youngoli commented on a change in pull request #13325:
URL: https://github.com/apache/beam/pull/13325#discussion_r523383319



##########
File path: sdks/go/pkg/beam/core/runtime/xlangx/expand.go
##########
@@ -18,19 +18,79 @@ package xlangx
 import (
        "context"
 
+       "github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/pipelinex"
        "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
        jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
        pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
        "google.golang.org/grpc"
 )
 
-// Expand submits an external transform to be expanded by the expansion 
service.
-// The given transform should be the external transform, and the components are
-// any additional components necessary for the pipeline snippet.
+// Expand expands an unexpanded graph.ExternalTransform and returns the 
expanded
+// transform as a new graph.ExpandedTransform. This requires querying an
+// expansion service based on the configuration details within the
+// ExternalTransform.
+func Expand(edge *graph.MultiEdge, ext *graph.ExternalTransform) 
(*graph.ExpandedTransform, error) {
+       // Build the ExpansionRequest
+
+       // Obtaining the components and transform proto representing this 
transform
+       p, err := graphx.Marshal([]*graph.MultiEdge{edge}, &graphx.Options{})
+       if err != nil {
+               return nil, errors.Wrapf(err, "unable to generate proto 
representation of %v", ext)
+       }
+
+       transforms := p.GetComponents().GetTransforms()
+
+       // Transforms consist of only External transform and composites. 
Composites
+       // should be removed from proto before submitting expansion request.
+       extTransformID := p.GetRootTransformIds()[0]
+       extTransform := transforms[extTransformID]
+       for extTransform.UniqueName != "External" {
+               delete(transforms, extTransformID)
+               p, err := pipelinex.Normalize(p)
+               if err != nil {
+                       return nil, err
+               }
+               extTransformID = p.GetRootTransformIds()[0]
+               extTransform = transforms[extTransformID]
+       }
+
+       // Scoping the ExternalTransform with respect to it's unique namespace, 
thus
+       // avoiding future collisions
+       AddNamespace(extTransform, p.GetComponents(), ext.Namespace)

Review comment:
       Doesn't seem like it. Un-exported.

##########
File path: sdks/go/pkg/beam/core/runtime/graphx/xlang.go
##########
@@ -16,11 +16,222 @@
 package graphx
 
 import (
+       "fmt"
+
        "github.com/apache/beam/sdks/go/pkg/beam/core/graph"
        "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
        pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
 )
 
+// MergeExpandedWithPipeline adds expanded components of all 
ExternalTransforms to the existing pipeline
+func MergeExpandedWithPipeline(edges []*graph.MultiEdge, p *pipepb.Pipeline) {
+       // Adding Expanded transforms to their counterparts in the Pipeline
+
+       for _, e := range edges {
+               if e.Op == graph.External {
+                       exp := e.External.Expanded
+                       if exp == nil {
+                               continue
+                       }
+                       id := fmt.Sprintf("e%v", e.ID())
+
+                       p.Requirements = append(p.Requirements, 
exp.Requirements...)
+
+                       // Adding components of the Expanded Transforms to the 
current Pipeline
+                       components, err := ExpandedComponents(exp)
+                       if err != nil {
+                               panic(err)
+                       }
+                       for k, v := range components.GetTransforms() {
+                               p.Components.Transforms[k] = v
+                       }
+                       for k, v := range components.GetPcollections() {
+                               p.Components.Pcollections[k] = v
+                       }
+                       for k, v := range components.GetWindowingStrategies() {
+                               p.Components.WindowingStrategies[k] = v
+                       }
+                       for k, v := range components.GetCoders() {
+                               p.Components.Coders[k] = v
+                       }
+                       for k, v := range components.GetEnvironments() {
+                               if k == "go" {
+                                       // This case is not an anomaly. It is 
expected to be always
+                                       // present. Any initial 
ExpansionRequest will have a
+                                       // component which requires the "go" 
environment. Scoping
+                                       // using unique namespace prevents 
collision.
+                                       continue
+                               }
+                               p.Components.Environments[k] = v
+                       }
+
+                       transform, err := ExpandedTransform(exp)
+                       if err != nil {
+                               panic(err)
+                       }
+                       p.Components.Transforms[id] = transform
+               }
+       }
+}
+
+// PurgeOutputInput remaps outputs from edge corresponding to an
+// ExternalTransform with the correct expanded outputs. All consumers of the
+// previous outputs are updated with new inputs.
+func PurgeOutputInput(edges []*graph.MultiEdge, p *pipepb.Pipeline) {

Review comment:
       According to IntelliJ, you're right. The others have usages either in 
the xlangx or beam packages.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to