This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 82e6226738 Fix Datafusion system tests (#32749)
82e6226738 is described below

commit 82e6226738bcf7f3981c5b8db714a849c26a6962
Author: VladaZakharova <[email protected]>
AuthorDate: Sun Jul 23 07:55:23 2023 +0200

    Fix Datafusion system tests (#32749)
---
 airflow/providers/google/cloud/hooks/datafusion.py | 23 ++++++--
 .../google/cloud/datafusion/example_datafusion.py  | 60 ++++++++++---------
 .../cloud/datafusion/example_datafusion_async.py   | 68 ++++++++++++----------
 3 files changed, 87 insertions(+), 64 deletions(-)

diff --git a/airflow/providers/google/cloud/hooks/datafusion.py 
b/airflow/providers/google/cloud/hooks/datafusion.py
index f74958cc16..770e3da391 100644
--- a/airflow/providers/google/cloud/hooks/datafusion.py
+++ b/airflow/providers/google/cloud/hooks/datafusion.py
@@ -319,7 +319,7 @@ class DataFusionHook(GoogleBaseHook):
         namespace: str = "default",
     ) -> None:
         """
-        Creates a Cloud Data Fusion pipeline.
+        Creates a batch Cloud Data Fusion pipeline.
 
         :param pipeline_name: Your pipeline name.
         :param pipeline: The pipeline definition. For more information check:
@@ -343,12 +343,12 @@ class DataFusionHook(GoogleBaseHook):
         namespace: str = "default",
     ) -> None:
         """
-        Deletes a Cloud Data Fusion pipeline.
+        Deletes a batch Cloud Data Fusion pipeline.
 
         :param pipeline_name: Your pipeline name.
         :param version_id: Version of pipeline to delete
         :param instance_url: Endpoint on which the REST APIs is accessible for 
the instance.
-        :param namespace: f your pipeline belongs to a Basic edition instance, 
the namespace ID
+        :param namespace: if your pipeline belongs to a Basic edition 
instance, the namespace ID
             is always default. If your pipeline belongs to an Enterprise 
edition instance, you
             can create a namespace.
         """
@@ -357,9 +357,20 @@ class DataFusionHook(GoogleBaseHook):
             url = os.path.join(url, "versions", version_id)
 
         response = self._cdap_request(url=url, method="DELETE", body=None)
-        self._check_response_status_and_data(
-            response, f"Deleting a pipeline failed with code {response.status}"
-        )
+        # Check for 409 error: the previous step for starting/stopping 
pipeline could still be in progress.
+        # Waiting some time before retry.
+        for time_to_wait in exponential_sleep_generator(initial=10, 
maximum=120):
+            try:
+                self._check_response_status_and_data(
+                    response, f"Deleting a pipeline failed with code 
{response.status}: {response.data}"
+                )
+                break
+            except AirflowException as exc:
+                if "409" in str(exc):
+                    sleep(time_to_wait)
+                    response = self._cdap_request(url=url, method="DELETE", 
body=None)
+                else:
+                    raise
 
     def list_pipelines(
         self,
diff --git 
a/tests/system/providers/google/cloud/datafusion/example_datafusion.py 
b/tests/system/providers/google/cloud/datafusion/example_datafusion.py
index 4ff221c928..4d3bb7418d 100644
--- a/tests/system/providers/google/cloud/datafusion/example_datafusion.py
+++ b/tests/system/providers/google/cloud/datafusion/example_datafusion.py
@@ -23,7 +23,6 @@ import os
 from datetime import datetime
 
 from airflow import models
-from airflow.operators.bash import BashOperator
 from airflow.providers.google.cloud.operators.datafusion import (
     CloudDataFusionCreateInstanceOperator,
     CloudDataFusionCreatePipelineOperator,
@@ -36,34 +35,36 @@ from airflow.providers.google.cloud.operators.datafusion 
import (
     CloudDataFusionStopPipelineOperator,
     CloudDataFusionUpdateInstanceOperator,
 )
-from airflow.providers.google.cloud.operators.gcs import 
GCSCreateBucketOperator
+from airflow.providers.google.cloud.operators.gcs import 
GCSCreateBucketOperator, GCSDeleteBucketOperator
 from airflow.providers.google.cloud.sensors.datafusion import 
CloudDataFusionPipelineStateSensor
+from airflow.utils.trigger_rule import TriggerRule
 
 # [START howto_data_fusion_env_variables]
 SERVICE_ACCOUNT = os.environ.get("GCP_DATAFUSION_SERVICE_ACCOUNT")
 PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
 LOCATION = "europe-north1"
-INSTANCE_NAME = "airflow-test-instance"
+DAG_ID = "example_data_fusion"
+INSTANCE_NAME = "test-instance"
 INSTANCE = {
     "type": "BASIC",
     "displayName": INSTANCE_NAME,
     "dataprocServiceAccount": SERVICE_ACCOUNT,
 }
 
-BUCKET_1 = "test-datafusion-1"
-BUCKET_2 = "test-datafusion-2"
-BUCKET_1_URI = f"gs://{BUCKET_1}"
-BUCKET_2_URI = f"gs://{BUCKET_2}"
+BUCKET_NAME_1 = "test-datafusion-1"
+BUCKET_NAME_2 = "test-datafusion-2"
+BUCKET_NAME_1_URI = f"gs://{BUCKET_NAME_1}"
+BUCKET_NAME_2_URI = f"gs://{BUCKET_NAME_2}"
 
 PIPELINE_NAME = "test-pipe"
 PIPELINE = {
     "artifact": {
         "name": "cdap-data-pipeline",
-        "version": "6.7.2",
+        "version": "6.8.3",
         "scope": "SYSTEM",
     },
     "description": "Data Pipeline Application",
-    "name": "test-pipe",
+    "name": PIPELINE_NAME,
     "config": {
         "resources": {"memoryMB": 2048, "virtualCores": 1},
         "driverResources": {"memoryMB": 2048, "virtualCores": 1},
@@ -80,7 +81,7 @@ PIPELINE = {
                     "name": "GCSFile",
                     "type": "batchsource",
                     "label": "GCS",
-                    "artifact": {"name": "google-cloud", "version": "0.20.3", 
"scope": "SYSTEM"},
+                    "artifact": {"name": "google-cloud", "version": "0.21.2", 
"scope": "SYSTEM"},
                     "properties": {
                         "project": "auto-detect",
                         "format": "text",
@@ -91,7 +92,7 @@ PIPELINE = {
                         "encrypted": "false",
                         "schema": 
'{"type":"record","name":"textfile","fields":[{"name"\
                             
:"offset","type":"long"},{"name":"body","type":"string"}]}',
-                        "path": BUCKET_1_URI,
+                        "path": BUCKET_NAME_1_URI,
                         "referenceName": "foo_bucket",
                         "useConnection": "false",
                         "serviceAccountType": "filePath",
@@ -109,7 +110,7 @@ PIPELINE = {
                     "name": "GCS",
                     "type": "batchsink",
                     "label": "GCS2",
-                    "artifact": {"name": "google-cloud", "version": "0.20.3", 
"scope": "SYSTEM"},
+                    "artifact": {"name": "google-cloud", "version": "0.21.2", 
"scope": "SYSTEM"},
                     "properties": {
                         "project": "auto-detect",
                         "suffix": "yyyy-MM-dd-HH-mm",
@@ -119,7 +120,7 @@ PIPELINE = {
                         "schema": 
'{"type":"record","name":"textfile","fields":[{"name"\
                             
:"offset","type":"long"},{"name":"body","type":"string"}]}',
                         "referenceName": "bar",
-                        "path": BUCKET_2_URI,
+                        "path": BUCKET_NAME_2_URI,
                         "serviceAccountType": "filePath",
                         "contentType": "application/octet-stream",
                     },
@@ -146,19 +147,20 @@ PIPELINE = {
 # [END howto_data_fusion_env_variables]
 
 with models.DAG(
-    "example_data_fusion",
+    DAG_ID,
     start_date=datetime(2021, 1, 1),
     catchup=False,
+    tags=["example", "datafusion"],
 ) as dag:
     create_bucket1 = GCSCreateBucketOperator(
         task_id="create_bucket1",
-        bucket_name=BUCKET_1,
+        bucket_name=BUCKET_NAME_1,
         project_id=PROJECT_ID,
     )
 
     create_bucket2 = GCSCreateBucketOperator(
         task_id="create_bucket2",
-        bucket_name=BUCKET_2,
+        bucket_name=BUCKET_NAME_2,
         project_id=PROJECT_ID,
     )
 
@@ -255,38 +257,44 @@ with models.DAG(
         pipeline_name=PIPELINE_NAME,
         instance_name=INSTANCE_NAME,
         task_id="delete_pipeline",
+        trigger_rule=TriggerRule.ALL_DONE,
     )
     # [END howto_cloud_data_fusion_delete_pipeline]
 
     # [START howto_cloud_data_fusion_delete_instance_operator]
     delete_instance = CloudDataFusionDeleteInstanceOperator(
-        location=LOCATION, instance_name=INSTANCE_NAME, 
task_id="delete_instance"
+        location=LOCATION,
+        instance_name=INSTANCE_NAME,
+        task_id="delete_instance",
+        trigger_rule=TriggerRule.ALL_DONE,
     )
     # [END howto_cloud_data_fusion_delete_instance_operator]
 
-    # Add sleep before creating pipeline
-    sleep = BashOperator(task_id="sleep", bash_command="sleep 60")
-
-    # Add sleep before creating pipeline
-    sleep_30 = BashOperator(task_id="sleep_30", bash_command="sleep 30")
+    delete_bucket1 = GCSDeleteBucketOperator(
+        task_id="delete_bucket1", bucket_name=BUCKET_NAME_1, 
trigger_rule=TriggerRule.ALL_DONE
+    )
+    delete_bucket2 = GCSDeleteBucketOperator(
+        task_id="delete_bucket2", bucket_name=BUCKET_NAME_1, 
trigger_rule=TriggerRule.ALL_DONE
+    )
 
     (
-        create_bucket1
-        >> create_bucket2
+        # TEST SETUP
+        [create_bucket1, create_bucket2]
+        # TEST BODY
         >> create_instance
         >> get_instance
         >> restart_instance
         >> update_instance
-        >> sleep
         >> create_pipeline
         >> list_pipelines
         >> start_pipeline_async
         >> start_pipeline_sensor
         >> start_pipeline
         >> stop_pipeline
-        >> sleep_30
         >> delete_pipeline
         >> delete_instance
+        # TEST TEARDOWN
+        >> [delete_bucket1, delete_bucket2]
     )
 
     from tests.system.utils.watcher import watcher
diff --git 
a/tests/system/providers/google/cloud/datafusion/example_datafusion_async.py 
b/tests/system/providers/google/cloud/datafusion/example_datafusion_async.py
index f8c345c735..43b1241e59 100644
--- a/tests/system/providers/google/cloud/datafusion/example_datafusion_async.py
+++ b/tests/system/providers/google/cloud/datafusion/example_datafusion_async.py
@@ -23,7 +23,6 @@ import os
 from datetime import datetime
 
 from airflow import models
-from airflow.operators.bash import BashOperator
 from airflow.providers.google.cloud.operators.datafusion import (
     CloudDataFusionCreateInstanceOperator,
     CloudDataFusionCreatePipelineOperator,
@@ -36,33 +35,35 @@ from airflow.providers.google.cloud.operators.datafusion 
import (
     CloudDataFusionStopPipelineOperator,
     CloudDataFusionUpdateInstanceOperator,
 )
-from airflow.providers.google.cloud.operators.gcs import 
GCSCreateBucketOperator
+from airflow.providers.google.cloud.operators.gcs import 
GCSCreateBucketOperator, GCSDeleteBucketOperator
+from airflow.utils.trigger_rule import TriggerRule
 
 # [START howto_data_fusion_env_variables]
 SERVICE_ACCOUNT = os.environ.get("GCP_DATAFUSION_SERVICE_ACCOUNT")
 PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
+DAG_ID = "example_data_fusion_async"
 LOCATION = "europe-north1"
-INSTANCE_NAME = "airflow-test-instance"
+INSTANCE_NAME = "test-instance-async"
 INSTANCE = {
     "type": "BASIC",
     "displayName": INSTANCE_NAME,
     "dataprocServiceAccount": SERVICE_ACCOUNT,
 }
 
-BUCKET_1 = "test-datafusion-1"
-BUCKET_2 = "test-datafusion-2"
-BUCKET_1_URI = f"gs://{BUCKET_1}"
-BUCKET_2_URI = f"gs://{BUCKET_2}"
+BUCKET_NAME_1 = "test-datafusion-async-1"
+BUCKET_NAME_2 = "test-datafusion-async-2"
+BUCKET_NAME_1_URI = f"gs://{BUCKET_NAME_1}"
+BUCKET_NAME_2_URI = f"gs://{BUCKET_NAME_2}"
 
 PIPELINE_NAME = "test-pipe"
 PIPELINE = {
     "artifact": {
         "name": "cdap-data-pipeline",
-        "version": "6.7.2",
+        "version": "6.8.3",
         "scope": "SYSTEM",
     },
     "description": "Data Pipeline Application",
-    "name": "test-pipe",
+    "name": PIPELINE_NAME,
     "config": {
         "resources": {"memoryMB": 2048, "virtualCores": 1},
         "driverResources": {"memoryMB": 2048, "virtualCores": 1},
@@ -79,7 +80,7 @@ PIPELINE = {
                     "name": "GCSFile",
                     "type": "batchsource",
                     "label": "GCS",
-                    "artifact": {"name": "google-cloud", "version": "0.20.3", 
"scope": "SYSTEM"},
+                    "artifact": {"name": "google-cloud", "version": "0.21.2", 
"scope": "SYSTEM"},
                     "properties": {
                         "project": "auto-detect",
                         "format": "text",
@@ -90,7 +91,7 @@ PIPELINE = {
                         "encrypted": "false",
                         "schema": 
'{"type":"record","name":"textfile","fields":[{"name"\
                             
:"offset","type":"long"},{"name":"body","type":"string"}]}',
-                        "path": BUCKET_1_URI,
+                        "path": BUCKET_NAME_1_URI,
                         "referenceName": "foo_bucket",
                         "useConnection": "false",
                         "serviceAccountType": "filePath",
@@ -108,7 +109,7 @@ PIPELINE = {
                     "name": "GCS",
                     "type": "batchsink",
                     "label": "GCS2",
-                    "artifact": {"name": "google-cloud", "version": "0.20.3", 
"scope": "SYSTEM"},
+                    "artifact": {"name": "google-cloud", "version": "0.21.2", 
"scope": "SYSTEM"},
                     "properties": {
                         "project": "auto-detect",
                         "suffix": "yyyy-MM-dd-HH-mm",
@@ -118,7 +119,7 @@ PIPELINE = {
                         "schema": 
'{"type":"record","name":"textfile","fields":[{"name"\
                             
:"offset","type":"long"},{"name":"body","type":"string"}]}',
                         "referenceName": "bar",
-                        "path": BUCKET_2_URI,
+                        "path": BUCKET_NAME_2_URI,
                         "serviceAccountType": "filePath",
                         "contentType": "application/octet-stream",
                     },
@@ -145,19 +146,20 @@ PIPELINE = {
 # [END howto_data_fusion_env_variables]
 
 with models.DAG(
-    "example_data_fusion_async",
+    DAG_ID,
     start_date=datetime(2021, 1, 1),
     catchup=False,
+    tags=["example", "datafusion", "deferrable"],
 ) as dag:
     create_bucket1 = GCSCreateBucketOperator(
         task_id="create_bucket1",
-        bucket_name=BUCKET_1,
+        bucket_name=BUCKET_NAME_1,
         project_id=PROJECT_ID,
     )
 
     create_bucket2 = GCSCreateBucketOperator(
         task_id="create_bucket2",
-        bucket_name=BUCKET_2,
+        bucket_name=BUCKET_NAME_2,
         project_id=PROJECT_ID,
     )
 
@@ -209,7 +211,7 @@ with models.DAG(
     # [END howto_cloud_data_fusion_list_pipelines]
 
     # [START howto_cloud_data_fusion_start_pipeline_def]
-    start_pipeline_async = CloudDataFusionStartPipelineOperator(
+    start_pipeline_def = CloudDataFusionStartPipelineOperator(
         location=LOCATION,
         pipeline_name=PIPELINE_NAME,
         instance_name=INSTANCE_NAME,
@@ -233,40 +235,42 @@ with models.DAG(
         pipeline_name=PIPELINE_NAME,
         instance_name=INSTANCE_NAME,
         task_id="delete_pipeline",
+        trigger_rule=TriggerRule.ALL_DONE,
     )
     # [END howto_cloud_data_fusion_delete_pipeline]
 
     # [START howto_cloud_data_fusion_delete_instance_operator]
     delete_instance = CloudDataFusionDeleteInstanceOperator(
-        location=LOCATION, instance_name=INSTANCE_NAME, 
task_id="delete_instance"
+        location=LOCATION,
+        instance_name=INSTANCE_NAME,
+        task_id="delete_instance",
+        trigger_rule=TriggerRule.ALL_DONE,
     )
     # [END howto_cloud_data_fusion_delete_instance_operator]
-    #
-    # Add sleep before creating pipeline
-    sleep_30_1 = BashOperator(task_id="sleep_30_1", bash_command="sleep 30")
-
-    # Add sleep before deleting pipeline
-    sleep_30 = BashOperator(task_id="sleep_30", bash_command="sleep 30")
 
-    # Add sleep before starting pipeline
-    sleep_20 = BashOperator(task_id="sleep_20", bash_command="sleep 40")
+    delete_bucket1 = GCSDeleteBucketOperator(
+        task_id="delete_bucket1", bucket_name=BUCKET_NAME_1, 
trigger_rule=TriggerRule.ALL_DONE
+    )
+    delete_bucket2 = GCSDeleteBucketOperator(
+        task_id="delete_bucket2", bucket_name=BUCKET_NAME_1, 
trigger_rule=TriggerRule.ALL_DONE
+    )
 
     (
-        create_bucket1
-        >> create_bucket2
+        # TEST SETUP
+        [create_bucket1, create_bucket2]
+        # TEST BODY
         >> create_instance
         >> get_instance
         >> restart_instance
         >> update_instance
-        >> sleep_30_1
         >> create_pipeline
         >> list_pipelines
-        >> sleep_20
-        >> start_pipeline_async
+        >> start_pipeline_def
         >> stop_pipeline
-        >> sleep_30
         >> delete_pipeline
         >> delete_instance
+        # TEST TEARDOWN
+        >> [delete_bucket1, delete_bucket2]
     )
 
     from tests.system.utils.watcher import watcher

Reply via email to