This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch users/damccorm/fnApiRunner
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 1581944f9e7d4b33a0b76e3b17b8beea8d0cb0b9
Author: Danny Mccormick <[email protected]>
AuthorDate: Mon Jun 9 16:45:43 2025 -0400

    Force FnApiRunner in cases where prism can't handle use case
---
 .../apache_beam/dataframe/transforms_test.py       | 12 ++++++++--
 .../transforms/elementwise/pardo_dofn_methods.py   |  9 +++++++-
 .../apache_beam/io/gcp/bigquery_file_loads_test.py | 15 +++++++++---
 sdks/python/apache_beam/io/gcp/bigquery_test.py    | 27 ++++++++++++++++------
 sdks/python/apache_beam/io/gcp/bigtableio_test.py  |  6 ++++-
 .../io/gcp/experimental/spannerio_test.py          | 18 ++++++++++++---
 sdks/python/apache_beam/io/gcp/pubsub_test.py      | 12 ++++++++++
 sdks/python/apache_beam/ml/gcp/cloud_dlp_test.py   | 12 ++++++++--
 sdks/python/apache_beam/ml/inference/base_test.py  | 26 +++++++++++++++++----
 .../ml/inference/tensorflow_inference_test.py      |  5 +++-
 .../python/apache_beam/options/pipeline_options.py |  1 +
 sdks/python/apache_beam/pipeline_test.py           |  4 +++-
 .../runners/direct/direct_runner_test.py           | 13 +++++++----
 .../interactive/non_interactive_runner_test.py     |  4 +++-
 sdks/python/apache_beam/runners/runner_test.py     |  5 ++--
 sdks/python/apache_beam/transforms/trigger_test.py |  4 +++-
 .../python/apache_beam/typehints/typecheck_test.py |  3 ++-
 sdks/python/apache_beam/yaml/readme_test.py        |  4 +++-
 18 files changed, 144 insertions(+), 36 deletions(-)

diff --git a/sdks/python/apache_beam/dataframe/transforms_test.py 
b/sdks/python/apache_beam/dataframe/transforms_test.py
index 6b070090c62..a2ca2f9d387 100644
--- a/sdks/python/apache_beam/dataframe/transforms_test.py
+++ b/sdks/python/apache_beam/dataframe/transforms_test.py
@@ -372,7 +372,11 @@ class FusionTest(unittest.TestCase):
                            reshuffle=False)
 
   def test_loc_filter(self):
-    with beam.Pipeline() as p:
+    # TODO(https://github.com/apache/beam/issues/34549): This test relies on
+    # monitoring_metrics property of the FnApiRunner which does not exist on
+    # other runners like Prism.
+    # 
https://github.com/apache/beam/blob/5f9cd73b7c9a2f37f83971ace3a399d633201dd1/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L1590
+    with beam.Pipeline('FnApiRunner') as p:
       _ = (
           self.create_animal_speed_input(p)
           | transforms.DataframeTransform(lambda df: df[df.Speed > 10]))
@@ -383,7 +387,11 @@ class FusionTest(unittest.TestCase):
       df[name] = s
       return df
 
-    with beam.Pipeline() as p:
+    # TODO(https://github.com/apache/beam/issues/34549): This test relies on
+    # monitoring_metrics property of the FnApiRunner which does not exist on
+    # other runners like Prism.
+    # 
https://github.com/apache/beam/blob/5f9cd73b7c9a2f37f83971ace3a399d633201dd1/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L1590
+    with beam.Pipeline('FnApiRunner') as p:
       _ = (
           self.create_animal_speed_input(p)
           | transforms.DataframeTransform(
diff --git 
a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/pardo_dofn_methods.py
 
b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/pardo_dofn_methods.py
index 86851960256..46d4f5955b0 100644
--- 
a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/pardo_dofn_methods.py
+++ 
b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/pardo_dofn_methods.py
@@ -34,6 +34,9 @@
 
 
 def pardo_dofn_methods(test=None):
+  # Portable runners do not guarantee that teardown will be executed, so we
+  # use FnApiRunner instead of prism.
+  runner = 'FnApiRunner'
   # [START pardo_dofn_methods]
   import apache_beam as beam
 
@@ -60,9 +63,13 @@ def pardo_dofn_methods(test=None):
       )
 
     def teardown(self):
+      # Teardown is best effort and not guaranteed to be executed by all
+      # runners in all cases (for example, it may be skipped if the pipeline
+      # can otherwise complete). It should be used for best effort resource
+      # cleanup.
       print('teardown')
 
-  with beam.Pipeline() as pipeline:
+  with beam.Pipeline(runner) as pipeline:
     results = (
         pipeline
         | 'Create inputs' >> beam.Create(['🍓', '🥕', '🍆', '🍅', '🥔'])
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py 
b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
index 84e8ecfc486..6400365918d 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
@@ -541,7 +541,10 @@ class TestBigQueryFileLoads(_TestCaseWithTempDirCleanUp):
         validate=False,
         load_job_project_id='loadJobProject')
 
-    with TestPipeline('DirectRunner') as p:
+    # TODO(https://github.com/apache/beam/issues/34549): This test relies on
+    # lineage metrics which Prism doesn't seem to handle correctly. Defaulting
+    # to FnApiRunner instead.
+    with TestPipeline('FnApiRunner') as p:
       outputs = p | beam.Create(_ELEMENTS) | transform
       jobs = outputs[bqfl.BigQueryBatchFileLoads.DESTINATION_JOBID_PAIRS] \
              | "GetJobs" >> beam.Map(lambda x: x[1])
@@ -571,7 +574,10 @@ class TestBigQueryFileLoads(_TestCaseWithTempDirCleanUp):
     bq_client.jobs.Insert.return_value = result_job
     bq_client.tables.Delete.return_value = None
 
-    with TestPipeline('DirectRunner') as p:
+    # TODO(https://github.com/apache/beam/issues/34549): This test relies on
+    # lineage metrics which Prism doesn't seem to handle correctly. Defaulting
+    # to FnApiRunner instead.
+    with TestPipeline('FnApiRunner') as p:
       outputs = (
           p
           | beam.Create(_ELEMENTS, reshuffle=False)
@@ -709,7 +715,10 @@ class TestBigQueryFileLoads(_TestCaseWithTempDirCleanUp):
     bq_client.jobs.Insert.return_value = result_job
     bq_client.tables.Delete.return_value = None
 
-    with TestPipeline('DirectRunner') as p:
+    # TODO(https://github.com/apache/beam/issues/34549): This test relies on
+    # lineage metrics which Prism doesn't seem to handle correctly. Defaulting
+    # to FnApiRunner instead.
+    with TestPipeline('FnApiRunner') as p:
       outputs = (
           p
           | beam.Create(_ELEMENTS, reshuffle=False)
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py 
b/sdks/python/apache_beam/io/gcp/bigquery_test.py
index 3931b082259..182e5359ef8 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py
@@ -446,13 +446,14 @@ class TestReadFromBigQuery(unittest.TestCase):
   ])
   def test_create_temp_dataset_exception(self, exception_type, error_message):
 
+    # Uses the FnApiRunner to ensure errors are mocked/passed through correctly
     with mock.patch.object(bigquery_v2_client.BigqueryV2.JobsService,
                            'Insert'),\
       mock.patch.object(BigQueryWrapper,
                         'get_or_create_dataset') as mock_insert, \
       mock.patch('time.sleep'), \
       self.assertRaises(Exception) as exc,\
-      beam.Pipeline() as p:
+      beam.Pipeline('FnApiRunner') as p:
 
       mock_insert.side_effect = exception_type(error_message)
 
@@ -462,7 +463,7 @@ class TestReadFromBigQuery(unittest.TestCase):
           gcs_location='gs://temp_location')
 
     mock_insert.assert_called()
-    self.assertIn(error_message, exc.exception.args[0])
+    self.assertIn(error_message, str(exc.exception))
 
   @parameterized.expand([
       # read without exception
@@ -515,6 +516,9 @@ class TestReadFromBigQuery(unittest.TestCase):
       numBytes = 5
       schema = DummySchema()
 
+    # TODO(https://github.com/apache/beam/issues/34549): This test relies on
+    # lineage metrics which Prism doesn't seem to handle correctly. Defaulting
+    # to FnApiRunner instead.
     with mock.patch('time.sleep'), \
             mock.patch.object(bigquery_v2_client.BigqueryV2.TablesService,
                               'Get') as mock_get_table, \
@@ -526,7 +530,7 @@ class TestReadFromBigQuery(unittest.TestCase):
                               'match'), \
             mock.patch.object(FileSystems,
                               'delete'), \
-            beam.Pipeline() as p:
+            beam.Pipeline('FnApiRunner') as p:
       call_counter = 0
 
       def store_callback(unused_request):
@@ -676,6 +680,9 @@ class TestReadFromBigQuery(unittest.TestCase):
   ])
   def test_query_job_exception(self, exception_type, error_message):
 
+    # TODO(https://github.com/apache/beam/issues/34549): This test relies on
+    # mocking which prism doesn't seem to fully handle correctly (mocks get
+    # mixed between test runs). Pinning to FnApiRunner for now.
     with mock.patch.object(beam.io.gcp.bigquery._CustomBigQuerySource,
                            'estimate_size') as mock_estimate,\
       mock.patch.object(BigQueryWrapper,
@@ -685,7 +692,7 @@ class TestReadFromBigQuery(unittest.TestCase):
       mock.patch.object(bigquery_v2_client.BigqueryV2.DatasetsService, 'Get'), 
\
       mock.patch('time.sleep'), \
       self.assertRaises(Exception) as exc, \
-      beam.Pipeline() as p:
+      beam.Pipeline('FnApiRunner') as p:
 
       mock_estimate.return_value = None
       mock_query_location.return_value = None
@@ -727,14 +734,17 @@ class TestReadFromBigQuery(unittest.TestCase):
           gcs_location="gs://temp_location")
 
     mock_query_job.assert_called()
-    self.assertIn(error_message, exc.exception.args[0])
+    self.assertIn(error_message, str(exc.exception))
 
   def test_read_direct_lineage(self):
+    # TODO(https://github.com/apache/beam/issues/34549): This test relies on
+    # lineage metrics which Prism doesn't seem to handle correctly. Defaulting
+    # to FnApiRunner instead.
     with mock.patch.object(bigquery_tools.BigQueryWrapper,
                         '_bigquery_client'),\
          mock.patch.object(bq_storage.BigQueryReadClient,
                         'create_read_session'),\
-        beam.Pipeline() as p:
+        beam.Pipeline('FnApiRunner') as p:
 
       _ = p | ReadFromBigQuery(
           method=ReadFromBigQuery.Method.DIRECT_READ,
@@ -744,8 +754,11 @@ class TestReadFromBigQuery(unittest.TestCase):
         set(["bigquery:project.dataset.table"]))
 
   def test_read_all_lineage(self):
+    # TODO(https://github.com/apache/beam/issues/34549): This test relies on
+    # lineage metrics which Prism doesn't seem to handle correctly. Defaulting
+    # to FnApiRunner instead.
     with mock.patch.object(_BigQueryReadSplit, '_export_files') as export, \
-                            beam.Pipeline() as p:
+                            beam.Pipeline('FnApiRunner') as p:
 
       export.return_value = (None, [])
 
diff --git a/sdks/python/apache_beam/io/gcp/bigtableio_test.py 
b/sdks/python/apache_beam/io/gcp/bigtableio_test.py
index 130f9a71412..3a9d2ac6f7c 100644
--- a/sdks/python/apache_beam/io/gcp/bigtableio_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigtableio_test.py
@@ -271,8 +271,12 @@ class TestWriteBigTable(unittest.TestCase):
 
   def test_write(self):
     direct_rows = [self.generate_row(i) for i in range(5)]
+    # TODO(https://github.com/apache/beam/issues/34549): This test relies on
+    # lineage metrics which Prism doesn't seem to handle correctly. Defaulting
+    # to FnApiRunner instead.
+    runner = 'FnApiRunner'
     with patch.object(MutationsBatcher, 'mutate'), \
-      patch.object(MutationsBatcher, 'close'), TestPipeline() as p:
+      patch.object(MutationsBatcher, 'close'), TestPipeline(runner) as p:
       _ = p | beam.Create(direct_rows) | bigtableio.WriteToBigTable(
           self._PROJECT_ID, self._INSTANCE_ID, self._TABLE_ID)
     self.assertSetEqual(
diff --git a/sdks/python/apache_beam/io/gcp/experimental/spannerio_test.py 
b/sdks/python/apache_beam/io/gcp/experimental/spannerio_test.py
index 0e22041dbea..4e391900eaa 100644
--- a/sdks/python/apache_beam/io/gcp/experimental/spannerio_test.py
+++ b/sdks/python/apache_beam/io/gcp/experimental/spannerio_test.py
@@ -448,7 +448,11 @@ class SpannerWriteTest(unittest.TestCase):
             [('1234', "mutations-inset-1233-updated")]),
     ]
 
-    p = TestPipeline()
+    # TODO(https://github.com/apache/beam/issues/34549): This test relies on
+    # metrics filtering which doesn't work on Prism yet because Prism renames
+    # steps (e.g. "Do" becomes "ref_AppliedPTransform_Do_7").
+    # 
https://github.com/apache/beam/blob/5f9cd73b7c9a2f37f83971ace3a399d633201dd1/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L1590
+    p = TestPipeline('FnApiRunner')
     _ = (
         p
         | beam.Create(mutations)
@@ -475,7 +479,11 @@ class SpannerWriteTest(unittest.TestCase):
         WriteMutation.insert(
             "roles", ("key", "rolename"), [('1234', "mutations-inset-1234")])
     ] * 50
-    p = TestPipeline()
+    # TODO(https://github.com/apache/beam/issues/34549): This test relies on
+    # metrics filtering which doesn't work on Prism yet because Prism renames
+    # steps (e.g. "Do" becomes "ref_AppliedPTransform_Do_7").
+    # 
https://github.com/apache/beam/blob/5f9cd73b7c9a2f37f83971ace3a399d633201dd1/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L1590
+    p = TestPipeline('FnApiRunner')
     _ = (
         p
         | beam.Create(mutations)
@@ -514,7 +522,11 @@ class SpannerWriteTest(unittest.TestCase):
         MutationGroup([WriteMutation.delete("roles", ks)])
     ]
 
-    p = TestPipeline()
+    # TODO(https://github.com/apache/beam/issues/34549): This test relies on
+    # metrics filtering which doesn't work on Prism yet because Prism renames
+    # steps (e.g. "Do" becomes "ref_AppliedPTransform_Do_7").
+    # 
https://github.com/apache/beam/blob/5f9cd73b7c9a2f37f83971ace3a399d633201dd1/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L1590
+    p = TestPipeline('FnApiRunner')
     _ = (
         p
         | beam.Create(mutation_groups)
diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py 
b/sdks/python/apache_beam/io/gcp/pubsub_test.py
index fadc49461a3..e3fb07a1762 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub_test.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py
@@ -834,6 +834,10 @@ class TestReadFromPubSub(unittest.TestCase):
     ]
     options = PipelineOptions([])
     options.view_as(StandardOptions).streaming = True
+    # TODO(https://github.com/apache/beam/issues/34549): This test relies on
+    # lineage metrics which Prism doesn't seem to handle correctly. Defaulting
+    # to FnApiRunner instead.
+    options.view_as(StandardOptions).runner = 'FnApiRunner'
     for test_case in ('topic', 'subscription'):
       with TestPipeline(options=options) as p:
         # Direct runner currently overwrites the whole ReadFromPubSub 
transform.
@@ -1009,6 +1013,10 @@ class TestWriteToPubSub(unittest.TestCase):
 
     options = PipelineOptions([])
     options.view_as(StandardOptions).streaming = True
+    # TODO(https://github.com/apache/beam/issues/34549): This test relies on
+    # lineage metrics which Prism doesn't seem to handle correctly. Defaulting
+    # to FnApiRunner instead.
+    options.view_as(StandardOptions).runner = 'FnApiRunner'
     with TestPipeline(options=options) as p:
       pcoll = p | Create(payloads)
       WriteToPubSub(
@@ -1025,6 +1033,10 @@ class TestWriteToPubSub(unittest.TestCase):
 
     options = PipelineOptions([])
     options.view_as(StandardOptions).streaming = True
+    # TODO(https://github.com/apache/beam/issues/34549): This test relies on
+    # lineage metrics which Prism doesn't seem to handle correctly. Defaulting
+    # to FnApiRunner instead.
+    options.view_as(StandardOptions).runner = 'FnApiRunner'
     with TestPipeline(options=options) as p:
       pcoll = p | Create(payloads)
       # Avoid direct runner overwrites WriteToPubSub
diff --git a/sdks/python/apache_beam/ml/gcp/cloud_dlp_test.py 
b/sdks/python/apache_beam/ml/gcp/cloud_dlp_test.py
index d4153e5b3fe..51916eaaf6c 100644
--- a/sdks/python/apache_beam/ml/gcp/cloud_dlp_test.py
+++ b/sdks/python/apache_beam/ml/gcp/cloud_dlp_test.py
@@ -72,7 +72,11 @@ class TestDeidentifyFn(unittest.TestCase):
         return 'test'
 
     with mock.patch('google.cloud.dlp_v2.DlpServiceClient', ClientMock):
-      p = TestPipeline()
+      # TODO(https://github.com/apache/beam/issues/34549): This test relies on
+      # metrics filtering which doesn't work on Prism yet because Prism renames
+      # steps (e.g. "Do" becomes "ref_AppliedPTransform_Do_7").
+      # 
https://github.com/apache/beam/blob/5f9cd73b7c9a2f37f83971ace3a399d633201dd1/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L1590
+      p = TestPipeline('FnApiRunner')
       config = {
           "deidentify_config": {
               "info_type_transformations": {
@@ -125,7 +129,11 @@ class TestInspectFn(unittest.TestCase):
         return 'test'
 
     with mock.patch('google.cloud.dlp_v2.DlpServiceClient', ClientMock):
-      p = TestPipeline()
+      # TODO(https://github.com/apache/beam/issues/34549): This test relies on
+      # metrics filtering which doesn't work on Prism yet because Prism renames
+      # steps (e.g. "Do" becomes "ref_AppliedPTransform_Do_7").
+      # 
https://github.com/apache/beam/blob/5f9cd73b7c9a2f37f83971ace3a399d633201dd1/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L1590
+      p = TestPipeline('FnApiRunner')
       config = {"inspect_config": {"info_types": [{"name": "EMAIL_ADDRESS"}]}}
       # pylint: disable=expression-not-assigned
       (
diff --git a/sdks/python/apache_beam/ml/inference/base_test.py 
b/sdks/python/apache_beam/ml/inference/base_test.py
index 6497de3fe9d..29ac0ad4247 100644
--- a/sdks/python/apache_beam/ml/inference/base_test.py
+++ b/sdks/python/apache_beam/ml/inference/base_test.py
@@ -950,7 +950,11 @@ class RunInferenceBaseTest(unittest.TestCase):
 
   def test_increment_failed_batches_counter(self):
     with self.assertRaises(ValueError):
-      with TestPipeline() as pipeline:
+      # TODO(https://github.com/apache/beam/issues/34549): This test relies on
+      # metrics filtering which doesn't work on Prism yet because Prism renames
+      # steps (e.g. "Do" becomes "ref_AppliedPTransform_Do_7").
+      # 
https://github.com/apache/beam/blob/5f9cd73b7c9a2f37f83971ace3a399d633201dd1/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L1590
+      with TestPipeline('FnApiRunner') as pipeline:
         examples = [7]
         pcoll = pipeline | 'start' >> beam.Create(examples)
         _ = pcoll | base.RunInference(FakeModelHandlerExpectedInferenceArgs())
@@ -1226,7 +1230,10 @@ class RunInferenceBaseTest(unittest.TestCase):
         for e in element:
           yield e
 
-    with TestPipeline() as pipeline:
+    # This test relies on poorly defined side input semantics which vary
+    # across runners (including prism). Pinning to FnApiRunner which
+    # consistently guarantees output.
+    with TestPipeline('FnApiRunner') as pipeline:
       side_input = (
           pipeline
           |
@@ -1324,7 +1331,10 @@ class RunInferenceBaseTest(unittest.TestCase):
         for e in element:
           yield e
 
-    with TestPipeline() as pipeline:
+    # This test relies on poorly defined side input semantics which vary
+    # across runners (including prism). Pinning to FnApiRunner which
+    # consistently guarantees output.
+    with TestPipeline('FnApiRunner') as pipeline:
       side_input = (
           pipeline
           |
@@ -1425,7 +1435,10 @@ class RunInferenceBaseTest(unittest.TestCase):
         for e in element:
           yield e
 
-    with TestPipeline() as pipeline:
+    # This test relies on poorly defined side input semantics which vary
+    # across runners (including prism). Pinning to FnApiRunner which
+    # consistently guarantees output.
+    with TestPipeline('FnApiRunner') as pipeline:
       side_input = (
           pipeline
           |
@@ -1500,7 +1513,10 @@ class RunInferenceBaseTest(unittest.TestCase):
         for e in element:
           yield e
 
-    with TestPipeline() as pipeline:
+    # This test relies on poorly defined side input semantics which vary
+    # across runners (including prism). Pinning to FnApiRunner which
+    # consistently guarantees output.
+    with TestPipeline('FnApiRunner') as pipeline:
       side_input = (
           pipeline
           |
diff --git a/sdks/python/apache_beam/ml/inference/tensorflow_inference_test.py 
b/sdks/python/apache_beam/ml/inference/tensorflow_inference_test.py
index 9b23963723d..75f15c87f5c 100644
--- a/sdks/python/apache_beam/ml/inference/tensorflow_inference_test.py
+++ b/sdks/python/apache_beam/ml/inference/tensorflow_inference_test.py
@@ -221,7 +221,10 @@ class TFRunInferenceTest(unittest.TestCase):
     model = _create_mult2_model()
     model_path = os.path.join(self.tmpdir, f'mult2_{uuid.uuid4()}.keras')
     tf.keras.models.save_model(model, model_path)
-    with TestPipeline() as pipeline:
+    # TODO(https://github.com/apache/beam/issues/34549): This test relies on a
+    # runner producing a single bundle or bundles of even size, neither of
+    # which prism seems to do here
+    with TestPipeline('FnApiRunner') as pipeline:
 
       def fake_batching_inference_fn(
           model: tf.Module,
diff --git a/sdks/python/apache_beam/options/pipeline_options.py 
b/sdks/python/apache_beam/options/pipeline_options.py
index 867a5bc24f2..3554e4cfe95 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -607,6 +607,7 @@ class StandardOptions(PipelineOptions):
       'apache_beam.runners.direct.direct_runner.SwitchingDirectRunner',
       'apache_beam.runners.interactive.interactive_runner.InteractiveRunner',
       'apache_beam.runners.portability.flink_runner.FlinkRunner',
+      'apache_beam.runners.portability.fn_api_runner.FnApiRunner',
       'apache_beam.runners.portability.portable_runner.PortableRunner',
       'apache_beam.runners.portability.prism_runner.PrismRunner',
       'apache_beam.runners.portability.spark_runner.SparkRunner',
diff --git a/sdks/python/apache_beam/pipeline_test.py 
b/sdks/python/apache_beam/pipeline_test.py
index 0bbd14b6afc..4389f174cb9 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -752,7 +752,9 @@ class PipelineTest(unittest.TestCase):
           RuntimeError,
           'Pipeline construction environment and pipeline runtime '
           'environment are not compatible.'):
-        with TestPipeline() as p:
+        # TODO(https://github.com/apache/beam/issues/34549): Prism doesn't
+        # pass through capabilities as part of the ProcessBundleDescriptor.
+        with TestPipeline('FnApiRunner') as p:
           _ = p | Create([None])
 
 
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner_test.py 
b/sdks/python/apache_beam/runners/direct/direct_runner_test.py
index 92116c665b6..008a1bd4721 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner_test.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner_test.py
@@ -56,9 +56,10 @@ class DirectPipelineResultTest(unittest.TestCase):
   def test_waiting_on_result_stops_executor_threads(self):
     pre_test_threads = set(t.ident for t in threading.enumerate())
 
-    for runner in ['DirectRunner',
-                   'BundleBasedDirectRunner',
-                   'SwitchingDirectRunner']:
+    for runner in [
+        'BundleBasedDirectRunner',
+        'apache_beam.runners.portability.fn_api_runner.fn_runner.FnApiRunner'
+    ]:
       pipeline = test_pipeline.TestPipeline(runner=runner)
       _ = (pipeline | beam.Create([{'foo': 'bar'}]))
       result = pipeline.run()
@@ -91,7 +92,11 @@ class DirectPipelineResultTest(unittest.TestCase):
             ("a", "b", str(element % 4)))
         return [element]
 
-    p = Pipeline(DirectRunner())
+    # TODO(https://github.com/apache/beam/issues/34549): This test relies on
+    # metrics filtering which doesn't work on Prism yet because Prism renames
+    # steps (e.g. "Do" becomes "ref_AppliedPTransform_Do_7").
+    # 
https://github.com/apache/beam/blob/5f9cd73b7c9a2f37f83971ace3a399d633201dd1/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L1590
+    p = Pipeline('FnApiRunner')
     pcoll = (
         p | beam.Create([1, 2, 3, 4, 5], reshuffle=False)
         | 'Do' >> beam.ParDo(MyDoFn()))
diff --git 
a/sdks/python/apache_beam/runners/interactive/non_interactive_runner_test.py 
b/sdks/python/apache_beam/runners/interactive/non_interactive_runner_test.py
index f7fd052fecc..82298f5def0 100644
--- a/sdks/python/apache_beam/runners/interactive/non_interactive_runner_test.py
+++ b/sdks/python/apache_beam/runners/interactive/non_interactive_runner_test.py
@@ -76,7 +76,9 @@ class NonInteractiveRunnerTest(unittest.TestCase):
   @unittest.skipIf(sys.platform == "win32", "[BEAM-10627]")
   def test_basic(self):
     clear_side_effect()
-    p = beam.Pipeline(direct_runner.DirectRunner())
+    # This test relies on the pipeline cache being populated. Prism doesn't
+    # consistently populate this cache, forcing FnApiRunner
+    p = beam.Pipeline('FnApiRunner')
 
     # Initial collection runs the pipeline.
     pcoll1 = p | beam.Create(['a', 'b', 'c']) | beam.Map(cause_side_effect)
diff --git a/sdks/python/apache_beam/runners/runner_test.py 
b/sdks/python/apache_beam/runners/runner_test.py
index 61fe400997d..0593be40465 100644
--- a/sdks/python/apache_beam/runners/runner_test.py
+++ b/sdks/python/apache_beam/runners/runner_test.py
@@ -30,6 +30,7 @@ import apache_beam as beam
 from apache_beam.metrics.metric import Metrics
 from apache_beam.runners import DirectRunner
 from apache_beam.runners import create_runner
+from apache_beam.runners.portability.fn_api_runner import FnApiRunner
 
 
 class RunnerTest(unittest.TestCase):
@@ -55,7 +56,7 @@ class RunnerTest(unittest.TestCase):
 
   def test_run_api(self):
     my_metric = Metrics.counter('namespace', 'my_metric')
-    runner = DirectRunner()
+    runner = FnApiRunner()
     result = runner.run(
         beam.Create([1, 10, 100]) | beam.Map(lambda x: my_metric.inc(x)))
     result.wait_until_finish()
@@ -72,7 +73,7 @@ class RunnerTest(unittest.TestCase):
           | beam.Create([1, 10, 100])
           | beam.Map(lambda x: my_metric.inc(x)))
 
-    runner = DirectRunner()
+    runner = FnApiRunner()
     result = runner.run(fn)
     result.wait_until_finish()
     # Use counters to assert the pipeline actually ran.
diff --git a/sdks/python/apache_beam/transforms/trigger_test.py 
b/sdks/python/apache_beam/transforms/trigger_test.py
index 2cad624272b..79fd3151c08 100644
--- a/sdks/python/apache_beam/transforms/trigger_test.py
+++ b/sdks/python/apache_beam/transforms/trigger_test.py
@@ -700,7 +700,9 @@ class TriggerPipelineTest(unittest.TestCase):
               }.items())))
 
   def test_always(self):
-    with TestPipeline() as p:
+    # Pin to FnApiRunner since portable runner could trigger differently if
+    # using bundle sizes of greater than 1.
+    with TestPipeline('FnApiRunner') as p:
 
       def construct_timestamped(k, t):
         return TimestampedValue((k, t), t)
diff --git a/sdks/python/apache_beam/typehints/typecheck_test.py 
b/sdks/python/apache_beam/typehints/typecheck_test.py
index 32307c5202e..ce16e8e2f5d 100644
--- a/sdks/python/apache_beam/typehints/typecheck_test.py
+++ b/sdks/python/apache_beam/typehints/typecheck_test.py
@@ -84,7 +84,8 @@ class MyDoFnBadAnnotation(MyDoFn):
 
 class RuntimeTypeCheckTest(unittest.TestCase):
   def setUp(self):
-    self.p = TestPipeline(
+    # Use FnApiRunner since it guarantees all lifecycle methods will be called.
+    self.p = TestPipeline('FnApiRunner',
         options=PipelineOptions(
             runtime_type_check=True, performance_runtime_type_check=False))
 
diff --git a/sdks/python/apache_beam/yaml/readme_test.py 
b/sdks/python/apache_beam/yaml/readme_test.py
index ce9d6269e54..c05039cb703 100644
--- a/sdks/python/apache_beam/yaml/readme_test.py
+++ b/sdks/python/apache_beam/yaml/readme_test.py
@@ -260,7 +260,9 @@ def create_test_method(test_type, test_name, test_yaml):
         with mock.patch(
             'apache_beam.yaml.yaml_provider.ExternalProvider.create_transform',
             lambda *args, **kwargs: _Fakes.SomeTransform(*args, **kwargs)):
-          p = beam.Pipeline(options=PipelineOptions(**options))
+          # Uses the FnApiRunner to ensure errors are mocked/passed through
+          # correctly
+          p = beam.Pipeline('FnApiRunner', options=PipelineOptions(**options))
           yaml_transform.expand_pipeline(
               p, modified_yaml, yaml_provider.merge_providers([test_provider]))
       if test_type == 'BUILD':

Reply via email to