riteshghorse commented on code in PR #24346:
URL: https://github.com/apache/beam/pull/24346#discussion_r1034747217


##########
sdks/go/pkg/beam/core/runtime/exec/datasource.go:
##########
@@ -324,7 +324,7 @@ func (n *DataSource) Down(ctx context.Context) error {
 }
 
 func (n *DataSource) String() string {
-       return fmt.Sprintf("DataSource[%v, %v] Coder:%v Out:%v", n.SID, n.Name, 
n.Coder, n.Out.ID())
+       return fmt.Sprintf("DataSource[%v, %v] Out:%v Coder:%v ", n.SID, 
n.Name, n.Coder, n.Out.ID())

Review Comment:
   ```suggestion
        return fmt.Sprintf("DataSource[%v, %v] Out:%v Coder:%v ", n.SID, 
n.Name, n.Out.ID(), n.Coder)
   ```



##########
sdks/go/pkg/beam/core/runtime/exec/translate.go:
##########
@@ -95,11 +96,67 @@ func UnmarshalPlan(desc *fnpb.ProcessBundleDescriptor) 
(*Plan, error) {
                        b.units = b.units[:len(b.units)-1]
                }
 
+               mayFixDataSourceCoder(u)
                b.units = append(b.units, u)
        }
        return b.build()
 }
 
+// mayFixDataSourceCoder checks the node downstream of the DataSource and if 
applicable, changes
+// a KV<k, Iter<V>> coder to a CoGBK<k, v>. This requires knowledge of the 
downstream node because
+// coder interpretation is ambiguous to received types in DoFns, and we can 
only interpret it right
+// at execution time with knowledge of both.
+func mayFixDataSourceCoder(u *DataSource) {
+       if !coder.IsKV(coder.SkipW(u.Coder)) {
+               return // If it's not a KV, there's nothing to do here.
+       }
+       if coder.SkipW(u.Coder).Components[1].Kind != coder.Iterable {
+               return // If the V is not an iterable, we don't care.
+       }
+       out := u.Out
+       if mp, ok := out.(*Multiplex); ok {
+               // Here we trust that the Multiplex Outs are all the same 
signature, since we've validated
+               // that at construction time.
+               out = mp.Out[0]
+       }
+
+       // Expand, *Combine, MergeAccumulators, ReshuffleOutput nodes always 
have CoGBK behavior.

Review Comment:
   ```suggestion
        // Expand, Combine, MergeAccumulators, ReshuffleOutput nodes always 
have CoGBK behavior.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to