pskevin commented on a change in pull request #12445: URL: https://github.com/apache/beam/pull/12445#discussion_r465373887
########## File path: sdks/go/pkg/beam/runners/universal/universal.go ########## @@ -82,6 +82,54 @@ func Execute(ctx context.Context, p *beam.Pipeline) error { return errors.WithContextf(err, "generating model pipeline") } + // Adding Expanded transforms to their counterparts in the Pipeline + for id, external := range p.ExpandedTransforms { + pipeline.Requirements = append(pipeline.Requirements, external.Requirements...) + + // Correct update of transform corresponding to the ExpandedTransform + // TODO(pskevin): Figure if there is a better way of supporting multiple outputs + transform := pipeline.Components.Transforms[id] + existingInput := "" + newInput := "" + for _, v := range transform.Outputs { + existingInput = v + } + for _, v := range external.ExpandedTransform.Outputs { + newInput = v + } + + for _, t := range pipeline.Components.Transforms { + for idx, i := range t.Inputs { + if i == existingInput { + t.Inputs[idx] = newInput + } + } + } + + // Adding components of the Expanded Transforms to the current Pipeline + for k, v := range external.Components.Transforms { Review comment: That's an awesome helper. Thanks for pointing it out. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org