This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new bc7c39422fe [Python] Deprecate --enable_streaming_engine flag since it 
is always enabled. (#35917)
bc7c39422fe is described below

commit bc7c39422fed62e4a8401e77b9d270f10f752ed8
Author: Sam Whittle <[email protected]>
AuthorDate: Thu Aug 21 19:19:27 2025 +0200

    [Python] Deprecate --enable_streaming_engine flag since it is always 
enabled. (#35917)
---
 sdks/python/apache_beam/options/pipeline_options.py   |  5 +++--
 .../apache_beam/runners/dataflow/dataflow_runner.py   | 19 ++-----------------
 .../runners/dataflow/dataflow_runner_test.py          |  3 +--
 3 files changed, 6 insertions(+), 21 deletions(-)

diff --git a/sdks/python/apache_beam/options/pipeline_options.py 
b/sdks/python/apache_beam/options/pipeline_options.py
index c3fbdf7c79c..bc2622e454a 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -1021,9 +1021,10 @@ class GoogleCloudOptions(PipelineOptions):
         'updating-a-pipeline')
     parser.add_argument(
         '--enable_streaming_engine',
-        default=False,
+        default=True,
         action='store_true',
-        help='Enable Windmill Service for this Dataflow job. ')
+        help='Deprecated. All Python streaming pipelines on Dataflow'
+        'use Streaming Engine.')
     parser.add_argument(
         '--dataflow_kms_key',
         default=None,
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py 
b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 19302923b1f..4893649b613 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -633,23 +633,8 @@ def _check_and_add_missing_streaming_options(options):
   # Runner v2 only supports using streaming engine (aka windmill service)
   if options.view_as(StandardOptions).streaming:
     debug_options = options.view_as(DebugOptions)
-    google_cloud_options = options.view_as(GoogleCloudOptions)
-    if (not google_cloud_options.enable_streaming_engine and
-        (debug_options.lookup_experiment("enable_windmill_service") or
-         debug_options.lookup_experiment("enable_streaming_engine"))):
-      raise ValueError(
-          """Streaming engine both disabled and enabled:
-          --enable_streaming_engine flag is not set, but
-          enable_windmill_service and/or enable_streaming_engine experiments
-          are present. It is recommended you only set the
-          --enable_streaming_engine flag.""")
-
-    # Ensure that if we detected a streaming pipeline that streaming specific
-    # options and experiments.
-    options.view_as(StandardOptions).streaming = True
-    google_cloud_options.enable_streaming_engine = True
-    debug_options.add_experiment("enable_streaming_engine")
-    debug_options.add_experiment("enable_windmill_service")
+    debug_options.add_experiment('enable_streaming_engine')
+    debug_options.add_experiment('enable_windmill_service')
 
 
 def _is_runner_v2_disabled(options):
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py 
b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
index bb9132bdb96..178a75ec41d 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
@@ -421,10 +421,9 @@ class DataflowRunnerTest(unittest.TestCase, 
ExtraAssertionsMixin):
         'min_cpu_platform=Intel Haswell',
         remote_runner.job.options.view_as(DebugOptions).experiments)
 
-  def test_streaming_engine_flag_adds_windmill_experiments(self):
+  def test_streaming_adds_windmill_experiments(self):
     remote_runner = DataflowRunner()
     self.default_properties.append('--streaming')
-    self.default_properties.append('--enable_streaming_engine')
     self.default_properties.append('--experiment=some_other_experiment')
 
     with Pipeline(remote_runner, PipelineOptions(self.default_properties)) as 
p:

Reply via email to