This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 843a3b8c3e Fix system test example_cloud_storage_transfer_service_aws 
(#33429)
843a3b8c3e is described below

commit 843a3b8c3ecca9582a269cf780144738c9c45d15
Author: max <[email protected]>
AuthorDate: Wed Aug 16 12:27:28 2023 +0000

    Fix system test example_cloud_storage_transfer_service_aws (#33429)
---
 .../cloud/cloud_storage_transfer_service.rst       |  18 +--
 .../example_cloud_storage_transfer_service_aws.py  | 168 +++++++++++++++------
 2 files changed, 128 insertions(+), 58 deletions(-)

diff --git 
a/docs/apache-airflow-providers-google/operators/cloud/cloud_storage_transfer_service.rst
 
b/docs/apache-airflow-providers-google/operators/cloud/cloud_storage_transfer_service.rst
index 4c7309cf4c..bd6a3fde3c 100644
--- 
a/docs/apache-airflow-providers-google/operators/cloud/cloud_storage_transfer_service.rst
+++ 
b/docs/apache-airflow-providers-google/operators/cloud/cloud_storage_transfer_service.rst
@@ -67,12 +67,12 @@ Using the operator
       :start-after: [START howto_operator_gcp_transfer_create_job_body_gcp]
       :end-before: [END howto_operator_gcp_transfer_create_job_body_gcp]
 
-.. exampleinclude:: 
/../../airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_aws.py
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py
       :language: python
       :start-after: [START howto_operator_gcp_transfer_create_job_body_aws]
       :end-before: [END howto_operator_gcp_transfer_create_job_body_aws]
 
-.. exampleinclude:: 
/../../airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_aws.py
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py
       :language: python
       :dedent: 4
       :start-after: [START howto_operator_gcp_transfer_create_job]
@@ -107,7 +107,7 @@ For parameter definition, take a look at
 Using the operator
 """"""""""""""""""
 
-.. exampleinclude:: 
/../../airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_aws.py
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py
       :language: python
       :dedent: 4
       :start-after: [START howto_operator_gcp_transfer_delete_job]
@@ -181,7 +181,7 @@ For parameter definition, take a look at
 Using the operator
 """"""""""""""""""
 
-.. exampleinclude:: 
/../../airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_aws.py
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py
       :language: python
       :dedent: 4
       :start-after: [START howto_operator_gcp_transfer_cancel_operation]
@@ -217,7 +217,7 @@ For parameter definition, take a look at
 Using the operator
 """"""""""""""""""
 
-.. exampleinclude:: 
/../../airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_aws.py
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py
       :language: python
       :dedent: 4
       :start-after: [START howto_operator_gcp_transfer_get_operation]
@@ -252,7 +252,7 @@ For parameter definition, take a look at
 Using the operator
 """"""""""""""""""
 
-.. exampleinclude:: 
/../../airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_aws.py
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py
       :language: python
       :dedent: 4
       :start-after: [START howto_operator_gcp_transfer_list_operations]
@@ -286,7 +286,7 @@ For parameter definition, take a look at
 Using the operator
 """"""""""""""""""
 
-.. exampleinclude:: 
/../../airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_aws.py
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py
       :language: python
       :dedent: 4
       :start-after: [START howto_operator_gcp_transfer_pause_operation]
@@ -320,7 +320,7 @@ For parameter definition, take a look at
 Using the operator
 """"""""""""""""""
 
-.. exampleinclude:: 
/../../airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_aws.py
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py
       :language: python
       :dedent: 4
       :start-after: [START howto_operator_gcp_transfer_resume_operation]
@@ -355,7 +355,7 @@ For parameter definition, take a look at
 Using the operator
 """"""""""""""""""
 
-.. exampleinclude:: 
/../../airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_aws.py
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py
       :language: python
       :dedent: 4
       :start-after: [START howto_operator_gcp_transfer_wait_operation]
diff --git 
a/airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_aws.py
 
b/tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py
similarity index 54%
rename from 
airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_aws.py
rename to 
tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py
index 6a5f93bffc..9855f1ceb5 100644
--- 
a/airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_aws.py
+++ 
b/tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py
@@ -16,29 +16,18 @@
 # specific language governing permissions and limitations
 # under the License.
 """
-Example Airflow DAG that demonstrates interactions with Google Cloud Transfer. 
This DAG relies on
-the following OS environment variables
-
-Note that you need to provide a large enough set of data so that operations do 
not execute too quickly.
-Otherwise, DAG will fail.
-
-* GCP_PROJECT_ID - Google Cloud Project to use for the Google Cloud Transfer 
Service.
-* GCP_DESCRIPTION - Description of transfer job
-* GCP_TRANSFER_SOURCE_AWS_BUCKET - Amazon Web Services Storage bucket from 
which files are copied.
-* GCP_TRANSFER_SECOND_TARGET_BUCKET - Google Cloud Storage bucket to which 
files are copied
-* WAIT_FOR_OPERATION_POKE_INTERVAL - interval of what to check the status of 
the operation
-  A smaller value than the default value accelerates the system test and 
ensures its correct execution with
-  smaller quantities of files in the source bucket
-  Look at documentation of 
:class:`~airflow.operators.sensors.BaseSensorOperator` for more information
-
+Example Airflow DAG that demonstrates interactions with Google Cloud Transfer.
 """
 from __future__ import annotations
 
 import os
 from datetime import datetime, timedelta
 
+from pydantic.main import deepcopy
+
 from airflow import models
-from airflow.models.baseoperator import chain
+from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator, 
S3DeleteBucketOperator
+from airflow.providers.amazon.aws.transfers.gcs_to_s3 import GCSToS3Operator
 from airflow.providers.google.cloud.hooks.cloud_storage_transfer_service 
import (
     ALREADY_EXISTING_IN_SINK,
     AWS_S3_DATA_SOURCE,
@@ -68,21 +57,26 @@ from 
airflow.providers.google.cloud.operators.cloud_storage_transfer_service imp
     CloudDataTransferServicePauseOperationOperator,
     CloudDataTransferServiceResumeOperationOperator,
 )
+from airflow.providers.google.cloud.operators.gcs import 
GCSCreateBucketOperator, GCSDeleteBucketOperator
 from airflow.providers.google.cloud.sensors.cloud_storage_transfer_service 
import (
     CloudDataTransferServiceJobStatusSensor,
 )
+from airflow.utils.trigger_rule import TriggerRule
 
-GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
-GCP_DESCRIPTION = os.environ.get("GCP_DESCRIPTION", "description")
-GCP_TRANSFER_TARGET_BUCKET = os.environ.get("GCP_TRANSFER_TARGET_BUCKET")
-WAIT_FOR_OPERATION_POKE_INTERVAL = 
int(os.environ.get("WAIT_FOR_OPERATION_POKE_INTERVAL", 5))
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+GCP_PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
 
-GCP_TRANSFER_SOURCE_AWS_BUCKET = 
os.environ.get("GCP_TRANSFER_SOURCE_AWS_BUCKET")
-GCP_TRANSFER_FIRST_TARGET_BUCKET = os.environ.get(
-    "GCP_TRANSFER_FIRST_TARGET_BUCKET", "gcp-transfer-first-target"
-)
+DAG_ID = "example_gcp_transfer_aws"
+
+EXAMPLE_BUCKET = "airflow-system-tests-resources"
+EXAMPLE_FILE = "storage-transfer/big_file.dat"
+BUCKET_SOURCE_AWS = f"bucket-aws-{DAG_ID}-{ENV_ID}".replace("_", "-")
+BUCKET_TARGET_GCS = f"bucket-gcs-{DAG_ID}-{ENV_ID}".replace("_", "-")
+WAIT_FOR_OPERATION_POKE_INTERVAL = 
int(os.environ.get("WAIT_FOR_OPERATION_POKE_INTERVAL", 5))
 
-GCP_TRANSFER_JOB_NAME = os.environ.get("GCP_TRANSFER_JOB_NAME", 
"transferJobs/sampleJob")
+GCP_DESCRIPTION = "description"
+GCP_TRANSFER_JOB_NAME = 
f"transferJobs/sampleJob-{DAG_ID}-{ENV_ID}".replace("-", "_")
+GCP_TRANSFER_JOB_2_NAME = 
f"transferJobs/sampleJob2-{DAG_ID}-{ENV_ID}".replace("-", "_")
 
 # [START howto_operator_gcp_transfer_create_job_body_aws]
 aws_to_gcs_transfer_body = {
@@ -93,33 +87,53 @@ aws_to_gcs_transfer_body = {
     SCHEDULE: {
         SCHEDULE_START_DATE: datetime(2015, 1, 1).date(),
         SCHEDULE_END_DATE: datetime(2030, 1, 1).date(),
-        START_TIME_OF_DAY: (datetime.utcnow() + timedelta(minutes=2)).time(),
+        START_TIME_OF_DAY: (datetime.utcnow() + timedelta(minutes=1)).time(),
     },
     TRANSFER_SPEC: {
-        AWS_S3_DATA_SOURCE: {BUCKET_NAME: GCP_TRANSFER_SOURCE_AWS_BUCKET},
-        GCS_DATA_SINK: {BUCKET_NAME: GCP_TRANSFER_FIRST_TARGET_BUCKET},
+        AWS_S3_DATA_SOURCE: {BUCKET_NAME: BUCKET_SOURCE_AWS},
+        GCS_DATA_SINK: {BUCKET_NAME: BUCKET_TARGET_GCS},
         TRANSFER_OPTIONS: {ALREADY_EXISTING_IN_SINK: True},
     },
 }
 # [END howto_operator_gcp_transfer_create_job_body_aws]
 
+aws_to_gcs_transfer_body_2 = deepcopy(aws_to_gcs_transfer_body)
+aws_to_gcs_transfer_body_2[JOB_NAME] = GCP_TRANSFER_JOB_2_NAME
 
 with models.DAG(
-    "example_gcp_transfer_aws",
+    dag_id=DAG_ID,
     start_date=datetime(2021, 1, 1),
     catchup=False,
-    tags=["example"],
+    tags=["example", "aws", "gcs", "transfer"],
 ) as dag:
+    create_bucket_s3 = S3CreateBucketOperator(
+        task_id="create_bucket_s3", bucket_name=BUCKET_SOURCE_AWS, 
region_name="us-east-1"
+    )
+
+    upload_file_to_s3 = GCSToS3Operator(
+        task_id="upload_file_to_s3",
+        gcp_user_project=GCP_PROJECT_ID,
+        bucket=EXAMPLE_BUCKET,
+        prefix=EXAMPLE_FILE,
+        dest_s3_key=f"s3://{BUCKET_SOURCE_AWS}",
+        replace=True,
+    )
+    #
+    create_bucket_gcs = GCSCreateBucketOperator(
+        task_id="create_bucket_gcs",
+        bucket_name=BUCKET_TARGET_GCS,
+        project_id=GCP_PROJECT_ID,
+    )
 
     # [START howto_operator_gcp_transfer_create_job]
-    create_transfer_job_from_aws = CloudDataTransferServiceCreateJobOperator(
-        task_id="create_transfer_job_from_aws", body=aws_to_gcs_transfer_body
+    create_transfer_job_s3_to_gcs = CloudDataTransferServiceCreateJobOperator(
+        task_id="create_transfer_job_s3_to_gcs", body=aws_to_gcs_transfer_body
     )
     # [END howto_operator_gcp_transfer_create_job]
 
     wait_for_operation_to_start = CloudDataTransferServiceJobStatusSensor(
         task_id="wait_for_operation_to_start",
-        
job_name="{{task_instance.xcom_pull('create_transfer_job_from_aws')['name']}}",
+        
job_name="{{task_instance.xcom_pull('create_transfer_job_s3_to_gcs')['name']}}",
         project_id=GCP_PROJECT_ID,
         expected_statuses={GcpTransferOperationStatus.IN_PROGRESS},
         poke_interval=WAIT_FOR_OPERATION_POKE_INTERVAL,
@@ -138,7 +152,7 @@ with models.DAG(
         task_id="list_operations",
         request_filter={
             FILTER_PROJECT_ID: GCP_PROJECT_ID,
-            FILTER_JOB_NAMES: 
["{{task_instance.xcom_pull('create_transfer_job_from_aws')['name']}}"],
+            FILTER_JOB_NAMES: 
["{{task_instance.xcom_pull('create_transfer_job_s3_to_gcs')['name']}}"],
         },
     )
     # [END howto_operator_gcp_transfer_list_operations]
@@ -158,37 +172,93 @@ with models.DAG(
     # [START howto_operator_gcp_transfer_wait_operation]
     wait_for_operation_to_end = CloudDataTransferServiceJobStatusSensor(
         task_id="wait_for_operation_to_end",
-        
job_name="{{task_instance.xcom_pull('create_transfer_job_from_aws')['name']}}",
+        
job_name="{{task_instance.xcom_pull('create_transfer_job_s3_to_gcs')['name']}}",
         project_id=GCP_PROJECT_ID,
         expected_statuses={GcpTransferOperationStatus.SUCCESS},
         poke_interval=WAIT_FOR_OPERATION_POKE_INTERVAL,
     )
     # [END howto_operator_gcp_transfer_wait_operation]
 
+    create_second_transfer_job_from_aws = 
CloudDataTransferServiceCreateJobOperator(
+        task_id="create_transfer_job_s3_to_gcs_2", 
body=aws_to_gcs_transfer_body_2
+    )
+
+    wait_for_operation_to_start_2 = CloudDataTransferServiceJobStatusSensor(
+        task_id="wait_for_operation_to_start_2",
+        
job_name="{{task_instance.xcom_pull('create_transfer_job_s3_to_gcs_2')['name']}}",
+        project_id=GCP_PROJECT_ID,
+        expected_statuses={GcpTransferOperationStatus.IN_PROGRESS},
+        poke_interval=WAIT_FOR_OPERATION_POKE_INTERVAL,
+    )
+
     # [START howto_operator_gcp_transfer_cancel_operation]
     cancel_operation = CloudDataTransferServiceCancelOperationOperator(
         task_id="cancel_operation",
         operation_name="{{task_instance.xcom_pull("
-        "'wait_for_second_operation_to_start', 
key='sensed_operations')[0]['name']}}",
+        "'wait_for_operation_to_start_2', 
key='sensed_operations')[0]['name']}}",
     )
     # [END howto_operator_gcp_transfer_cancel_operation]
 
     # [START howto_operator_gcp_transfer_delete_job]
-    delete_transfer_from_aws_job = CloudDataTransferServiceDeleteJobOperator(
-        task_id="delete_transfer_from_aws_job",
-        
job_name="{{task_instance.xcom_pull('create_transfer_job_from_aws')['name']}}",
+    delete_transfer_job_s3_to_gcs = CloudDataTransferServiceDeleteJobOperator(
+        task_id="delete_transfer_job_s3_to_gcs",
+        
job_name="{{task_instance.xcom_pull('create_transfer_job_s3_to_gcs')['name']}}",
         project_id=GCP_PROJECT_ID,
+        trigger_rule=TriggerRule.ALL_DONE,
     )
     # [END howto_operator_gcp_transfer_delete_job]
 
-    chain(
-        create_transfer_job_from_aws,
-        wait_for_operation_to_start,
-        pause_operation,
-        list_operations,
-        get_operation,
-        resume_operation,
-        wait_for_operation_to_end,
-        cancel_operation,
-        delete_transfer_from_aws_job,
+    delete_transfer_job_s3_to_gcs_2 = 
CloudDataTransferServiceDeleteJobOperator(
+        task_id="delete_transfer_job_s3_to_gcs_2",
+        
job_name="{{task_instance.xcom_pull('create_transfer_job_s3_to_gcs_2')['name']}}",
+        project_id=GCP_PROJECT_ID,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+
+    delete_bucket_s3 = S3DeleteBucketOperator(
+        task_id="delete_bucket_s3",
+        bucket_name=BUCKET_SOURCE_AWS,
+        force_delete=True,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+
+    delete_bucket_gcs = GCSDeleteBucketOperator(
+        task_id="delete_bucket_gcs",
+        bucket_name=BUCKET_TARGET_GCS,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+
+    (
+        # TEST SETUP
+        [create_bucket_s3 >> upload_file_to_s3, create_bucket_gcs]
+        # TEST BODY
+        >> create_transfer_job_s3_to_gcs
+        >> wait_for_operation_to_start
+        >> pause_operation
+        >> list_operations
+        >> get_operation
+        >> resume_operation
+        >> wait_for_operation_to_end
+        >> create_second_transfer_job_from_aws
+        >> wait_for_operation_to_start_2
+        >> cancel_operation
+        # TEST TEARDOWN
+        >> [
+            delete_transfer_job_s3_to_gcs,
+            delete_transfer_job_s3_to_gcs_2,
+            delete_bucket_gcs,
+            delete_bucket_s3,
+        ]
     )
+
+    from tests.system.utils.watcher import watcher
+
+    # This test needs watcher in order to properly mark success/failure
+    # when "tearDown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()
+
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: 
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)

Reply via email to