pskevin commented on a change in pull request #12445:
URL: https://github.com/apache/beam/pull/12445#discussion_r465373680



##########
File path: sdks/go/pkg/beam/external.go
##########
@@ -16,10 +16,151 @@
 package beam
 
 import (
+       "context"
+       "fmt"
+
        "github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
        "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+       jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+       pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+       "google.golang.org/grpc"
 )
 
+// ExternalTransform represents the cross-language transform in and out of the 
Pipeline as a MultiEdge and Expanded proto respectively
+type ExternalTransform struct {
+       id                int
+       Urn               string
+       Payload           []byte
+       In                []PCollection
+       Out               []FullType
+       Bounded           bool
+       ExpansionAddr     string
+       Components        *pipepb.Components
+       ExpandedTransform *pipepb.PTransform
+       Requirements      []string
+}
+
+// CrossLanguage is the temporary API to execute external transforms
+// TODO(pskevin): Handle errors using the TryN and Must strategies instead one 
function handling multiple points of failure
+func CrossLanguage(s Scope, p *Pipeline, e *ExternalTransform) []PCollection {
+       if e.ExpansionAddr == "" { // TODO(pskevin): Better way to check if the 
value was ever set
+               // return Legacy External API
+       }
+
+       /*
+               Add ExternalTranform to the Graph
+       */
+       // Validating scope and inputs
+       if !s.IsValid() {
+               // return nil, errors.New("invalid scope")
+               fmt.Println("invalid scope")
+       }
+       for i, col := range e.In {
+               if !col.IsValid() {
+                       // return nil, errors.Errorf("invalid pcollection to 
external: index %v", i)
+                       fmt.Printf("\ninvalid pcollection to external: index 
%v", i)
+
+               }
+       }
+
+       // Using exisiting MultiEdge format to represent ExternalTransform 
(already backwards compatible)
+       payload := &graph.Payload{
+               URN:  e.Urn,
+               Data: e.Payload,
+       }
+       var ins []*graph.Node
+       for _, col := range e.In {
+               ins = append(ins, col.n)
+       }
+       edge := graph.NewCrossLanguage(s.real, s.scope, ins, payload)
+
+       // TODO(pskevin): There needs to be a better way of associating this 
ExternalTransform to the pipeline
+       // Adding ExternalTransform to pipeline referenced by MultiEdge ID
+       if p.ExpandedTransforms == nil {
+               p.ExpandedTransforms = make(map[string]*ExternalTransform)
+       }
+       p.ExpandedTransforms[fmt.Sprintf("e%v", edge.ID())] = e
+
+       /*
+               Build the ExpansionRequest
+       */
+       // Obtaining the components and transform proto representing this 
transform
+       pipeline, err := graphx.Marshal([]*graph.MultiEdge{edge}, 
&graphx.Options{})
+       if err != nil {
+               panic(err)
+       }
+
+       // Adding fake impulses to each input as required for correct expansion
+       // TODO(pskevin): Remove these fake impulses from final Pipeline since 
multiple producers of the same PCollections is logically wrong
+       transforms := pipeline.Components.Transforms
+       rootTransformID := pipeline.RootTransformIds[0]
+       for tag, id := range transforms[rootTransformID].Inputs {
+               key := fmt.Sprintf("%s_%s", "impulse", tag)
+
+               output := map[string]string{"out": id}
+               impulse := &pipepb.PTransform{
+                       UniqueName: key,
+                       Spec: &pipepb.FunctionSpec{
+                               Urn: graphx.URNImpulse,
+                       },
+                       Outputs: output,
+               }
+
+               transforms[key] = impulse
+       }
+
+       // Assembling ExpansionRequest proto
+       req := &jobpb.ExpansionRequest{
+               Components: pipeline.Components,
+               Transform:  transforms[rootTransformID],
+               Namespace:  s.String(),
+       }
+
+       /*
+               Querying Expansion Service
+       */
+       // Setting grpc client
+       conn, err := grpc.Dial(e.ExpansionAddr, grpc.WithInsecure())
+       if err != nil {
+               panic(err)
+       }
+       defer conn.Close()
+       client := jobpb.NewExpansionServiceClient(conn)
+
+       // Handling ExpansionResponse
+       res, err := client.Expand(context.Background(), req)
+       if err != nil {
+               panic(err)
+       }
+       e.Components = res.GetComponents()
+       e.ExpandedTransform = res.GetTransform()
+       e.Requirements = res.GetRequirements()
+
+       /*
+               Associating output PCollections of the expanded transform with 
correct internal outbound links and nodes
+       */
+       // No information about the output types and bounded nature has been 
explicitly passed by the user
+       if len(e.Out) == 0 || cap(e.Out) == 0 {
+               // Infer output types from ExpansionResponse and update e.Out
+               if e.Out == nil {
+                       // Use reverse schema encoding
+               } else {
+                       // Use the coders list and map from coder id to 
internal FullType?
+               }
+       }

Review comment:
       Makes sense. Will do!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to