This is an automated email from the ASF dual-hosted git repository. damccorm pushed a commit to branch users/damccorm/reenablePrism in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/users/damccorm/reenablePrism by this push: new e0421d0466a Reenable prism as default e0421d0466a is described below commit e0421d0466a4f2f2c17f7d2c81e5a7306510e2e8 Author: Danny Mccormick <dannymccorm...@google.com> AuthorDate: Thu Jul 17 14:24:35 2025 -0400 Reenable prism as default --- .../beam_PostCommit_Python_Xlang_Gcp_Direct.json | 2 +- .../apache_beam/runners/direct/direct_runner.py | 27 +++++++++++----------- 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json index afdc7f7012a..2504db607e4 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json +++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 11 + "modification": 12 } diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py index fcc13ae1024..564a6c7df20 100644 --- a/sdks/python/apache_beam/runners/direct/direct_runner.py +++ b/sdks/python/apache_beam/runners/direct/direct_runner.py @@ -195,21 +195,9 @@ class SwitchingDirectRunner(PipelineRunner): # Use BundleBasedDirectRunner if other runners are missing needed features. runner = BundleBasedDirectRunner() - # Check whether all transforms used in the pipeline are supported by the - # FnApiRunner, and the pipeline was not meant to be run as streaming. - if _FnApiRunnerSupportVisitor().accept(pipeline): - from apache_beam.portability.api import beam_provision_api_pb2 - from apache_beam.runners.portability.fn_api_runner import fn_runner - from apache_beam.runners.portability.portable_runner import JobServiceHandle - all_options = options.get_all_options() - encoded_options = JobServiceHandle.encode_pipeline_options(all_options) - provision_info = fn_runner.ExtendedProvisionInfo( - beam_provision_api_pb2.ProvisionInfo( - pipeline_options=encoded_options)) - runner = fn_runner.FnApiRunner(provision_info=provision_info) # Check whether all transforms used in the pipeline are supported by the # PrismRunner - elif _PrismRunnerSupportVisitor().accept(pipeline, self._is_interactive): + if _PrismRunnerSupportVisitor().accept(pipeline, self._is_interactive): _LOGGER.info('Running pipeline with PrismRunner.') from apache_beam.runners.portability import prism_runner runner = prism_runner.PrismRunner() @@ -233,6 +221,19 @@ class SwitchingDirectRunner(PipelineRunner): _LOGGER.info('Falling back to DirectRunner') runner = BundleBasedDirectRunner() + # Check whether all transforms used in the pipeline are supported by the + # FnApiRunner, and the pipeline was not meant to be run as streaming. + if _FnApiRunnerSupportVisitor().accept(pipeline): + from apache_beam.portability.api import beam_provision_api_pb2 + from apache_beam.runners.portability.fn_api_runner import fn_runner + from apache_beam.runners.portability.portable_runner import JobServiceHandle + all_options = options.get_all_options() + encoded_options = JobServiceHandle.encode_pipeline_options(all_options) + provision_info = fn_runner.ExtendedProvisionInfo( + beam_provision_api_pb2.ProvisionInfo( + pipeline_options=encoded_options)) + runner = fn_runner.FnApiRunner(provision_info=provision_info) + return runner.run_pipeline(pipeline, options)