This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 42465c5a94 Fix Dataproc system tests (#32807)
42465c5a94 is described below

commit 42465c5a9465fd77f3000117721e0ed1cc51c166
Author: max <[email protected]>
AuthorDate: Mon Jul 31 10:46:53 2023 +0200

    Fix Dataproc system tests (#32807)
---
 .../cloud/dataproc/example_dataproc_batch.py       | 24 +++++++++-------------
 .../dataproc/example_dataproc_batch_deferrable.py  | 13 +++++++++---
 .../dataproc/example_dataproc_batch_persistent.py  | 18 ++++++++++++----
 .../example_dataproc_cluster_deferrable.py         | 14 +++++++++----
 .../dataproc/example_dataproc_cluster_generator.py | 15 ++++++++++----
 .../dataproc/example_dataproc_cluster_update.py    | 12 ++++++++---
 .../google/cloud/dataproc/example_dataproc_gke.py  | 12 ++++++-----
 .../cloud/dataproc/example_dataproc_hadoop.py      |  5 +----
 .../google/cloud/dataproc/example_dataproc_hive.py |  6 +-----
 .../google/cloud/dataproc/example_dataproc_pig.py  |  5 +----
 .../cloud/dataproc/example_dataproc_pyspark.py     |  3 +--
 .../cloud/dataproc/example_dataproc_workflow.py    | 13 +++++++++---
 .../example_dataproc_workflow_deferrable.py        | 15 ++++++++++----
 13 files changed, 96 insertions(+), 59 deletions(-)

diff --git 
a/tests/system/providers/google/cloud/dataproc/example_dataproc_batch.py 
b/tests/system/providers/google/cloud/dataproc/example_dataproc_batch.py
index 160422d197..3e9aed3502 100644
--- a/tests/system/providers/google/cloud/dataproc/example_dataproc_batch.py
+++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_batch.py
@@ -40,10 +40,10 @@ DAG_ID = "dataproc_batch"
 PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
 REGION = "europe-west1"
 
-BATCH_ID = f"test-batch-id-{ENV_ID}"
-BATCH_ID_2 = f"test-batch-id-{ENV_ID}-2"
-BATCH_ID_3 = f"test-batch-id-{ENV_ID}-3"
-BATCH_ID_4 = f"test-batch-id-{ENV_ID}-4"
+BATCH_ID = f"batch-{ENV_ID}-{DAG_ID}".replace("_", "-")
+BATCH_ID_2 = f"batch-{ENV_ID}-{DAG_ID}-2".replace("_", "-")
+BATCH_ID_3 = f"batch-{ENV_ID}-{DAG_ID}-3".replace("_", "-")
+BATCH_ID_4 = f"batch-{ENV_ID}-{DAG_ID}-4".replace("_", "-")
 
 BATCH_CONFIG = {
     "spark_batch": {
@@ -154,19 +154,15 @@ with models.DAG(
     delete_batch_4.trigger_rule = TriggerRule.ALL_DONE
 
     (
-        create_batch
-        >> create_batch_2
-        >> create_batch_3
+        # TEST SETUP
+        [create_batch, create_batch_2, create_batch_3]
+        # TEST BODY
         >> batch_async_sensor
-        >> get_batch
-        >> get_batch_2
-        >> list_batches
+        >> [get_batch, get_batch_2, list_batches]
         >> create_batch_4
         >> cancel_operation
-        >> delete_batch
-        >> delete_batch_2
-        >> delete_batch_3
-        >> delete_batch_4
+        # TEST TEARDOWN
+        >> [delete_batch, delete_batch_2, delete_batch_3, delete_batch_4]
     )
 
     from tests.system.utils.watcher import watcher
diff --git 
a/tests/system/providers/google/cloud/dataproc/example_dataproc_batch_deferrable.py
 
b/tests/system/providers/google/cloud/dataproc/example_dataproc_batch_deferrable.py
index 2d363328c4..8ed1893a5b 100644
--- 
a/tests/system/providers/google/cloud/dataproc/example_dataproc_batch_deferrable.py
+++ 
b/tests/system/providers/google/cloud/dataproc/example_dataproc_batch_deferrable.py
@@ -36,7 +36,7 @@ ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
 DAG_ID = "dataproc_batch_deferrable"
 PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
 REGION = "europe-west1"
-BATCH_ID = f"test-def-batch-id-{ENV_ID}"
+BATCH_ID = f"batch-{ENV_ID}-{DAG_ID}".replace("_", "-")
 BATCH_CONFIG = {
     "spark_batch": {
         "jar_file_uris": 
["file:///usr/lib/spark/examples/jars/spark-examples.jar"],
@@ -50,7 +50,7 @@ with models.DAG(
     schedule="@once",
     start_date=datetime(2021, 1, 1),
     catchup=False,
-    tags=["example", "dataproc"],
+    tags=["example", "dataproc", "batch", "deferrable"],
 ) as dag:
     # [START how_to_cloud_dataproc_create_batch_operator_async]
     create_batch = DataprocCreateBatchOperator(
@@ -75,7 +75,14 @@ with models.DAG(
     )
     delete_batch.trigger_rule = TriggerRule.ALL_DONE
 
-    (create_batch >> get_batch >> delete_batch)
+    (
+        # TEST SETUP
+        create_batch
+        # TEST BODY
+        >> get_batch
+        # TEST TEARDOWN
+        >> delete_batch
+    )
 
     from tests.system.utils.watcher import watcher
 
diff --git 
a/tests/system/providers/google/cloud/dataproc/example_dataproc_batch_persistent.py
 
b/tests/system/providers/google/cloud/dataproc/example_dataproc_batch_persistent.py
index 011b91d245..2a8aaf4975 100644
--- 
a/tests/system/providers/google/cloud/dataproc/example_dataproc_batch_persistent.py
+++ 
b/tests/system/providers/google/cloud/dataproc/example_dataproc_batch_persistent.py
@@ -37,8 +37,8 @@ DAG_ID = "dataproc_batch_ps"
 PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
 BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
 REGION = "europe-west1"
-CLUSTER_NAME = f"dataproc-cluster-ps-{ENV_ID}"
-BATCH_ID = f"batch-ps-{ENV_ID}"
+CLUSTER_NAME = f"cluster-{ENV_ID}-{DAG_ID}".replace("_", "-")
+BATCH_ID = f"batch-{ENV_ID}-{DAG_ID}".replace("_", "-")
 
 CLUSTER_GENERATOR_CONFIG_FOR_PHS = ClusterGenerator(
     project_id=PROJECT_ID,
@@ -71,7 +71,7 @@ with models.DAG(
     schedule="@once",
     start_date=datetime(2021, 1, 1),
     catchup=False,
-    tags=["example", "dataproc"],
+    tags=["example", "dataproc", "batch", "persistent"],
 ) as dag:
     create_bucket = GCSCreateBucketOperator(
         task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID
@@ -108,7 +108,17 @@ with models.DAG(
     delete_bucket = GCSDeleteBucketOperator(
         task_id="delete_bucket", bucket_name=BUCKET_NAME, 
trigger_rule=TriggerRule.ALL_DONE
     )
-    create_bucket >> create_cluster >> create_batch >> delete_cluster >> 
delete_bucket
+
+    (
+        # TEST SETUP
+        create_bucket
+        >> create_cluster
+        # TEST BODY
+        >> create_batch
+        # TEST TEARDOWN
+        >> delete_cluster
+        >> delete_bucket
+    )
 
     from tests.system.utils.watcher import watcher
 
diff --git 
a/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_deferrable.py
 
b/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_deferrable.py
index b86ae0f0c2..b14c5c00f1 100644
--- 
a/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_deferrable.py
+++ 
b/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_deferrable.py
@@ -35,9 +35,8 @@ ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
 DAG_ID = "dataproc_cluster_def"
 PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
 
-CLUSTER_NAME = f"cluster-dataproc-def-{ENV_ID}"
+CLUSTER_NAME = f"cluster-{ENV_ID}-{DAG_ID}".replace("_", "-")
 REGION = "europe-west1"
-ZONE = "europe-west1-b"
 
 
 # Cluster definition
@@ -82,7 +81,7 @@ with models.DAG(
     schedule="@once",
     start_date=datetime(2021, 1, 1),
     catchup=False,
-    tags=["example", "dataproc"],
+    tags=["example", "dataproc", "deferrable"],
 ) as dag:
     # [START how_to_cloud_dataproc_create_cluster_operator_async]
     create_cluster = DataprocCreateClusterOperator(
@@ -119,7 +118,14 @@ with models.DAG(
     )
     # [END how_to_cloud_dataproc_delete_cluster_operator_async]
 
-    create_cluster >> update_cluster >> delete_cluster
+    (
+        # TEST SETUP
+        create_cluster
+        # TEST BODY
+        >> update_cluster
+        # TEST TEARDOWN
+        >> delete_cluster
+    )
 
     from tests.system.utils.watcher import watcher
 
diff --git 
a/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_generator.py
 
b/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_generator.py
index f3f68291b2..f994247294 100644
--- 
a/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_generator.py
+++ 
b/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_generator.py
@@ -40,7 +40,7 @@ DAG_ID = "dataproc_cluster_generation"
 PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
 
 BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
-CLUSTER_NAME = f"dataproc-cluster-gen-{ENV_ID}"
+CLUSTER_NAME = f"cluster-{ENV_ID}-{DAG_ID}".replace("_", "-")
 REGION = "europe-west1"
 ZONE = "europe-west1-b"
 INIT_FILE_SRC = str(Path(__file__).parent / "resources" / "pip-install.sh")
@@ -65,8 +65,6 @@ CLUSTER_GENERATOR_CONFIG = ClusterGenerator(
 
 # [END how_to_cloud_dataproc_create_cluster_generate_cluster_config]
 
-TIMEOUT = {"seconds": 1 * 24 * 60 * 60}
-
 
 with models.DAG(
     DAG_ID,
@@ -110,7 +108,16 @@ with models.DAG(
         task_id="delete_bucket", bucket_name=BUCKET_NAME, 
trigger_rule=TriggerRule.ALL_DONE
     )
 
-    create_bucket >> upload_file >> create_dataproc_cluster >> 
[delete_cluster, delete_bucket]
+    (
+        # TEST SETUP
+        create_bucket
+        >> upload_file
+        >>
+        # TEST BODY
+        create_dataproc_cluster
+        # TEST TEARDOWN
+        >> [delete_cluster, delete_bucket]
+    )
 
     from tests.system.utils.watcher import watcher
 
diff --git 
a/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_update.py
 
b/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_update.py
index 1607e714f5..ed8e953239 100644
--- 
a/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_update.py
+++ 
b/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_update.py
@@ -35,9 +35,8 @@ ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
 DAG_ID = "dataproc_update"
 PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
 
-CLUSTER_NAME = f"cluster-dataproc-update-{ENV_ID}"
+CLUSTER_NAME = f"cluster-{ENV_ID}-{DAG_ID}".replace("_", "-")
 REGION = "europe-west1"
-ZONE = "europe-west1-b"
 
 
 # Cluster definition
@@ -102,7 +101,14 @@ with models.DAG(
         trigger_rule=TriggerRule.ALL_DONE,
     )
 
-    create_cluster >> scale_cluster >> delete_cluster
+    (
+        # TEST SETUP
+        create_cluster
+        # TEST BODY
+        >> scale_cluster
+        # TEST TEARDOWN
+        >> delete_cluster
+    )
 
     from tests.system.utils.watcher import watcher
 
diff --git 
a/tests/system/providers/google/cloud/dataproc/example_dataproc_gke.py 
b/tests/system/providers/google/cloud/dataproc/example_dataproc_gke.py
index d2e5f0fd3d..b5451d0d44 100644
--- a/tests/system/providers/google/cloud/dataproc/example_dataproc_gke.py
+++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_gke.py
@@ -47,8 +47,8 @@ DAG_ID = "dataproc-gke"
 PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
 
 REGION = "us-central1"
-CLUSTER_NAME = f"cluster-test-build-in-gke{ENV_ID}"
-GKE_CLUSTER_NAME = f"test-dataproc-gke-cluster-{ENV_ID}"
+CLUSTER_NAME = f"cluster-{ENV_ID}-{DAG_ID}".replace("_", "-")
+GKE_CLUSTER_NAME = f"cluster-{ENV_ID}-{DAG_ID}-gke".replace("_", "-")
 WORKLOAD_POOL = f"{PROJECT_ID}.svc.id.goog"
 GKE_CLUSTER_CONFIG = {
     "name": GKE_CLUSTER_NAME,
@@ -89,7 +89,7 @@ with models.DAG(
     schedule="@once",
     start_date=datetime(2021, 1, 1),
     catchup=False,
-    tags=["example"],
+    tags=["example", "dataproc", "gke"],
 ) as dag:
     create_gke_cluster = GKECreateClusterOperator(
         task_id="create_gke_cluster",
@@ -132,11 +132,13 @@ with models.DAG(
     )
 
     (
+        # TEST SETUP
         create_gke_cluster
         >> add_iam_policy_binding
+        # TEST BODY
         >> create_cluster_in_gke
-        >> delete_gke_cluster
-        >> delete_dataproc_cluster
+        # TEST TEARDOWN
+        >> [delete_gke_cluster, delete_dataproc_cluster]
     )
 
     from tests.system.utils.watcher import watcher
diff --git 
a/tests/system/providers/google/cloud/dataproc/example_dataproc_hadoop.py 
b/tests/system/providers/google/cloud/dataproc/example_dataproc_hadoop.py
index 7af2654bfe..4cd612c8ea 100644
--- a/tests/system/providers/google/cloud/dataproc/example_dataproc_hadoop.py
+++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_hadoop.py
@@ -37,9 +37,8 @@ DAG_ID = "dataproc_hadoop"
 PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
 
 BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
-CLUSTER_NAME = f"dataproc-hadoop-{ENV_ID}"
+CLUSTER_NAME = f"cluster-{ENV_ID}-{DAG_ID}".replace("_", "-")
 REGION = "europe-west1"
-ZONE = "europe-west1-b"
 
 OUTPUT_FOLDER = "wordcount"
 OUTPUT_PATH = f"gs://{BUCKET_NAME}/{OUTPUT_FOLDER}/"
@@ -58,8 +57,6 @@ CLUSTER_CONFIG = {
     },
 }
 
-TIMEOUT = {"seconds": 1 * 24 * 60 * 60}
-
 # Jobs definitions
 # [START how_to_cloud_dataproc_hadoop_config]
 HADOOP_JOB = {
diff --git 
a/tests/system/providers/google/cloud/dataproc/example_dataproc_hive.py 
b/tests/system/providers/google/cloud/dataproc/example_dataproc_hive.py
index 37ebe56dc0..99312b2011 100644
--- a/tests/system/providers/google/cloud/dataproc/example_dataproc_hive.py
+++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_hive.py
@@ -35,10 +35,8 @@ ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
 DAG_ID = "dataproc_hive"
 PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
 
-CLUSTER_NAME = f"cluster-dataproc-hive-{ENV_ID}"
+CLUSTER_NAME = f"cluster-{ENV_ID}-{DAG_ID}".replace("_", "-")
 REGION = "europe-west1"
-ZONE = "europe-west1-b"
-
 
 # Cluster definition
 # [START how_to_cloud_dataproc_create_cluster]
@@ -68,8 +66,6 @@ CLUSTER_CONFIG = {
 
 # [END how_to_cloud_dataproc_create_cluster]
 
-TIMEOUT = {"seconds": 1 * 24 * 60 * 60}
-
 # [START how_to_cloud_dataproc_hive_config]
 HIVE_JOB = {
     "reference": {"project_id": PROJECT_ID},
diff --git 
a/tests/system/providers/google/cloud/dataproc/example_dataproc_pig.py 
b/tests/system/providers/google/cloud/dataproc/example_dataproc_pig.py
index a449437cb3..1a0299bd10 100644
--- a/tests/system/providers/google/cloud/dataproc/example_dataproc_pig.py
+++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_pig.py
@@ -35,9 +35,8 @@ DAG_ID = "dataproc_pig"
 ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
 PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
 
-CLUSTER_NAME = f"cluster-dataproc-pig-{ENV_ID}"
+CLUSTER_NAME = f"cluster-{ENV_ID}-{DAG_ID}".replace("_", "-")
 REGION = "europe-west1"
-ZONE = "europe-west1-b"
 
 
 # Cluster definition
@@ -54,8 +53,6 @@ CLUSTER_CONFIG = {
     },
 }
 
-TIMEOUT = {"seconds": 1 * 24 * 60 * 60}
-
 # Jobs definitions
 # [START how_to_cloud_dataproc_pig_config]
 PIG_JOB = {
diff --git 
a/tests/system/providers/google/cloud/dataproc/example_dataproc_pyspark.py 
b/tests/system/providers/google/cloud/dataproc/example_dataproc_pyspark.py
index ff3e619d7d..fe3884dadd 100644
--- a/tests/system/providers/google/cloud/dataproc/example_dataproc_pyspark.py
+++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_pyspark.py
@@ -42,9 +42,8 @@ DAG_ID = "dataproc_pyspark"
 PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
 
 BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
-CLUSTER_NAME = f"cluster-dataproc-pyspark-{ENV_ID}"
+CLUSTER_NAME = f"cluster-{ENV_ID}-{DAG_ID}".replace("_", "-")
 REGION = "europe-west1"
-ZONE = "europe-west1-b"
 
 # Cluster definition
 CLUSTER_CONFIG = {
diff --git 
a/tests/system/providers/google/cloud/dataproc/example_dataproc_workflow.py 
b/tests/system/providers/google/cloud/dataproc/example_dataproc_workflow.py
index 52a9f3094f..35d3ee0ebe 100644
--- a/tests/system/providers/google/cloud/dataproc/example_dataproc_workflow.py
+++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_workflow.py
@@ -34,7 +34,7 @@ DAG_ID = "dataproc_workflow"
 PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
 
 REGION = "europe-west1"
-CLUSTER_NAME = f"cluster-dataproc-workflow-{ENV_ID}"
+CLUSTER_NAME = f"cluster-{ENV_ID}-{DAG_ID}".replace("_", "-")
 CLUSTER_CONFIG = {
     "master_config": {
         "num_instances": 1,
@@ -66,7 +66,7 @@ with models.DAG(
     schedule="@once",
     start_date=datetime(2021, 1, 1),
     catchup=False,
-    tags=["example", "dataproc"],
+    tags=["example", "dataproc", "workflow"],
 ) as dag:
     # [START how_to_cloud_dataproc_create_workflow_template]
     create_workflow_template = DataprocCreateWorkflowTemplateOperator(
@@ -89,7 +89,14 @@ with models.DAG(
     )
     # [END how_to_cloud_dataproc_instantiate_inline_workflow_template]
 
-    (create_workflow_template >> trigger_workflow >> 
instantiate_inline_workflow_template)
+    (
+        # TEST SETUP
+        create_workflow_template
+        # TEST BODY
+        >> trigger_workflow
+        # TEST TEARDOWN
+        >> instantiate_inline_workflow_template
+    )
 
     from tests.system.utils.watcher import watcher
 
diff --git 
a/tests/system/providers/google/cloud/dataproc/example_dataproc_workflow_deferrable.py
 
b/tests/system/providers/google/cloud/dataproc/example_dataproc_workflow_deferrable.py
index d8dfab2267..843148bf9b 100644
--- 
a/tests/system/providers/google/cloud/dataproc/example_dataproc_workflow_deferrable.py
+++ 
b/tests/system/providers/google/cloud/dataproc/example_dataproc_workflow_deferrable.py
@@ -30,11 +30,11 @@ from airflow.providers.google.cloud.operators.dataproc 
import (
 )
 
 ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
-DAG_ID = "dataproc_workflow"
+DAG_ID = "dataproc_workflow_def"
 PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
 
 REGION = "europe-west1"
-CLUSTER_NAME = f"cluster-dataproc-workflow-{ENV_ID}"
+CLUSTER_NAME = f"{ENV_ID}-{DAG_ID}".replace("_", "-")
 CLUSTER_CONFIG = {
     "master_config": {
         "num_instances": 1,
@@ -66,7 +66,7 @@ with models.DAG(
     schedule="@once",
     start_date=datetime(2021, 1, 1),
     catchup=False,
-    tags=["example", "dataproc"],
+    tags=["example", "dataproc", "workflow", "deferrable"],
 ) as dag:
     create_workflow_template = DataprocCreateWorkflowTemplateOperator(
         task_id="create_workflow_template",
@@ -94,7 +94,14 @@ with models.DAG(
     )
     # [END how_to_cloud_dataproc_instantiate_inline_workflow_template_async]
 
-    (create_workflow_template >> trigger_workflow_async >> 
instantiate_inline_workflow_template_async)
+    (
+        # TEST SETUP
+        create_workflow_template
+        # TEST BODY
+        >> trigger_workflow_async
+        # TEST TEARDOWN
+        >> instantiate_inline_workflow_template_async
+    )
 
     from tests.system.utils.watcher import watcher
 

Reply via email to