This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 923f1ef30e Google Cloud Storage to Google Drive Transfer Operators -
system tests migrations (AIP-47) (#26772)
923f1ef30e is described below
commit 923f1ef30e8f4c0df2845817b8f96373991ad3ce
Author: Beata Kossakowska <[email protected]>
AuthorDate: Mon Oct 10 06:33:22 2022 +0200
Google Cloud Storage to Google Drive Transfer Operators - system tests
migrations (AIP-47) (#26772)
---
.../google/cloud/gcs/example_gcs_to_gdrive.py | 77 +++++++++++++++++++---
1 file changed, 68 insertions(+), 9 deletions(-)
diff --git a/tests/system/providers/google/cloud/gcs/example_gcs_to_gdrive.py
b/tests/system/providers/google/cloud/gcs/example_gcs_to_gdrive.py
index f43f2ab9cc..0947ff39db 100644
--- a/tests/system/providers/google/cloud/gcs/example_gcs_to_gdrive.py
+++ b/tests/system/providers/google/cloud/gcs/example_gcs_to_gdrive.py
@@ -17,19 +17,37 @@
# under the License.
"""
Example DAG using GoogleCloudStorageToGoogleDriveOperator.
+
+Using this operator requires the following additional scopes:
+https://www.googleapis.com/auth/drive
"""
from __future__ import annotations
import os
from datetime import datetime
+from pathlib import Path
from airflow import models
+from airflow.providers.google.cloud.operators.gcs import
GCSCreateBucketOperator, GCSDeleteBucketOperator
+from airflow.providers.google.cloud.transfers.local_to_gcs import
LocalFilesystemToGCSOperator
from airflow.providers.google.suite.transfers.gcs_to_gdrive import
GCSToGoogleDriveOperator
+from airflow.utils.trigger_rule import TriggerRule
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
+
DAG_ID = "example_gcs_to_gdrive"
-GCS_TO_GDRIVE_BUCKET = os.environ.get("GCS_TO_DRIVE_BUCKET", "example-object")
+BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
+
+TMP_PATH = "tmp"
+
+CURRENT_FOLDER = Path(__file__).parent
+LOCAL_PATH = str(Path(CURRENT_FOLDER) / "resources")
+
+FILE_LOCAL_PATH = str(Path(LOCAL_PATH))
+FILE_NAME = "example_upload.txt"
+
with models.DAG(
DAG_ID,
@@ -38,31 +56,72 @@ with models.DAG(
catchup=False,
tags=['example', 'gcs'],
) as dag:
+ create_bucket = GCSCreateBucketOperator(
+ task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID
+ )
+
+ upload_file = LocalFilesystemToGCSOperator(
+ task_id="upload_file",
+ src=f"{FILE_LOCAL_PATH}/{FILE_NAME}",
+ dst=f"{TMP_PATH}/{FILE_NAME}",
+ bucket=BUCKET_NAME,
+ )
+
+ upload_file_2 = LocalFilesystemToGCSOperator(
+ task_id="upload_fil_2",
+ src=f"{FILE_LOCAL_PATH}/{FILE_NAME}",
+ dst=f"{TMP_PATH}/2_{FILE_NAME}",
+ bucket=BUCKET_NAME,
+ )
# [START howto_operator_gcs_to_gdrive_copy_single_file]
copy_single_file = GCSToGoogleDriveOperator(
task_id="copy_single_file",
- source_bucket=GCS_TO_GDRIVE_BUCKET,
- source_object="sales/january.avro",
- destination_object="copied_sales/january-backup.avro",
+ source_bucket=BUCKET_NAME,
+ source_object=f"{TMP_PATH}/{FILE_NAME}",
+ destination_object=f"copied_tmp/copied_{FILE_NAME}",
)
# [END howto_operator_gcs_to_gdrive_copy_single_file]
+
# [START howto_operator_gcs_to_gdrive_copy_files]
copy_files = GCSToGoogleDriveOperator(
task_id="copy_files",
- source_bucket=GCS_TO_GDRIVE_BUCKET,
- source_object="sales/*",
- destination_object="copied_sales/",
+ source_bucket=BUCKET_NAME,
+ source_object=f"{TMP_PATH}/*",
+ destination_object="copied_tmp/",
)
# [END howto_operator_gcs_to_gdrive_copy_files]
+
# [START howto_operator_gcs_to_gdrive_move_files]
move_files = GCSToGoogleDriveOperator(
task_id="move_files",
- source_bucket=GCS_TO_GDRIVE_BUCKET,
- source_object="sales/*.avro",
+ source_bucket=BUCKET_NAME,
+ source_object=f"{TMP_PATH}/*.txt",
move_object=True,
)
# [END howto_operator_gcs_to_gdrive_move_files]
+ delete_bucket = GCSDeleteBucketOperator(
+ task_id="delete_bucket", bucket_name=BUCKET_NAME,
trigger_rule=TriggerRule.ALL_DONE
+ )
+
+ (
+ # TEST SETUP
+ create_bucket
+ >> upload_file
+ >> upload_file_2
+ # TEST BODY
+ >> copy_single_file
+ >> copy_files
+ >> move_files
+ # TEST TEARDOWN
+ >> delete_bucket
+ )
+
+ from tests.system.utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "tearDown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
from tests.system.utils import get_test_run # noqa: E402