This is an automated email from the ASF dual-hosted git repository.

goenka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new a88b2a109e1 Factors enable_prime flag in when checking 
use_unified_worker conditions (#17271)
a88b2a109e1 is described below

commit a88b2a109e1647337a326d0d792024ee6661ba19
Author: Yichi Zhang <[email protected]>
AuthorDate: Thu Apr 7 09:43:51 2022 -0700

    Factors enable_prime flag in when checking use_unified_worker conditions 
(#17271)
    
    * Factors enable_prime flag in when checking use_unified_worker conditions
    
    * Address comments
---
 .../apache_beam/runners/dataflow/internal/apiclient.py  | 11 +++++++++++
 .../runners/dataflow/internal/apiclient_test.py         | 17 +++++++++++++++++
 2 files changed, 28 insertions(+)

diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py 
b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index 771d98d7763..6ac8f778959 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -1174,11 +1174,22 @@ def _use_unified_worker(pipeline_options):
   debug_options = pipeline_options.view_as(DebugOptions)
   use_unified_worker_flag = 'use_unified_worker'
   use_runner_v2_flag = 'use_runner_v2'
+  enable_prime_flag = 'enable_prime'
 
   if (debug_options.lookup_experiment(use_runner_v2_flag) and
       not debug_options.lookup_experiment(use_unified_worker_flag)):
     debug_options.add_experiment(use_unified_worker_flag)
 
+  dataflow_service_options = pipeline_options.view_as(
+      GoogleCloudOptions).dataflow_service_options or []
+  if ((debug_options.lookup_experiment(enable_prime_flag) or
+       enable_prime_flag in dataflow_service_options) and
+      not any([debug_options.lookup_experiment('disable_prime_runner_v2'),
+               debug_options.lookup_experiment('disable_runner_v2')])):
+    debug_options.add_experiment(use_runner_v2_flag)
+    debug_options.add_experiment(use_unified_worker_flag)
+    debug_options.add_experiment(enable_prime_flag)
+
   return debug_options.lookup_experiment(use_unified_worker_flag)
 
 
diff --git 
a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py 
b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
index 0e0a9e02197..49180c77ad7 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
@@ -1003,6 +1003,23 @@ class UtilTest(unittest.TestCase):
         ['--experiments=use_runner_v2', '--experiments=beam_fn_api'])
     self.assertTrue(apiclient._use_unified_worker(pipeline_options))
 
+    pipeline_options = PipelineOptions(['--experiments=enable_prime'])
+    self.assertTrue(apiclient._use_unified_worker(pipeline_options))
+
+    pipeline_options = PipelineOptions(
+        ['--dataflow_service_options=enable_prime'])
+    self.assertTrue(apiclient._use_unified_worker(pipeline_options))
+
+    pipeline_options = PipelineOptions([
+        '--dataflow_service_options=enable_prime',
+        '--experiments=disable_prime_runner_v2'
+    ])
+    self.assertFalse(apiclient._use_unified_worker(pipeline_options))
+
+    pipeline_options = PipelineOptions(
+        ['--experiments=enable_prime', 
'--experiments=disable_prime_runner_v2'])
+    self.assertFalse(apiclient._use_unified_worker(pipeline_options))
+
     pipeline_options = PipelineOptions([
         '--experiments=use_unified_worker',
         '--experiments=use_runner_v2',

Reply via email to