milantracy commented on a change in pull request #13028:
URL: https://github.com/apache/beam/pull/13028#discussion_r501393713



##########
File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go
##########
@@ -420,33 +490,58 @@ func (m *marshaller) expandCrossLanguage(namedEdge 
NamedEdge) string {
                // map consumers of these outputs to the expanded transform's 
outputs.
                outputs := make(map[string]string)
                for i, out := range edge.Output {
-                       m.addNode(out.To)
+                       if _, err := m.addNode(out.To); err != nil {
+                               return "", errors.Wrapf(err, "Failed to expand 
cross language transform for edge: %v", namedEdge)
+                       }
                        outputs[fmt.Sprintf("i%v", i)] = nodeID(out.To)
                }
                transform.Outputs = outputs
-               transform.EnvironmentId = 
ExpandedTransform(edge.External.Expanded).EnvironmentId
+               environment, err := ExpandedTransform(edge.External.Expanded)
+               if err != nil {
+                       return "", errors.Wrapf(err, "Failed to expand cross 
language transform for edge: %v", namedEdge)
+               }
+               transform.EnvironmentId = environment.EnvironmentId
        }
 
        m.transforms[id] = transform
-       return id
+       return id, nil
 }
 
-func (m *marshaller) expandCoGBK(edge NamedEdge) string {
+func (m *marshaller) expandCoGBK(edge NamedEdge) (string, error) {
        // TODO(BEAM-490): replace once CoGBK is a primitive. For now, we have 
to translate
        // CoGBK with multiple PCollections as described in cogbk.go.
 
        id := edgeID(edge.Edge)
-       kvCoderID := m.coders.Add(MakeKVUnionCoder(edge.Edge))
-       gbkCoderID := m.coders.Add(MakeGBKUnionCoder(edge.Edge))
+       kvCoder, err := MakeKVUnionCoder(edge.Edge)
+       if err != nil {
+               return "", errors.Wrapf(err, "Fail to expand CoGBK transform 
for edge: %v", edge)

Review comment:
       thanks, this is a good catch, I also create a anonymous function to 
return errors from _addMultiEdge_ method rather than create a named helper 
function for the same purpose.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to