This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 8b7ae76026 Fixup docs and optimize system test for 
DataprocSubmitJobOperator (Hadoop job) (#32722)
8b7ae76026 is described below

commit 8b7ae760261109f1bfa6c60abbbc9803bd93bb74
Author: max <[email protected]>
AuthorDate: Thu Jul 20 19:26:40 2023 +0200

    Fixup docs and optimize system test for DataprocSubmitJobOperator (Hadoop 
job) (#32722)
---
 .../providers/google/cloud/operators/dataproc.py   |  8 +++++-
 .../cloud/dataproc/example_dataproc_hadoop.py      | 32 +++++++---------------
 2 files changed, 17 insertions(+), 23 deletions(-)

diff --git a/airflow/providers/google/cloud/operators/dataproc.py 
b/airflow/providers/google/cloud/operators/dataproc.py
index bac0c4eb9b..3fa6ae1e77 100644
--- a/airflow/providers/google/cloud/operators/dataproc.py
+++ b/airflow/providers/google/cloud/operators/dataproc.py
@@ -1424,6 +1424,10 @@ class 
DataprocSubmitSparkJobOperator(DataprocJobBaseOperator):
 class DataprocSubmitHadoopJobOperator(DataprocJobBaseOperator):
     """Start a Hadoop Job on a Cloud DataProc cluster.
 
+    .. seealso::
+        This operator is deprecated, please use
+        
:class:`~airflow.providers.google.cloud.operators.dataproc.DataprocSubmitJobOperator`:
+
     :param main_jar: The HCFS URI of the jar file containing the main class
         (use this or the main_class, not both together).
     :param main_class: Name of the job class. (use this or the main_jar, not 
both
@@ -1931,7 +1935,9 @@ class DataprocSubmitJobOperator(GoogleCloudBaseOperator):
     :param region: Required. The Cloud Dataproc region in which to handle the 
request.
     :param job: Required. The job resource.
         If a dict is provided, it must be of the same form as the protobuf 
message
-        :class:`~google.cloud.dataproc_v1.types.Job`
+        :class:`~google.cloud.dataproc_v1.types.Job`.
+        For the complete list of supported job types please take a look here
+        
https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.jobs
     :param request_id: Optional. A unique id used to identify the request. If 
the server receives two
         ``SubmitJobRequest`` requests with the same id, then the second 
request will be ignored and the first
         ``Job`` created and stored in the backend is returned.
diff --git 
a/tests/system/providers/google/cloud/dataproc/example_dataproc_hadoop.py 
b/tests/system/providers/google/cloud/dataproc/example_dataproc_hadoop.py
index 1eb0307178..7af2654bfe 100644
--- a/tests/system/providers/google/cloud/dataproc/example_dataproc_hadoop.py
+++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_hadoop.py
@@ -28,7 +28,6 @@ from airflow.providers.google.cloud.operators.dataproc import 
(
     DataprocCreateClusterOperator,
     DataprocDeleteClusterOperator,
     DataprocSubmitJobOperator,
-    DataprocUpdateClusterOperator,
 )
 from airflow.providers.google.cloud.operators.gcs import 
GCSCreateBucketOperator, GCSDeleteBucketOperator
 from airflow.utils.trigger_rule import TriggerRule
@@ -53,20 +52,12 @@ CLUSTER_CONFIG = {
         "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 
1024},
     },
     "worker_config": {
-        "num_instances": 2,
+        "num_instances": 3,
         "machine_type_uri": "n1-standard-4",
         "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 
1024},
     },
 }
 
-# Update options
-CLUSTER_UPDATE = {
-    "config": {"worker_config": {"num_instances": 3}, 
"secondary_worker_config": {"num_instances": 3}}
-}
-UPDATE_MASK = {
-    "paths": ["config.worker_config.num_instances", 
"config.secondary_worker_config.num_instances"]
-}
-
 TIMEOUT = {"seconds": 1 * 24 * 60 * 60}
 
 # Jobs definitions
@@ -87,7 +78,7 @@ with models.DAG(
     schedule="@once",
     start_date=datetime(2021, 1, 1),
     catchup=False,
-    tags=["example", "dataproc"],
+    tags=["example", "dataproc", "hadoop"],
 ) as dag:
     create_bucket = GCSCreateBucketOperator(
         task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID
@@ -101,16 +92,6 @@ with models.DAG(
         cluster_name=CLUSTER_NAME,
     )
 
-    scale_cluster = DataprocUpdateClusterOperator(
-        task_id="scale_cluster",
-        cluster_name=CLUSTER_NAME,
-        cluster=CLUSTER_UPDATE,
-        update_mask=UPDATE_MASK,
-        graceful_decommission_timeout=TIMEOUT,
-        project_id=PROJECT_ID,
-        region=REGION,
-    )
-
     hadoop_task = DataprocSubmitJobOperator(
         task_id="hadoop_task", job=HADOOP_JOB, region=REGION, 
project_id=PROJECT_ID
     )
@@ -127,7 +108,14 @@ with models.DAG(
         task_id="delete_bucket", bucket_name=BUCKET_NAME, 
trigger_rule=TriggerRule.ALL_DONE
     )
 
-    create_bucket >> create_cluster >> scale_cluster >> hadoop_task >> 
delete_cluster >> delete_bucket
+    (
+        # TEST SETUP
+        [create_bucket, create_cluster]
+        # TEST BODY
+        >> hadoop_task
+        # TEST TEARDOWN
+        >> [delete_cluster, delete_bucket]
+    )
 
     from tests.system.utils.watcher import watcher
 

Reply via email to