This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch users/damccorm/reenablePrism
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/users/damccorm/reenablePrism 
by this push:
     new e0421d0466a Reenable prism as default
e0421d0466a is described below

commit e0421d0466a4f2f2c17f7d2c81e5a7306510e2e8
Author: Danny Mccormick <dannymccorm...@google.com>
AuthorDate: Thu Jul 17 14:24:35 2025 -0400

    Reenable prism as default
---
 .../beam_PostCommit_Python_Xlang_Gcp_Direct.json   |  2 +-
 .../apache_beam/runners/direct/direct_runner.py    | 27 +++++++++++-----------
 2 files changed, 15 insertions(+), 14 deletions(-)

diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json 
b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
index afdc7f7012a..2504db607e4 100644
--- a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
+++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
@@ -1,4 +1,4 @@
 {
   "comment": "Modify this file in a trivial way to cause this test suite to 
run",
-  "modification": 11
+  "modification": 12
 }
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py 
b/sdks/python/apache_beam/runners/direct/direct_runner.py
index fcc13ae1024..564a6c7df20 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner.py
@@ -195,21 +195,9 @@ class SwitchingDirectRunner(PipelineRunner):
     # Use BundleBasedDirectRunner if other runners are missing needed features.
     runner = BundleBasedDirectRunner()
 
-    # Check whether all transforms used in the pipeline are supported by the
-    # FnApiRunner, and the pipeline was not meant to be run as streaming.
-    if _FnApiRunnerSupportVisitor().accept(pipeline):
-      from apache_beam.portability.api import beam_provision_api_pb2
-      from apache_beam.runners.portability.fn_api_runner import fn_runner
-      from apache_beam.runners.portability.portable_runner import 
JobServiceHandle
-      all_options = options.get_all_options()
-      encoded_options = JobServiceHandle.encode_pipeline_options(all_options)
-      provision_info = fn_runner.ExtendedProvisionInfo(
-          beam_provision_api_pb2.ProvisionInfo(
-              pipeline_options=encoded_options))
-      runner = fn_runner.FnApiRunner(provision_info=provision_info)
     # Check whether all transforms used in the pipeline are supported by the
     # PrismRunner
-    elif _PrismRunnerSupportVisitor().accept(pipeline, self._is_interactive):
+    if _PrismRunnerSupportVisitor().accept(pipeline, self._is_interactive):
       _LOGGER.info('Running pipeline with PrismRunner.')
       from apache_beam.runners.portability import prism_runner
       runner = prism_runner.PrismRunner()
@@ -233,6 +221,19 @@ class SwitchingDirectRunner(PipelineRunner):
         _LOGGER.info('Falling back to DirectRunner')
         runner = BundleBasedDirectRunner()
 
+    # Check whether all transforms used in the pipeline are supported by the
+    # FnApiRunner, and the pipeline was not meant to be run as streaming.
+    if _FnApiRunnerSupportVisitor().accept(pipeline):
+      from apache_beam.portability.api import beam_provision_api_pb2
+      from apache_beam.runners.portability.fn_api_runner import fn_runner
+      from apache_beam.runners.portability.portable_runner import 
JobServiceHandle
+      all_options = options.get_all_options()
+      encoded_options = JobServiceHandle.encode_pipeline_options(all_options)
+      provision_info = fn_runner.ExtendedProvisionInfo(
+          beam_provision_api_pb2.ProvisionInfo(
+              pipeline_options=encoded_options))
+      runner = fn_runner.FnApiRunner(provision_info=provision_info)
+
     return runner.run_pipeline(pipeline, options)
 
 

Reply via email to