This is an automated email from the ASF dual-hosted git repository.
damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 9e7c4393287 add friendly error message for when transform is applied
to no output (#35160)
9e7c4393287 is described below
commit 9e7c43932876fe6566020fdd686e31a9110ef2c1
Author: Hai Joey Tran <[email protected]>
AuthorDate: Thu Jun 5 12:45:12 2025 -0400
add friendly error message for when transform is applied to no output
(#35160)
* add friendly error message for when transform is applied to no output
* update test name
* Fix pubsub unit tests that depend on old behavior
---
sdks/python/apache_beam/io/gcp/pubsub_test.py | 3 +++
sdks/python/apache_beam/pipeline.py | 19 +++++++++++++++++++
sdks/python/apache_beam/pipeline_test.py | 21 +++++++++++++++++++++
3 files changed, 43 insertions(+)
diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py
b/sdks/python/apache_beam/io/gcp/pubsub_test.py
index feee9dc0082..fadc49461a3 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub_test.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py
@@ -27,6 +27,7 @@ import hamcrest as hc
import mock
import apache_beam as beam
+from apache_beam import Pipeline
from apache_beam.io import Read
from apache_beam.io import Write
from apache_beam.io.gcp.pubsub import MultipleReadFromPubSub
@@ -364,6 +365,7 @@ class TestMultiReadFromPubSubOverride(unittest.TestCase):
@unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
class TestWriteStringsToPubSubOverride(unittest.TestCase):
+ @mock.patch.object(Pipeline, '_assert_not_applying_PDone', mock.Mock())
def test_expand_deprecated(self):
options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
@@ -385,6 +387,7 @@ class TestWriteStringsToPubSubOverride(unittest.TestCase):
# Ensure that the properties passed through correctly
self.assertEqual('a_topic', write_transform.dofn.short_topic_name)
+ @mock.patch.object(Pipeline, '_assert_not_applying_PDone', mock.Mock())
def test_expand(self):
options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
diff --git a/sdks/python/apache_beam/pipeline.py
b/sdks/python/apache_beam/pipeline.py
index 7aaebc0f196..c4ac12a84cb 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -802,7 +802,12 @@ class Pipeline(HasDisplayData):
f"Transform '{full_label}' expects a PCollection as input. "
"Got a PBegin/Pipeline instead.")
+ self._assert_not_applying_PDone(pvalueish, transform)
+
pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
+ if pvalueish_result is None:
+ pvalueish_result = pvalue.PDone(self)
+ pvalueish_result.producer = current
if type_options is not None and type_options.pipeline_type_check:
transform.type_check_outputs(pvalueish_result)
@@ -849,6 +854,20 @@ class Pipeline(HasDisplayData):
self.transforms_stack.pop()
return pvalueish_result
+ def _assert_not_applying_PDone(
+ self,
+ pvalueish, # type: Optional[pvalue.PValue]
+ transform # type: ptransform.PTransform
+ ):
+ if isinstance(pvalueish, pvalue.PDone) and isinstance(transform, ParDo):
+ # If the input is a PDone, we cannot apply a ParDo transform.
+ full_label = self._current_transform().full_label
+ producer_label = pvalueish.producer.full_label
+ raise TypeCheckError(
+ f'Transform "{full_label}" was applied to the output of '
+ f'"{producer_label}" but "{producer_label.split("/")[-1]}" '
+ 'produces no PCollections.')
+
def _generate_unique_label(
self,
transform # type: str
diff --git a/sdks/python/apache_beam/pipeline_test.py
b/sdks/python/apache_beam/pipeline_test.py
index 20aebdd7790..0bbd14b6afc 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -61,6 +61,7 @@ from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.window import IntervalWindow
from apache_beam.transforms.window import SlidingWindows
from apache_beam.transforms.window import TimestampedValue
+from apache_beam.typehints import TypeCheckError
from apache_beam.utils import windowed_value
from apache_beam.utils.timestamp import MIN_TIMESTAMP
@@ -157,6 +158,26 @@ class PipelineTest(unittest.TestCase):
pcoll3 = pcoll2 | 'do' >> FlatMap(lambda x: [x + 10])
assert_that(pcoll3, equal_to([14, 15, 16]), label='pcoll3')
+ def test_unexpected_PDone_errmsg(self):
+ """
+ Test that a nice error message is raised if a transform that
+ returns None (i.e. produces no PCollection) is used as input
+ to a PTransform.
+ """
+ class DoNothingTransform(PTransform):
+ def expand(self, pcoll):
+ return None
+
+ class ParentTransform(PTransform):
+ def expand(self, pcoll):
+ return pcoll | DoNothingTransform()
+
+ with pytest.raises(
+ TypeCheckError,
+ match=r".*applied to the output.*ParentTransform/DoNothingTransform"):
+ with TestPipeline() as pipeline:
+ _ = pipeline | ParentTransform() | beam.Map(lambda x: x + 1)
+
@mock.patch('logging.info')
def test_runner_overrides_default_pickler(self, mock_info):
with mock.patch.object(PipelineRunner,