This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new a69b031 Add S3ToRedshift example dag and system test (#8877)
a69b031 is described below
commit a69b031f20c5a1cd032f9873394374f661811e8f
Author: Felix Uellendall <[email protected]>
AuthorDate: Wed Jun 10 17:20:08 2020 +0200
Add S3ToRedshift example dag and system test (#8877)
- add howto docs for S3ToRedshift example dag
- add terraform which runs terraform CLI commands in an isolated docker
container
NOTE: This system test uses terraform to provide the infrastructure needed
to run this example dag.
---
.gitignore | 10 +++
.pre-commit-config.yaml | 2 +-
.../aws/example_dags/example_s3_to_redshift.py | 91 ++++++++++++++++++++++
docs/howto/operator/amazon/aws/s3_to_redshift.rst | 79 +++++++++++++++++++
docs/operators-and-hooks-ref.rst | 2 +-
.../run_test_package_import_all_classes.sh | 2 +-
scripts/ci/kubernetes/docker/bootstrap.sh | 2 +-
scripts/ci/prepare_tool_scripts.sh | 43 ++++++++++
.../example_s3_to_redshift/outputs.tf | 38 +++++++++
.../example_s3_to_redshift/resources.tf | 35 +++++++++
.../example_s3_to_redshift/variables.tf | 25 ++++++
.../aws/operators/test_s3_to_redshift_system.py | 50 ++++++++++++
tests/test_utils/terraform.py | 34 ++++++++
13 files changed, 409 insertions(+), 4 deletions(-)
diff --git a/.gitignore b/.gitignore
index 6aea114..b3314d2 100644
--- a/.gitignore
+++ b/.gitignore
@@ -187,3 +187,13 @@ dmypy.json
/.inputrc
log.txt*
/backport_packages/CHANGELOG.txt
+
+# Local .terraform directories
+**/.terraform/*
+
+# .tfstate files
+*.tfstate
+*.tfstate.*
+
+# Terraform variables
+*.tfvars
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index 0bec834..dc30879 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -46,7 +46,7 @@ repos:
- license-templates/LICENSE.txt
- --fuzzy-match-generates-todo
files: >
-
\.properties$|\.cfg$|\.conf$|\.ini$|\.ldif$|\.readthedocs$|\.service$|^Dockerfile.*$
+
\.properties$|\.cfg$|\.conf$|\.ini$|\.ldif$|\.readthedocs$|\.service$|\.tf$|^Dockerfile.*$
- id: insert-license
name: Add license for all rst files
exclude: ^\.github/.*$
diff --git
a/airflow/providers/amazon/aws/example_dags/example_s3_to_redshift.py
b/airflow/providers/amazon/aws/example_dags/example_s3_to_redshift.py
new file mode 100644
index 0000000..c36d443
--- /dev/null
+++ b/airflow/providers/amazon/aws/example_dags/example_s3_to_redshift.py
@@ -0,0 +1,91 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+This is an example dag for using `S3ToRedshiftTransferOperator` to copy a S3
key into a Redshift table.
+"""
+
+from os import getenv
+
+from airflow import DAG
+from airflow.operators.python import PythonOperator
+from airflow.providers.amazon.aws.hooks.s3 import S3Hook
+from airflow.providers.amazon.aws.operators.s3_to_redshift import
S3ToRedshiftTransferOperator
+from airflow.providers.postgres.operators.postgres import PostgresOperator
+from airflow.utils.dates import days_ago
+
+# [START howto_operator_s3_to_redshift_env_variables]
+S3_BUCKET = getenv("S3_BUCKET")
+S3_KEY = getenv("S3_KEY", "key")
+REDSHIFT_TABLE = getenv("REDSHIFT_TABLE", "test_table")
+# [END howto_operator_s3_to_redshift_env_variables]
+
+default_args = {"start_date": days_ago(1)}
+
+
+def _add_sample_data_to_s3():
+ s3_hook = S3Hook()
+ s3_hook.load_string("0,Airflow", f'{S3_KEY}/{REDSHIFT_TABLE}', S3_BUCKET,
replace=True)
+
+
+def _remove_sample_data_from_s3():
+ s3_hook = S3Hook()
+ if s3_hook.check_for_key(f'{S3_KEY}/{REDSHIFT_TABLE}', S3_BUCKET):
+ s3_hook.delete_objects(S3_BUCKET, f'{S3_KEY}/{REDSHIFT_TABLE}')
+
+
+with DAG(
+ dag_id="example_s3_to_redshift",
+ default_args=default_args,
+ schedule_interval=None,
+ tags=['example']
+) as dag:
+ setup__task_add_sample_data_to_s3 = PythonOperator(
+ python_callable=_add_sample_data_to_s3,
+ task_id='setup__add_sample_data_to_s3'
+ )
+ setup__task_create_table = PostgresOperator(
+ sql=f'CREATE TABLE IF NOT EXISTS {REDSHIFT_TABLE}(Id int, Name
varchar)',
+ postgres_conn_id='redshift_default',
+ task_id='setup__create_table'
+ )
+ # [START howto_operator_s3_to_redshift_task_1]
+ task_transfer_s3_to_redshift = S3ToRedshiftTransferOperator(
+ s3_bucket=S3_BUCKET,
+ s3_key=S3_KEY,
+ schema="PUBLIC",
+ table=REDSHIFT_TABLE,
+ copy_options=['csv'],
+ task_id='transfer_s3_to_redshift'
+ )
+ # [END howto_operator_s3_to_redshift_task_1]
+ teardown__task_drop_table = PostgresOperator(
+ sql=f'DROP TABLE IF EXISTS {REDSHIFT_TABLE}',
+ postgres_conn_id='redshift_default',
+ task_id='teardown__drop_table'
+ )
+ teardown__task_remove_sample_data_from_s3 = PythonOperator(
+ python_callable=_remove_sample_data_from_s3,
+ task_id='teardown__remove_sample_data_from_s3'
+ )
+ [
+ setup__task_add_sample_data_to_s3,
+ setup__task_create_table
+ ] >> task_transfer_s3_to_redshift >> [
+ teardown__task_drop_table,
+ teardown__task_remove_sample_data_from_s3
+ ]
diff --git a/docs/howto/operator/amazon/aws/s3_to_redshift.rst
b/docs/howto/operator/amazon/aws/s3_to_redshift.rst
new file mode 100644
index 0000000..7a634da
--- /dev/null
+++ b/docs/howto/operator/amazon/aws/s3_to_redshift.rst
@@ -0,0 +1,79 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+
+.. _howto/operator:S3ToRedshiftTransferOperator:
+
+S3 To Redshift Transfer Operator
+================================
+
+.. contents::
+ :depth: 1
+ :local:
+
+Overview
+--------
+
+The ``S3ToRedshiftTransferOperator`` copies data from a S3 Bucket into a
Redshift table.
+
+The example dag provided showcases the
+:class:`~airflow.providers.amazon.aws.operators.s3_to_redshift.S3ToRedshiftTransferOperator`
+in action.
+
+ - example_s3_to_redshift.py
+
+example_s3_to_redshift.py
+-------------------------
+
+Purpose
+"""""""
+
+This is a basic example dag for using ``S3ToRedshiftTransferOperator`` to
copies data from a S3 Bucket into a Redshift table.
+
+Environment variables
+"""""""""""""""""""""
+
+This example relies on the following variables, which can be passed via OS
environment variables.
+
+.. exampleinclude::
../../../../../airflow/providers/amazon/aws/example_dags/example_s3_to_redshift.py
+ :language: python
+ :start-after: [START howto_operator_s3_to_redshift_env_variables]
+ :end-before: [END howto_operator_s3_to_redshift_env_variables]
+
+You need to set at least the ``S3_BUCKET``.
+
+Copy S3 key into Redshift table
+"""""""""""""""""""""""""""""""
+
+In the following code we are copying the S3 key
``s3://{S3_BUCKET}/{S3_KEY}/{REDSHIFT_TABLE}`` into the Redshift table
+``PUBLIC.{REDSHIFT_TABLE}``.
+
+.. exampleinclude::
../../../../../airflow/providers/amazon/aws/example_dags/example_s3_to_redshift.py
+ :language: python
+ :start-after: [START howto_operator_s3_to_redshift_task_1]
+ :end-before: [END howto_operator_s3_to_redshift_task_1]
+
+You can find more information to the ``COPY`` command used
+`here
<https://docs.aws.amazon.com/us_en/redshift/latest/dg/copy-parameters-data-source-s3.html>`__.
+
+Reference
+---------
+
+For further information, look at:
+
+* `AWS COPY from Amazon S3 Documentation
<https://docs.aws.amazon.com/us_en/redshift/latest/dg/copy-parameters-data-source-s3.html>`__
+* `AWS boto3 Library Documentation for S3
<https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html>`__
diff --git a/docs/operators-and-hooks-ref.rst b/docs/operators-and-hooks-ref.rst
index 6f9377b..46f5f1d 100644
--- a/docs/operators-and-hooks-ref.rst
+++ b/docs/operators-and-hooks-ref.rst
@@ -539,7 +539,7 @@ These integrations allow you to copy data from/to Amazon
Web Services.
* - `Amazon Simple Storage Service (S3) <https://aws.amazon.com/s3/>`_
- `Amazon Redshift <https://aws.amazon.com/redshift/>`__
- -
+ - :doc:`How to use <howto/operator/amazon/aws/s3_to_redshift>`
- :mod:`airflow.providers.amazon.aws.operators.s3_to_redshift`
* - `Amazon Simple Storage Service (S3) <https://aws.amazon.com/s3/>`_
diff --git a/scripts/ci/in_container/run_test_package_import_all_classes.sh
b/scripts/ci/in_container/run_test_package_import_all_classes.sh
index 4ad80c8..863e48b 100755
--- a/scripts/ci/in_container/run_test_package_import_all_classes.sh
+++ b/scripts/ci/in_container/run_test_package_import_all_classes.sh
@@ -48,7 +48,7 @@ else
fi
echo
-echo Installing all packages at once in Airlfow 1.10
+echo Installing all packages at once in Airflow 1.10
echo
# Install all packages at once
diff --git a/scripts/ci/kubernetes/docker/bootstrap.sh
b/scripts/ci/kubernetes/docker/bootstrap.sh
index 9099b6b..d8d99c2 100755
--- a/scripts/ci/kubernetes/docker/bootstrap.sh
+++ b/scripts/ci/kubernetes/docker/bootstrap.sh
@@ -35,7 +35,7 @@ echo
echo "Uninstalling pre-installed airflow"
echo
-# Uninstall preinstalled Apache Airlfow
+# Uninstall preinstalled Apache Airflow
pip uninstall -y apache-airflow
diff --git a/scripts/ci/prepare_tool_scripts.sh
b/scripts/ci/prepare_tool_scripts.sh
index 7a98c50..ecdde3c 100755
--- a/scripts/ci/prepare_tool_scripts.sh
+++ b/scripts/ci/prepare_tool_scripts.sh
@@ -62,3 +62,46 @@ prepare_tool_script "mcr.microsoft.com/azure-cli:latest"
".azure" az az
prepare_tool_script "${GCLOUD_IMAGE}" ".config/gcloud" bq bq
prepare_tool_script "${GCLOUD_IMAGE}" ".config/gcloud" gcloud gcloud
prepare_tool_script "${GCLOUD_IMAGE}" ".config/gcloud" gsutil gsutil
+
+function prepare_terraform_script() {
+ TOOL="terraform"
+ IMAGE="hashicorp/terraform:latest"
+
+ TARGET_TOOL_PATH="/usr/bin/${TOOL}"
+ TARGET_TOOL_UPDATE_PATH="/usr/bin/${TOOL}-update"
+
+ cat >"${TARGET_TOOL_PATH}" <<EOF
+#!/usr/bin/env bash
+docker run --rm -it \
+ -v "\${HOST_AIRFLOW_SOURCES}/tmp:/tmp" \
+ -v "\${HOST_AIRFLOW_SOURCES}/files:/files" \
+ -v "\${HOST_AIRFLOW_SOURCES}:/opt/airflow" \
+ -v "\${HOST_HOME}/.aws:/root/.aws" \
+ -v "\${HOST_HOME}/.azure:/root/.azure" \
+ -v "\${HOST_HOME}/.config/gcloud:/root/.config/gcloud" \
+ -w /opt/airflow \
+ --env-file <(env | grep TF) \
+ "${IMAGE}" "\$@"
+RES=\$?
+if [[ \${HOST_OS} == "Linux" ]]; then
+ docker run --rm \
+ -v "\${HOST_AIRFLOW_SOURCES}/tmp:/tmp" \
+ -v "\${HOST_AIRFLOW_SOURCES}/files:/files" \
+ -v "\${HOST_HOME}/.aws:/root/.aws" \
+ -v "\${HOST_HOME}/.azure:/root/.azure" \
+ -v "\${HOST_HOME}/.config/gcloud:/root/.config/gcloud" \
+ "\${AIRFLOW_CI_IMAGE}" bash -c \
+ "find '/tmp/' '/files/' '/root/.aws' '/root/.azure'
'/root/.config/gcloud' -user root -print0 | xargs --null chown
'\${HOST_USER_ID}.\${HOST_GROUP_ID}' --no-dereference" >/dev/null 2>&1
+fi
+exit \${RES}
+EOF
+
+ cat >"${TARGET_TOOL_UPDATE_PATH}" <<EOF
+#!/usr/bin/env bash
+docker pull "${IMAGE}"
+EOF
+
+ chmod a+x "${TARGET_TOOL_PATH}" "${TARGET_TOOL_UPDATE_PATH}"
+}
+
+prepare_terraform_script
diff --git
a/tests/providers/amazon/aws/infrastructure/example_s3_to_redshift/outputs.tf
b/tests/providers/amazon/aws/infrastructure/example_s3_to_redshift/outputs.tf
new file mode 100644
index 0000000..18a9f71
--- /dev/null
+++
b/tests/providers/amazon/aws/infrastructure/example_s3_to_redshift/outputs.tf
@@ -0,0 +1,38 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+output "redshift_endpoint" {
+ value = aws_redshift_cluster.redshift.endpoint
+ description = "The redshift endpoint which is needed to create an airflow
connection."
+}
+
+output "redshift_database_name" {
+ value = aws_redshift_cluster.redshift.database_name
+ description = "The redshift database name which is needed to create an
airflow connection."
+}
+
+output "redshift_master_username" {
+ value = aws_redshift_cluster.redshift.master_username
+ description = "The redshift username which is needed to create an airflow
connection."
+ sensitive = true
+}
+
+output "redshift_master_password" {
+ value = aws_redshift_cluster.redshift.master_password
+ description = "The redshift password which is needed to create an airflow
connection."
+ sensitive = true
+}
diff --git
a/tests/providers/amazon/aws/infrastructure/example_s3_to_redshift/resources.tf
b/tests/providers/amazon/aws/infrastructure/example_s3_to_redshift/resources.tf
new file mode 100644
index 0000000..4c5c019
--- /dev/null
+++
b/tests/providers/amazon/aws/infrastructure/example_s3_to_redshift/resources.tf
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+provider "aws" {
+ region = var.aws_region
+}
+
+resource "aws_s3_bucket" "s3" {
+ bucket = var.s3_bucket
+ force_destroy = true
+}
+
+resource "aws_redshift_cluster" "redshift" {
+ cluster_identifier = var.redshift_cluster_identifier
+ database_name = var.redshift_database_name
+ master_username = var.redshift_master_username
+ master_password = var.redshift_master_password
+ node_type = "dc1.large"
+ cluster_type = "single-node"
+ skip_final_snapshot = true
+}
diff --git
a/tests/providers/amazon/aws/infrastructure/example_s3_to_redshift/variables.tf
b/tests/providers/amazon/aws/infrastructure/example_s3_to_redshift/variables.tf
new file mode 100644
index 0000000..72ee8a9
--- /dev/null
+++
b/tests/providers/amazon/aws/infrastructure/example_s3_to_redshift/variables.tf
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+variable "aws_region" {}
+
+variable "s3_bucket" {}
+
+variable "redshift_cluster_identifier" {}
+variable "redshift_database_name" {}
+variable "redshift_master_username" {}
+variable "redshift_master_password" {}
diff --git a/tests/providers/amazon/aws/operators/test_s3_to_redshift_system.py
b/tests/providers/amazon/aws/operators/test_s3_to_redshift_system.py
new file mode 100644
index 0000000..ae73167
--- /dev/null
+++ b/tests/providers/amazon/aws/operators/test_s3_to_redshift_system.py
@@ -0,0 +1,50 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import os
+
+import pytest
+
+from airflow.models import Connection
+from airflow.utils import db
+from airflow.utils.session import create_session
+from tests.test_utils import AIRFLOW_MAIN_FOLDER
+from tests.test_utils.amazon_system_helpers import AWS_DAG_FOLDER,
AmazonSystemTest
+from tests.test_utils.terraform import Terraform
+
+
[email protected]("mysql", "postgres")
+class TestS3ToRedshiftExampleDags(AmazonSystemTest, Terraform):
+ TERRAFORM_DIR = os.path.join(
+ AIRFLOW_MAIN_FOLDER, "tests", "providers", "amazon", "aws",
"infrastructure", "example_s3_to_redshift"
+ )
+
+ def setUp(self) -> None:
+ super().setUp()
+ host, port = self.get_tf_output("redshift_endpoint").split(':')
+ schema = self.get_tf_output("redshift_database_name")
+ login = self.get_tf_output("redshift_master_username")
+ password = self.get_tf_output("redshift_master_password")
+ db.merge_conn(Connection("redshift_default", "postgres", host, login,
password, schema, port))
+
+ def test_run_example_dag_s3_to_redshift(self):
+ self.run_dag('example_s3_to_redshift', AWS_DAG_FOLDER)
+
+ def tearDown(self) -> None:
+ super().tearDown()
+ with create_session() as session:
+ session.query(Connection).filter(Connection.conn_id ==
"redshift_default").delete()
diff --git a/tests/test_utils/terraform.py b/tests/test_utils/terraform.py
new file mode 100644
index 0000000..a85d4fc
--- /dev/null
+++ b/tests/test_utils/terraform.py
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from tests.test_utils.system_tests_class import SystemTest
+
+
+class Terraform(SystemTest):
+ TERRAFORM_DIR: str
+
+ def setUp(self) -> None:
+ self.execute_cmd(["terraform", "init", "-input=false",
self.TERRAFORM_DIR])
+ self.execute_cmd(["terraform", "plan", "-input=false",
self.TERRAFORM_DIR])
+ self.execute_cmd(["terraform", "apply", "-input=false",
"-auto-approve", self.TERRAFORM_DIR])
+
+ def get_tf_output(self, name):
+ return self.check_output(["terraform", "output",
name]).decode('utf-8').replace("\r\n", "")
+
+ def tearDown(self) -> None:
+ self.execute_cmd(["terraform", "plan", "-destroy", "-input=false",
self.TERRAFORM_DIR])
+ self.execute_cmd(["terraform", "destroy", "-input=false",
"-auto-approve", self.TERRAFORM_DIR])