lostluck commented on a change in pull request #13325:
URL: https://github.com/apache/beam/pull/13325#discussion_r522490717



##########
File path: sdks/go/pkg/beam/core/runtime/graphx/xlang.go
##########
@@ -16,11 +16,222 @@
 package graphx
 
 import (
+       "fmt"
+
        "github.com/apache/beam/sdks/go/pkg/beam/core/graph"
        "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
        pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
 )
 
+// MergeExpandedWithPipeline adds expanded components of all 
ExternalTransforms to the existing pipeline
+func MergeExpandedWithPipeline(edges []*graph.MultiEdge, p *pipepb.Pipeline) {

Review comment:
       Please un-export all functions if they aren't required outside of the 
graphx package. Naturally, keep any existing doc comments. No reason to remove 
them just because the functions aren't exported.

##########
File path: sdks/go/pkg/beam/core/runtime/graphx/xlang.go
##########
@@ -16,11 +16,222 @@
 package graphx
 
 import (
+       "fmt"
+
        "github.com/apache/beam/sdks/go/pkg/beam/core/graph"
        "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
        pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
 )
 
+// MergeExpandedWithPipeline adds expanded components of all 
ExternalTransforms to the existing pipeline
+func MergeExpandedWithPipeline(edges []*graph.MultiEdge, p *pipepb.Pipeline) {
+       // Adding Expanded transforms to their counterparts in the Pipeline
+
+       for _, e := range edges {
+               if e.Op == graph.External {
+                       exp := e.External.Expanded
+                       if exp == nil {
+                               continue
+                       }
+                       id := fmt.Sprintf("e%v", e.ID())
+
+                       p.Requirements = append(p.Requirements, 
exp.Requirements...)
+
+                       // Adding components of the Expanded Transforms to the 
current Pipeline
+                       components, err := ExpandedComponents(exp)
+                       if err != nil {
+                               panic(err)
+                       }
+                       for k, v := range components.GetTransforms() {
+                               p.Components.Transforms[k] = v
+                       }
+                       for k, v := range components.GetPcollections() {
+                               p.Components.Pcollections[k] = v
+                       }
+                       for k, v := range components.GetWindowingStrategies() {
+                               p.Components.WindowingStrategies[k] = v
+                       }
+                       for k, v := range components.GetCoders() {
+                               p.Components.Coders[k] = v
+                       }
+                       for k, v := range components.GetEnvironments() {
+                               if k == "go" {
+                                       // This case is not an anomaly. It is 
expected to be always
+                                       // present. Any initial 
ExpansionRequest will have a
+                                       // component which requires the "go" 
environment. Scoping
+                                       // using unique namespace prevents 
collision.
+                                       continue
+                               }
+                               p.Components.Environments[k] = v
+                       }
+
+                       transform, err := ExpandedTransform(exp)
+                       if err != nil {
+                               panic(err)
+                       }
+                       p.Components.Transforms[id] = transform
+               }
+       }
+}
+
+// PurgeOutputInput remaps outputs from edge corresponding to an
+// ExternalTransform with the correct expanded outputs. All consumers of the
+// previous outputs are updated with new inputs.
+func PurgeOutputInput(edges []*graph.MultiEdge, p *pipepb.Pipeline) {
+       idxMap := make(map[string]string)
+       components := p.GetComponents()
+
+       // Generating map (oldID -> newID) of outputs to be purged
+       for _, e := range edges {
+               if e.Op == graph.External {
+                       if e.External.Expanded == nil {
+                               continue
+                       }
+                       for tag, n := range ExternalOutputs(e) {
+                               nodeID := fmt.Sprintf("n%v", n.ID())
+
+                               transform, err := 
ExpandedTransform(e.External.Expanded)
+                               if err != nil {
+                                       panic(err)
+                               }
+                               expandedOutputs := transform.GetOutputs()
+                               var pcolID string
+                               if tag == graph.UnnamedOutputTag {
+                                       for _, pcolID = range expandedOutputs {
+                                               // easiest way to access map 
with one entry (key,value)
+                                       }
+                               } else {
+                                       pcolID = expandedOutputs[tag]
+                               }
+
+                               idxMap[nodeID] = pcolID
+                               delete(components.Pcollections, nodeID)
+                       }
+               }
+       }
+
+       // Updating all input ids to reflect the correct sources
+       for _, t := range components.GetTransforms() {
+               inputs := t.GetInputs()
+               for tag, nodeID := range inputs {
+                       if pcolID, exists := idxMap[nodeID]; exists {
+                               inputs[tag] = pcolID
+                       }
+               }
+       }
+
+}
+
+// VerifyNamedOutputs ensures the expanded outputs correspond to the correct 
and expected named outputs
+func VerifyNamedOutputs(ext *graph.ExternalTransform) {
+       transform, err := ExpandedTransform(ext.Expanded)
+       if err != nil {
+               panic(err)
+       }
+       expandedOutputs := transform.GetOutputs()
+
+       if len(expandedOutputs) != len(ext.OutputsMap) {
+               panic(errors.Errorf("mismatched number of named 
outputs:\nreceived - %v\nexpected - %v", len(expandedOutputs), 
len(ext.OutputsMap)))
+       }
+
+       for tag := range ext.OutputsMap {
+               _, exists := expandedOutputs[tag]
+               if tag != graph.UnnamedOutputTag && !exists {
+                       panic(errors.Errorf("missing named output in expanded 
transform: %v is expected in %v", tag, expandedOutputs))
+               }
+               if tag == graph.UnnamedOutputTag && len(expandedOutputs) > 1 {
+                       panic(errors.Errorf("mismatched number of unnamed 
outputs:\nreceived - %v\nexpected - 1", len(expandedOutputs)))
+               }
+       }
+}
+
+// ResolveOutputIsBounded updates each Output node with respect to the received
+// expanded components to reflect if it is bounded or not
+func ResolveOutputIsBounded(e *graph.MultiEdge, isBoundedUpdater 
func(*graph.Node, bool)) {
+       ext := e.External
+       exp := ext.Expanded
+       components, err := ExpandedComponents(exp)
+       if err != nil {
+               panic(err)
+       }
+       expandedPCollections := components.GetPcollections()
+       transform, err := ExpandedTransform(exp)
+       if err != nil {
+               panic(err)
+       }
+       expandedOutputs := transform.GetOutputs()
+
+       for tag, node := range ExternalOutputs(e) {
+               var id string
+               isBounded := true
+
+               switch tag {
+               case graph.UnnamedOutputTag:
+                       for _, id = range expandedOutputs {
+                               // easiest way to access map with one entry 
(key,value)
+                       }
+               default:
+                       id = expandedOutputs[tag]
+               }
+
+               if pcol, exists := expandedPCollections[id]; exists {
+                       if pcol.GetIsBounded() == pipepb.IsBounded_UNBOUNDED {
+                               isBounded = false
+                       }
+                       isBoundedUpdater(node, isBounded)
+               } else {
+                       panic(errors.Errorf("missing corresponsing pcollection 
of named output: %v is expected in %v", id, expandedPCollections))
+               }
+
+       }
+}
+
+// AddFakeImpulses adds an impulse transform as the producer for each input to
+// the root transform. Inputs need producers to form a correct pipeline.
+func AddFakeImpulses(p *pipepb.Pipeline) {
+       // For a pipeline consisting of only the external node edge, there will 
be
+       // single root transform which will be the external transform.
+       // Adding fake impulses per input to this external transform
+       transforms := p.GetComponents().GetTransforms()
+       ext := transforms[p.GetRootTransformIds()[0]]
+
+       for tag, id := range ext.GetInputs() {
+               key := fmt.Sprintf("%s_%s", "impulse", tag)
+               output := map[string]string{"out": id}
+
+               impulse := &pipepb.PTransform{
+                       UniqueName: key,
+                       Spec: &pipepb.FunctionSpec{
+                               Urn: URNImpulse,
+                       },
+                       Outputs: output,
+               }
+
+               transforms[key] = impulse
+       }
+
+}
+
+// RemoveFakeImpulses removes each fake impulse per input to the the transform.
+// Multiple producers for one Input cannot be present.
+func RemoveFakeImpulses(c *pipepb.Components, ext *pipepb.PTransform) {
+       transforms := c.GetTransforms()
+       var impulseIDs []string
+
+       for tag := range ext.GetInputs() {
+               id := fmt.Sprintf("%s_%s", "impulse", tag)
+               impulseIDs = append(impulseIDs, id)
+       }
+
+       for _, id := range impulseIDs {
+               t := transforms[id]
+               if t.GetSpec().GetUrn() == URNImpulse {
+                       delete(transforms, id)
+

Review comment:
       rm spare line

##########
File path: sdks/go/pkg/beam/xlang.go
##########
@@ -87,61 +84,16 @@ func TryCrossLanguage(s Scope, ext 
*graph.ExternalTransform, ins []*graph.Inboun
        // unique namespace can be requested.
        ext.Namespace = graph.NewNamespace()
 
-       // Build the ExpansionRequest
-
-       // Obtaining the components and transform proto representing this 
transform
-       // TODO(BEAM-11188): Move proto handling code into xlangx or graphx 
package.
-       p, err := graphx.Marshal([]*graph.MultiEdge{edge}, &graphx.Options{})
+       expanded, err := xlangx.Expand(edge, ext)
        if err != nil {
-               return nil, errors.Wrapf(err, "unable to generate proto 
representation of %v", ext)
-       }
-
-       transforms := p.GetComponents().GetTransforms()
-
-       // Transforms consist of only External transform and composites. 
Composites
-       // should be removed from proto before submitting expansion request.
-       extTransformID := p.GetRootTransformIds()[0]
-       extTransform := transforms[extTransformID]
-       for extTransform.UniqueName != "External" {
-               delete(transforms, extTransformID)
-               p, err := pipelinex.Normalize(p)
-               if err != nil {
-                       return nil, err
-               }
-               extTransformID = p.GetRootTransformIds()[0]
-               extTransform = transforms[extTransformID]
-       }
-
-       // Scoping the ExternalTransform with respect to it's unique namespace, 
thus
-       // avoiding future collisions
-       xlangx.AddNamespace(extTransform, p.GetComponents(), ext.Namespace)
-
-       xlangx.AddFakeImpulses(p) // Inputs need to have sources
-       delete(transforms, extTransformID)
-
-       // Querying the expansion service
-       res, err := xlangx.Expand(context.Background(), p.GetComponents(), 
extTransform, ext.Namespace, ext.ExpansionAddr)
-       if err != nil {
-               return nil, err
-       }
-
-       // Handling ExpansionResponse
-
-       // Previously added fake impulses need to be removed to avoid having
-       // multiple sources to the same pcollection in the graph
-       xlangx.RemoveFakeImpulses(res.GetComponents(), res.GetTransform())
-
-       exp := &graph.ExpandedTransform{
-               Components:   res.GetComponents(),
-               Transform:    res.GetTransform(),
-               Requirements: res.GetRequirements(),
+               return nil, errors.WithContext(err, "expanding external 
transform")
        }
-       ext.Expanded = exp
+       ext.Expanded = expanded

Review comment:
       What do you think about simply moving this assignment into xlangx.Expand?

##########
File path: sdks/go/pkg/beam/core/runtime/graphx/xlang.go
##########
@@ -16,11 +16,222 @@
 package graphx
 
 import (
+       "fmt"
+
        "github.com/apache/beam/sdks/go/pkg/beam/core/graph"
        "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
        pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
 )
 
+// MergeExpandedWithPipeline adds expanded components of all 
ExternalTransforms to the existing pipeline
+func MergeExpandedWithPipeline(edges []*graph.MultiEdge, p *pipepb.Pipeline) {
+       // Adding Expanded transforms to their counterparts in the Pipeline
+
+       for _, e := range edges {
+               if e.Op == graph.External {
+                       exp := e.External.Expanded
+                       if exp == nil {
+                               continue
+                       }
+                       id := fmt.Sprintf("e%v", e.ID())
+
+                       p.Requirements = append(p.Requirements, 
exp.Requirements...)
+
+                       // Adding components of the Expanded Transforms to the 
current Pipeline
+                       components, err := ExpandedComponents(exp)
+                       if err != nil {
+                               panic(err)
+                       }
+                       for k, v := range components.GetTransforms() {
+                               p.Components.Transforms[k] = v
+                       }
+                       for k, v := range components.GetPcollections() {
+                               p.Components.Pcollections[k] = v
+                       }
+                       for k, v := range components.GetWindowingStrategies() {
+                               p.Components.WindowingStrategies[k] = v
+                       }
+                       for k, v := range components.GetCoders() {
+                               p.Components.Coders[k] = v
+                       }
+                       for k, v := range components.GetEnvironments() {
+                               if k == "go" {
+                                       // This case is not an anomaly. It is 
expected to be always
+                                       // present. Any initial 
ExpansionRequest will have a
+                                       // component which requires the "go" 
environment. Scoping
+                                       // using unique namespace prevents 
collision.
+                                       continue
+                               }
+                               p.Components.Environments[k] = v
+                       }
+
+                       transform, err := ExpandedTransform(exp)
+                       if err != nil {
+                               panic(err)
+                       }
+                       p.Components.Transforms[id] = transform
+               }
+       }
+}
+
+// PurgeOutputInput remaps outputs from edge corresponding to an
+// ExternalTransform with the correct expanded outputs. All consumers of the
+// previous outputs are updated with new inputs.
+func PurgeOutputInput(edges []*graph.MultiEdge, p *pipepb.Pipeline) {
+       idxMap := make(map[string]string)
+       components := p.GetComponents()
+
+       // Generating map (oldID -> newID) of outputs to be purged
+       for _, e := range edges {
+               if e.Op == graph.External {
+                       if e.External.Expanded == nil {
+                               continue
+                       }
+                       for tag, n := range ExternalOutputs(e) {
+                               nodeID := fmt.Sprintf("n%v", n.ID())
+
+                               transform, err := 
ExpandedTransform(e.External.Expanded)
+                               if err != nil {
+                                       panic(err)
+                               }
+                               expandedOutputs := transform.GetOutputs()
+                               var pcolID string
+                               if tag == graph.UnnamedOutputTag {
+                                       for _, pcolID = range expandedOutputs {
+                                               // easiest way to access map 
with one entry (key,value)
+                                       }
+                               } else {
+                                       pcolID = expandedOutputs[tag]
+                               }
+
+                               idxMap[nodeID] = pcolID
+                               delete(components.Pcollections, nodeID)
+                       }
+               }
+       }
+
+       // Updating all input ids to reflect the correct sources
+       for _, t := range components.GetTransforms() {
+               inputs := t.GetInputs()
+               for tag, nodeID := range inputs {
+                       if pcolID, exists := idxMap[nodeID]; exists {
+                               inputs[tag] = pcolID
+                       }
+               }
+       }
+
+}
+
+// VerifyNamedOutputs ensures the expanded outputs correspond to the correct 
and expected named outputs
+func VerifyNamedOutputs(ext *graph.ExternalTransform) {
+       transform, err := ExpandedTransform(ext.Expanded)
+       if err != nil {
+               panic(err)
+       }
+       expandedOutputs := transform.GetOutputs()
+
+       if len(expandedOutputs) != len(ext.OutputsMap) {
+               panic(errors.Errorf("mismatched number of named 
outputs:\nreceived - %v\nexpected - %v", len(expandedOutputs), 
len(ext.OutputsMap)))
+       }
+
+       for tag := range ext.OutputsMap {
+               _, exists := expandedOutputs[tag]
+               if tag != graph.UnnamedOutputTag && !exists {
+                       panic(errors.Errorf("missing named output in expanded 
transform: %v is expected in %v", tag, expandedOutputs))
+               }
+               if tag == graph.UnnamedOutputTag && len(expandedOutputs) > 1 {
+                       panic(errors.Errorf("mismatched number of unnamed 
outputs:\nreceived - %v\nexpected - 1", len(expandedOutputs)))
+               }
+       }
+}
+
+// ResolveOutputIsBounded updates each Output node with respect to the received
+// expanded components to reflect if it is bounded or not
+func ResolveOutputIsBounded(e *graph.MultiEdge, isBoundedUpdater 
func(*graph.Node, bool)) {
+       ext := e.External
+       exp := ext.Expanded
+       components, err := ExpandedComponents(exp)
+       if err != nil {
+               panic(err)
+       }
+       expandedPCollections := components.GetPcollections()
+       transform, err := ExpandedTransform(exp)
+       if err != nil {
+               panic(err)
+       }
+       expandedOutputs := transform.GetOutputs()
+
+       for tag, node := range ExternalOutputs(e) {
+               var id string
+               isBounded := true
+
+               switch tag {
+               case graph.UnnamedOutputTag:
+                       for _, id = range expandedOutputs {
+                               // easiest way to access map with one entry 
(key,value)
+                       }
+               default:
+                       id = expandedOutputs[tag]
+               }
+
+               if pcol, exists := expandedPCollections[id]; exists {
+                       if pcol.GetIsBounded() == pipepb.IsBounded_UNBOUNDED {
+                               isBounded = false
+                       }
+                       isBoundedUpdater(node, isBounded)
+               } else {
+                       panic(errors.Errorf("missing corresponsing pcollection 
of named output: %v is expected in %v", id, expandedPCollections))
+               }
+
+       }
+}
+
+// AddFakeImpulses adds an impulse transform as the producer for each input to
+// the root transform. Inputs need producers to form a correct pipeline.
+func AddFakeImpulses(p *pipepb.Pipeline) {
+       // For a pipeline consisting of only the external node edge, there will 
be
+       // single root transform which will be the external transform.
+       // Adding fake impulses per input to this external transform
+       transforms := p.GetComponents().GetTransforms()
+       ext := transforms[p.GetRootTransformIds()[0]]
+
+       for tag, id := range ext.GetInputs() {
+               key := fmt.Sprintf("%s_%s", "impulse", tag)
+               output := map[string]string{"out": id}
+
+               impulse := &pipepb.PTransform{
+                       UniqueName: key,
+                       Spec: &pipepb.FunctionSpec{
+                               Urn: URNImpulse,
+                       },
+                       Outputs: output,
+               }
+
+               transforms[key] = impulse
+       }
+
+}
+
+// RemoveFakeImpulses removes each fake impulse per input to the the transform.
+// Multiple producers for one Input cannot be present.
+func RemoveFakeImpulses(c *pipepb.Components, ext *pipepb.PTransform) {
+       transforms := c.GetTransforms()
+       var impulseIDs []string
+
+       for tag := range ext.GetInputs() {
+               id := fmt.Sprintf("%s_%s", "impulse", tag)
+               impulseIDs = append(impulseIDs, id)
+       }
+
+       for _, id := range impulseIDs {
+               t := transforms[id]
+               if t.GetSpec().GetUrn() == URNImpulse {
+                       delete(transforms, id)
+
+               }
+       }
+}
+
 // ExpandedComponents type asserts the Components field with interface{} type
 // and returns its pipeline component proto representation
 func ExpandedComponents(exp *graph.ExpandedTransform) (*pipepb.Components, 
error) {

Review comment:
       Can this function be unexported now that callers were moved into graphx?

##########
File path: sdks/go/pkg/beam/core/runtime/xlangx/expand.go
##########
@@ -18,19 +18,79 @@ package xlangx
 import (
        "context"
 
+       "github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/pipelinex"
        "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
        jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
        pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
        "google.golang.org/grpc"
 )
 
-// Expand submits an external transform to be expanded by the expansion 
service.
-// The given transform should be the external transform, and the components are
-// any additional components necessary for the pipeline snippet.
+// Expand expands an unexpanded graph.ExternalTransform and returns the 
expanded
+// transform as a new graph.ExpandedTransform. This requires querying an
+// expansion service based on the configuration details within the
+// ExternalTransform.
+func Expand(edge *graph.MultiEdge, ext *graph.ExternalTransform) 
(*graph.ExpandedTransform, error) {
+       // Build the ExpansionRequest
+
+       // Obtaining the components and transform proto representing this 
transform
+       p, err := graphx.Marshal([]*graph.MultiEdge{edge}, &graphx.Options{})
+       if err != nil {
+               return nil, errors.Wrapf(err, "unable to generate proto 
representation of %v", ext)
+       }
+
+       transforms := p.GetComponents().GetTransforms()
+
+       // Transforms consist of only External transform and composites. 
Composites
+       // should be removed from proto before submitting expansion request.
+       extTransformID := p.GetRootTransformIds()[0]
+       extTransform := transforms[extTransformID]
+       for extTransform.UniqueName != "External" {
+               delete(transforms, extTransformID)
+               p, err := pipelinex.Normalize(p)
+               if err != nil {
+                       return nil, err
+               }
+               extTransformID = p.GetRootTransformIds()[0]
+               extTransform = transforms[extTransformID]
+       }
+
+       // Scoping the ExternalTransform with respect to it's unique namespace, 
thus
+       // avoiding future collisions
+       AddNamespace(extTransform, p.GetComponents(), ext.Namespace)

Review comment:
       Does addNamespace still need to be exported?

##########
File path: sdks/go/pkg/beam/core/runtime/graphx/xlang.go
##########
@@ -16,11 +16,222 @@
 package graphx
 
 import (
+       "fmt"
+
        "github.com/apache/beam/sdks/go/pkg/beam/core/graph"
        "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
        pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
 )
 
+// MergeExpandedWithPipeline adds expanded components of all 
ExternalTransforms to the existing pipeline
+func MergeExpandedWithPipeline(edges []*graph.MultiEdge, p *pipepb.Pipeline) {
+       // Adding Expanded transforms to their counterparts in the Pipeline
+
+       for _, e := range edges {
+               if e.Op == graph.External {
+                       exp := e.External.Expanded
+                       if exp == nil {
+                               continue
+                       }
+                       id := fmt.Sprintf("e%v", e.ID())
+
+                       p.Requirements = append(p.Requirements, 
exp.Requirements...)
+
+                       // Adding components of the Expanded Transforms to the 
current Pipeline
+                       components, err := ExpandedComponents(exp)
+                       if err != nil {
+                               panic(err)
+                       }
+                       for k, v := range components.GetTransforms() {
+                               p.Components.Transforms[k] = v
+                       }
+                       for k, v := range components.GetPcollections() {
+                               p.Components.Pcollections[k] = v
+                       }
+                       for k, v := range components.GetWindowingStrategies() {
+                               p.Components.WindowingStrategies[k] = v
+                       }
+                       for k, v := range components.GetCoders() {
+                               p.Components.Coders[k] = v
+                       }
+                       for k, v := range components.GetEnvironments() {
+                               if k == "go" {

Review comment:
       Since this is now in the graphx package, we can probably promote the 
constant in addDefaultEnv to a proper unexported package constant, and refer to 
it here.

##########
File path: sdks/go/pkg/beam/core/runtime/xlangx/expand.go
##########
@@ -18,19 +18,79 @@ package xlangx
 import (
        "context"
 
+       "github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/pipelinex"
        "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
        jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
        pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
        "google.golang.org/grpc"
 )
 
-// Expand submits an external transform to be expanded by the expansion 
service.
-// The given transform should be the external transform, and the components are
-// any additional components necessary for the pipeline snippet.
+// Expand expands an unexpanded graph.ExternalTransform and returns the 
expanded
+// transform as a new graph.ExpandedTransform. This requires querying an
+// expansion service based on the configuration details within the
+// ExternalTransform.
+func Expand(edge *graph.MultiEdge, ext *graph.ExternalTransform) 
(*graph.ExpandedTransform, error) {
+       // Build the ExpansionRequest
+
+       // Obtaining the components and transform proto representing this 
transform
+       p, err := graphx.Marshal([]*graph.MultiEdge{edge}, &graphx.Options{})
+       if err != nil {
+               return nil, errors.Wrapf(err, "unable to generate proto 
representation of %v", ext)
+       }
+
+       transforms := p.GetComponents().GetTransforms()
+
+       // Transforms consist of only External transform and composites. 
Composites
+       // should be removed from proto before submitting expansion request.
+       extTransformID := p.GetRootTransformIds()[0]
+       extTransform := transforms[extTransformID]
+       for extTransform.UniqueName != "External" {
+               delete(transforms, extTransformID)
+               p, err := pipelinex.Normalize(p)
+               if err != nil {
+                       return nil, err
+               }
+               extTransformID = p.GetRootTransformIds()[0]
+               extTransform = transforms[extTransformID]
+       }
+
+       // Scoping the ExternalTransform with respect to it's unique namespace, 
thus
+       // avoiding future collisions
+       AddNamespace(extTransform, p.GetComponents(), ext.Namespace)
+
+       graphx.AddFakeImpulses(p) // Inputs need to have sources
+       delete(transforms, extTransformID)
+
+       // Querying the expansion service
+       res, err := QueryExpansionService(context.Background(), 
p.GetComponents(), extTransform, ext.Namespace, ext.ExpansionAddr)
+       if err != nil {
+               return nil, err
+       }
+
+       // Handling ExpansionResponse
+
+       // Previously added fake impulses need to be removed to avoid having
+       // multiple sources to the same pcollection in the graph
+       graphx.RemoveFakeImpulses(res.GetComponents(), res.GetTransform())
+
+       exp := &graph.ExpandedTransform{
+               Components:   res.GetComponents(),
+               Transform:    res.GetTransform(),
+               Requirements: res.GetRequirements(),
+       }
+       return exp, nil
+}
+
+// QueryExpansionService submits an external transform to be expanded by the
+// expansion service. The given transform should be the external transform, and
+// the components are any additional components necessary for the pipeline
+// snippet.
 //
 // Users should generally call beam.CrossLanguage to access foreign transforms
 // rather than calling this function directly.
-func Expand(
+func QueryExpansionService(

Review comment:
       Consider making this not exported on the package boundary, if it's not 
necessary.
   
   It's always easier to expand an API than contract it, and invariably exposed 
API surfaces will eventually be depended on by some user or another.

##########
File path: sdks/go/pkg/beam/core/runtime/graphx/xlang.go
##########
@@ -16,11 +16,222 @@
 package graphx
 
 import (
+       "fmt"
+
        "github.com/apache/beam/sdks/go/pkg/beam/core/graph"
        "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
        pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
 )
 
+// MergeExpandedWithPipeline adds expanded components of all 
ExternalTransforms to the existing pipeline
+func MergeExpandedWithPipeline(edges []*graph.MultiEdge, p *pipepb.Pipeline) {
+       // Adding Expanded transforms to their counterparts in the Pipeline
+
+       for _, e := range edges {
+               if e.Op == graph.External {
+                       exp := e.External.Expanded
+                       if exp == nil {
+                               continue
+                       }
+                       id := fmt.Sprintf("e%v", e.ID())
+
+                       p.Requirements = append(p.Requirements, 
exp.Requirements...)
+
+                       // Adding components of the Expanded Transforms to the 
current Pipeline
+                       components, err := ExpandedComponents(exp)
+                       if err != nil {
+                               panic(err)
+                       }
+                       for k, v := range components.GetTransforms() {
+                               p.Components.Transforms[k] = v
+                       }
+                       for k, v := range components.GetPcollections() {
+                               p.Components.Pcollections[k] = v
+                       }
+                       for k, v := range components.GetWindowingStrategies() {
+                               p.Components.WindowingStrategies[k] = v
+                       }
+                       for k, v := range components.GetCoders() {
+                               p.Components.Coders[k] = v
+                       }
+                       for k, v := range components.GetEnvironments() {
+                               if k == "go" {
+                                       // This case is not an anomaly. It is 
expected to be always
+                                       // present. Any initial 
ExpansionRequest will have a
+                                       // component which requires the "go" 
environment. Scoping
+                                       // using unique namespace prevents 
collision.
+                                       continue
+                               }
+                               p.Components.Environments[k] = v
+                       }
+
+                       transform, err := ExpandedTransform(exp)
+                       if err != nil {
+                               panic(err)
+                       }
+                       p.Components.Transforms[id] = transform
+               }
+       }
+}
+
+// PurgeOutputInput remaps outputs from edge corresponding to an
+// ExternalTransform with the correct expanded outputs. All consumers of the
+// previous outputs are updated with new inputs.
+func PurgeOutputInput(edges []*graph.MultiEdge, p *pipepb.Pipeline) {

Review comment:
       Now that I looked at everything, it seems like this and 
MergeExpandedWithPipeline may be the only touched function we can un-export, 
but I only have a small view of the code right now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to