This is an automated email from the ASF dual-hosted git repository.

jrmccluskey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a1618eca88 fix(python): Register all output pcollections of a 
transform rather than only ones that happened to be accessed in DoOutputsTuple 
(#37556)
6a1618eca88 is described below

commit 6a1618eca88ec88b9ebcb17336b39ae37d8ee500
Author: Joey Tran <[email protected]>
AuthorDate: Tue Feb 24 10:57:49 2026 -0500

    fix(python): Register all output pcollections of a transform rather than 
only ones that happened to be accessed in DoOutputsTuple (#37556)
    
    * Fix DoOutputsTuple composite output registration for lazily-populated 
_pcolls
    
    The previous code iterated over _pcolls.items() to register composite
    outputs, but _pcolls is lazily populated — entries are only added when
    accessed via __getitem__. This caused unaccessed outputs (e.g. the main
    output) to be missing from the composite's registered outputs in the
    pipeline proto. Fix by iterating over all declared tags (_main_tag + _tags)
    and accessing each via __getitem__ to ensure lazy creation.
    
    Co-Authored-By: Claude Opus 4.6 <[email protected]>
    
    * Remove separate regression test; existing test covers the fix
    
    Co-Authored-By: Claude Opus 4.6 <[email protected]>
    
    ---------
    
    Co-authored-by: Claude Opus 4.6 <[email protected]>
---
 sdks/python/apache_beam/pipeline.py      | 5 +++--
 sdks/python/apache_beam/pipeline_test.py | 7 ++++---
 2 files changed, 7 insertions(+), 5 deletions(-)

diff --git a/sdks/python/apache_beam/pipeline.py 
b/sdks/python/apache_beam/pipeline.py
index 3cce2c5bb77..7446f9df38c 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -829,9 +829,10 @@ class Pipeline(HasDisplayData):
 
         assert isinstance(result.producer.inputs, tuple)
         if isinstance(result, pvalue.DoOutputsTuple):
-          for tag, pc in list(result._pcolls.items()):
+          all_tags = [result._main_tag] + list(result._tags)
+          for tag in all_tags:
             if tag not in current.outputs:
-              current.add_output(pc, tag)
+              current.add_output(result[tag], tag)
           continue
 
         # If there is already a tag with the same name, increase a counter for
diff --git a/sdks/python/apache_beam/pipeline_test.py 
b/sdks/python/apache_beam/pipeline_test.py
index b28fe3c3d14..ac3e6ac4afc 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -1648,9 +1648,10 @@ class RunnerApiTest(unittest.TestCase):
       all_applied_transforms[xform.full_label] = xform
       current_transforms.extend(xform.parts)
     xform = all_applied_transforms['Split Sales']
-    # Confirm that Split Sales correctly has two outputs as specified by
-    #  ParDo.with_outputs in ParentSalesSplitter.
-    assert len(xform.outputs) == 2
+    # Confirm that Split Sales correctly has three outputs: the main
+    # (untagged) output plus the two tagged outputs specified by
+    # ParDo.with_outputs in ParentSalesSplitter.
+    assert len(xform.outputs) == 3
 
 
 if __name__ == '__main__':

Reply via email to