This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 04251e72aa3 Fix a flatten flaky test when two flattens are used 
sequentially. (#34602)
04251e72aa3 is described below

commit 04251e72aa3b7e1cfd37c1c0e58d949566228eaf
Author: Shunping Huang <[email protected]>
AuthorDate: Thu Apr 10 12:16:15 2025 -0400

    Fix a flatten flaky test when two flattens are used sequentially. (#34602)
---
 .../beam/runners/prism/internal/handlerunner.go    | 34 ++++++++++++++++++++--
 1 file changed, 32 insertions(+), 2 deletions(-)

diff --git a/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go 
b/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go
index e58bb8f180e..ba950de3469 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go
@@ -21,6 +21,7 @@ import (
        "io"
        "reflect"
        "sort"
+       "strings"
 
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
@@ -108,12 +109,40 @@ func (h *runner) handleFlatten(tid string, t 
*pipepb.PTransform, comps *pipepb.C
                // they're written out to the runner in the same fashion.
                // This may stop being necessary once Flatten Unzipping happens 
in the optimizer.
                outPCol := comps.GetPcollections()[outColID]
+               outCoderID := outPCol.CoderId
+               outCoder := comps.GetCoders()[outCoderID]
+               coderSubs := map[string]*pipepb.Coder{}
                pcollSubs := map[string]*pipepb.PCollection{}
+
+               if !strings.HasPrefix(outCoderID, "cf_") {
+                       // Create a new coder id for the flatten output 
PCollection and use
+                       // this coder id for all input PCollections
+                       outCoderID = "cf_" + outColID
+                       outCoder = proto.Clone(outCoder).(*pipepb.Coder)
+                       coderSubs[outCoderID] = outCoder
+
+                       pcollSubs[outColID] = 
proto.Clone(outPCol).(*pipepb.PCollection)
+                       pcollSubs[outColID].CoderId = outCoderID
+
+                       outPCol = pcollSubs[outColID]
+               }
+
                for _, p := range t.GetInputs() {
                        inPCol := comps.GetPcollections()[p]
                        if inPCol.CoderId != outPCol.CoderId {
-                               pcollSubs[p] = 
proto.Clone(inPCol).(*pipepb.PCollection)
-                               pcollSubs[p].CoderId = outPCol.CoderId
+                               if strings.HasPrefix(inPCol.CoderId, "cf_") {
+                                       // The input pcollection is the output 
of another flatten:
+                                       //     e.g. [[a, b] | Flatten], c] | 
Flatten
+                                       // In this case, we just point the 
input coder id to the new flatten
+                                       // output coder, so any upstream input 
pcollections will use the new
+                                       // output coder.
+                                       coderSubs[inPCol.CoderId] = outCoder
+                               } else {
+                                       // Create a substitute PCollection for 
this input with the flatten
+                                       // output coder id
+                                       pcollSubs[p] = 
proto.Clone(inPCol).(*pipepb.PCollection)
+                                       pcollSubs[p].CoderId = outPCol.CoderId
+                               }
                        }
                }
 
@@ -125,6 +154,7 @@ func (h *runner) handleFlatten(tid string, t 
*pipepb.PTransform, comps *pipepb.C
                                        tid: t,
                                },
                                Pcollections: pcollSubs,
+                               Coders:       coderSubs,
                        },
                        RemovedLeaves: nil,
                        ForcedRoots:   forcedRoots,

Reply via email to