This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 8b7ae76026 Fixup docs and optimize system test for
DataprocSubmitJobOperator (Hadoop job) (#32722)
8b7ae76026 is described below
commit 8b7ae760261109f1bfa6c60abbbc9803bd93bb74
Author: max <[email protected]>
AuthorDate: Thu Jul 20 19:26:40 2023 +0200
Fixup docs and optimize system test for DataprocSubmitJobOperator (Hadoop
job) (#32722)
---
.../providers/google/cloud/operators/dataproc.py | 8 +++++-
.../cloud/dataproc/example_dataproc_hadoop.py | 32 +++++++---------------
2 files changed, 17 insertions(+), 23 deletions(-)
diff --git a/airflow/providers/google/cloud/operators/dataproc.py
b/airflow/providers/google/cloud/operators/dataproc.py
index bac0c4eb9b..3fa6ae1e77 100644
--- a/airflow/providers/google/cloud/operators/dataproc.py
+++ b/airflow/providers/google/cloud/operators/dataproc.py
@@ -1424,6 +1424,10 @@ class
DataprocSubmitSparkJobOperator(DataprocJobBaseOperator):
class DataprocSubmitHadoopJobOperator(DataprocJobBaseOperator):
"""Start a Hadoop Job on a Cloud DataProc cluster.
+ .. seealso::
+ This operator is deprecated, please use
+
:class:`~airflow.providers.google.cloud.operators.dataproc.DataprocSubmitJobOperator`:
+
:param main_jar: The HCFS URI of the jar file containing the main class
(use this or the main_class, not both together).
:param main_class: Name of the job class. (use this or the main_jar, not
both
@@ -1931,7 +1935,9 @@ class DataprocSubmitJobOperator(GoogleCloudBaseOperator):
:param region: Required. The Cloud Dataproc region in which to handle the
request.
:param job: Required. The job resource.
If a dict is provided, it must be of the same form as the protobuf
message
- :class:`~google.cloud.dataproc_v1.types.Job`
+ :class:`~google.cloud.dataproc_v1.types.Job`.
+ For the complete list of supported job types please take a look here
+
https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.jobs
:param request_id: Optional. A unique id used to identify the request. If
the server receives two
``SubmitJobRequest`` requests with the same id, then the second
request will be ignored and the first
``Job`` created and stored in the backend is returned.
diff --git
a/tests/system/providers/google/cloud/dataproc/example_dataproc_hadoop.py
b/tests/system/providers/google/cloud/dataproc/example_dataproc_hadoop.py
index 1eb0307178..7af2654bfe 100644
--- a/tests/system/providers/google/cloud/dataproc/example_dataproc_hadoop.py
+++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_hadoop.py
@@ -28,7 +28,6 @@ from airflow.providers.google.cloud.operators.dataproc import
(
DataprocCreateClusterOperator,
DataprocDeleteClusterOperator,
DataprocSubmitJobOperator,
- DataprocUpdateClusterOperator,
)
from airflow.providers.google.cloud.operators.gcs import
GCSCreateBucketOperator, GCSDeleteBucketOperator
from airflow.utils.trigger_rule import TriggerRule
@@ -53,20 +52,12 @@ CLUSTER_CONFIG = {
"disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb":
1024},
},
"worker_config": {
- "num_instances": 2,
+ "num_instances": 3,
"machine_type_uri": "n1-standard-4",
"disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb":
1024},
},
}
-# Update options
-CLUSTER_UPDATE = {
- "config": {"worker_config": {"num_instances": 3},
"secondary_worker_config": {"num_instances": 3}}
-}
-UPDATE_MASK = {
- "paths": ["config.worker_config.num_instances",
"config.secondary_worker_config.num_instances"]
-}
-
TIMEOUT = {"seconds": 1 * 24 * 60 * 60}
# Jobs definitions
@@ -87,7 +78,7 @@ with models.DAG(
schedule="@once",
start_date=datetime(2021, 1, 1),
catchup=False,
- tags=["example", "dataproc"],
+ tags=["example", "dataproc", "hadoop"],
) as dag:
create_bucket = GCSCreateBucketOperator(
task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID
@@ -101,16 +92,6 @@ with models.DAG(
cluster_name=CLUSTER_NAME,
)
- scale_cluster = DataprocUpdateClusterOperator(
- task_id="scale_cluster",
- cluster_name=CLUSTER_NAME,
- cluster=CLUSTER_UPDATE,
- update_mask=UPDATE_MASK,
- graceful_decommission_timeout=TIMEOUT,
- project_id=PROJECT_ID,
- region=REGION,
- )
-
hadoop_task = DataprocSubmitJobOperator(
task_id="hadoop_task", job=HADOOP_JOB, region=REGION,
project_id=PROJECT_ID
)
@@ -127,7 +108,14 @@ with models.DAG(
task_id="delete_bucket", bucket_name=BUCKET_NAME,
trigger_rule=TriggerRule.ALL_DONE
)
- create_bucket >> create_cluster >> scale_cluster >> hadoop_task >>
delete_cluster >> delete_bucket
+ (
+ # TEST SETUP
+ [create_bucket, create_cluster]
+ # TEST BODY
+ >> hadoop_task
+ # TEST TEARDOWN
+ >> [delete_cluster, delete_bucket]
+ )
from tests.system.utils.watcher import watcher