This is an automated email from the ASF dual-hosted git repository.
goenka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new a88b2a109e1 Factors enable_prime flag in when checking
use_unified_worker conditions (#17271)
a88b2a109e1 is described below
commit a88b2a109e1647337a326d0d792024ee6661ba19
Author: Yichi Zhang <[email protected]>
AuthorDate: Thu Apr 7 09:43:51 2022 -0700
Factors enable_prime flag in when checking use_unified_worker conditions
(#17271)
* Factors enable_prime flag in when checking use_unified_worker conditions
* Address comments
---
.../apache_beam/runners/dataflow/internal/apiclient.py | 11 +++++++++++
.../runners/dataflow/internal/apiclient_test.py | 17 +++++++++++++++++
2 files changed, 28 insertions(+)
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index 771d98d7763..6ac8f778959 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -1174,11 +1174,22 @@ def _use_unified_worker(pipeline_options):
debug_options = pipeline_options.view_as(DebugOptions)
use_unified_worker_flag = 'use_unified_worker'
use_runner_v2_flag = 'use_runner_v2'
+ enable_prime_flag = 'enable_prime'
if (debug_options.lookup_experiment(use_runner_v2_flag) and
not debug_options.lookup_experiment(use_unified_worker_flag)):
debug_options.add_experiment(use_unified_worker_flag)
+ dataflow_service_options = pipeline_options.view_as(
+ GoogleCloudOptions).dataflow_service_options or []
+ if ((debug_options.lookup_experiment(enable_prime_flag) or
+ enable_prime_flag in dataflow_service_options) and
+ not any([debug_options.lookup_experiment('disable_prime_runner_v2'),
+ debug_options.lookup_experiment('disable_runner_v2')])):
+ debug_options.add_experiment(use_runner_v2_flag)
+ debug_options.add_experiment(use_unified_worker_flag)
+ debug_options.add_experiment(enable_prime_flag)
+
return debug_options.lookup_experiment(use_unified_worker_flag)
diff --git
a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
index 0e0a9e02197..49180c77ad7 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
@@ -1003,6 +1003,23 @@ class UtilTest(unittest.TestCase):
['--experiments=use_runner_v2', '--experiments=beam_fn_api'])
self.assertTrue(apiclient._use_unified_worker(pipeline_options))
+ pipeline_options = PipelineOptions(['--experiments=enable_prime'])
+ self.assertTrue(apiclient._use_unified_worker(pipeline_options))
+
+ pipeline_options = PipelineOptions(
+ ['--dataflow_service_options=enable_prime'])
+ self.assertTrue(apiclient._use_unified_worker(pipeline_options))
+
+ pipeline_options = PipelineOptions([
+ '--dataflow_service_options=enable_prime',
+ '--experiments=disable_prime_runner_v2'
+ ])
+ self.assertFalse(apiclient._use_unified_worker(pipeline_options))
+
+ pipeline_options = PipelineOptions(
+ ['--experiments=enable_prime',
'--experiments=disable_prime_runner_v2'])
+ self.assertFalse(apiclient._use_unified_worker(pipeline_options))
+
pipeline_options = PipelineOptions([
'--experiments=use_unified_worker',
'--experiments=use_runner_v2',