This is an automated email from the ASF dual-hosted git repository.

goenka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 09d4fab  Default to Runner v2 for Python Streaming jobs. (#15140)
09d4fab is described below

commit 09d4fab4cba974edaecd6e9c603f1e2a2855e56b
Author: Robert Bradshaw <[email protected]>
AuthorDate: Wed Jul 21 20:01:12 2021 -0700

    Default to Runner v2 for Python Streaming jobs. (#15140)
    
    * Default to Runner v2 for Python Streaming jobs.
    
    * Fix test expectations.
    
    * yapf
---
 sdks/python/apache_beam/runners/dataflow/dataflow_runner.py   |  8 +++++++-
 .../apache_beam/runners/dataflow/dataflow_runner_test.py      | 11 +++++++++--
 2 files changed, 16 insertions(+), 3 deletions(-)

diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py 
b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index c112bdd..0ac8c5f 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -594,9 +594,15 @@ class DataflowRunner(PipelineRunner):
     return result
 
   def _maybe_add_unified_worker_missing_options(self, options):
+    debug_options = options.view_as(DebugOptions)
+    # Streaming is always portable, default to runner v2.
+    if options.view_as(StandardOptions).streaming:
+      if not debug_options.lookup_experiment('disable_runner_v2'):
+        debug_options.add_experiment('beam_fn_api')
+        debug_options.add_experiment('use_runner_v2')
+        debug_options.add_experiment('use_portable_job_submission')
     # set default beam_fn_api experiment if use unified
     # worker experiment flag exists, no-op otherwise.
-    debug_options = options.view_as(DebugOptions)
     from apache_beam.runners.dataflow.internal import apiclient
     if apiclient._use_unified_worker(options):
       if not debug_options.lookup_experiment('beam_fn_api'):
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py 
b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
index ec4edc9..9c7546d 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
@@ -256,6 +256,7 @@ class DataflowRunnerTest(unittest.TestCase, 
ExtraAssertionsMixin):
   def test_streaming_create_translation(self):
     remote_runner = DataflowRunner()
     self.default_properties.append("--streaming")
+    self.default_properties.append("--experiments=disable_runner_v2")
     with Pipeline(remote_runner, PipelineOptions(self.default_properties)) as 
p:
       p | ptransform.Create([1])  # pylint: disable=expression-not-assigned
     job_dict = json.loads(str(remote_runner.job))
@@ -838,7 +839,8 @@ class DataflowRunnerTest(unittest.TestCase, 
ExtraAssertionsMixin):
         'Runner determined sharding not available in Dataflow for '
         'GroupIntoBatches for jobs not using Runner V2'):
       _ = self._run_group_into_batches_and_get_step_properties(
-          True, ['--enable_streaming_engine'])
+          True,
+          ['--enable_streaming_engine', '--experiments=disable_runner_v2'])
 
     # JRH
     with self.assertRaisesRegex(
@@ -846,7 +848,12 @@ class DataflowRunnerTest(unittest.TestCase, 
ExtraAssertionsMixin):
         'Runner determined sharding not available in Dataflow for '
         'GroupIntoBatches for jobs not using Runner V2'):
       _ = self._run_group_into_batches_and_get_step_properties(
-          True, ['--enable_streaming_engine', '--experiments=beam_fn_api'])
+          True,
+          [
+              '--enable_streaming_engine',
+              '--experiments=beam_fn_api',
+              '--experiments=disable_runner_v2'
+          ])
 
   def test_pack_combiners(self):
     class PackableCombines(beam.PTransform):

Reply via email to