riteshghorse commented on code in PR #29439:
URL: https://github.com/apache/beam/pull/29439#discussion_r1402395163


##########
sdks/go/pkg/beam/runners/prism/internal/preprocess.go:
##########
@@ -299,71 +299,71 @@ func checkForExpandCoderPattern(in, out string, comps 
*pipepb.Components) bool {
 }
 
 type fusionFacts struct {
-       pcolProducers   map[string]link   // global pcol ID to transform link 
that produces it.
-       pcolConsumers   map[string][]link // global pcol ID to all consumers of 
that pcollection
-       usedAsSideInput map[string]bool   // global pcol ID and if it's used as 
a side input
+       PcolProducers   map[string]link   // global pcol ID to transform link 
that produces it.
+       PcolConsumers   map[string][]link // global pcol ID to all consumers of 
that pcollection
+       UsedAsSideInput map[string]bool   // global pcol ID and if it's used as 
a side input
 
-       directSideInputs     map[string]map[string]bool // global transform ID 
and all direct side input pcollections.
-       downstreamSideInputs map[string]map[string]bool // global transform ID 
and all transitive side input pcollections.
+       DirectSideInputs     map[string]map[string]bool // global transform ID 
and all direct side input pcollections.
+       DownstreamSideInputs map[string]map[string]bool // global transform ID 
and all transitive side input pcollections.
 
-       forcedRoots map[string]bool // transforms forced to be roots (not 
computed in computeFacts)
+       ForcedRoots map[string]bool // transforms forced to be roots (not 
computed in computeFacts)
 }
 
 // computeFacts computes facts about the given set of transforms and 
components that
 // are useful for fusion.
 func computeFacts(topological []string, comps *pipepb.Components) fusionFacts {
        ret := fusionFacts{
-               pcolProducers:        map[string]link{},
-               pcolConsumers:        map[string][]link{},
-               usedAsSideInput:      map[string]bool{},
-               directSideInputs:     map[string]map[string]bool{}, // direct 
set
-               downstreamSideInputs: map[string]map[string]bool{}, // 
transitive set
+               PcolProducers:        map[string]link{},
+               PcolConsumers:        map[string][]link{},
+               UsedAsSideInput:      map[string]bool{},
+               DirectSideInputs:     map[string]map[string]bool{}, // direct 
set
+               DownstreamSideInputs: map[string]map[string]bool{}, // 
transitive set
        }
 
        // Use the topological ids so each PCollection only has a single
        // producer. We've already pruned out composites at this stage.
        for _, tID := range topological {
                t := comps.GetTransforms()[tID]
                for local, global := range t.GetOutputs() {
-                       ret.pcolProducers[global] = link{transform: tID, local: 
local, global: global}
+                       ret.PcolProducers[global] = link{Transform: tID, Local: 
local, Global: global}
                }
                sis, err := getSideInputs(t)
                if err != nil {
                        panic(err)
                }
                directSIs := map[string]bool{}
-               ret.directSideInputs[tID] = directSIs
+               ret.DirectSideInputs[tID] = directSIs
                for local, global := range t.GetInputs() {
-                       ret.pcolConsumers[global] = 
append(ret.pcolConsumers[global], link{transform: tID, local: local, global: 
global})
+                       ret.PcolConsumers[global] = 
append(ret.PcolConsumers[global], link{Transform: tID, Local: local, Global: 
global})
                        if _, ok := sis[local]; ok {
-                               ret.usedAsSideInput[global] = true
+                               ret.UsedAsSideInput[global] = true
                                directSIs[global] = true
                        }
                }
        }
 
        for _, tID := range topological {
-               computeDownstreamSideInputs(tID, comps, ret)
+               computeDownstreamSideInputs(tID, comps, &ret)
        }
 
        return ret
 }
 
-func computeDownstreamSideInputs(tID string, comps *pipepb.Components, facts 
fusionFacts) map[string]bool {
-       if dssi, ok := facts.downstreamSideInputs[tID]; ok {
+func computeDownstreamSideInputs(tID string, comps *pipepb.Components, facts 
*fusionFacts) map[string]bool {
+       if dssi, ok := facts.DownstreamSideInputs[tID]; ok {
                return dssi
        }
        dssi := map[string]bool{}
        for _, o := range comps.GetTransforms()[tID].GetOutputs() {
-               if facts.usedAsSideInput[o] {
+               if facts.UsedAsSideInput[o] {
                        dssi[o] = true
                }
-               for _, consumer := range facts.pcolConsumers[o] {
-                       cdssi := computeDownstreamSideInputs(consumer.global, 
comps, facts)
+               for _, consumer := range facts.PcolConsumers[o] {
+                       cdssi := 
computeDownstreamSideInputs(consumer.Transform, comps, facts)

Review Comment:
   thanks for making the review short!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to