This is an automated email from the ASF dual-hosted git repository. damccorm pushed a commit to branch users/damccorm/prismByDefault in repository https://gitbox.apache.org/repos/asf/beam.git
commit 4d9c497900e47830b5cc8b651c7b5d14feec8519 Author: Danny Mccormick <[email protected]> AuthorDate: Fri Apr 11 15:50:55 2025 -0400 Enable prism by default --- .../apache_beam/runners/direct/direct_runner.py | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py index 8b893765368..ce82eb10336 100644 --- a/sdks/python/apache_beam/runners/direct/direct_runner.py +++ b/sdks/python/apache_beam/runners/direct/direct_runner.py @@ -52,6 +52,7 @@ from apache_beam.transforms.core import ParDo from apache_beam.transforms.ptransform import PTransform from apache_beam.transforms.timeutil import TimeDomain from apache_beam.typehints import trivial_inference +from apache_beam.utils.interactive_utils import is_in_ipython __all__ = ['BundleBasedDirectRunner', 'DirectRunner', 'SwitchingDirectRunner'] @@ -114,7 +115,11 @@ class SwitchingDirectRunner(PipelineRunner): """Visitor determining if a Pipeline can be run on the PrismRunner.""" def accept(self, pipeline): self.supported_by_prism_runner = True - pipeline.visit(self) + # TODO(https://github.com/apache/beam/issues/33623): Prism currently does not support interactive mode + if is_in_ipython(): + self.supported_by_prism_runner = False + else: + pipeline.visit(self) return self.supported_by_prism_runner def visit_transform(self, applied_ptransform): @@ -144,7 +149,12 @@ class SwitchingDirectRunner(PipelineRunner): tryingPrism = False # Check whether all transforms used in the pipeline are supported by the # FnApiRunner, and the pipeline was not meant to be run as streaming. - if _FnApiRunnerSupportVisitor().accept(pipeline): + if _PrismRunnerSupportVisitor().accept(pipeline): + _LOGGER.info('Running pipeline with PrismRunner.') + from apache_beam.runners.portability import prism_runner + runner = prism_runner.PrismRunner() + tryingPrism = True + elif _FnApiRunnerSupportVisitor().accept(pipeline): from apache_beam.portability.api import beam_provision_api_pb2 from apache_beam.runners.portability.fn_api_runner import fn_runner from apache_beam.runners.portability.portable_runner import JobServiceHandle @@ -154,11 +164,6 @@ class SwitchingDirectRunner(PipelineRunner): beam_provision_api_pb2.ProvisionInfo( pipeline_options=encoded_options)) runner = fn_runner.FnApiRunner(provision_info=provision_info) - elif _PrismRunnerSupportVisitor().accept(pipeline): - _LOGGER.info('Running pipeline with PrismRunner.') - from apache_beam.runners.portability import prism_runner - runner = prism_runner.PrismRunner() - tryingPrism = True else: runner = BundleBasedDirectRunner()
