This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 923f1ef30e Google Cloud Storage to Google Drive Transfer Operators - 
system tests migrations (AIP-47)  (#26772)
923f1ef30e is described below

commit 923f1ef30e8f4c0df2845817b8f96373991ad3ce
Author: Beata Kossakowska <[email protected]>
AuthorDate: Mon Oct 10 06:33:22 2022 +0200

    Google Cloud Storage to Google Drive Transfer Operators - system tests 
migrations (AIP-47)  (#26772)
---
 .../google/cloud/gcs/example_gcs_to_gdrive.py      | 77 +++++++++++++++++++---
 1 file changed, 68 insertions(+), 9 deletions(-)

diff --git a/tests/system/providers/google/cloud/gcs/example_gcs_to_gdrive.py 
b/tests/system/providers/google/cloud/gcs/example_gcs_to_gdrive.py
index f43f2ab9cc..0947ff39db 100644
--- a/tests/system/providers/google/cloud/gcs/example_gcs_to_gdrive.py
+++ b/tests/system/providers/google/cloud/gcs/example_gcs_to_gdrive.py
@@ -17,19 +17,37 @@
 # under the License.
 """
 Example DAG using GoogleCloudStorageToGoogleDriveOperator.
+
+Using this operator requires the following additional scopes:
+https://www.googleapis.com/auth/drive
 """
 from __future__ import annotations
 
 import os
 from datetime import datetime
+from pathlib import Path
 
 from airflow import models
+from airflow.providers.google.cloud.operators.gcs import 
GCSCreateBucketOperator, GCSDeleteBucketOperator
+from airflow.providers.google.cloud.transfers.local_to_gcs import 
LocalFilesystemToGCSOperator
 from airflow.providers.google.suite.transfers.gcs_to_gdrive import 
GCSToGoogleDriveOperator
+from airflow.utils.trigger_rule import TriggerRule
 
 ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
+
 DAG_ID = "example_gcs_to_gdrive"
 
-GCS_TO_GDRIVE_BUCKET = os.environ.get("GCS_TO_DRIVE_BUCKET", "example-object")
+BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
+
+TMP_PATH = "tmp"
+
+CURRENT_FOLDER = Path(__file__).parent
+LOCAL_PATH = str(Path(CURRENT_FOLDER) / "resources")
+
+FILE_LOCAL_PATH = str(Path(LOCAL_PATH))
+FILE_NAME = "example_upload.txt"
+
 
 with models.DAG(
     DAG_ID,
@@ -38,31 +56,72 @@ with models.DAG(
     catchup=False,
     tags=['example', 'gcs'],
 ) as dag:
+    create_bucket = GCSCreateBucketOperator(
+        task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID
+    )
+
+    upload_file = LocalFilesystemToGCSOperator(
+        task_id="upload_file",
+        src=f"{FILE_LOCAL_PATH}/{FILE_NAME}",
+        dst=f"{TMP_PATH}/{FILE_NAME}",
+        bucket=BUCKET_NAME,
+    )
+
+    upload_file_2 = LocalFilesystemToGCSOperator(
+        task_id="upload_fil_2",
+        src=f"{FILE_LOCAL_PATH}/{FILE_NAME}",
+        dst=f"{TMP_PATH}/2_{FILE_NAME}",
+        bucket=BUCKET_NAME,
+    )
     # [START howto_operator_gcs_to_gdrive_copy_single_file]
     copy_single_file = GCSToGoogleDriveOperator(
         task_id="copy_single_file",
-        source_bucket=GCS_TO_GDRIVE_BUCKET,
-        source_object="sales/january.avro",
-        destination_object="copied_sales/january-backup.avro",
+        source_bucket=BUCKET_NAME,
+        source_object=f"{TMP_PATH}/{FILE_NAME}",
+        destination_object=f"copied_tmp/copied_{FILE_NAME}",
     )
     # [END howto_operator_gcs_to_gdrive_copy_single_file]
+
     # [START howto_operator_gcs_to_gdrive_copy_files]
     copy_files = GCSToGoogleDriveOperator(
         task_id="copy_files",
-        source_bucket=GCS_TO_GDRIVE_BUCKET,
-        source_object="sales/*",
-        destination_object="copied_sales/",
+        source_bucket=BUCKET_NAME,
+        source_object=f"{TMP_PATH}/*",
+        destination_object="copied_tmp/",
     )
     # [END howto_operator_gcs_to_gdrive_copy_files]
+
     # [START howto_operator_gcs_to_gdrive_move_files]
     move_files = GCSToGoogleDriveOperator(
         task_id="move_files",
-        source_bucket=GCS_TO_GDRIVE_BUCKET,
-        source_object="sales/*.avro",
+        source_bucket=BUCKET_NAME,
+        source_object=f"{TMP_PATH}/*.txt",
         move_object=True,
     )
     # [END howto_operator_gcs_to_gdrive_move_files]
 
+    delete_bucket = GCSDeleteBucketOperator(
+        task_id="delete_bucket", bucket_name=BUCKET_NAME, 
trigger_rule=TriggerRule.ALL_DONE
+    )
+
+    (
+        # TEST SETUP
+        create_bucket
+        >> upload_file
+        >> upload_file_2
+        # TEST BODY
+        >> copy_single_file
+        >> copy_files
+        >> move_files
+        # TEST TEARDOWN
+        >> delete_bucket
+    )
+
+    from tests.system.utils.watcher import watcher
+
+    # This test needs watcher in order to properly mark success/failure
+    # when "tearDown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()
 
 from tests.system.utils import get_test_run  # noqa: E402
 

Reply via email to