This is an automated email from the ASF dual-hosted git repository.
chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 05699c3 [BEAM-12590] Automatically upgrading Dataflow Python
pipelines that use cross-language transforms to Runner v2 (#15079)
05699c3 is described below
commit 05699c3410dfce3c62013140a57c266a2839bccb
Author: Chamikara Jayalath <[email protected]>
AuthorDate: Mon Jul 12 18:39:41 2021 -0700
[BEAM-12590] Automatically upgrading Dataflow Python pipelines that use
cross-language transforms to Runner v2 (#15079)
* Automatically upgrading Dataflow Python pipelines that use cross-language
transforms to Runner v2.
* Revert change to a test
---
.../apache_beam/runners/dataflow/dataflow_runner.py | 18 ++++++++++++------
1 file changed, 12 insertions(+), 6 deletions(-)
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 2b8aa19..c112bdd 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -408,6 +408,18 @@ class DataflowRunner(PipelineRunner):
'Google Cloud Dataflow runner not available, '
'please install apache_beam[gcp]')
+ debug_options = options.view_as(DebugOptions)
+ if pipeline.contains_external_transforms:
+ if not apiclient._use_unified_worker(options):
+ _LOGGER.info(
+ 'Automatically enabling Dataflow Runner v2 since the '
+ 'pipeline used cross-language transforms.')
+ # This has to be done before any Fn API specific setup.
+ debug_options.add_experiment("use_runner_v2")
+ # Dataflow multi-language pipelines require portable job submission.
+ if not debug_options.lookup_experiment('use_portable_job_submission'):
+ debug_options.add_experiment("use_portable_job_submission")
+
self._maybe_add_unified_worker_missing_options(options)
use_fnapi = apiclient._use_fnapi(options)
@@ -511,12 +523,6 @@ class DataflowRunner(PipelineRunner):
debug_options.add_experiment(
'min_cpu_platform=' + worker_options.min_cpu_platform)
- if (apiclient._use_unified_worker(options) and
- pipeline.contains_external_transforms):
- # All Dataflow multi-language pipelines (supported by Runner v2 only) use
- # portable job submission by default.
- debug_options.add_experiment("use_portable_job_submission")
-
# Elevate "enable_streaming_engine" to pipeline option, but using the
# existing experiment.
google_cloud_options = options.view_as(GoogleCloudOptions)