lostluck commented on code in PR #29439:
URL: https://github.com/apache/beam/pull/29439#discussion_r1393710231
##########
sdks/go/pkg/beam/runners/prism/internal/preprocess.go:
##########
@@ -299,71 +299,71 @@ func checkForExpandCoderPattern(in, out string, comps
*pipepb.Components) bool {
}
type fusionFacts struct {
- pcolProducers map[string]link // global pcol ID to transform link
that produces it.
- pcolConsumers map[string][]link // global pcol ID to all consumers of
that pcollection
- usedAsSideInput map[string]bool // global pcol ID and if it's used as
a side input
+ PcolProducers map[string]link // global pcol ID to transform link
that produces it.
+ PcolConsumers map[string][]link // global pcol ID to all consumers of
that pcollection
+ UsedAsSideInput map[string]bool // global pcol ID and if it's used as
a side input
- directSideInputs map[string]map[string]bool // global transform ID
and all direct side input pcollections.
- downstreamSideInputs map[string]map[string]bool // global transform ID
and all transitive side input pcollections.
+ DirectSideInputs map[string]map[string]bool // global transform ID
and all direct side input pcollections.
+ DownstreamSideInputs map[string]map[string]bool // global transform ID
and all transitive side input pcollections.
- forcedRoots map[string]bool // transforms forced to be roots (not
computed in computeFacts)
+ ForcedRoots map[string]bool // transforms forced to be roots (not
computed in computeFacts)
}
// computeFacts computes facts about the given set of transforms and
components that
// are useful for fusion.
func computeFacts(topological []string, comps *pipepb.Components) fusionFacts {
ret := fusionFacts{
- pcolProducers: map[string]link{},
- pcolConsumers: map[string][]link{},
- usedAsSideInput: map[string]bool{},
- directSideInputs: map[string]map[string]bool{}, // direct
set
- downstreamSideInputs: map[string]map[string]bool{}, //
transitive set
+ PcolProducers: map[string]link{},
+ PcolConsumers: map[string][]link{},
+ UsedAsSideInput: map[string]bool{},
+ DirectSideInputs: map[string]map[string]bool{}, // direct
set
+ DownstreamSideInputs: map[string]map[string]bool{}, //
transitive set
}
// Use the topological ids so each PCollection only has a single
// producer. We've already pruned out composites at this stage.
for _, tID := range topological {
t := comps.GetTransforms()[tID]
for local, global := range t.GetOutputs() {
- ret.pcolProducers[global] = link{transform: tID, local:
local, global: global}
+ ret.PcolProducers[global] = link{Transform: tID, Local:
local, Global: global}
}
sis, err := getSideInputs(t)
if err != nil {
panic(err)
}
directSIs := map[string]bool{}
- ret.directSideInputs[tID] = directSIs
+ ret.DirectSideInputs[tID] = directSIs
for local, global := range t.GetInputs() {
- ret.pcolConsumers[global] =
append(ret.pcolConsumers[global], link{transform: tID, local: local, global:
global})
+ ret.PcolConsumers[global] =
append(ret.PcolConsumers[global], link{Transform: tID, Local: local, Global:
global})
if _, ok := sis[local]; ok {
- ret.usedAsSideInput[global] = true
+ ret.UsedAsSideInput[global] = true
directSIs[global] = true
}
}
}
for _, tID := range topological {
- computeDownstreamSideInputs(tID, comps, ret)
+ computeDownstreamSideInputs(tID, comps, &ret)
}
return ret
}
-func computeDownstreamSideInputs(tID string, comps *pipepb.Components, facts
fusionFacts) map[string]bool {
- if dssi, ok := facts.downstreamSideInputs[tID]; ok {
+func computeDownstreamSideInputs(tID string, comps *pipepb.Components, facts
*fusionFacts) map[string]bool {
+ if dssi, ok := facts.DownstreamSideInputs[tID]; ok {
return dssi
}
dssi := map[string]bool{}
for _, o := range comps.GetTransforms()[tID].GetOutputs() {
- if facts.usedAsSideInput[o] {
+ if facts.UsedAsSideInput[o] {
dssi[o] = true
}
- for _, consumer := range facts.pcolConsumers[o] {
- cdssi := computeDownstreamSideInputs(consumer.global,
comps, facts)
+ for _, consumer := range facts.PcolConsumers[o] {
+ cdssi :=
computeDownstreamSideInputs(consumer.Transform, comps, facts)
Review Comment:
FYI, this line is where the bug occurred.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]