lostluck commented on a change in pull request #12445:
URL: https://github.com/apache/beam/pull/12445#discussion_r465879720



##########
File path: sdks/go/pkg/beam/external.go
##########
@@ -16,10 +16,144 @@
 package beam
 
 import (
+       "context"
+       "fmt"
+
        "github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
        "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+       jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+       pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+       "google.golang.org/grpc"
 )
 
+// ExternalTransform represents the cross-language transform in and out of the 
Pipeline as a MultiEdge and Expanded proto respectively
+type ExternalTransform struct {

Review comment:
       Per the other thread, please move these to parameters on CrossLanguage 
and TryCrossLanguage instead. Do not try to force in compatibility with the 
legacy External, it's OK for them to have two separate calls and paths.
   By having them as a struct it's not clear what is required and what is not, 
and the compiler won't help the user by failing at compile time. 
   
   An aside: The other issue here is you've mixed up user side parameters with 
internal implementation details, and made them part of the API surface. APIs 
are easiest to use when the user knows how to fill everything and what is 
required or not. The components and Expanded transform and requirements fields 
are not something that users would be filling in for example. Types are cheap. 
Make a new type instead of trying to reuse something that almost fits.

##########
File path: sdks/go/pkg/beam/external.go
##########
@@ -16,10 +16,151 @@
 package beam
 
 import (
+       "context"
+       "fmt"
+
        "github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
        "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+       jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+       pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+       "google.golang.org/grpc"
 )
 
+// ExternalTransform represents the cross-language transform in and out of the 
Pipeline as a MultiEdge and Expanded proto respectively
+type ExternalTransform struct {
+       id                int
+       Urn               string
+       Payload           []byte
+       In                []PCollection
+       Out               []FullType
+       Bounded           bool
+       ExpansionAddr     string
+       Components        *pipepb.Components
+       ExpandedTransform *pipepb.PTransform
+       Requirements      []string
+}
+
+// CrossLanguage is the temporary API to execute external transforms
+// TODO(pskevin): Handle errors using the TryN and Must strategies instead one 
function handling multiple points of failure
+func CrossLanguage(s Scope, p *Pipeline, e *ExternalTransform) []PCollection {
+       if e.ExpansionAddr == "" { // TODO(pskevin): Better way to check if the 
value was ever set

Review comment:
       Note, by this comment, the intent was for you to remove this dead code, 
as it's unnecessary.

##########
File path: sdks/go/pkg/beam/external.go
##########
@@ -16,10 +16,144 @@
 package beam
 
 import (
+       "context"
+       "fmt"
+
        "github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
        "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+       jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+       pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+       "google.golang.org/grpc"
 )
 
+// ExternalTransform represents the cross-language transform in and out of the 
Pipeline as a MultiEdge and Expanded proto respectively
+type ExternalTransform struct {
+       id                int
+       Urn               string
+       Payload           []byte
+       In                []PCollection
+       Out               []FullType
+       Bounded           bool
+       ExpansionAddr     string
+       Components        *pipepb.Components
+       ExpandedTransform *pipepb.PTransform
+       Requirements      []string

Review comment:
       Move these to a graph.CrossLanguage struct, but have their proto types 
be interface{} instead, with a comment about what the types should be. Given 
those fields are only used by beam framework internals, there's little risk in 
using type assertions for them in the right places, such as the graphx package.

##########
File path: sdks/go/pkg/beam/external.go
##########
@@ -16,10 +16,151 @@
 package beam
 
 import (
+       "context"
+       "fmt"
+
        "github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
        "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+       jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+       pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+       "google.golang.org/grpc"
 )
 
+// ExternalTransform represents the cross-language transform in and out of the 
Pipeline as a MultiEdge and Expanded proto respectively
+type ExternalTransform struct {
+       id                int
+       Urn               string
+       Payload           []byte
+       In                []PCollection
+       Out               []FullType
+       Bounded           bool
+       ExpansionAddr     string
+       Components        *pipepb.Components
+       ExpandedTransform *pipepb.PTransform
+       Requirements      []string
+}
+
+// CrossLanguage is the temporary API to execute external transforms
+// TODO(pskevin): Handle errors using the TryN and Must strategies instead one 
function handling multiple points of failure
+func CrossLanguage(s Scope, p *Pipeline, e *ExternalTransform) []PCollection {
+       if e.ExpansionAddr == "" { // TODO(pskevin): Better way to check if the 
value was ever set
+               // return Legacy External API
+       }
+
+       /*
+               Add ExternalTranform to the Graph
+       */
+       // Validating scope and inputs
+       if !s.IsValid() {
+               // return nil, errors.New("invalid scope")
+               fmt.Println("invalid scope")
+       }
+       for i, col := range e.In {
+               if !col.IsValid() {
+                       // return nil, errors.Errorf("invalid pcollection to 
external: index %v", i)
+                       fmt.Printf("\ninvalid pcollection to external: index 
%v", i)
+
+               }
+       }
+
+       // Using exisiting MultiEdge format to represent ExternalTransform 
(already backwards compatible)
+       payload := &graph.Payload{
+               URN:  e.Urn,
+               Data: e.Payload,
+       }
+       var ins []*graph.Node
+       for _, col := range e.In {
+               ins = append(ins, col.n)
+       }
+       edge := graph.NewCrossLanguage(s.real, s.scope, ins, payload)
+
+       // TODO(pskevin): There needs to be a better way of associating this 
ExternalTransform to the pipeline
+       // Adding ExternalTransform to pipeline referenced by MultiEdge ID
+       if p.ExpandedTransforms == nil {
+               p.ExpandedTransforms = make(map[string]*ExternalTransform)
+       }
+       p.ExpandedTransforms[fmt.Sprintf("e%v", edge.ID())] = e
+
+       /*
+               Build the ExpansionRequest
+       */
+       // Obtaining the components and transform proto representing this 
transform
+       pipeline, err := graphx.Marshal([]*graph.MultiEdge{edge}, 
&graphx.Options{})
+       if err != nil {
+               panic(err)
+       }
+
+       // Adding fake impulses to each input as required for correct expansion
+       // TODO(pskevin): Remove these fake impulses from final Pipeline since 
multiple producers of the same PCollections is logically wrong
+       transforms := pipeline.Components.Transforms
+       rootTransformID := pipeline.RootTransformIds[0]
+       for tag, id := range transforms[rootTransformID].Inputs {
+               key := fmt.Sprintf("%s_%s", "impulse", tag)
+
+               output := map[string]string{"out": id}
+               impulse := &pipepb.PTransform{
+                       UniqueName: key,
+                       Spec: &pipepb.FunctionSpec{
+                               Urn: graphx.URNImpulse,
+                       },
+                       Outputs: output,
+               }
+
+               transforms[key] = impulse
+       }
+
+       // Assembling ExpansionRequest proto
+       req := &jobpb.ExpansionRequest{
+               Components: pipeline.Components,
+               Transform:  transforms[rootTransformID],
+               Namespace:  s.String(),
+       }
+
+       /*
+               Querying Expansion Service
+       */
+       // Setting grpc client
+       conn, err := grpc.Dial(e.ExpansionAddr, grpc.WithInsecure())
+       if err != nil {
+               panic(err)
+       }
+       defer conn.Close()
+       client := jobpb.NewExpansionServiceClient(conn)
+
+       // Handling ExpansionResponse
+       res, err := client.Expand(context.Background(), req)
+       if err != nil {
+               panic(err)
+       }
+       e.Components = res.GetComponents()

Review comment:
       I don't know why it didn't occur to me before, but if the deserialized 
proto object needs to be "stored" on an object in the graph package, that field 
can just be an interface{} and be commented that it's expected to be the proto 
type. As the fields are only usable by beam framework internals, it's fine to 
assume that they'll be the correct types by construction (validated by unit 
tests at some juncture).
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to