This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 7a7ca50160 Migrate Google example DAG s3_to_gcs to new design AIP-47
(#24641)
7a7ca50160 is described below
commit 7a7ca5016019f93ebee052a2bf99772145b7fc03
Author: Chenglong Yan <[email protected]>
AuthorDate: Tue Jun 28 20:49:47 2022 +0800
Migrate Google example DAG s3_to_gcs to new design AIP-47 (#24641)
related: #22447, #22430
---
.../operators/transfer/s3_to_gcs.rst | 2 +-
.../cloud/transfers/test_s3_to_gcs_system.py | 51 ----------------------
.../google/cloud/gcs}/example_s3_to_gcs.py | 51 ++++++++++++++++------
3 files changed, 38 insertions(+), 66 deletions(-)
diff --git
a/docs/apache-airflow-providers-google/operators/transfer/s3_to_gcs.rst
b/docs/apache-airflow-providers-google/operators/transfer/s3_to_gcs.rst
index 64a93efa44..4cda423cea 100644
--- a/docs/apache-airflow-providers-google/operators/transfer/s3_to_gcs.rst
+++ b/docs/apache-airflow-providers-google/operators/transfer/s3_to_gcs.rst
@@ -32,7 +32,7 @@ Prerequisite Tasks
Use the
:class:`~airflow.providers.google.cloud.transfers.s3_to_gcs.S3ToGCSOperator`
to transfer data from Amazon S3 to Google Cloud Storage.
-..
exampleinclude::/../airflow/providers/google/cloud/example_dags/example_s3_to_gcs.py
+..
exampleinclude::/../tests/system/providers/google/cloud/gcs/example_s3_to_gcs.py
:language: python
:start-after: [START howto_transfer_s3togcs_operator]
:end-before: [END howto_transfer_s3togcs_operator]
diff --git a/tests/providers/google/cloud/transfers/test_s3_to_gcs_system.py
b/tests/providers/google/cloud/transfers/test_s3_to_gcs_system.py
deleted file mode 100644
index 6f25f64144..0000000000
--- a/tests/providers/google/cloud/transfers/test_s3_to_gcs_system.py
+++ /dev/null
@@ -1,51 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import pytest
-
-from airflow.providers.google.cloud.example_dags.example_s3_to_gcs import
UPLOAD_FILE
-from tests.providers.google.cloud.utils.gcp_authenticator import GCP_GCS_KEY
-from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER,
GoogleSystemTest, provide_gcp_context
-
-FILENAME = UPLOAD_FILE.split('/')[-1]
-
-
[email protected]("mysql", "postgres")
[email protected]_file(GCP_GCS_KEY)
-class S3ToGCSSystemTest(GoogleSystemTest):
- """System test for S3 to GCS transfer operator.
-
- This test requires the following environment variables:
-
- GCP_PROJECT_ID=your-gcp-project-id
- GCP_GCS_BUCKET=unique-bucket-name-to-create
- S3BUCKET_NAME=unique-s3-bucket-name
- AWS_ACCESS_KEY_ID=your-aws-access-key
- AWS_SECRET_ACCESS_KEY=your-aws-secret-access-key
- """
-
- def setUp(self) -> None:
- super().setUp()
- self.create_dummy_file(FILENAME)
-
- def tearDown(self) -> None:
- self.delete_dummy_file(FILENAME, dir_path='/tmp')
- super().tearDown()
-
- @provide_gcp_context(GCP_GCS_KEY)
- def test_run_example_dag_s3_to_gcs(self):
- self.run_dag('example_s3_to_gcs', CLOUD_DAG_FOLDER)
diff --git a/airflow/providers/google/cloud/example_dags/example_s3_to_gcs.py
b/tests/system/providers/google/cloud/gcs/example_s3_to_gcs.py
similarity index 60%
rename from airflow/providers/google/cloud/example_dags/example_s3_to_gcs.py
rename to tests/system/providers/google/cloud/gcs/example_s3_to_gcs.py
index e3948f390d..9242f8f0e6 100644
--- a/airflow/providers/google/cloud/example_dags/example_s3_to_gcs.py
+++ b/tests/system/providers/google/cloud/gcs/example_s3_to_gcs.py
@@ -24,11 +24,14 @@ from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator,
S3DeleteBucketOperator
from airflow.providers.google.cloud.operators.gcs import
GCSCreateBucketOperator, GCSDeleteBucketOperator
from airflow.providers.google.cloud.transfers.s3_to_gcs import S3ToGCSOperator
+from airflow.utils.trigger_rule import TriggerRule
-GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'gcp-project-id')
-S3BUCKET_NAME = os.environ.get('S3BUCKET_NAME', 'example-s3bucket-name')
-GCS_BUCKET = os.environ.get('GCP_GCS_BUCKET', 'example-gcsbucket-name')
-GCS_BUCKET_URL = f"gs://{GCS_BUCKET}/"
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+GCP_PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
+DAG_ID = "example_s3_to_gcs"
+
+BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
+GCS_BUCKET_URL = f"gs://{BUCKET_NAME}/"
UPLOAD_FILE = '/tmp/example-file.txt'
PREFIX = 'TESTS'
@@ -37,42 +40,62 @@ PREFIX = 'TESTS'
def upload_file():
"""A callable to upload file to AWS bucket"""
s3_hook = S3Hook()
- s3_hook.load_file(filename=UPLOAD_FILE, key=PREFIX,
bucket_name=S3BUCKET_NAME)
+ s3_hook.load_file(filename=UPLOAD_FILE, key=PREFIX,
bucket_name=BUCKET_NAME)
with models.DAG(
- 'example_s3_to_gcs',
+ DAG_ID,
schedule_interval='@once',
start_date=datetime(2021, 1, 1),
catchup=False,
- tags=['example'],
+ tags=['example', 's3'],
) as dag:
create_s3_bucket = S3CreateBucketOperator(
- task_id="create_s3_bucket", bucket_name=S3BUCKET_NAME,
region_name='us-east-1'
+ task_id="create_s3_bucket", bucket_name=BUCKET_NAME,
region_name='us-east-1'
)
create_gcs_bucket = GCSCreateBucketOperator(
task_id="create_bucket",
- bucket_name=GCS_BUCKET,
+ bucket_name=BUCKET_NAME,
project_id=GCP_PROJECT_ID,
)
# [START howto_transfer_s3togcs_operator]
transfer_to_gcs = S3ToGCSOperator(
- task_id='s3_to_gcs_task', bucket=S3BUCKET_NAME, prefix=PREFIX,
dest_gcs=GCS_BUCKET_URL
+ task_id='s3_to_gcs_task', bucket=BUCKET_NAME, prefix=PREFIX,
dest_gcs=GCS_BUCKET_URL
)
# [END howto_transfer_s3togcs_operator]
delete_s3_bucket = S3DeleteBucketOperator(
- task_id='delete_s3_bucket', bucket_name=S3BUCKET_NAME,
force_delete=True
+ task_id='delete_s3_bucket',
+ bucket_name=BUCKET_NAME,
+ force_delete=True,
+ trigger_rule=TriggerRule.ALL_DONE,
)
- delete_gcs_bucket = GCSDeleteBucketOperator(task_id='delete_gcs_bucket',
bucket_name=GCS_BUCKET)
+ delete_gcs_bucket = GCSDeleteBucketOperator(
+ task_id='delete_gcs_bucket', bucket_name=BUCKET_NAME,
trigger_rule=TriggerRule.ALL_DONE
+ )
(
- create_s3_bucket
+ # TEST SETUP
+ create_gcs_bucket
+ >> create_s3_bucket
>> upload_file()
- >> create_gcs_bucket
+ # TEST BODY
>> transfer_to_gcs
+ # TEST TEARDOWN
>> delete_s3_bucket
>> delete_gcs_bucket
)
+
+ from tests.system.utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "tearDown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
+
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see:
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)