This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 4950c62743 Fix the system test: dataflow_native_java (#42099)
4950c62743 is described below

commit 4950c627431c77e2db935a56db3f2f44b1f52fa3
Author: M. Olcay Tercanlı <[email protected]>
AuthorDate: Mon Sep 9 15:06:25 2024 +0200

    Fix the system test: dataflow_native_java (#42099)
---
 .../cloud/dataflow/example_dataflow_native_java.py   | 20 ++++++++++++++------
 1 file changed, 14 insertions(+), 6 deletions(-)

diff --git 
a/tests/system/providers/google/cloud/dataflow/example_dataflow_native_java.py 
b/tests/system/providers/google/cloud/dataflow/example_dataflow_native_java.py
index 9ef2efe3d7..df30a06e6f 100644
--- 
a/tests/system/providers/google/cloud/dataflow/example_dataflow_native_java.py
+++ 
b/tests/system/providers/google/cloud/dataflow/example_dataflow_native_java.py
@@ -52,9 +52,15 @@ BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
 PUBLIC_BUCKET = "airflow-system-tests-resources"
 
 JAR_FILE_NAME = "word-count-beam-bundled-0.1.jar"
+# For the distributed system, we need to store the JAR file in a folder that 
can be accessed by multiple
+# worker.
+# For example in Composer the correct path is 
gcs/data/word-count-beam-bundled-0.1.jar.
+# Because gcs/data/ is shared folder for Airflow's workers.
+IS_COMPOSER = bool(os.environ.get("COMPOSER_ENVIRONMENT", ""))
+LOCAL_JAR = f"gcs/data/{JAR_FILE_NAME}" if IS_COMPOSER else JAR_FILE_NAME
 REMOTE_JAR_FILE_PATH = f"dataflow/java/{JAR_FILE_NAME}"
-GCS_OUTPUT = f"gs://{BUCKET_NAME}"
 GCS_JAR = f"gs://{PUBLIC_BUCKET}/dataflow/java/{JAR_FILE_NAME}"
+GCS_OUTPUT = f"gs://{BUCKET_NAME}"
 LOCATION = "europe-west3"
 
 with DAG(
@@ -70,13 +76,13 @@ with DAG(
         task_id="download_file",
         object_name=REMOTE_JAR_FILE_PATH,
         bucket=PUBLIC_BUCKET,
-        filename=JAR_FILE_NAME,
+        filename=LOCAL_JAR,
     )
 
     # [START howto_operator_start_java_job_local_jar]
     start_java_job_local = BeamRunJavaPipelineOperator(
         task_id="start_java_job_local",
-        jar=JAR_FILE_NAME,
+        jar=LOCAL_JAR,
         pipeline_options={
             "output": GCS_OUTPUT,
         },
@@ -92,16 +98,18 @@ with DAG(
     # [START howto_operator_start_java_job_jar_on_gcs]
     start_java_job = BeamRunJavaPipelineOperator(
         runner=BeamRunnerType.DataflowRunner,
-        task_id="start-java-job",
+        task_id="start_java_job",
         jar=GCS_JAR,
         pipeline_options={
             "output": GCS_OUTPUT,
         },
         job_class="org.apache.beam.examples.WordCount",
         dataflow_config={
+            "job_name": "test-java-pipeline-job",
             "check_if_running": CheckJobRunning.IgnoreJob,
             "location": LOCATION,
             "poll_sleep": 10,
+            "append_job_name": False,
         },
     )
     # [END howto_operator_start_java_job_jar_on_gcs]
@@ -109,14 +117,14 @@ with DAG(
     # [START howto_operator_start_java_job_jar_on_gcs_deferrable]
     start_java_deferrable = BeamRunJavaPipelineOperator(
         runner=BeamRunnerType.DataflowRunner,
-        task_id="start-java-job-deferrable",
+        task_id="start_java_job_deferrable",
         jar=GCS_JAR,
         pipeline_options={
             "output": GCS_OUTPUT,
         },
         job_class="org.apache.beam.examples.WordCount",
         dataflow_config={
-            "job_name": "test-java-pipeline-job",
+            "job_name": "test-deferrable-java-pipeline-job",
             "check_if_running": CheckJobRunning.WaitForRun,
             "location": LOCATION,
             "poll_sleep": 10,

Reply via email to