This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e7c4393287 add friendly error message for when transform is applied 
to no output (#35160)
9e7c4393287 is described below

commit 9e7c43932876fe6566020fdd686e31a9110ef2c1
Author: Hai Joey Tran <[email protected]>
AuthorDate: Thu Jun 5 12:45:12 2025 -0400

    add friendly error message for when transform is applied to no output 
(#35160)
    
    * add friendly error message for when transform is applied to no output
    
    * update test name
    
    * Fix pubsub unit tests that depend on old behavior
---
 sdks/python/apache_beam/io/gcp/pubsub_test.py |  3 +++
 sdks/python/apache_beam/pipeline.py           | 19 +++++++++++++++++++
 sdks/python/apache_beam/pipeline_test.py      | 21 +++++++++++++++++++++
 3 files changed, 43 insertions(+)

diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py 
b/sdks/python/apache_beam/io/gcp/pubsub_test.py
index feee9dc0082..fadc49461a3 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub_test.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py
@@ -27,6 +27,7 @@ import hamcrest as hc
 import mock
 
 import apache_beam as beam
+from apache_beam import Pipeline
 from apache_beam.io import Read
 from apache_beam.io import Write
 from apache_beam.io.gcp.pubsub import MultipleReadFromPubSub
@@ -364,6 +365,7 @@ class TestMultiReadFromPubSubOverride(unittest.TestCase):
 
 @unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
 class TestWriteStringsToPubSubOverride(unittest.TestCase):
+  @mock.patch.object(Pipeline, '_assert_not_applying_PDone', mock.Mock())
   def test_expand_deprecated(self):
     options = PipelineOptions([])
     options.view_as(StandardOptions).streaming = True
@@ -385,6 +387,7 @@ class TestWriteStringsToPubSubOverride(unittest.TestCase):
     # Ensure that the properties passed through correctly
     self.assertEqual('a_topic', write_transform.dofn.short_topic_name)
 
+  @mock.patch.object(Pipeline, '_assert_not_applying_PDone', mock.Mock())
   def test_expand(self):
     options = PipelineOptions([])
     options.view_as(StandardOptions).streaming = True
diff --git a/sdks/python/apache_beam/pipeline.py 
b/sdks/python/apache_beam/pipeline.py
index 7aaebc0f196..c4ac12a84cb 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -802,7 +802,12 @@ class Pipeline(HasDisplayData):
             f"Transform '{full_label}' expects a PCollection as input. "
             "Got a PBegin/Pipeline instead.")
 
+      self._assert_not_applying_PDone(pvalueish, transform)
+
       pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
+      if pvalueish_result is None:
+        pvalueish_result = pvalue.PDone(self)
+        pvalueish_result.producer = current
 
       if type_options is not None and type_options.pipeline_type_check:
         transform.type_check_outputs(pvalueish_result)
@@ -849,6 +854,20 @@ class Pipeline(HasDisplayData):
       self.transforms_stack.pop()
     return pvalueish_result
 
+  def _assert_not_applying_PDone(
+      self,
+      pvalueish,  # type: Optional[pvalue.PValue]
+      transform  # type: ptransform.PTransform
+  ):
+    if isinstance(pvalueish, pvalue.PDone) and isinstance(transform, ParDo):
+      # If the input is a PDone, we cannot apply a ParDo transform.
+      full_label = self._current_transform().full_label
+      producer_label = pvalueish.producer.full_label
+      raise TypeCheckError(
+          f'Transform "{full_label}" was applied to the output of '
+          f'"{producer_label}" but "{producer_label.split("/")[-1]}" '
+          'produces no PCollections.')
+
   def _generate_unique_label(
       self,
       transform  # type: str
diff --git a/sdks/python/apache_beam/pipeline_test.py 
b/sdks/python/apache_beam/pipeline_test.py
index 20aebdd7790..0bbd14b6afc 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -61,6 +61,7 @@ from apache_beam.transforms.window import FixedWindows
 from apache_beam.transforms.window import IntervalWindow
 from apache_beam.transforms.window import SlidingWindows
 from apache_beam.transforms.window import TimestampedValue
+from apache_beam.typehints import TypeCheckError
 from apache_beam.utils import windowed_value
 from apache_beam.utils.timestamp import MIN_TIMESTAMP
 
@@ -157,6 +158,26 @@ class PipelineTest(unittest.TestCase):
       pcoll3 = pcoll2 | 'do' >> FlatMap(lambda x: [x + 10])
       assert_that(pcoll3, equal_to([14, 15, 16]), label='pcoll3')
 
+  def test_unexpected_PDone_errmsg(self):
+    """
+    Test that a nice error message is raised if a transform that
+    returns None (i.e. produces no PCollection) is used as input
+    to a PTransform.
+    """
+    class DoNothingTransform(PTransform):
+      def expand(self, pcoll):
+        return None
+
+    class ParentTransform(PTransform):
+      def expand(self, pcoll):
+        return pcoll | DoNothingTransform()
+
+    with pytest.raises(
+        TypeCheckError,
+        match=r".*applied to the output.*ParentTransform/DoNothingTransform"):
+      with TestPipeline() as pipeline:
+        _ = pipeline | ParentTransform() | beam.Map(lambda x: x + 1)
+
   @mock.patch('logging.info')
   def test_runner_overrides_default_pickler(self, mock_info):
     with mock.patch.object(PipelineRunner,

Reply via email to