milantracy commented on a change in pull request #13028:
URL: https://github.com/apache/beam/pull/13028#discussion_r501393713
##########
File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go
##########
@@ -420,33 +490,58 @@ func (m *marshaller) expandCrossLanguage(namedEdge
NamedEdge) string {
// map consumers of these outputs to the expanded transform's
outputs.
outputs := make(map[string]string)
for i, out := range edge.Output {
- m.addNode(out.To)
+ if _, err := m.addNode(out.To); err != nil {
+ return "", errors.Wrapf(err, "Failed to expand
cross language transform for edge: %v", namedEdge)
+ }
outputs[fmt.Sprintf("i%v", i)] = nodeID(out.To)
}
transform.Outputs = outputs
- transform.EnvironmentId =
ExpandedTransform(edge.External.Expanded).EnvironmentId
+ environment, err := ExpandedTransform(edge.External.Expanded)
+ if err != nil {
+ return "", errors.Wrapf(err, "Failed to expand cross
language transform for edge: %v", namedEdge)
+ }
+ transform.EnvironmentId = environment.EnvironmentId
}
m.transforms[id] = transform
- return id
+ return id, nil
}
-func (m *marshaller) expandCoGBK(edge NamedEdge) string {
+func (m *marshaller) expandCoGBK(edge NamedEdge) (string, error) {
// TODO(BEAM-490): replace once CoGBK is a primitive. For now, we have
to translate
// CoGBK with multiple PCollections as described in cogbk.go.
id := edgeID(edge.Edge)
- kvCoderID := m.coders.Add(MakeKVUnionCoder(edge.Edge))
- gbkCoderID := m.coders.Add(MakeGBKUnionCoder(edge.Edge))
+ kvCoder, err := MakeKVUnionCoder(edge.Edge)
+ if err != nil {
+ return "", errors.Wrapf(err, "Fail to expand CoGBK transform
for edge: %v", edge)
Review comment:
thanks, this is a good catch, I also create a anonymous function to
return errors from _addMultiEdge_ method rather than create a named helper
function for the same purpose.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]