This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch users/damccorm/prism-revert
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 34fb5d54b2b4dec264d4ac0f5e64bbb068ae4b5e
Author: Danny Mccormick <[email protected]>
AuthorDate: Wed Jul 16 13:27:11 2025 -0400

    Temporarily move back to fnapi runner as default
---
 .../apache_beam/runners/direct/direct_runner.py    | 29 +++++++++++-----------
 1 file changed, 15 insertions(+), 14 deletions(-)

diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py 
b/sdks/python/apache_beam/runners/direct/direct_runner.py
index 564a6c7df20..628be01145c 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner.py
@@ -195,9 +195,23 @@ class SwitchingDirectRunner(PipelineRunner):
     # Use BundleBasedDirectRunner if other runners are missing needed features.
     runner = BundleBasedDirectRunner()
 
+
+
+    # Check whether all transforms used in the pipeline are supported by the
+    # FnApiRunner, and the pipeline was not meant to be run as streaming.
+    if _FnApiRunnerSupportVisitor().accept(pipeline):
+      from apache_beam.portability.api import beam_provision_api_pb2
+      from apache_beam.runners.portability.fn_api_runner import fn_runner
+      from apache_beam.runners.portability.portable_runner import 
JobServiceHandle
+      all_options = options.get_all_options()
+      encoded_options = JobServiceHandle.encode_pipeline_options(all_options)
+      provision_info = fn_runner.ExtendedProvisionInfo(
+          beam_provision_api_pb2.ProvisionInfo(
+              pipeline_options=encoded_options))
+      runner = fn_runner.FnApiRunner(provision_info=provision_info)
     # Check whether all transforms used in the pipeline are supported by the
     # PrismRunner
-    if _PrismRunnerSupportVisitor().accept(pipeline, self._is_interactive):
+    elif _PrismRunnerSupportVisitor().accept(pipeline, self._is_interactive):
       _LOGGER.info('Running pipeline with PrismRunner.')
       from apache_beam.runners.portability import prism_runner
       runner = prism_runner.PrismRunner()
@@ -221,19 +235,6 @@ class SwitchingDirectRunner(PipelineRunner):
         _LOGGER.info('Falling back to DirectRunner')
         runner = BundleBasedDirectRunner()
 
-    # Check whether all transforms used in the pipeline are supported by the
-    # FnApiRunner, and the pipeline was not meant to be run as streaming.
-    if _FnApiRunnerSupportVisitor().accept(pipeline):
-      from apache_beam.portability.api import beam_provision_api_pb2
-      from apache_beam.runners.portability.fn_api_runner import fn_runner
-      from apache_beam.runners.portability.portable_runner import 
JobServiceHandle
-      all_options = options.get_all_options()
-      encoded_options = JobServiceHandle.encode_pipeline_options(all_options)
-      provision_info = fn_runner.ExtendedProvisionInfo(
-          beam_provision_api_pb2.ProvisionInfo(
-              pipeline_options=encoded_options))
-      runner = fn_runner.FnApiRunner(provision_info=provision_info)
-
     return runner.run_pipeline(pipeline, options)
 
 

Reply via email to