This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 7a7ca50160 Migrate Google example DAG s3_to_gcs to new design AIP-47 
(#24641)
7a7ca50160 is described below

commit 7a7ca5016019f93ebee052a2bf99772145b7fc03
Author: Chenglong Yan <[email protected]>
AuthorDate: Tue Jun 28 20:49:47 2022 +0800

    Migrate Google example DAG s3_to_gcs to new design AIP-47 (#24641)
    
    related: #22447, #22430
---
 .../operators/transfer/s3_to_gcs.rst               |  2 +-
 .../cloud/transfers/test_s3_to_gcs_system.py       | 51 ----------------------
 .../google/cloud/gcs}/example_s3_to_gcs.py         | 51 ++++++++++++++++------
 3 files changed, 38 insertions(+), 66 deletions(-)

diff --git 
a/docs/apache-airflow-providers-google/operators/transfer/s3_to_gcs.rst 
b/docs/apache-airflow-providers-google/operators/transfer/s3_to_gcs.rst
index 64a93efa44..4cda423cea 100644
--- a/docs/apache-airflow-providers-google/operators/transfer/s3_to_gcs.rst
+++ b/docs/apache-airflow-providers-google/operators/transfer/s3_to_gcs.rst
@@ -32,7 +32,7 @@ Prerequisite Tasks
 Use the 
:class:`~airflow.providers.google.cloud.transfers.s3_to_gcs.S3ToGCSOperator`
 to transfer data from Amazon S3 to Google Cloud Storage.
 
-.. 
exampleinclude::/../airflow/providers/google/cloud/example_dags/example_s3_to_gcs.py
+.. 
exampleinclude::/../tests/system/providers/google/cloud/gcs/example_s3_to_gcs.py
     :language: python
     :start-after: [START howto_transfer_s3togcs_operator]
     :end-before: [END howto_transfer_s3togcs_operator]
diff --git a/tests/providers/google/cloud/transfers/test_s3_to_gcs_system.py 
b/tests/providers/google/cloud/transfers/test_s3_to_gcs_system.py
deleted file mode 100644
index 6f25f64144..0000000000
--- a/tests/providers/google/cloud/transfers/test_s3_to_gcs_system.py
+++ /dev/null
@@ -1,51 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import pytest
-
-from airflow.providers.google.cloud.example_dags.example_s3_to_gcs import 
UPLOAD_FILE
-from tests.providers.google.cloud.utils.gcp_authenticator import GCP_GCS_KEY
-from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, 
GoogleSystemTest, provide_gcp_context
-
-FILENAME = UPLOAD_FILE.split('/')[-1]
-
-
[email protected]("mysql", "postgres")
[email protected]_file(GCP_GCS_KEY)
-class S3ToGCSSystemTest(GoogleSystemTest):
-    """System test for S3 to GCS transfer operator.
-
-    This test requires the following environment variables:
-
-    GCP_PROJECT_ID=your-gcp-project-id
-    GCP_GCS_BUCKET=unique-bucket-name-to-create
-    S3BUCKET_NAME=unique-s3-bucket-name
-    AWS_ACCESS_KEY_ID=your-aws-access-key
-    AWS_SECRET_ACCESS_KEY=your-aws-secret-access-key
-    """
-
-    def setUp(self) -> None:
-        super().setUp()
-        self.create_dummy_file(FILENAME)
-
-    def tearDown(self) -> None:
-        self.delete_dummy_file(FILENAME, dir_path='/tmp')
-        super().tearDown()
-
-    @provide_gcp_context(GCP_GCS_KEY)
-    def test_run_example_dag_s3_to_gcs(self):
-        self.run_dag('example_s3_to_gcs', CLOUD_DAG_FOLDER)
diff --git a/airflow/providers/google/cloud/example_dags/example_s3_to_gcs.py 
b/tests/system/providers/google/cloud/gcs/example_s3_to_gcs.py
similarity index 60%
rename from airflow/providers/google/cloud/example_dags/example_s3_to_gcs.py
rename to tests/system/providers/google/cloud/gcs/example_s3_to_gcs.py
index e3948f390d..9242f8f0e6 100644
--- a/airflow/providers/google/cloud/example_dags/example_s3_to_gcs.py
+++ b/tests/system/providers/google/cloud/gcs/example_s3_to_gcs.py
@@ -24,11 +24,14 @@ from airflow.providers.amazon.aws.hooks.s3 import S3Hook
 from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator, 
S3DeleteBucketOperator
 from airflow.providers.google.cloud.operators.gcs import 
GCSCreateBucketOperator, GCSDeleteBucketOperator
 from airflow.providers.google.cloud.transfers.s3_to_gcs import S3ToGCSOperator
+from airflow.utils.trigger_rule import TriggerRule
 
-GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'gcp-project-id')
-S3BUCKET_NAME = os.environ.get('S3BUCKET_NAME', 'example-s3bucket-name')
-GCS_BUCKET = os.environ.get('GCP_GCS_BUCKET', 'example-gcsbucket-name')
-GCS_BUCKET_URL = f"gs://{GCS_BUCKET}/"
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+GCP_PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
+DAG_ID = "example_s3_to_gcs"
+
+BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
+GCS_BUCKET_URL = f"gs://{BUCKET_NAME}/"
 UPLOAD_FILE = '/tmp/example-file.txt'
 PREFIX = 'TESTS'
 
@@ -37,42 +40,62 @@ PREFIX = 'TESTS'
 def upload_file():
     """A callable to upload file to AWS bucket"""
     s3_hook = S3Hook()
-    s3_hook.load_file(filename=UPLOAD_FILE, key=PREFIX, 
bucket_name=S3BUCKET_NAME)
+    s3_hook.load_file(filename=UPLOAD_FILE, key=PREFIX, 
bucket_name=BUCKET_NAME)
 
 
 with models.DAG(
-    'example_s3_to_gcs',
+    DAG_ID,
     schedule_interval='@once',
     start_date=datetime(2021, 1, 1),
     catchup=False,
-    tags=['example'],
+    tags=['example', 's3'],
 ) as dag:
     create_s3_bucket = S3CreateBucketOperator(
-        task_id="create_s3_bucket", bucket_name=S3BUCKET_NAME, 
region_name='us-east-1'
+        task_id="create_s3_bucket", bucket_name=BUCKET_NAME, 
region_name='us-east-1'
     )
 
     create_gcs_bucket = GCSCreateBucketOperator(
         task_id="create_bucket",
-        bucket_name=GCS_BUCKET,
+        bucket_name=BUCKET_NAME,
         project_id=GCP_PROJECT_ID,
     )
     # [START howto_transfer_s3togcs_operator]
     transfer_to_gcs = S3ToGCSOperator(
-        task_id='s3_to_gcs_task', bucket=S3BUCKET_NAME, prefix=PREFIX, 
dest_gcs=GCS_BUCKET_URL
+        task_id='s3_to_gcs_task', bucket=BUCKET_NAME, prefix=PREFIX, 
dest_gcs=GCS_BUCKET_URL
     )
     # [END howto_transfer_s3togcs_operator]
 
     delete_s3_bucket = S3DeleteBucketOperator(
-        task_id='delete_s3_bucket', bucket_name=S3BUCKET_NAME, 
force_delete=True
+        task_id='delete_s3_bucket',
+        bucket_name=BUCKET_NAME,
+        force_delete=True,
+        trigger_rule=TriggerRule.ALL_DONE,
     )
 
-    delete_gcs_bucket = GCSDeleteBucketOperator(task_id='delete_gcs_bucket', 
bucket_name=GCS_BUCKET)
+    delete_gcs_bucket = GCSDeleteBucketOperator(
+        task_id='delete_gcs_bucket', bucket_name=BUCKET_NAME, 
trigger_rule=TriggerRule.ALL_DONE
+    )
 
     (
-        create_s3_bucket
+        # TEST SETUP
+        create_gcs_bucket
+        >> create_s3_bucket
         >> upload_file()
-        >> create_gcs_bucket
+        # TEST BODY
         >> transfer_to_gcs
+        # TEST TEARDOWN
         >> delete_s3_bucket
         >> delete_gcs_bucket
     )
+
+    from tests.system.utils.watcher import watcher
+
+    # This test needs watcher in order to properly mark success/failure
+    # when "tearDown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()
+
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: 
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)

Reply via email to