This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 4950c62743 Fix the system test: dataflow_native_java (#42099)
4950c62743 is described below
commit 4950c627431c77e2db935a56db3f2f44b1f52fa3
Author: M. Olcay Tercanlı <[email protected]>
AuthorDate: Mon Sep 9 15:06:25 2024 +0200
Fix the system test: dataflow_native_java (#42099)
---
.../cloud/dataflow/example_dataflow_native_java.py | 20 ++++++++++++++------
1 file changed, 14 insertions(+), 6 deletions(-)
diff --git
a/tests/system/providers/google/cloud/dataflow/example_dataflow_native_java.py
b/tests/system/providers/google/cloud/dataflow/example_dataflow_native_java.py
index 9ef2efe3d7..df30a06e6f 100644
---
a/tests/system/providers/google/cloud/dataflow/example_dataflow_native_java.py
+++
b/tests/system/providers/google/cloud/dataflow/example_dataflow_native_java.py
@@ -52,9 +52,15 @@ BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
PUBLIC_BUCKET = "airflow-system-tests-resources"
JAR_FILE_NAME = "word-count-beam-bundled-0.1.jar"
+# For the distributed system, we need to store the JAR file in a folder that
can be accessed by multiple
+# worker.
+# For example in Composer the correct path is
gcs/data/word-count-beam-bundled-0.1.jar.
+# Because gcs/data/ is shared folder for Airflow's workers.
+IS_COMPOSER = bool(os.environ.get("COMPOSER_ENVIRONMENT", ""))
+LOCAL_JAR = f"gcs/data/{JAR_FILE_NAME}" if IS_COMPOSER else JAR_FILE_NAME
REMOTE_JAR_FILE_PATH = f"dataflow/java/{JAR_FILE_NAME}"
-GCS_OUTPUT = f"gs://{BUCKET_NAME}"
GCS_JAR = f"gs://{PUBLIC_BUCKET}/dataflow/java/{JAR_FILE_NAME}"
+GCS_OUTPUT = f"gs://{BUCKET_NAME}"
LOCATION = "europe-west3"
with DAG(
@@ -70,13 +76,13 @@ with DAG(
task_id="download_file",
object_name=REMOTE_JAR_FILE_PATH,
bucket=PUBLIC_BUCKET,
- filename=JAR_FILE_NAME,
+ filename=LOCAL_JAR,
)
# [START howto_operator_start_java_job_local_jar]
start_java_job_local = BeamRunJavaPipelineOperator(
task_id="start_java_job_local",
- jar=JAR_FILE_NAME,
+ jar=LOCAL_JAR,
pipeline_options={
"output": GCS_OUTPUT,
},
@@ -92,16 +98,18 @@ with DAG(
# [START howto_operator_start_java_job_jar_on_gcs]
start_java_job = BeamRunJavaPipelineOperator(
runner=BeamRunnerType.DataflowRunner,
- task_id="start-java-job",
+ task_id="start_java_job",
jar=GCS_JAR,
pipeline_options={
"output": GCS_OUTPUT,
},
job_class="org.apache.beam.examples.WordCount",
dataflow_config={
+ "job_name": "test-java-pipeline-job",
"check_if_running": CheckJobRunning.IgnoreJob,
"location": LOCATION,
"poll_sleep": 10,
+ "append_job_name": False,
},
)
# [END howto_operator_start_java_job_jar_on_gcs]
@@ -109,14 +117,14 @@ with DAG(
# [START howto_operator_start_java_job_jar_on_gcs_deferrable]
start_java_deferrable = BeamRunJavaPipelineOperator(
runner=BeamRunnerType.DataflowRunner,
- task_id="start-java-job-deferrable",
+ task_id="start_java_job_deferrable",
jar=GCS_JAR,
pipeline_options={
"output": GCS_OUTPUT,
},
job_class="org.apache.beam.examples.WordCount",
dataflow_config={
- "job_name": "test-java-pipeline-job",
+ "job_name": "test-deferrable-java-pipeline-job",
"check_if_running": CheckJobRunning.WaitForRun,
"location": LOCATION,
"poll_sleep": 10,