youngoli commented on a change in pull request #13325:
URL: https://github.com/apache/beam/pull/13325#discussion_r523383319
##########
File path: sdks/go/pkg/beam/core/runtime/xlangx/expand.go
##########
@@ -18,19 +18,79 @@ package xlangx
import (
"context"
+ "github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+ "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
+ "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/pipelinex"
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
"google.golang.org/grpc"
)
-// Expand submits an external transform to be expanded by the expansion
service.
-// The given transform should be the external transform, and the components are
-// any additional components necessary for the pipeline snippet.
+// Expand expands an unexpanded graph.ExternalTransform and returns the
expanded
+// transform as a new graph.ExpandedTransform. This requires querying an
+// expansion service based on the configuration details within the
+// ExternalTransform.
+func Expand(edge *graph.MultiEdge, ext *graph.ExternalTransform)
(*graph.ExpandedTransform, error) {
+ // Build the ExpansionRequest
+
+ // Obtaining the components and transform proto representing this
transform
+ p, err := graphx.Marshal([]*graph.MultiEdge{edge}, &graphx.Options{})
+ if err != nil {
+ return nil, errors.Wrapf(err, "unable to generate proto
representation of %v", ext)
+ }
+
+ transforms := p.GetComponents().GetTransforms()
+
+ // Transforms consist of only External transform and composites.
Composites
+ // should be removed from proto before submitting expansion request.
+ extTransformID := p.GetRootTransformIds()[0]
+ extTransform := transforms[extTransformID]
+ for extTransform.UniqueName != "External" {
+ delete(transforms, extTransformID)
+ p, err := pipelinex.Normalize(p)
+ if err != nil {
+ return nil, err
+ }
+ extTransformID = p.GetRootTransformIds()[0]
+ extTransform = transforms[extTransformID]
+ }
+
+ // Scoping the ExternalTransform with respect to it's unique namespace,
thus
+ // avoiding future collisions
+ AddNamespace(extTransform, p.GetComponents(), ext.Namespace)
Review comment:
Doesn't seem like it. Un-exported.
##########
File path: sdks/go/pkg/beam/core/runtime/graphx/xlang.go
##########
@@ -16,11 +16,222 @@
package graphx
import (
+ "fmt"
+
"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
)
+// MergeExpandedWithPipeline adds expanded components of all
ExternalTransforms to the existing pipeline
+func MergeExpandedWithPipeline(edges []*graph.MultiEdge, p *pipepb.Pipeline) {
+ // Adding Expanded transforms to their counterparts in the Pipeline
+
+ for _, e := range edges {
+ if e.Op == graph.External {
+ exp := e.External.Expanded
+ if exp == nil {
+ continue
+ }
+ id := fmt.Sprintf("e%v", e.ID())
+
+ p.Requirements = append(p.Requirements,
exp.Requirements...)
+
+ // Adding components of the Expanded Transforms to the
current Pipeline
+ components, err := ExpandedComponents(exp)
+ if err != nil {
+ panic(err)
+ }
+ for k, v := range components.GetTransforms() {
+ p.Components.Transforms[k] = v
+ }
+ for k, v := range components.GetPcollections() {
+ p.Components.Pcollections[k] = v
+ }
+ for k, v := range components.GetWindowingStrategies() {
+ p.Components.WindowingStrategies[k] = v
+ }
+ for k, v := range components.GetCoders() {
+ p.Components.Coders[k] = v
+ }
+ for k, v := range components.GetEnvironments() {
+ if k == "go" {
+ // This case is not an anomaly. It is
expected to be always
+ // present. Any initial
ExpansionRequest will have a
+ // component which requires the "go"
environment. Scoping
+ // using unique namespace prevents
collision.
+ continue
+ }
+ p.Components.Environments[k] = v
+ }
+
+ transform, err := ExpandedTransform(exp)
+ if err != nil {
+ panic(err)
+ }
+ p.Components.Transforms[id] = transform
+ }
+ }
+}
+
+// PurgeOutputInput remaps outputs from edge corresponding to an
+// ExternalTransform with the correct expanded outputs. All consumers of the
+// previous outputs are updated with new inputs.
+func PurgeOutputInput(edges []*graph.MultiEdge, p *pipepb.Pipeline) {
Review comment:
According to IntelliJ, you're right. The others have usages either in
the xlangx or beam packages.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]