This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 44dd6999a13 Remove tasks which use DirectRunner from Dataflow system
tests (#57803)
44dd6999a13 is described below
commit 44dd6999a13457473ceb39b7023ebc5d561e02a1
Author: Maksim <[email protected]>
AuthorDate: Tue Nov 4 19:01:08 2025 +0100
Remove tasks which use DirectRunner from Dataflow system tests (#57803)
---
providers/google/docs/operators/cloud/dataflow.rst | 8 ----
.../cloud/dataflow/example_dataflow_native_java.py | 49 ----------------------
.../dataflow/example_dataflow_native_python.py | 27 ------------
3 files changed, 84 deletions(-)
diff --git a/providers/google/docs/operators/cloud/dataflow.rst
b/providers/google/docs/operators/cloud/dataflow.rst
index 6483912381e..393fb614c22 100644
--- a/providers/google/docs/operators/cloud/dataflow.rst
+++ b/providers/google/docs/operators/cloud/dataflow.rst
@@ -144,14 +144,6 @@ Here is an example of creating and running a pipeline in
Java with jar stored on
:start-after: [START howto_operator_start_java_job_jar_on_gcs_deferrable]
:end-before: [END howto_operator_start_java_job_jar_on_gcs_deferrable]
-Here is an example of creating and running a pipeline in Java with jar stored
on local file system:
-
-.. exampleinclude::
/../../google/tests/system/google/cloud/dataflow/example_dataflow_native_java.py
- :language: python
- :dedent: 4
- :start-after: [START howto_operator_start_java_job_local_jar]
- :end-before: [END howto_operator_start_java_job_local_jar]
-
Here is an example of creating and running a streaming pipeline in Java with
jar stored on GCS:
.. exampleinclude::
/../../google/tests/system/google/cloud/dataflow/example_dataflow_java_streaming.py
diff --git
a/providers/google/tests/system/google/cloud/dataflow/example_dataflow_native_java.py
b/providers/google/tests/system/google/cloud/dataflow/example_dataflow_native_java.py
index ed74ede79d4..a88366bd3eb 100644
---
a/providers/google/tests/system/google/cloud/dataflow/example_dataflow_native_java.py
+++
b/providers/google/tests/system/google/cloud/dataflow/example_dataflow_native_java.py
@@ -42,7 +42,6 @@ from airflow.providers.apache.beam.hooks.beam import
BeamRunnerType
from airflow.providers.apache.beam.operators.beam import
BeamRunJavaPipelineOperator
from airflow.providers.google.cloud.operators.dataflow import CheckJobRunning
from airflow.providers.google.cloud.operators.gcs import
GCSCreateBucketOperator, GCSDeleteBucketOperator
-from airflow.providers.google.cloud.transfers.gcs_to_local import
GCSToLocalFilesystemOperator
try:
from airflow.sdk import TriggerRule
@@ -57,13 +56,6 @@ BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
PUBLIC_BUCKET = "airflow-system-tests-resources"
JAR_FILE_NAME = "word-count-beam-bundled-0.1.jar"
-# For the distributed system, we need to store the JAR file in a folder that
can be accessed by multiple
-# worker.
-# For example in Composer the correct path is
gcs/data/word-count-beam-bundled-0.1.jar.
-# Because gcs/data/ is shared folder for Airflow's workers.
-IS_COMPOSER = bool(os.environ.get("COMPOSER_ENVIRONMENT", ""))
-LOCAL_JAR = f"gcs/data/{JAR_FILE_NAME}" if IS_COMPOSER else JAR_FILE_NAME
-REMOTE_JAR_FILE_PATH = f"dataflow/java/{JAR_FILE_NAME}"
GCS_JAR = f"gs://{PUBLIC_BUCKET}/dataflow/java/{JAR_FILE_NAME}"
GCS_OUTPUT = f"gs://{BUCKET_NAME}"
LOCATION = "europe-west3"
@@ -77,44 +69,6 @@ with DAG(
) as dag:
create_bucket = GCSCreateBucketOperator(task_id="create_bucket",
bucket_name=BUCKET_NAME)
- download_file = GCSToLocalFilesystemOperator(
- task_id="download_file",
- object_name=REMOTE_JAR_FILE_PATH,
- bucket=PUBLIC_BUCKET,
- filename=LOCAL_JAR,
- )
-
- # [START howto_operator_start_java_job_local_jar]
- start_java_job_direct = BeamRunJavaPipelineOperator(
- task_id="start_java_job_direct",
- jar=GCS_JAR,
- pipeline_options={
- "output": GCS_OUTPUT,
- },
- job_class="org.apache.beam.examples.WordCount",
- dataflow_config={
- "check_if_running": CheckJobRunning.WaitForRun,
- "location": LOCATION,
- "poll_sleep": 10,
- },
- )
- # [END howto_operator_start_java_job_local_jar]
-
- start_java_job_direct_deferrable = BeamRunJavaPipelineOperator(
- task_id="start_java_job_direct_deferrable",
- jar=LOCAL_JAR,
- pipeline_options={
- "output": GCS_OUTPUT,
- },
- job_class="org.apache.beam.examples.WordCount",
- dataflow_config={
- "check_if_running": CheckJobRunning.WaitForRun,
- "location": LOCATION,
- "poll_sleep": 10,
- },
- deferrable=True,
- )
-
# [START howto_operator_start_java_job_jar_on_gcs]
start_java_job_dataflow = BeamRunJavaPipelineOperator(
runner=BeamRunnerType.DataflowRunner,
@@ -161,11 +115,8 @@ with DAG(
(
# TEST SETUP
create_bucket
- >> download_file
# TEST BODY
>> [
- start_java_job_direct,
- start_java_job_direct_deferrable,
start_java_job_dataflow,
start_java_job_dataflow_deferrable,
]
diff --git
a/providers/google/tests/system/google/cloud/dataflow/example_dataflow_native_python.py
b/providers/google/tests/system/google/cloud/dataflow/example_dataflow_native_python.py
index 05d4511663a..93980bf9ff7 100644
---
a/providers/google/tests/system/google/cloud/dataflow/example_dataflow_native_python.py
+++
b/providers/google/tests/system/google/cloud/dataflow/example_dataflow_native_python.py
@@ -80,31 +80,6 @@ with DAG(
)
# [END howto_operator_start_python_job]
- start_python_job_direct = BeamRunPythonPipelineOperator(
- task_id="start_python_job_direct",
- py_file="apache_beam.examples.wordcount",
- py_options=["-m"],
- pipeline_options={
- "output": GCS_OUTPUT,
- },
- py_requirements=["apache-beam[gcp]==2.67.0"],
- py_interpreter="python3",
- py_system_site_packages=False,
- )
-
- start_python_job_direct_deferrable = BeamRunPythonPipelineOperator(
- task_id="start_python_job_direct_deferrable",
- py_file="apache_beam.examples.wordcount",
- py_options=["-m"],
- pipeline_options={
- "output": GCS_OUTPUT,
- },
- py_requirements=["apache-beam[gcp]==2.67.0"],
- py_interpreter="python3",
- py_system_site_packages=False,
- deferrable=True,
- )
-
start_python_job_dataflow_deferrable = BeamRunPythonPipelineOperator(
runner=BeamRunnerType.DataflowRunner,
task_id="start_python_job_dataflow_deferrable",
@@ -138,8 +113,6 @@ with DAG(
# TEST BODY
>> [
start_python_job_dataflow,
- start_python_job_direct,
- start_python_job_direct_deferrable,
start_python_job_dataflow_deferrable,
]
>> stop_dataflow_job