youngoli commented on a change in pull request #13325:
URL: https://github.com/apache/beam/pull/13325#discussion_r523383346
##########
File path: sdks/go/pkg/beam/xlang.go
##########
@@ -87,61 +84,16 @@ func TryCrossLanguage(s Scope, ext
*graph.ExternalTransform, ins []*graph.Inboun
// unique namespace can be requested.
ext.Namespace = graph.NewNamespace()
- // Build the ExpansionRequest
-
- // Obtaining the components and transform proto representing this
transform
- // TODO(BEAM-11188): Move proto handling code into xlangx or graphx
package.
- p, err := graphx.Marshal([]*graph.MultiEdge{edge}, &graphx.Options{})
+ expanded, err := xlangx.Expand(edge, ext)
if err != nil {
- return nil, errors.Wrapf(err, "unable to generate proto
representation of %v", ext)
- }
-
- transforms := p.GetComponents().GetTransforms()
-
- // Transforms consist of only External transform and composites.
Composites
- // should be removed from proto before submitting expansion request.
- extTransformID := p.GetRootTransformIds()[0]
- extTransform := transforms[extTransformID]
- for extTransform.UniqueName != "External" {
- delete(transforms, extTransformID)
- p, err := pipelinex.Normalize(p)
- if err != nil {
- return nil, err
- }
- extTransformID = p.GetRootTransformIds()[0]
- extTransform = transforms[extTransformID]
- }
-
- // Scoping the ExternalTransform with respect to it's unique namespace,
thus
- // avoiding future collisions
- xlangx.AddNamespace(extTransform, p.GetComponents(), ext.Namespace)
-
- xlangx.AddFakeImpulses(p) // Inputs need to have sources
- delete(transforms, extTransformID)
-
- // Querying the expansion service
- res, err := xlangx.Expand(context.Background(), p.GetComponents(),
extTransform, ext.Namespace, ext.ExpansionAddr)
- if err != nil {
- return nil, err
- }
-
- // Handling ExpansionResponse
-
- // Previously added fake impulses need to be removed to avoid having
- // multiple sources to the same pcollection in the graph
- xlangx.RemoveFakeImpulses(res.GetComponents(), res.GetTransform())
-
- exp := &graph.ExpandedTransform{
- Components: res.GetComponents(),
- Transform: res.GetTransform(),
- Requirements: res.GetRequirements(),
+ return nil, errors.WithContext(err, "expanding external
transform")
}
- ext.Expanded = exp
+ ext.Expanded = expanded
Review comment:
Sounds fine with me. My instinct is usually to prefer returning values
over modifying parameters, but Expand has such specific usage that I don't
think it matters much here. Done.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]