This is an automated email from the ASF dual-hosted git repository.
goenka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 09d4fab Default to Runner v2 for Python Streaming jobs. (#15140)
09d4fab is described below
commit 09d4fab4cba974edaecd6e9c603f1e2a2855e56b
Author: Robert Bradshaw <[email protected]>
AuthorDate: Wed Jul 21 20:01:12 2021 -0700
Default to Runner v2 for Python Streaming jobs. (#15140)
* Default to Runner v2 for Python Streaming jobs.
* Fix test expectations.
* yapf
---
sdks/python/apache_beam/runners/dataflow/dataflow_runner.py | 8 +++++++-
.../apache_beam/runners/dataflow/dataflow_runner_test.py | 11 +++++++++--
2 files changed, 16 insertions(+), 3 deletions(-)
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index c112bdd..0ac8c5f 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -594,9 +594,15 @@ class DataflowRunner(PipelineRunner):
return result
def _maybe_add_unified_worker_missing_options(self, options):
+ debug_options = options.view_as(DebugOptions)
+ # Streaming is always portable, default to runner v2.
+ if options.view_as(StandardOptions).streaming:
+ if not debug_options.lookup_experiment('disable_runner_v2'):
+ debug_options.add_experiment('beam_fn_api')
+ debug_options.add_experiment('use_runner_v2')
+ debug_options.add_experiment('use_portable_job_submission')
# set default beam_fn_api experiment if use unified
# worker experiment flag exists, no-op otherwise.
- debug_options = options.view_as(DebugOptions)
from apache_beam.runners.dataflow.internal import apiclient
if apiclient._use_unified_worker(options):
if not debug_options.lookup_experiment('beam_fn_api'):
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
index ec4edc9..9c7546d 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
@@ -256,6 +256,7 @@ class DataflowRunnerTest(unittest.TestCase,
ExtraAssertionsMixin):
def test_streaming_create_translation(self):
remote_runner = DataflowRunner()
self.default_properties.append("--streaming")
+ self.default_properties.append("--experiments=disable_runner_v2")
with Pipeline(remote_runner, PipelineOptions(self.default_properties)) as
p:
p | ptransform.Create([1]) # pylint: disable=expression-not-assigned
job_dict = json.loads(str(remote_runner.job))
@@ -838,7 +839,8 @@ class DataflowRunnerTest(unittest.TestCase,
ExtraAssertionsMixin):
'Runner determined sharding not available in Dataflow for '
'GroupIntoBatches for jobs not using Runner V2'):
_ = self._run_group_into_batches_and_get_step_properties(
- True, ['--enable_streaming_engine'])
+ True,
+ ['--enable_streaming_engine', '--experiments=disable_runner_v2'])
# JRH
with self.assertRaisesRegex(
@@ -846,7 +848,12 @@ class DataflowRunnerTest(unittest.TestCase,
ExtraAssertionsMixin):
'Runner determined sharding not available in Dataflow for '
'GroupIntoBatches for jobs not using Runner V2'):
_ = self._run_group_into_batches_and_get_step_properties(
- True, ['--enable_streaming_engine', '--experiments=beam_fn_api'])
+ True,
+ [
+ '--enable_streaming_engine',
+ '--experiments=beam_fn_api',
+ '--experiments=disable_runner_v2'
+ ])
def test_pack_combiners(self):
class PackableCombines(beam.PTransform):