This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 04251e72aa3 Fix a flatten flaky test when two flattens are used
sequentially. (#34602)
04251e72aa3 is described below
commit 04251e72aa3b7e1cfd37c1c0e58d949566228eaf
Author: Shunping Huang <[email protected]>
AuthorDate: Thu Apr 10 12:16:15 2025 -0400
Fix a flatten flaky test when two flattens are used sequentially. (#34602)
---
.../beam/runners/prism/internal/handlerunner.go | 34 ++++++++++++++++++++--
1 file changed, 32 insertions(+), 2 deletions(-)
diff --git a/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go
b/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go
index e58bb8f180e..ba950de3469 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go
@@ -21,6 +21,7 @@ import (
"io"
"reflect"
"sort"
+ "strings"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
@@ -108,12 +109,40 @@ func (h *runner) handleFlatten(tid string, t
*pipepb.PTransform, comps *pipepb.C
// they're written out to the runner in the same fashion.
// This may stop being necessary once Flatten Unzipping happens
in the optimizer.
outPCol := comps.GetPcollections()[outColID]
+ outCoderID := outPCol.CoderId
+ outCoder := comps.GetCoders()[outCoderID]
+ coderSubs := map[string]*pipepb.Coder{}
pcollSubs := map[string]*pipepb.PCollection{}
+
+ if !strings.HasPrefix(outCoderID, "cf_") {
+ // Create a new coder id for the flatten output
PCollection and use
+ // this coder id for all input PCollections
+ outCoderID = "cf_" + outColID
+ outCoder = proto.Clone(outCoder).(*pipepb.Coder)
+ coderSubs[outCoderID] = outCoder
+
+ pcollSubs[outColID] =
proto.Clone(outPCol).(*pipepb.PCollection)
+ pcollSubs[outColID].CoderId = outCoderID
+
+ outPCol = pcollSubs[outColID]
+ }
+
for _, p := range t.GetInputs() {
inPCol := comps.GetPcollections()[p]
if inPCol.CoderId != outPCol.CoderId {
- pcollSubs[p] =
proto.Clone(inPCol).(*pipepb.PCollection)
- pcollSubs[p].CoderId = outPCol.CoderId
+ if strings.HasPrefix(inPCol.CoderId, "cf_") {
+ // The input pcollection is the output
of another flatten:
+ // e.g. [[a, b] | Flatten], c] |
Flatten
+ // In this case, we just point the
input coder id to the new flatten
+ // output coder, so any upstream input
pcollections will use the new
+ // output coder.
+ coderSubs[inPCol.CoderId] = outCoder
+ } else {
+ // Create a substitute PCollection for
this input with the flatten
+ // output coder id
+ pcollSubs[p] =
proto.Clone(inPCol).(*pipepb.PCollection)
+ pcollSubs[p].CoderId = outPCol.CoderId
+ }
}
}
@@ -125,6 +154,7 @@ func (h *runner) handleFlatten(tid string, t
*pipepb.PTransform, comps *pipepb.C
tid: t,
},
Pcollections: pcollSubs,
+ Coders: coderSubs,
},
RemovedLeaves: nil,
ForcedRoots: forcedRoots,