This is an automated email from the ASF dual-hosted git repository.
jrmccluskey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 6a1618eca88 fix(python): Register all output pcollections of a
transform rather than only ones that happened to be accessed in DoOutputsTuple
(#37556)
6a1618eca88 is described below
commit 6a1618eca88ec88b9ebcb17336b39ae37d8ee500
Author: Joey Tran <[email protected]>
AuthorDate: Tue Feb 24 10:57:49 2026 -0500
fix(python): Register all output pcollections of a transform rather than
only ones that happened to be accessed in DoOutputsTuple (#37556)
* Fix DoOutputsTuple composite output registration for lazily-populated
_pcolls
The previous code iterated over _pcolls.items() to register composite
outputs, but _pcolls is lazily populated — entries are only added when
accessed via __getitem__. This caused unaccessed outputs (e.g. the main
output) to be missing from the composite's registered outputs in the
pipeline proto. Fix by iterating over all declared tags (_main_tag + _tags)
and accessing each via __getitem__ to ensure lazy creation.
Co-Authored-By: Claude Opus 4.6 <[email protected]>
* Remove separate regression test; existing test covers the fix
Co-Authored-By: Claude Opus 4.6 <[email protected]>
---------
Co-authored-by: Claude Opus 4.6 <[email protected]>
---
sdks/python/apache_beam/pipeline.py | 5 +++--
sdks/python/apache_beam/pipeline_test.py | 7 ++++---
2 files changed, 7 insertions(+), 5 deletions(-)
diff --git a/sdks/python/apache_beam/pipeline.py
b/sdks/python/apache_beam/pipeline.py
index 3cce2c5bb77..7446f9df38c 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -829,9 +829,10 @@ class Pipeline(HasDisplayData):
assert isinstance(result.producer.inputs, tuple)
if isinstance(result, pvalue.DoOutputsTuple):
- for tag, pc in list(result._pcolls.items()):
+ all_tags = [result._main_tag] + list(result._tags)
+ for tag in all_tags:
if tag not in current.outputs:
- current.add_output(pc, tag)
+ current.add_output(result[tag], tag)
continue
# If there is already a tag with the same name, increase a counter for
diff --git a/sdks/python/apache_beam/pipeline_test.py
b/sdks/python/apache_beam/pipeline_test.py
index b28fe3c3d14..ac3e6ac4afc 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -1648,9 +1648,10 @@ class RunnerApiTest(unittest.TestCase):
all_applied_transforms[xform.full_label] = xform
current_transforms.extend(xform.parts)
xform = all_applied_transforms['Split Sales']
- # Confirm that Split Sales correctly has two outputs as specified by
- # ParDo.with_outputs in ParentSalesSplitter.
- assert len(xform.outputs) == 2
+ # Confirm that Split Sales correctly has three outputs: the main
+ # (untagged) output plus the two tagged outputs specified by
+ # ParDo.with_outputs in ParentSalesSplitter.
+ assert len(xform.outputs) == 3
if __name__ == '__main__':