This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 42465c5a94 Fix Dataproc system tests (#32807)
42465c5a94 is described below
commit 42465c5a9465fd77f3000117721e0ed1cc51c166
Author: max <[email protected]>
AuthorDate: Mon Jul 31 10:46:53 2023 +0200
Fix Dataproc system tests (#32807)
---
.../cloud/dataproc/example_dataproc_batch.py | 24 +++++++++-------------
.../dataproc/example_dataproc_batch_deferrable.py | 13 +++++++++---
.../dataproc/example_dataproc_batch_persistent.py | 18 ++++++++++++----
.../example_dataproc_cluster_deferrable.py | 14 +++++++++----
.../dataproc/example_dataproc_cluster_generator.py | 15 ++++++++++----
.../dataproc/example_dataproc_cluster_update.py | 12 ++++++++---
.../google/cloud/dataproc/example_dataproc_gke.py | 12 ++++++-----
.../cloud/dataproc/example_dataproc_hadoop.py | 5 +----
.../google/cloud/dataproc/example_dataproc_hive.py | 6 +-----
.../google/cloud/dataproc/example_dataproc_pig.py | 5 +----
.../cloud/dataproc/example_dataproc_pyspark.py | 3 +--
.../cloud/dataproc/example_dataproc_workflow.py | 13 +++++++++---
.../example_dataproc_workflow_deferrable.py | 15 ++++++++++----
13 files changed, 96 insertions(+), 59 deletions(-)
diff --git
a/tests/system/providers/google/cloud/dataproc/example_dataproc_batch.py
b/tests/system/providers/google/cloud/dataproc/example_dataproc_batch.py
index 160422d197..3e9aed3502 100644
--- a/tests/system/providers/google/cloud/dataproc/example_dataproc_batch.py
+++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_batch.py
@@ -40,10 +40,10 @@ DAG_ID = "dataproc_batch"
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
REGION = "europe-west1"
-BATCH_ID = f"test-batch-id-{ENV_ID}"
-BATCH_ID_2 = f"test-batch-id-{ENV_ID}-2"
-BATCH_ID_3 = f"test-batch-id-{ENV_ID}-3"
-BATCH_ID_4 = f"test-batch-id-{ENV_ID}-4"
+BATCH_ID = f"batch-{ENV_ID}-{DAG_ID}".replace("_", "-")
+BATCH_ID_2 = f"batch-{ENV_ID}-{DAG_ID}-2".replace("_", "-")
+BATCH_ID_3 = f"batch-{ENV_ID}-{DAG_ID}-3".replace("_", "-")
+BATCH_ID_4 = f"batch-{ENV_ID}-{DAG_ID}-4".replace("_", "-")
BATCH_CONFIG = {
"spark_batch": {
@@ -154,19 +154,15 @@ with models.DAG(
delete_batch_4.trigger_rule = TriggerRule.ALL_DONE
(
- create_batch
- >> create_batch_2
- >> create_batch_3
+ # TEST SETUP
+ [create_batch, create_batch_2, create_batch_3]
+ # TEST BODY
>> batch_async_sensor
- >> get_batch
- >> get_batch_2
- >> list_batches
+ >> [get_batch, get_batch_2, list_batches]
>> create_batch_4
>> cancel_operation
- >> delete_batch
- >> delete_batch_2
- >> delete_batch_3
- >> delete_batch_4
+ # TEST TEARDOWN
+ >> [delete_batch, delete_batch_2, delete_batch_3, delete_batch_4]
)
from tests.system.utils.watcher import watcher
diff --git
a/tests/system/providers/google/cloud/dataproc/example_dataproc_batch_deferrable.py
b/tests/system/providers/google/cloud/dataproc/example_dataproc_batch_deferrable.py
index 2d363328c4..8ed1893a5b 100644
---
a/tests/system/providers/google/cloud/dataproc/example_dataproc_batch_deferrable.py
+++
b/tests/system/providers/google/cloud/dataproc/example_dataproc_batch_deferrable.py
@@ -36,7 +36,7 @@ ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "dataproc_batch_deferrable"
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
REGION = "europe-west1"
-BATCH_ID = f"test-def-batch-id-{ENV_ID}"
+BATCH_ID = f"batch-{ENV_ID}-{DAG_ID}".replace("_", "-")
BATCH_CONFIG = {
"spark_batch": {
"jar_file_uris":
["file:///usr/lib/spark/examples/jars/spark-examples.jar"],
@@ -50,7 +50,7 @@ with models.DAG(
schedule="@once",
start_date=datetime(2021, 1, 1),
catchup=False,
- tags=["example", "dataproc"],
+ tags=["example", "dataproc", "batch", "deferrable"],
) as dag:
# [START how_to_cloud_dataproc_create_batch_operator_async]
create_batch = DataprocCreateBatchOperator(
@@ -75,7 +75,14 @@ with models.DAG(
)
delete_batch.trigger_rule = TriggerRule.ALL_DONE
- (create_batch >> get_batch >> delete_batch)
+ (
+ # TEST SETUP
+ create_batch
+ # TEST BODY
+ >> get_batch
+ # TEST TEARDOWN
+ >> delete_batch
+ )
from tests.system.utils.watcher import watcher
diff --git
a/tests/system/providers/google/cloud/dataproc/example_dataproc_batch_persistent.py
b/tests/system/providers/google/cloud/dataproc/example_dataproc_batch_persistent.py
index 011b91d245..2a8aaf4975 100644
---
a/tests/system/providers/google/cloud/dataproc/example_dataproc_batch_persistent.py
+++
b/tests/system/providers/google/cloud/dataproc/example_dataproc_batch_persistent.py
@@ -37,8 +37,8 @@ DAG_ID = "dataproc_batch_ps"
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
REGION = "europe-west1"
-CLUSTER_NAME = f"dataproc-cluster-ps-{ENV_ID}"
-BATCH_ID = f"batch-ps-{ENV_ID}"
+CLUSTER_NAME = f"cluster-{ENV_ID}-{DAG_ID}".replace("_", "-")
+BATCH_ID = f"batch-{ENV_ID}-{DAG_ID}".replace("_", "-")
CLUSTER_GENERATOR_CONFIG_FOR_PHS = ClusterGenerator(
project_id=PROJECT_ID,
@@ -71,7 +71,7 @@ with models.DAG(
schedule="@once",
start_date=datetime(2021, 1, 1),
catchup=False,
- tags=["example", "dataproc"],
+ tags=["example", "dataproc", "batch", "persistent"],
) as dag:
create_bucket = GCSCreateBucketOperator(
task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID
@@ -108,7 +108,17 @@ with models.DAG(
delete_bucket = GCSDeleteBucketOperator(
task_id="delete_bucket", bucket_name=BUCKET_NAME,
trigger_rule=TriggerRule.ALL_DONE
)
- create_bucket >> create_cluster >> create_batch >> delete_cluster >>
delete_bucket
+
+ (
+ # TEST SETUP
+ create_bucket
+ >> create_cluster
+ # TEST BODY
+ >> create_batch
+ # TEST TEARDOWN
+ >> delete_cluster
+ >> delete_bucket
+ )
from tests.system.utils.watcher import watcher
diff --git
a/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_deferrable.py
b/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_deferrable.py
index b86ae0f0c2..b14c5c00f1 100644
---
a/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_deferrable.py
+++
b/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_deferrable.py
@@ -35,9 +35,8 @@ ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "dataproc_cluster_def"
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
-CLUSTER_NAME = f"cluster-dataproc-def-{ENV_ID}"
+CLUSTER_NAME = f"cluster-{ENV_ID}-{DAG_ID}".replace("_", "-")
REGION = "europe-west1"
-ZONE = "europe-west1-b"
# Cluster definition
@@ -82,7 +81,7 @@ with models.DAG(
schedule="@once",
start_date=datetime(2021, 1, 1),
catchup=False,
- tags=["example", "dataproc"],
+ tags=["example", "dataproc", "deferrable"],
) as dag:
# [START how_to_cloud_dataproc_create_cluster_operator_async]
create_cluster = DataprocCreateClusterOperator(
@@ -119,7 +118,14 @@ with models.DAG(
)
# [END how_to_cloud_dataproc_delete_cluster_operator_async]
- create_cluster >> update_cluster >> delete_cluster
+ (
+ # TEST SETUP
+ create_cluster
+ # TEST BODY
+ >> update_cluster
+ # TEST TEARDOWN
+ >> delete_cluster
+ )
from tests.system.utils.watcher import watcher
diff --git
a/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_generator.py
b/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_generator.py
index f3f68291b2..f994247294 100644
---
a/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_generator.py
+++
b/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_generator.py
@@ -40,7 +40,7 @@ DAG_ID = "dataproc_cluster_generation"
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
-CLUSTER_NAME = f"dataproc-cluster-gen-{ENV_ID}"
+CLUSTER_NAME = f"cluster-{ENV_ID}-{DAG_ID}".replace("_", "-")
REGION = "europe-west1"
ZONE = "europe-west1-b"
INIT_FILE_SRC = str(Path(__file__).parent / "resources" / "pip-install.sh")
@@ -65,8 +65,6 @@ CLUSTER_GENERATOR_CONFIG = ClusterGenerator(
# [END how_to_cloud_dataproc_create_cluster_generate_cluster_config]
-TIMEOUT = {"seconds": 1 * 24 * 60 * 60}
-
with models.DAG(
DAG_ID,
@@ -110,7 +108,16 @@ with models.DAG(
task_id="delete_bucket", bucket_name=BUCKET_NAME,
trigger_rule=TriggerRule.ALL_DONE
)
- create_bucket >> upload_file >> create_dataproc_cluster >>
[delete_cluster, delete_bucket]
+ (
+ # TEST SETUP
+ create_bucket
+ >> upload_file
+ >>
+ # TEST BODY
+ create_dataproc_cluster
+ # TEST TEARDOWN
+ >> [delete_cluster, delete_bucket]
+ )
from tests.system.utils.watcher import watcher
diff --git
a/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_update.py
b/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_update.py
index 1607e714f5..ed8e953239 100644
---
a/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_update.py
+++
b/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_update.py
@@ -35,9 +35,8 @@ ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "dataproc_update"
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
-CLUSTER_NAME = f"cluster-dataproc-update-{ENV_ID}"
+CLUSTER_NAME = f"cluster-{ENV_ID}-{DAG_ID}".replace("_", "-")
REGION = "europe-west1"
-ZONE = "europe-west1-b"
# Cluster definition
@@ -102,7 +101,14 @@ with models.DAG(
trigger_rule=TriggerRule.ALL_DONE,
)
- create_cluster >> scale_cluster >> delete_cluster
+ (
+ # TEST SETUP
+ create_cluster
+ # TEST BODY
+ >> scale_cluster
+ # TEST TEARDOWN
+ >> delete_cluster
+ )
from tests.system.utils.watcher import watcher
diff --git
a/tests/system/providers/google/cloud/dataproc/example_dataproc_gke.py
b/tests/system/providers/google/cloud/dataproc/example_dataproc_gke.py
index d2e5f0fd3d..b5451d0d44 100644
--- a/tests/system/providers/google/cloud/dataproc/example_dataproc_gke.py
+++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_gke.py
@@ -47,8 +47,8 @@ DAG_ID = "dataproc-gke"
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
REGION = "us-central1"
-CLUSTER_NAME = f"cluster-test-build-in-gke{ENV_ID}"
-GKE_CLUSTER_NAME = f"test-dataproc-gke-cluster-{ENV_ID}"
+CLUSTER_NAME = f"cluster-{ENV_ID}-{DAG_ID}".replace("_", "-")
+GKE_CLUSTER_NAME = f"cluster-{ENV_ID}-{DAG_ID}-gke".replace("_", "-")
WORKLOAD_POOL = f"{PROJECT_ID}.svc.id.goog"
GKE_CLUSTER_CONFIG = {
"name": GKE_CLUSTER_NAME,
@@ -89,7 +89,7 @@ with models.DAG(
schedule="@once",
start_date=datetime(2021, 1, 1),
catchup=False,
- tags=["example"],
+ tags=["example", "dataproc", "gke"],
) as dag:
create_gke_cluster = GKECreateClusterOperator(
task_id="create_gke_cluster",
@@ -132,11 +132,13 @@ with models.DAG(
)
(
+ # TEST SETUP
create_gke_cluster
>> add_iam_policy_binding
+ # TEST BODY
>> create_cluster_in_gke
- >> delete_gke_cluster
- >> delete_dataproc_cluster
+ # TEST TEARDOWN
+ >> [delete_gke_cluster, delete_dataproc_cluster]
)
from tests.system.utils.watcher import watcher
diff --git
a/tests/system/providers/google/cloud/dataproc/example_dataproc_hadoop.py
b/tests/system/providers/google/cloud/dataproc/example_dataproc_hadoop.py
index 7af2654bfe..4cd612c8ea 100644
--- a/tests/system/providers/google/cloud/dataproc/example_dataproc_hadoop.py
+++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_hadoop.py
@@ -37,9 +37,8 @@ DAG_ID = "dataproc_hadoop"
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
-CLUSTER_NAME = f"dataproc-hadoop-{ENV_ID}"
+CLUSTER_NAME = f"cluster-{ENV_ID}-{DAG_ID}".replace("_", "-")
REGION = "europe-west1"
-ZONE = "europe-west1-b"
OUTPUT_FOLDER = "wordcount"
OUTPUT_PATH = f"gs://{BUCKET_NAME}/{OUTPUT_FOLDER}/"
@@ -58,8 +57,6 @@ CLUSTER_CONFIG = {
},
}
-TIMEOUT = {"seconds": 1 * 24 * 60 * 60}
-
# Jobs definitions
# [START how_to_cloud_dataproc_hadoop_config]
HADOOP_JOB = {
diff --git
a/tests/system/providers/google/cloud/dataproc/example_dataproc_hive.py
b/tests/system/providers/google/cloud/dataproc/example_dataproc_hive.py
index 37ebe56dc0..99312b2011 100644
--- a/tests/system/providers/google/cloud/dataproc/example_dataproc_hive.py
+++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_hive.py
@@ -35,10 +35,8 @@ ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "dataproc_hive"
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
-CLUSTER_NAME = f"cluster-dataproc-hive-{ENV_ID}"
+CLUSTER_NAME = f"cluster-{ENV_ID}-{DAG_ID}".replace("_", "-")
REGION = "europe-west1"
-ZONE = "europe-west1-b"
-
# Cluster definition
# [START how_to_cloud_dataproc_create_cluster]
@@ -68,8 +66,6 @@ CLUSTER_CONFIG = {
# [END how_to_cloud_dataproc_create_cluster]
-TIMEOUT = {"seconds": 1 * 24 * 60 * 60}
-
# [START how_to_cloud_dataproc_hive_config]
HIVE_JOB = {
"reference": {"project_id": PROJECT_ID},
diff --git
a/tests/system/providers/google/cloud/dataproc/example_dataproc_pig.py
b/tests/system/providers/google/cloud/dataproc/example_dataproc_pig.py
index a449437cb3..1a0299bd10 100644
--- a/tests/system/providers/google/cloud/dataproc/example_dataproc_pig.py
+++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_pig.py
@@ -35,9 +35,8 @@ DAG_ID = "dataproc_pig"
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
-CLUSTER_NAME = f"cluster-dataproc-pig-{ENV_ID}"
+CLUSTER_NAME = f"cluster-{ENV_ID}-{DAG_ID}".replace("_", "-")
REGION = "europe-west1"
-ZONE = "europe-west1-b"
# Cluster definition
@@ -54,8 +53,6 @@ CLUSTER_CONFIG = {
},
}
-TIMEOUT = {"seconds": 1 * 24 * 60 * 60}
-
# Jobs definitions
# [START how_to_cloud_dataproc_pig_config]
PIG_JOB = {
diff --git
a/tests/system/providers/google/cloud/dataproc/example_dataproc_pyspark.py
b/tests/system/providers/google/cloud/dataproc/example_dataproc_pyspark.py
index ff3e619d7d..fe3884dadd 100644
--- a/tests/system/providers/google/cloud/dataproc/example_dataproc_pyspark.py
+++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_pyspark.py
@@ -42,9 +42,8 @@ DAG_ID = "dataproc_pyspark"
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
-CLUSTER_NAME = f"cluster-dataproc-pyspark-{ENV_ID}"
+CLUSTER_NAME = f"cluster-{ENV_ID}-{DAG_ID}".replace("_", "-")
REGION = "europe-west1"
-ZONE = "europe-west1-b"
# Cluster definition
CLUSTER_CONFIG = {
diff --git
a/tests/system/providers/google/cloud/dataproc/example_dataproc_workflow.py
b/tests/system/providers/google/cloud/dataproc/example_dataproc_workflow.py
index 52a9f3094f..35d3ee0ebe 100644
--- a/tests/system/providers/google/cloud/dataproc/example_dataproc_workflow.py
+++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_workflow.py
@@ -34,7 +34,7 @@ DAG_ID = "dataproc_workflow"
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
REGION = "europe-west1"
-CLUSTER_NAME = f"cluster-dataproc-workflow-{ENV_ID}"
+CLUSTER_NAME = f"cluster-{ENV_ID}-{DAG_ID}".replace("_", "-")
CLUSTER_CONFIG = {
"master_config": {
"num_instances": 1,
@@ -66,7 +66,7 @@ with models.DAG(
schedule="@once",
start_date=datetime(2021, 1, 1),
catchup=False,
- tags=["example", "dataproc"],
+ tags=["example", "dataproc", "workflow"],
) as dag:
# [START how_to_cloud_dataproc_create_workflow_template]
create_workflow_template = DataprocCreateWorkflowTemplateOperator(
@@ -89,7 +89,14 @@ with models.DAG(
)
# [END how_to_cloud_dataproc_instantiate_inline_workflow_template]
- (create_workflow_template >> trigger_workflow >>
instantiate_inline_workflow_template)
+ (
+ # TEST SETUP
+ create_workflow_template
+ # TEST BODY
+ >> trigger_workflow
+ # TEST TEARDOWN
+ >> instantiate_inline_workflow_template
+ )
from tests.system.utils.watcher import watcher
diff --git
a/tests/system/providers/google/cloud/dataproc/example_dataproc_workflow_deferrable.py
b/tests/system/providers/google/cloud/dataproc/example_dataproc_workflow_deferrable.py
index d8dfab2267..843148bf9b 100644
---
a/tests/system/providers/google/cloud/dataproc/example_dataproc_workflow_deferrable.py
+++
b/tests/system/providers/google/cloud/dataproc/example_dataproc_workflow_deferrable.py
@@ -30,11 +30,11 @@ from airflow.providers.google.cloud.operators.dataproc
import (
)
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
-DAG_ID = "dataproc_workflow"
+DAG_ID = "dataproc_workflow_def"
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
REGION = "europe-west1"
-CLUSTER_NAME = f"cluster-dataproc-workflow-{ENV_ID}"
+CLUSTER_NAME = f"{ENV_ID}-{DAG_ID}".replace("_", "-")
CLUSTER_CONFIG = {
"master_config": {
"num_instances": 1,
@@ -66,7 +66,7 @@ with models.DAG(
schedule="@once",
start_date=datetime(2021, 1, 1),
catchup=False,
- tags=["example", "dataproc"],
+ tags=["example", "dataproc", "workflow", "deferrable"],
) as dag:
create_workflow_template = DataprocCreateWorkflowTemplateOperator(
task_id="create_workflow_template",
@@ -94,7 +94,14 @@ with models.DAG(
)
# [END how_to_cloud_dataproc_instantiate_inline_workflow_template_async]
- (create_workflow_template >> trigger_workflow_async >>
instantiate_inline_workflow_template_async)
+ (
+ # TEST SETUP
+ create_workflow_template
+ # TEST BODY
+ >> trigger_workflow_async
+ # TEST TEARDOWN
+ >> instantiate_inline_workflow_template_async
+ )
from tests.system.utils.watcher import watcher