This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 843a3b8c3e Fix system test example_cloud_storage_transfer_service_aws
(#33429)
843a3b8c3e is described below
commit 843a3b8c3ecca9582a269cf780144738c9c45d15
Author: max <[email protected]>
AuthorDate: Wed Aug 16 12:27:28 2023 +0000
Fix system test example_cloud_storage_transfer_service_aws (#33429)
---
.../cloud/cloud_storage_transfer_service.rst | 18 +--
.../example_cloud_storage_transfer_service_aws.py | 168 +++++++++++++++------
2 files changed, 128 insertions(+), 58 deletions(-)
diff --git
a/docs/apache-airflow-providers-google/operators/cloud/cloud_storage_transfer_service.rst
b/docs/apache-airflow-providers-google/operators/cloud/cloud_storage_transfer_service.rst
index 4c7309cf4c..bd6a3fde3c 100644
---
a/docs/apache-airflow-providers-google/operators/cloud/cloud_storage_transfer_service.rst
+++
b/docs/apache-airflow-providers-google/operators/cloud/cloud_storage_transfer_service.rst
@@ -67,12 +67,12 @@ Using the operator
:start-after: [START howto_operator_gcp_transfer_create_job_body_gcp]
:end-before: [END howto_operator_gcp_transfer_create_job_body_gcp]
-.. exampleinclude::
/../../airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_aws.py
+.. exampleinclude::
/../../tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py
:language: python
:start-after: [START howto_operator_gcp_transfer_create_job_body_aws]
:end-before: [END howto_operator_gcp_transfer_create_job_body_aws]
-.. exampleinclude::
/../../airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_aws.py
+.. exampleinclude::
/../../tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_transfer_create_job]
@@ -107,7 +107,7 @@ For parameter definition, take a look at
Using the operator
""""""""""""""""""
-.. exampleinclude::
/../../airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_aws.py
+.. exampleinclude::
/../../tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_transfer_delete_job]
@@ -181,7 +181,7 @@ For parameter definition, take a look at
Using the operator
""""""""""""""""""
-.. exampleinclude::
/../../airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_aws.py
+.. exampleinclude::
/../../tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_transfer_cancel_operation]
@@ -217,7 +217,7 @@ For parameter definition, take a look at
Using the operator
""""""""""""""""""
-.. exampleinclude::
/../../airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_aws.py
+.. exampleinclude::
/../../tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_transfer_get_operation]
@@ -252,7 +252,7 @@ For parameter definition, take a look at
Using the operator
""""""""""""""""""
-.. exampleinclude::
/../../airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_aws.py
+.. exampleinclude::
/../../tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_transfer_list_operations]
@@ -286,7 +286,7 @@ For parameter definition, take a look at
Using the operator
""""""""""""""""""
-.. exampleinclude::
/../../airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_aws.py
+.. exampleinclude::
/../../tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_transfer_pause_operation]
@@ -320,7 +320,7 @@ For parameter definition, take a look at
Using the operator
""""""""""""""""""
-.. exampleinclude::
/../../airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_aws.py
+.. exampleinclude::
/../../tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_transfer_resume_operation]
@@ -355,7 +355,7 @@ For parameter definition, take a look at
Using the operator
""""""""""""""""""
-.. exampleinclude::
/../../airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_aws.py
+.. exampleinclude::
/../../tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_transfer_wait_operation]
diff --git
a/airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_aws.py
b/tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py
similarity index 54%
rename from
airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_aws.py
rename to
tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py
index 6a5f93bffc..9855f1ceb5 100644
---
a/airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_aws.py
+++
b/tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py
@@ -16,29 +16,18 @@
# specific language governing permissions and limitations
# under the License.
"""
-Example Airflow DAG that demonstrates interactions with Google Cloud Transfer.
This DAG relies on
-the following OS environment variables
-
-Note that you need to provide a large enough set of data so that operations do
not execute too quickly.
-Otherwise, DAG will fail.
-
-* GCP_PROJECT_ID - Google Cloud Project to use for the Google Cloud Transfer
Service.
-* GCP_DESCRIPTION - Description of transfer job
-* GCP_TRANSFER_SOURCE_AWS_BUCKET - Amazon Web Services Storage bucket from
which files are copied.
-* GCP_TRANSFER_SECOND_TARGET_BUCKET - Google Cloud Storage bucket to which
files are copied
-* WAIT_FOR_OPERATION_POKE_INTERVAL - interval of what to check the status of
the operation
- A smaller value than the default value accelerates the system test and
ensures its correct execution with
- smaller quantities of files in the source bucket
- Look at documentation of
:class:`~airflow.operators.sensors.BaseSensorOperator` for more information
-
+Example Airflow DAG that demonstrates interactions with Google Cloud Transfer.
"""
from __future__ import annotations
import os
from datetime import datetime, timedelta
+from pydantic.main import deepcopy
+
from airflow import models
-from airflow.models.baseoperator import chain
+from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator,
S3DeleteBucketOperator
+from airflow.providers.amazon.aws.transfers.gcs_to_s3 import GCSToS3Operator
from airflow.providers.google.cloud.hooks.cloud_storage_transfer_service
import (
ALREADY_EXISTING_IN_SINK,
AWS_S3_DATA_SOURCE,
@@ -68,21 +57,26 @@ from
airflow.providers.google.cloud.operators.cloud_storage_transfer_service imp
CloudDataTransferServicePauseOperationOperator,
CloudDataTransferServiceResumeOperationOperator,
)
+from airflow.providers.google.cloud.operators.gcs import
GCSCreateBucketOperator, GCSDeleteBucketOperator
from airflow.providers.google.cloud.sensors.cloud_storage_transfer_service
import (
CloudDataTransferServiceJobStatusSensor,
)
+from airflow.utils.trigger_rule import TriggerRule
-GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
-GCP_DESCRIPTION = os.environ.get("GCP_DESCRIPTION", "description")
-GCP_TRANSFER_TARGET_BUCKET = os.environ.get("GCP_TRANSFER_TARGET_BUCKET")
-WAIT_FOR_OPERATION_POKE_INTERVAL =
int(os.environ.get("WAIT_FOR_OPERATION_POKE_INTERVAL", 5))
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+GCP_PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
-GCP_TRANSFER_SOURCE_AWS_BUCKET =
os.environ.get("GCP_TRANSFER_SOURCE_AWS_BUCKET")
-GCP_TRANSFER_FIRST_TARGET_BUCKET = os.environ.get(
- "GCP_TRANSFER_FIRST_TARGET_BUCKET", "gcp-transfer-first-target"
-)
+DAG_ID = "example_gcp_transfer_aws"
+
+EXAMPLE_BUCKET = "airflow-system-tests-resources"
+EXAMPLE_FILE = "storage-transfer/big_file.dat"
+BUCKET_SOURCE_AWS = f"bucket-aws-{DAG_ID}-{ENV_ID}".replace("_", "-")
+BUCKET_TARGET_GCS = f"bucket-gcs-{DAG_ID}-{ENV_ID}".replace("_", "-")
+WAIT_FOR_OPERATION_POKE_INTERVAL =
int(os.environ.get("WAIT_FOR_OPERATION_POKE_INTERVAL", 5))
-GCP_TRANSFER_JOB_NAME = os.environ.get("GCP_TRANSFER_JOB_NAME",
"transferJobs/sampleJob")
+GCP_DESCRIPTION = "description"
+GCP_TRANSFER_JOB_NAME =
f"transferJobs/sampleJob-{DAG_ID}-{ENV_ID}".replace("-", "_")
+GCP_TRANSFER_JOB_2_NAME =
f"transferJobs/sampleJob2-{DAG_ID}-{ENV_ID}".replace("-", "_")
# [START howto_operator_gcp_transfer_create_job_body_aws]
aws_to_gcs_transfer_body = {
@@ -93,33 +87,53 @@ aws_to_gcs_transfer_body = {
SCHEDULE: {
SCHEDULE_START_DATE: datetime(2015, 1, 1).date(),
SCHEDULE_END_DATE: datetime(2030, 1, 1).date(),
- START_TIME_OF_DAY: (datetime.utcnow() + timedelta(minutes=2)).time(),
+ START_TIME_OF_DAY: (datetime.utcnow() + timedelta(minutes=1)).time(),
},
TRANSFER_SPEC: {
- AWS_S3_DATA_SOURCE: {BUCKET_NAME: GCP_TRANSFER_SOURCE_AWS_BUCKET},
- GCS_DATA_SINK: {BUCKET_NAME: GCP_TRANSFER_FIRST_TARGET_BUCKET},
+ AWS_S3_DATA_SOURCE: {BUCKET_NAME: BUCKET_SOURCE_AWS},
+ GCS_DATA_SINK: {BUCKET_NAME: BUCKET_TARGET_GCS},
TRANSFER_OPTIONS: {ALREADY_EXISTING_IN_SINK: True},
},
}
# [END howto_operator_gcp_transfer_create_job_body_aws]
+aws_to_gcs_transfer_body_2 = deepcopy(aws_to_gcs_transfer_body)
+aws_to_gcs_transfer_body_2[JOB_NAME] = GCP_TRANSFER_JOB_2_NAME
with models.DAG(
- "example_gcp_transfer_aws",
+ dag_id=DAG_ID,
start_date=datetime(2021, 1, 1),
catchup=False,
- tags=["example"],
+ tags=["example", "aws", "gcs", "transfer"],
) as dag:
+ create_bucket_s3 = S3CreateBucketOperator(
+ task_id="create_bucket_s3", bucket_name=BUCKET_SOURCE_AWS,
region_name="us-east-1"
+ )
+
+ upload_file_to_s3 = GCSToS3Operator(
+ task_id="upload_file_to_s3",
+ gcp_user_project=GCP_PROJECT_ID,
+ bucket=EXAMPLE_BUCKET,
+ prefix=EXAMPLE_FILE,
+ dest_s3_key=f"s3://{BUCKET_SOURCE_AWS}",
+ replace=True,
+ )
+ #
+ create_bucket_gcs = GCSCreateBucketOperator(
+ task_id="create_bucket_gcs",
+ bucket_name=BUCKET_TARGET_GCS,
+ project_id=GCP_PROJECT_ID,
+ )
# [START howto_operator_gcp_transfer_create_job]
- create_transfer_job_from_aws = CloudDataTransferServiceCreateJobOperator(
- task_id="create_transfer_job_from_aws", body=aws_to_gcs_transfer_body
+ create_transfer_job_s3_to_gcs = CloudDataTransferServiceCreateJobOperator(
+ task_id="create_transfer_job_s3_to_gcs", body=aws_to_gcs_transfer_body
)
# [END howto_operator_gcp_transfer_create_job]
wait_for_operation_to_start = CloudDataTransferServiceJobStatusSensor(
task_id="wait_for_operation_to_start",
-
job_name="{{task_instance.xcom_pull('create_transfer_job_from_aws')['name']}}",
+
job_name="{{task_instance.xcom_pull('create_transfer_job_s3_to_gcs')['name']}}",
project_id=GCP_PROJECT_ID,
expected_statuses={GcpTransferOperationStatus.IN_PROGRESS},
poke_interval=WAIT_FOR_OPERATION_POKE_INTERVAL,
@@ -138,7 +152,7 @@ with models.DAG(
task_id="list_operations",
request_filter={
FILTER_PROJECT_ID: GCP_PROJECT_ID,
- FILTER_JOB_NAMES:
["{{task_instance.xcom_pull('create_transfer_job_from_aws')['name']}}"],
+ FILTER_JOB_NAMES:
["{{task_instance.xcom_pull('create_transfer_job_s3_to_gcs')['name']}}"],
},
)
# [END howto_operator_gcp_transfer_list_operations]
@@ -158,37 +172,93 @@ with models.DAG(
# [START howto_operator_gcp_transfer_wait_operation]
wait_for_operation_to_end = CloudDataTransferServiceJobStatusSensor(
task_id="wait_for_operation_to_end",
-
job_name="{{task_instance.xcom_pull('create_transfer_job_from_aws')['name']}}",
+
job_name="{{task_instance.xcom_pull('create_transfer_job_s3_to_gcs')['name']}}",
project_id=GCP_PROJECT_ID,
expected_statuses={GcpTransferOperationStatus.SUCCESS},
poke_interval=WAIT_FOR_OPERATION_POKE_INTERVAL,
)
# [END howto_operator_gcp_transfer_wait_operation]
+ create_second_transfer_job_from_aws =
CloudDataTransferServiceCreateJobOperator(
+ task_id="create_transfer_job_s3_to_gcs_2",
body=aws_to_gcs_transfer_body_2
+ )
+
+ wait_for_operation_to_start_2 = CloudDataTransferServiceJobStatusSensor(
+ task_id="wait_for_operation_to_start_2",
+
job_name="{{task_instance.xcom_pull('create_transfer_job_s3_to_gcs_2')['name']}}",
+ project_id=GCP_PROJECT_ID,
+ expected_statuses={GcpTransferOperationStatus.IN_PROGRESS},
+ poke_interval=WAIT_FOR_OPERATION_POKE_INTERVAL,
+ )
+
# [START howto_operator_gcp_transfer_cancel_operation]
cancel_operation = CloudDataTransferServiceCancelOperationOperator(
task_id="cancel_operation",
operation_name="{{task_instance.xcom_pull("
- "'wait_for_second_operation_to_start',
key='sensed_operations')[0]['name']}}",
+ "'wait_for_operation_to_start_2',
key='sensed_operations')[0]['name']}}",
)
# [END howto_operator_gcp_transfer_cancel_operation]
# [START howto_operator_gcp_transfer_delete_job]
- delete_transfer_from_aws_job = CloudDataTransferServiceDeleteJobOperator(
- task_id="delete_transfer_from_aws_job",
-
job_name="{{task_instance.xcom_pull('create_transfer_job_from_aws')['name']}}",
+ delete_transfer_job_s3_to_gcs = CloudDataTransferServiceDeleteJobOperator(
+ task_id="delete_transfer_job_s3_to_gcs",
+
job_name="{{task_instance.xcom_pull('create_transfer_job_s3_to_gcs')['name']}}",
project_id=GCP_PROJECT_ID,
+ trigger_rule=TriggerRule.ALL_DONE,
)
# [END howto_operator_gcp_transfer_delete_job]
- chain(
- create_transfer_job_from_aws,
- wait_for_operation_to_start,
- pause_operation,
- list_operations,
- get_operation,
- resume_operation,
- wait_for_operation_to_end,
- cancel_operation,
- delete_transfer_from_aws_job,
+ delete_transfer_job_s3_to_gcs_2 =
CloudDataTransferServiceDeleteJobOperator(
+ task_id="delete_transfer_job_s3_to_gcs_2",
+
job_name="{{task_instance.xcom_pull('create_transfer_job_s3_to_gcs_2')['name']}}",
+ project_id=GCP_PROJECT_ID,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+
+ delete_bucket_s3 = S3DeleteBucketOperator(
+ task_id="delete_bucket_s3",
+ bucket_name=BUCKET_SOURCE_AWS,
+ force_delete=True,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+
+ delete_bucket_gcs = GCSDeleteBucketOperator(
+ task_id="delete_bucket_gcs",
+ bucket_name=BUCKET_TARGET_GCS,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+
+ (
+ # TEST SETUP
+ [create_bucket_s3 >> upload_file_to_s3, create_bucket_gcs]
+ # TEST BODY
+ >> create_transfer_job_s3_to_gcs
+ >> wait_for_operation_to_start
+ >> pause_operation
+ >> list_operations
+ >> get_operation
+ >> resume_operation
+ >> wait_for_operation_to_end
+ >> create_second_transfer_job_from_aws
+ >> wait_for_operation_to_start_2
+ >> cancel_operation
+ # TEST TEARDOWN
+ >> [
+ delete_transfer_job_s3_to_gcs,
+ delete_transfer_job_s3_to_gcs_2,
+ delete_bucket_gcs,
+ delete_bucket_s3,
+ ]
)
+
+ from tests.system.utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "tearDown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
+
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see:
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)