This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch users/damccorm/prismByDefault
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 4d9c497900e47830b5cc8b651c7b5d14feec8519
Author: Danny Mccormick <[email protected]>
AuthorDate: Fri Apr 11 15:50:55 2025 -0400

    Enable prism by default
---
 .../apache_beam/runners/direct/direct_runner.py       | 19 ++++++++++++-------
 1 file changed, 12 insertions(+), 7 deletions(-)

diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py 
b/sdks/python/apache_beam/runners/direct/direct_runner.py
index 8b893765368..ce82eb10336 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner.py
@@ -52,6 +52,7 @@ from apache_beam.transforms.core import ParDo
 from apache_beam.transforms.ptransform import PTransform
 from apache_beam.transforms.timeutil import TimeDomain
 from apache_beam.typehints import trivial_inference
+from apache_beam.utils.interactive_utils import is_in_ipython
 
 __all__ = ['BundleBasedDirectRunner', 'DirectRunner', 'SwitchingDirectRunner']
 
@@ -114,7 +115,11 @@ class SwitchingDirectRunner(PipelineRunner):
       """Visitor determining if a Pipeline can be run on the PrismRunner."""
       def accept(self, pipeline):
         self.supported_by_prism_runner = True
-        pipeline.visit(self)
+        # TODO(https://github.com/apache/beam/issues/33623): Prism currently 
does not support interactive mode
+        if is_in_ipython():
+          self.supported_by_prism_runner = False
+        else:
+          pipeline.visit(self)
         return self.supported_by_prism_runner
 
       def visit_transform(self, applied_ptransform):
@@ -144,7 +149,12 @@ class SwitchingDirectRunner(PipelineRunner):
     tryingPrism = False
     # Check whether all transforms used in the pipeline are supported by the
     # FnApiRunner, and the pipeline was not meant to be run as streaming.
-    if _FnApiRunnerSupportVisitor().accept(pipeline):
+    if _PrismRunnerSupportVisitor().accept(pipeline):
+      _LOGGER.info('Running pipeline with PrismRunner.')
+      from apache_beam.runners.portability import prism_runner
+      runner = prism_runner.PrismRunner()
+      tryingPrism = True
+    elif _FnApiRunnerSupportVisitor().accept(pipeline):
       from apache_beam.portability.api import beam_provision_api_pb2
       from apache_beam.runners.portability.fn_api_runner import fn_runner
       from apache_beam.runners.portability.portable_runner import 
JobServiceHandle
@@ -154,11 +164,6 @@ class SwitchingDirectRunner(PipelineRunner):
           beam_provision_api_pb2.ProvisionInfo(
               pipeline_options=encoded_options))
       runner = fn_runner.FnApiRunner(provision_info=provision_info)
-    elif _PrismRunnerSupportVisitor().accept(pipeline):
-      _LOGGER.info('Running pipeline with PrismRunner.')
-      from apache_beam.runners.portability import prism_runner
-      runner = prism_runner.PrismRunner()
-      tryingPrism = True
     else:
       runner = BundleBasedDirectRunner()
 

Reply via email to