This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 440c9eb2b4 Fixup system test for DataprocSubmitJobOperator (PySpark 
job) (#32740)
440c9eb2b4 is described below

commit 440c9eb2b4c834d040584a492e23ff4f1212f7f0
Author: max <[email protected]>
AuthorDate: Fri Jul 21 15:35:14 2023 +0200

    Fixup system test for DataprocSubmitJobOperator (PySpark job) (#32740)
---
 .../providers/google/cloud/operators/dataproc.py   |  6 +-
 .../cloud/dataproc/example_dataproc_pyspark.py     | 65 +++++++++++++++++-----
 2 files changed, 56 insertions(+), 15 deletions(-)

diff --git a/airflow/providers/google/cloud/operators/dataproc.py 
b/airflow/providers/google/cloud/operators/dataproc.py
index 5ac1b1cb3d..bf420e76c0 100644
--- a/airflow/providers/google/cloud/operators/dataproc.py
+++ b/airflow/providers/google/cloud/operators/dataproc.py
@@ -1504,6 +1504,10 @@ class 
DataprocSubmitHadoopJobOperator(DataprocJobBaseOperator):
 class DataprocSubmitPySparkJobOperator(DataprocJobBaseOperator):
     """Start a PySpark Job on a Cloud DataProc cluster.
 
+    .. seealso::
+        This operator is deprecated, please use
+        
:class:`~airflow.providers.google.cloud.operators.dataproc.DataprocSubmitJobOperator`:
+
     :param main: [Required] The Hadoop Compatible Filesystem (HCFS) URI of the 
main
             Python file to use as the driver. Must be a .py file. (templated)
     :param arguments: Arguments for the job. (templated)
@@ -1940,7 +1944,7 @@ class DataprocSubmitJobOperator(GoogleCloudBaseOperator):
     :param job: Required. The job resource.
         If a dict is provided, it must be of the same form as the protobuf 
message
         :class:`~google.cloud.dataproc_v1.types.Job`.
-        For the complete list of supported job types please take a look here
+        For the complete list of supported job types and their configurations 
please take a look here
         
https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.jobs
     :param request_id: Optional. A unique id used to identify the request. If 
the server receives two
         ``SubmitJobRequest`` requests with the same id, then the second 
request will be ignored and the first
diff --git 
a/tests/system/providers/google/cloud/dataproc/example_dataproc_pyspark.py 
b/tests/system/providers/google/cloud/dataproc/example_dataproc_pyspark.py
index b23482f5ac..ff3e619d7d 100644
--- a/tests/system/providers/google/cloud/dataproc/example_dataproc_pyspark.py
+++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_pyspark.py
@@ -22,15 +22,18 @@ from __future__ import annotations
 
 import os
 from datetime import datetime
-from pathlib import Path
 
 from airflow import models
+from airflow.decorators import task
 from airflow.providers.google.cloud.operators.dataproc import (
     DataprocCreateClusterOperator,
     DataprocDeleteClusterOperator,
     DataprocSubmitJobOperator,
 )
-from airflow.providers.google.cloud.operators.gcs import 
GCSCreateBucketOperator, GCSDeleteBucketOperator
+from airflow.providers.google.cloud.operators.gcs import (
+    GCSCreateBucketOperator,
+    GCSDeleteBucketOperator,
+)
 from airflow.providers.google.cloud.transfers.local_to_gcs import 
LocalFilesystemToGCSOperator
 from airflow.utils.trigger_rule import TriggerRule
 
@@ -43,11 +46,7 @@ CLUSTER_NAME = f"cluster-dataproc-pyspark-{ENV_ID}"
 REGION = "europe-west1"
 ZONE = "europe-west1-b"
 
-PYSPARK_SRC = str(Path(__file__).parent / "resources" / "hello_world.py")
-PYSPARK_FILE = "hello_world.py"
-
 # Cluster definition
-
 CLUSTER_CONFIG = {
     "master_config": {
         "num_instances": 1,
@@ -61,14 +60,35 @@ CLUSTER_CONFIG = {
     },
 }
 
-TIMEOUT = {"seconds": 1 * 24 * 60 * 60}
+JOB_FILE_NAME = "dataproc-pyspark-job.py"
+JOB_FILE_LOCAL_PATH = f"/tmp/{JOB_FILE_NAME}"
+JOB_FILE_CONTENT = """from operator import add
+from random import random
+
+from pyspark.sql import SparkSession
+
+
+def f(_: int) -> float:
+    x = random() * 2 - 1
+    y = random() * 2 - 1
+    return 1 if x**2 + y**2 <= 1 else 0
+
+
+spark = SparkSession.builder.appName("PythonPi").getOrCreate()
+partitions = 2
+n = 100000 * partitions
+count = spark.sparkContext.parallelize(range(1, n + 1), 
partitions).map(f).reduce(add)
+print(f"Pi is roughly {4.0 * count / n:f}")
+
+spark.stop()
+"""
 
 # Jobs definitions
 # [START how_to_cloud_dataproc_pyspark_config]
 PYSPARK_JOB = {
     "reference": {"project_id": PROJECT_ID},
     "placement": {"cluster_name": CLUSTER_NAME},
-    "pyspark_job": {"main_python_file_uri": 
f"gs://{BUCKET_NAME}/{PYSPARK_FILE}"},
+    "pyspark_job": {"main_python_file_uri": 
f"gs://{BUCKET_NAME}/{JOB_FILE_NAME}"},
 }
 # [END how_to_cloud_dataproc_pyspark_config]
 
@@ -78,15 +98,23 @@ with models.DAG(
     schedule="@once",
     start_date=datetime(2021, 1, 1),
     catchup=False,
-    tags=["example", "dataproc"],
+    tags=["example", "dataproc", "pyspark"],
 ) as dag:
     create_bucket = GCSCreateBucketOperator(
         task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID
     )
+
+    @task
+    def create_job_file():
+        with open(JOB_FILE_LOCAL_PATH, "w") as job_file:
+            job_file.write(JOB_FILE_CONTENT)
+
+    create_job_file_task = create_job_file()
+
     upload_file = LocalFilesystemToGCSOperator(
         task_id="upload_file",
-        src=PYSPARK_SRC,
-        dst=PYSPARK_FILE,
+        src=JOB_FILE_LOCAL_PATH,
+        dst=JOB_FILE_NAME,
         bucket=BUCKET_NAME,
     )
 
@@ -116,14 +144,23 @@ with models.DAG(
         task_id="delete_bucket", bucket_name=BUCKET_NAME, 
trigger_rule=TriggerRule.ALL_DONE
     )
 
+    @task(trigger_rule=TriggerRule.ALL_DONE)
+    def delete_job_file():
+        try:
+            os.remove(JOB_FILE_LOCAL_PATH)
+        except FileNotFoundError:
+            pass
+        return 0
+
+    delete_job_file_task = delete_job_file()
+
     (
         # TEST SETUP
-        create_bucket
-        >> [upload_file, create_cluster]
+        [[create_job_file_task, create_bucket] >> upload_file, create_cluster]
         # TEST BODY
         >> pyspark_task
         # TEST TEARDOWN
-        >> [delete_cluster, delete_bucket]
+        >> [delete_cluster, delete_bucket, delete_job_file_task]
     )
 
     from tests.system.utils.watcher import watcher

Reply via email to