This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new bc7c39422fe [Python] Deprecate --enable_streaming_engine flag since it
is always enabled. (#35917)
bc7c39422fe is described below
commit bc7c39422fed62e4a8401e77b9d270f10f752ed8
Author: Sam Whittle <[email protected]>
AuthorDate: Thu Aug 21 19:19:27 2025 +0200
[Python] Deprecate --enable_streaming_engine flag since it is always
enabled. (#35917)
---
sdks/python/apache_beam/options/pipeline_options.py | 5 +++--
.../apache_beam/runners/dataflow/dataflow_runner.py | 19 ++-----------------
.../runners/dataflow/dataflow_runner_test.py | 3 +--
3 files changed, 6 insertions(+), 21 deletions(-)
diff --git a/sdks/python/apache_beam/options/pipeline_options.py
b/sdks/python/apache_beam/options/pipeline_options.py
index c3fbdf7c79c..bc2622e454a 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -1021,9 +1021,10 @@ class GoogleCloudOptions(PipelineOptions):
'updating-a-pipeline')
parser.add_argument(
'--enable_streaming_engine',
- default=False,
+ default=True,
action='store_true',
- help='Enable Windmill Service for this Dataflow job. ')
+ help='Deprecated. All Python streaming pipelines on Dataflow'
+ 'use Streaming Engine.')
parser.add_argument(
'--dataflow_kms_key',
default=None,
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 19302923b1f..4893649b613 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -633,23 +633,8 @@ def _check_and_add_missing_streaming_options(options):
# Runner v2 only supports using streaming engine (aka windmill service)
if options.view_as(StandardOptions).streaming:
debug_options = options.view_as(DebugOptions)
- google_cloud_options = options.view_as(GoogleCloudOptions)
- if (not google_cloud_options.enable_streaming_engine and
- (debug_options.lookup_experiment("enable_windmill_service") or
- debug_options.lookup_experiment("enable_streaming_engine"))):
- raise ValueError(
- """Streaming engine both disabled and enabled:
- --enable_streaming_engine flag is not set, but
- enable_windmill_service and/or enable_streaming_engine experiments
- are present. It is recommended you only set the
- --enable_streaming_engine flag.""")
-
- # Ensure that if we detected a streaming pipeline that streaming specific
- # options and experiments.
- options.view_as(StandardOptions).streaming = True
- google_cloud_options.enable_streaming_engine = True
- debug_options.add_experiment("enable_streaming_engine")
- debug_options.add_experiment("enable_windmill_service")
+ debug_options.add_experiment('enable_streaming_engine')
+ debug_options.add_experiment('enable_windmill_service')
def _is_runner_v2_disabled(options):
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
index bb9132bdb96..178a75ec41d 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
@@ -421,10 +421,9 @@ class DataflowRunnerTest(unittest.TestCase,
ExtraAssertionsMixin):
'min_cpu_platform=Intel Haswell',
remote_runner.job.options.view_as(DebugOptions).experiments)
- def test_streaming_engine_flag_adds_windmill_experiments(self):
+ def test_streaming_adds_windmill_experiments(self):
remote_runner = DataflowRunner()
self.default_properties.append('--streaming')
- self.default_properties.append('--enable_streaming_engine')
self.default_properties.append('--experiment=some_other_experiment')
with Pipeline(remote_runner, PipelineOptions(self.default_properties)) as
p: