This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 82e6226738 Fix Datafusion system tests (#32749)
82e6226738 is described below
commit 82e6226738bcf7f3981c5b8db714a849c26a6962
Author: VladaZakharova <[email protected]>
AuthorDate: Sun Jul 23 07:55:23 2023 +0200
Fix Datafusion system tests (#32749)
---
airflow/providers/google/cloud/hooks/datafusion.py | 23 ++++++--
.../google/cloud/datafusion/example_datafusion.py | 60 ++++++++++---------
.../cloud/datafusion/example_datafusion_async.py | 68 ++++++++++++----------
3 files changed, 87 insertions(+), 64 deletions(-)
diff --git a/airflow/providers/google/cloud/hooks/datafusion.py
b/airflow/providers/google/cloud/hooks/datafusion.py
index f74958cc16..770e3da391 100644
--- a/airflow/providers/google/cloud/hooks/datafusion.py
+++ b/airflow/providers/google/cloud/hooks/datafusion.py
@@ -319,7 +319,7 @@ class DataFusionHook(GoogleBaseHook):
namespace: str = "default",
) -> None:
"""
- Creates a Cloud Data Fusion pipeline.
+ Creates a batch Cloud Data Fusion pipeline.
:param pipeline_name: Your pipeline name.
:param pipeline: The pipeline definition. For more information check:
@@ -343,12 +343,12 @@ class DataFusionHook(GoogleBaseHook):
namespace: str = "default",
) -> None:
"""
- Deletes a Cloud Data Fusion pipeline.
+ Deletes a batch Cloud Data Fusion pipeline.
:param pipeline_name: Your pipeline name.
:param version_id: Version of pipeline to delete
:param instance_url: Endpoint on which the REST APIs is accessible for
the instance.
- :param namespace: f your pipeline belongs to a Basic edition instance,
the namespace ID
+ :param namespace: if your pipeline belongs to a Basic edition
instance, the namespace ID
is always default. If your pipeline belongs to an Enterprise
edition instance, you
can create a namespace.
"""
@@ -357,9 +357,20 @@ class DataFusionHook(GoogleBaseHook):
url = os.path.join(url, "versions", version_id)
response = self._cdap_request(url=url, method="DELETE", body=None)
- self._check_response_status_and_data(
- response, f"Deleting a pipeline failed with code {response.status}"
- )
+ # Check for 409 error: the previous step for starting/stopping
pipeline could still be in progress.
+ # Waiting some time before retry.
+ for time_to_wait in exponential_sleep_generator(initial=10,
maximum=120):
+ try:
+ self._check_response_status_and_data(
+ response, f"Deleting a pipeline failed with code
{response.status}: {response.data}"
+ )
+ break
+ except AirflowException as exc:
+ if "409" in str(exc):
+ sleep(time_to_wait)
+ response = self._cdap_request(url=url, method="DELETE",
body=None)
+ else:
+ raise
def list_pipelines(
self,
diff --git
a/tests/system/providers/google/cloud/datafusion/example_datafusion.py
b/tests/system/providers/google/cloud/datafusion/example_datafusion.py
index 4ff221c928..4d3bb7418d 100644
--- a/tests/system/providers/google/cloud/datafusion/example_datafusion.py
+++ b/tests/system/providers/google/cloud/datafusion/example_datafusion.py
@@ -23,7 +23,6 @@ import os
from datetime import datetime
from airflow import models
-from airflow.operators.bash import BashOperator
from airflow.providers.google.cloud.operators.datafusion import (
CloudDataFusionCreateInstanceOperator,
CloudDataFusionCreatePipelineOperator,
@@ -36,34 +35,36 @@ from airflow.providers.google.cloud.operators.datafusion
import (
CloudDataFusionStopPipelineOperator,
CloudDataFusionUpdateInstanceOperator,
)
-from airflow.providers.google.cloud.operators.gcs import
GCSCreateBucketOperator
+from airflow.providers.google.cloud.operators.gcs import
GCSCreateBucketOperator, GCSDeleteBucketOperator
from airflow.providers.google.cloud.sensors.datafusion import
CloudDataFusionPipelineStateSensor
+from airflow.utils.trigger_rule import TriggerRule
# [START howto_data_fusion_env_variables]
SERVICE_ACCOUNT = os.environ.get("GCP_DATAFUSION_SERVICE_ACCOUNT")
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
LOCATION = "europe-north1"
-INSTANCE_NAME = "airflow-test-instance"
+DAG_ID = "example_data_fusion"
+INSTANCE_NAME = "test-instance"
INSTANCE = {
"type": "BASIC",
"displayName": INSTANCE_NAME,
"dataprocServiceAccount": SERVICE_ACCOUNT,
}
-BUCKET_1 = "test-datafusion-1"
-BUCKET_2 = "test-datafusion-2"
-BUCKET_1_URI = f"gs://{BUCKET_1}"
-BUCKET_2_URI = f"gs://{BUCKET_2}"
+BUCKET_NAME_1 = "test-datafusion-1"
+BUCKET_NAME_2 = "test-datafusion-2"
+BUCKET_NAME_1_URI = f"gs://{BUCKET_NAME_1}"
+BUCKET_NAME_2_URI = f"gs://{BUCKET_NAME_2}"
PIPELINE_NAME = "test-pipe"
PIPELINE = {
"artifact": {
"name": "cdap-data-pipeline",
- "version": "6.7.2",
+ "version": "6.8.3",
"scope": "SYSTEM",
},
"description": "Data Pipeline Application",
- "name": "test-pipe",
+ "name": PIPELINE_NAME,
"config": {
"resources": {"memoryMB": 2048, "virtualCores": 1},
"driverResources": {"memoryMB": 2048, "virtualCores": 1},
@@ -80,7 +81,7 @@ PIPELINE = {
"name": "GCSFile",
"type": "batchsource",
"label": "GCS",
- "artifact": {"name": "google-cloud", "version": "0.20.3",
"scope": "SYSTEM"},
+ "artifact": {"name": "google-cloud", "version": "0.21.2",
"scope": "SYSTEM"},
"properties": {
"project": "auto-detect",
"format": "text",
@@ -91,7 +92,7 @@ PIPELINE = {
"encrypted": "false",
"schema":
'{"type":"record","name":"textfile","fields":[{"name"\
:"offset","type":"long"},{"name":"body","type":"string"}]}',
- "path": BUCKET_1_URI,
+ "path": BUCKET_NAME_1_URI,
"referenceName": "foo_bucket",
"useConnection": "false",
"serviceAccountType": "filePath",
@@ -109,7 +110,7 @@ PIPELINE = {
"name": "GCS",
"type": "batchsink",
"label": "GCS2",
- "artifact": {"name": "google-cloud", "version": "0.20.3",
"scope": "SYSTEM"},
+ "artifact": {"name": "google-cloud", "version": "0.21.2",
"scope": "SYSTEM"},
"properties": {
"project": "auto-detect",
"suffix": "yyyy-MM-dd-HH-mm",
@@ -119,7 +120,7 @@ PIPELINE = {
"schema":
'{"type":"record","name":"textfile","fields":[{"name"\
:"offset","type":"long"},{"name":"body","type":"string"}]}',
"referenceName": "bar",
- "path": BUCKET_2_URI,
+ "path": BUCKET_NAME_2_URI,
"serviceAccountType": "filePath",
"contentType": "application/octet-stream",
},
@@ -146,19 +147,20 @@ PIPELINE = {
# [END howto_data_fusion_env_variables]
with models.DAG(
- "example_data_fusion",
+ DAG_ID,
start_date=datetime(2021, 1, 1),
catchup=False,
+ tags=["example", "datafusion"],
) as dag:
create_bucket1 = GCSCreateBucketOperator(
task_id="create_bucket1",
- bucket_name=BUCKET_1,
+ bucket_name=BUCKET_NAME_1,
project_id=PROJECT_ID,
)
create_bucket2 = GCSCreateBucketOperator(
task_id="create_bucket2",
- bucket_name=BUCKET_2,
+ bucket_name=BUCKET_NAME_2,
project_id=PROJECT_ID,
)
@@ -255,38 +257,44 @@ with models.DAG(
pipeline_name=PIPELINE_NAME,
instance_name=INSTANCE_NAME,
task_id="delete_pipeline",
+ trigger_rule=TriggerRule.ALL_DONE,
)
# [END howto_cloud_data_fusion_delete_pipeline]
# [START howto_cloud_data_fusion_delete_instance_operator]
delete_instance = CloudDataFusionDeleteInstanceOperator(
- location=LOCATION, instance_name=INSTANCE_NAME,
task_id="delete_instance"
+ location=LOCATION,
+ instance_name=INSTANCE_NAME,
+ task_id="delete_instance",
+ trigger_rule=TriggerRule.ALL_DONE,
)
# [END howto_cloud_data_fusion_delete_instance_operator]
- # Add sleep before creating pipeline
- sleep = BashOperator(task_id="sleep", bash_command="sleep 60")
-
- # Add sleep before creating pipeline
- sleep_30 = BashOperator(task_id="sleep_30", bash_command="sleep 30")
+ delete_bucket1 = GCSDeleteBucketOperator(
+ task_id="delete_bucket1", bucket_name=BUCKET_NAME_1,
trigger_rule=TriggerRule.ALL_DONE
+ )
+ delete_bucket2 = GCSDeleteBucketOperator(
+ task_id="delete_bucket2", bucket_name=BUCKET_NAME_1,
trigger_rule=TriggerRule.ALL_DONE
+ )
(
- create_bucket1
- >> create_bucket2
+ # TEST SETUP
+ [create_bucket1, create_bucket2]
+ # TEST BODY
>> create_instance
>> get_instance
>> restart_instance
>> update_instance
- >> sleep
>> create_pipeline
>> list_pipelines
>> start_pipeline_async
>> start_pipeline_sensor
>> start_pipeline
>> stop_pipeline
- >> sleep_30
>> delete_pipeline
>> delete_instance
+ # TEST TEARDOWN
+ >> [delete_bucket1, delete_bucket2]
)
from tests.system.utils.watcher import watcher
diff --git
a/tests/system/providers/google/cloud/datafusion/example_datafusion_async.py
b/tests/system/providers/google/cloud/datafusion/example_datafusion_async.py
index f8c345c735..43b1241e59 100644
--- a/tests/system/providers/google/cloud/datafusion/example_datafusion_async.py
+++ b/tests/system/providers/google/cloud/datafusion/example_datafusion_async.py
@@ -23,7 +23,6 @@ import os
from datetime import datetime
from airflow import models
-from airflow.operators.bash import BashOperator
from airflow.providers.google.cloud.operators.datafusion import (
CloudDataFusionCreateInstanceOperator,
CloudDataFusionCreatePipelineOperator,
@@ -36,33 +35,35 @@ from airflow.providers.google.cloud.operators.datafusion
import (
CloudDataFusionStopPipelineOperator,
CloudDataFusionUpdateInstanceOperator,
)
-from airflow.providers.google.cloud.operators.gcs import
GCSCreateBucketOperator
+from airflow.providers.google.cloud.operators.gcs import
GCSCreateBucketOperator, GCSDeleteBucketOperator
+from airflow.utils.trigger_rule import TriggerRule
# [START howto_data_fusion_env_variables]
SERVICE_ACCOUNT = os.environ.get("GCP_DATAFUSION_SERVICE_ACCOUNT")
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
+DAG_ID = "example_data_fusion_async"
LOCATION = "europe-north1"
-INSTANCE_NAME = "airflow-test-instance"
+INSTANCE_NAME = "test-instance-async"
INSTANCE = {
"type": "BASIC",
"displayName": INSTANCE_NAME,
"dataprocServiceAccount": SERVICE_ACCOUNT,
}
-BUCKET_1 = "test-datafusion-1"
-BUCKET_2 = "test-datafusion-2"
-BUCKET_1_URI = f"gs://{BUCKET_1}"
-BUCKET_2_URI = f"gs://{BUCKET_2}"
+BUCKET_NAME_1 = "test-datafusion-async-1"
+BUCKET_NAME_2 = "test-datafusion-async-2"
+BUCKET_NAME_1_URI = f"gs://{BUCKET_NAME_1}"
+BUCKET_NAME_2_URI = f"gs://{BUCKET_NAME_2}"
PIPELINE_NAME = "test-pipe"
PIPELINE = {
"artifact": {
"name": "cdap-data-pipeline",
- "version": "6.7.2",
+ "version": "6.8.3",
"scope": "SYSTEM",
},
"description": "Data Pipeline Application",
- "name": "test-pipe",
+ "name": PIPELINE_NAME,
"config": {
"resources": {"memoryMB": 2048, "virtualCores": 1},
"driverResources": {"memoryMB": 2048, "virtualCores": 1},
@@ -79,7 +80,7 @@ PIPELINE = {
"name": "GCSFile",
"type": "batchsource",
"label": "GCS",
- "artifact": {"name": "google-cloud", "version": "0.20.3",
"scope": "SYSTEM"},
+ "artifact": {"name": "google-cloud", "version": "0.21.2",
"scope": "SYSTEM"},
"properties": {
"project": "auto-detect",
"format": "text",
@@ -90,7 +91,7 @@ PIPELINE = {
"encrypted": "false",
"schema":
'{"type":"record","name":"textfile","fields":[{"name"\
:"offset","type":"long"},{"name":"body","type":"string"}]}',
- "path": BUCKET_1_URI,
+ "path": BUCKET_NAME_1_URI,
"referenceName": "foo_bucket",
"useConnection": "false",
"serviceAccountType": "filePath",
@@ -108,7 +109,7 @@ PIPELINE = {
"name": "GCS",
"type": "batchsink",
"label": "GCS2",
- "artifact": {"name": "google-cloud", "version": "0.20.3",
"scope": "SYSTEM"},
+ "artifact": {"name": "google-cloud", "version": "0.21.2",
"scope": "SYSTEM"},
"properties": {
"project": "auto-detect",
"suffix": "yyyy-MM-dd-HH-mm",
@@ -118,7 +119,7 @@ PIPELINE = {
"schema":
'{"type":"record","name":"textfile","fields":[{"name"\
:"offset","type":"long"},{"name":"body","type":"string"}]}',
"referenceName": "bar",
- "path": BUCKET_2_URI,
+ "path": BUCKET_NAME_2_URI,
"serviceAccountType": "filePath",
"contentType": "application/octet-stream",
},
@@ -145,19 +146,20 @@ PIPELINE = {
# [END howto_data_fusion_env_variables]
with models.DAG(
- "example_data_fusion_async",
+ DAG_ID,
start_date=datetime(2021, 1, 1),
catchup=False,
+ tags=["example", "datafusion", "deferrable"],
) as dag:
create_bucket1 = GCSCreateBucketOperator(
task_id="create_bucket1",
- bucket_name=BUCKET_1,
+ bucket_name=BUCKET_NAME_1,
project_id=PROJECT_ID,
)
create_bucket2 = GCSCreateBucketOperator(
task_id="create_bucket2",
- bucket_name=BUCKET_2,
+ bucket_name=BUCKET_NAME_2,
project_id=PROJECT_ID,
)
@@ -209,7 +211,7 @@ with models.DAG(
# [END howto_cloud_data_fusion_list_pipelines]
# [START howto_cloud_data_fusion_start_pipeline_def]
- start_pipeline_async = CloudDataFusionStartPipelineOperator(
+ start_pipeline_def = CloudDataFusionStartPipelineOperator(
location=LOCATION,
pipeline_name=PIPELINE_NAME,
instance_name=INSTANCE_NAME,
@@ -233,40 +235,42 @@ with models.DAG(
pipeline_name=PIPELINE_NAME,
instance_name=INSTANCE_NAME,
task_id="delete_pipeline",
+ trigger_rule=TriggerRule.ALL_DONE,
)
# [END howto_cloud_data_fusion_delete_pipeline]
# [START howto_cloud_data_fusion_delete_instance_operator]
delete_instance = CloudDataFusionDeleteInstanceOperator(
- location=LOCATION, instance_name=INSTANCE_NAME,
task_id="delete_instance"
+ location=LOCATION,
+ instance_name=INSTANCE_NAME,
+ task_id="delete_instance",
+ trigger_rule=TriggerRule.ALL_DONE,
)
# [END howto_cloud_data_fusion_delete_instance_operator]
- #
- # Add sleep before creating pipeline
- sleep_30_1 = BashOperator(task_id="sleep_30_1", bash_command="sleep 30")
-
- # Add sleep before deleting pipeline
- sleep_30 = BashOperator(task_id="sleep_30", bash_command="sleep 30")
- # Add sleep before starting pipeline
- sleep_20 = BashOperator(task_id="sleep_20", bash_command="sleep 40")
+ delete_bucket1 = GCSDeleteBucketOperator(
+ task_id="delete_bucket1", bucket_name=BUCKET_NAME_1,
trigger_rule=TriggerRule.ALL_DONE
+ )
+ delete_bucket2 = GCSDeleteBucketOperator(
+ task_id="delete_bucket2", bucket_name=BUCKET_NAME_1,
trigger_rule=TriggerRule.ALL_DONE
+ )
(
- create_bucket1
- >> create_bucket2
+ # TEST SETUP
+ [create_bucket1, create_bucket2]
+ # TEST BODY
>> create_instance
>> get_instance
>> restart_instance
>> update_instance
- >> sleep_30_1
>> create_pipeline
>> list_pipelines
- >> sleep_20
- >> start_pipeline_async
+ >> start_pipeline_def
>> stop_pipeline
- >> sleep_30
>> delete_pipeline
>> delete_instance
+ # TEST TEARDOWN
+ >> [delete_bucket1, delete_bucket2]
)
from tests.system.utils.watcher import watcher