riteshghorse commented on code in PR #24346:
URL: https://github.com/apache/beam/pull/24346#discussion_r1034747217
##########
sdks/go/pkg/beam/core/runtime/exec/datasource.go:
##########
@@ -324,7 +324,7 @@ func (n *DataSource) Down(ctx context.Context) error {
}
func (n *DataSource) String() string {
- return fmt.Sprintf("DataSource[%v, %v] Coder:%v Out:%v", n.SID, n.Name,
n.Coder, n.Out.ID())
+ return fmt.Sprintf("DataSource[%v, %v] Out:%v Coder:%v ", n.SID,
n.Name, n.Coder, n.Out.ID())
Review Comment:
```suggestion
return fmt.Sprintf("DataSource[%v, %v] Out:%v Coder:%v ", n.SID,
n.Name, n.Out.ID(), n.Coder)
```
##########
sdks/go/pkg/beam/core/runtime/exec/translate.go:
##########
@@ -95,11 +96,67 @@ func UnmarshalPlan(desc *fnpb.ProcessBundleDescriptor)
(*Plan, error) {
b.units = b.units[:len(b.units)-1]
}
+ mayFixDataSourceCoder(u)
b.units = append(b.units, u)
}
return b.build()
}
+// mayFixDataSourceCoder checks the node downstream of the DataSource and if
applicable, changes
+// a KV<k, Iter<V>> coder to a CoGBK<k, v>. This requires knowledge of the
downstream node because
+// coder interpretation is ambiguous to received types in DoFns, and we can
only interpret it right
+// at execution time with knowledge of both.
+func mayFixDataSourceCoder(u *DataSource) {
+ if !coder.IsKV(coder.SkipW(u.Coder)) {
+ return // If it's not a KV, there's nothing to do here.
+ }
+ if coder.SkipW(u.Coder).Components[1].Kind != coder.Iterable {
+ return // If the V is not an iterable, we don't care.
+ }
+ out := u.Out
+ if mp, ok := out.(*Multiplex); ok {
+ // Here we trust that the Multiplex Outs are all the same
signature, since we've validated
+ // that at construction time.
+ out = mp.Out[0]
+ }
+
+ // Expand, *Combine, MergeAccumulators, ReshuffleOutput nodes always
have CoGBK behavior.
Review Comment:
```suggestion
// Expand, Combine, MergeAccumulators, ReshuffleOutput nodes always
have CoGBK behavior.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]