This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 440c9eb2b4 Fixup system test for DataprocSubmitJobOperator (PySpark
job) (#32740)
440c9eb2b4 is described below
commit 440c9eb2b4c834d040584a492e23ff4f1212f7f0
Author: max <[email protected]>
AuthorDate: Fri Jul 21 15:35:14 2023 +0200
Fixup system test for DataprocSubmitJobOperator (PySpark job) (#32740)
---
.../providers/google/cloud/operators/dataproc.py | 6 +-
.../cloud/dataproc/example_dataproc_pyspark.py | 65 +++++++++++++++++-----
2 files changed, 56 insertions(+), 15 deletions(-)
diff --git a/airflow/providers/google/cloud/operators/dataproc.py
b/airflow/providers/google/cloud/operators/dataproc.py
index 5ac1b1cb3d..bf420e76c0 100644
--- a/airflow/providers/google/cloud/operators/dataproc.py
+++ b/airflow/providers/google/cloud/operators/dataproc.py
@@ -1504,6 +1504,10 @@ class
DataprocSubmitHadoopJobOperator(DataprocJobBaseOperator):
class DataprocSubmitPySparkJobOperator(DataprocJobBaseOperator):
"""Start a PySpark Job on a Cloud DataProc cluster.
+ .. seealso::
+ This operator is deprecated, please use
+
:class:`~airflow.providers.google.cloud.operators.dataproc.DataprocSubmitJobOperator`:
+
:param main: [Required] The Hadoop Compatible Filesystem (HCFS) URI of the
main
Python file to use as the driver. Must be a .py file. (templated)
:param arguments: Arguments for the job. (templated)
@@ -1940,7 +1944,7 @@ class DataprocSubmitJobOperator(GoogleCloudBaseOperator):
:param job: Required. The job resource.
If a dict is provided, it must be of the same form as the protobuf
message
:class:`~google.cloud.dataproc_v1.types.Job`.
- For the complete list of supported job types please take a look here
+ For the complete list of supported job types and their configurations
please take a look here
https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.jobs
:param request_id: Optional. A unique id used to identify the request. If
the server receives two
``SubmitJobRequest`` requests with the same id, then the second
request will be ignored and the first
diff --git
a/tests/system/providers/google/cloud/dataproc/example_dataproc_pyspark.py
b/tests/system/providers/google/cloud/dataproc/example_dataproc_pyspark.py
index b23482f5ac..ff3e619d7d 100644
--- a/tests/system/providers/google/cloud/dataproc/example_dataproc_pyspark.py
+++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_pyspark.py
@@ -22,15 +22,18 @@ from __future__ import annotations
import os
from datetime import datetime
-from pathlib import Path
from airflow import models
+from airflow.decorators import task
from airflow.providers.google.cloud.operators.dataproc import (
DataprocCreateClusterOperator,
DataprocDeleteClusterOperator,
DataprocSubmitJobOperator,
)
-from airflow.providers.google.cloud.operators.gcs import
GCSCreateBucketOperator, GCSDeleteBucketOperator
+from airflow.providers.google.cloud.operators.gcs import (
+ GCSCreateBucketOperator,
+ GCSDeleteBucketOperator,
+)
from airflow.providers.google.cloud.transfers.local_to_gcs import
LocalFilesystemToGCSOperator
from airflow.utils.trigger_rule import TriggerRule
@@ -43,11 +46,7 @@ CLUSTER_NAME = f"cluster-dataproc-pyspark-{ENV_ID}"
REGION = "europe-west1"
ZONE = "europe-west1-b"
-PYSPARK_SRC = str(Path(__file__).parent / "resources" / "hello_world.py")
-PYSPARK_FILE = "hello_world.py"
-
# Cluster definition
-
CLUSTER_CONFIG = {
"master_config": {
"num_instances": 1,
@@ -61,14 +60,35 @@ CLUSTER_CONFIG = {
},
}
-TIMEOUT = {"seconds": 1 * 24 * 60 * 60}
+JOB_FILE_NAME = "dataproc-pyspark-job.py"
+JOB_FILE_LOCAL_PATH = f"/tmp/{JOB_FILE_NAME}"
+JOB_FILE_CONTENT = """from operator import add
+from random import random
+
+from pyspark.sql import SparkSession
+
+
+def f(_: int) -> float:
+ x = random() * 2 - 1
+ y = random() * 2 - 1
+ return 1 if x**2 + y**2 <= 1 else 0
+
+
+spark = SparkSession.builder.appName("PythonPi").getOrCreate()
+partitions = 2
+n = 100000 * partitions
+count = spark.sparkContext.parallelize(range(1, n + 1),
partitions).map(f).reduce(add)
+print(f"Pi is roughly {4.0 * count / n:f}")
+
+spark.stop()
+"""
# Jobs definitions
# [START how_to_cloud_dataproc_pyspark_config]
PYSPARK_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
- "pyspark_job": {"main_python_file_uri":
f"gs://{BUCKET_NAME}/{PYSPARK_FILE}"},
+ "pyspark_job": {"main_python_file_uri":
f"gs://{BUCKET_NAME}/{JOB_FILE_NAME}"},
}
# [END how_to_cloud_dataproc_pyspark_config]
@@ -78,15 +98,23 @@ with models.DAG(
schedule="@once",
start_date=datetime(2021, 1, 1),
catchup=False,
- tags=["example", "dataproc"],
+ tags=["example", "dataproc", "pyspark"],
) as dag:
create_bucket = GCSCreateBucketOperator(
task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID
)
+
+ @task
+ def create_job_file():
+ with open(JOB_FILE_LOCAL_PATH, "w") as job_file:
+ job_file.write(JOB_FILE_CONTENT)
+
+ create_job_file_task = create_job_file()
+
upload_file = LocalFilesystemToGCSOperator(
task_id="upload_file",
- src=PYSPARK_SRC,
- dst=PYSPARK_FILE,
+ src=JOB_FILE_LOCAL_PATH,
+ dst=JOB_FILE_NAME,
bucket=BUCKET_NAME,
)
@@ -116,14 +144,23 @@ with models.DAG(
task_id="delete_bucket", bucket_name=BUCKET_NAME,
trigger_rule=TriggerRule.ALL_DONE
)
+ @task(trigger_rule=TriggerRule.ALL_DONE)
+ def delete_job_file():
+ try:
+ os.remove(JOB_FILE_LOCAL_PATH)
+ except FileNotFoundError:
+ pass
+ return 0
+
+ delete_job_file_task = delete_job_file()
+
(
# TEST SETUP
- create_bucket
- >> [upload_file, create_cluster]
+ [[create_job_file_task, create_bucket] >> upload_file, create_cluster]
# TEST BODY
>> pyspark_task
# TEST TEARDOWN
- >> [delete_cluster, delete_bucket]
+ >> [delete_cluster, delete_bucket, delete_job_file_task]
)
from tests.system.utils.watcher import watcher