lostluck commented on a change in pull request #12445:
URL: https://github.com/apache/beam/pull/12445#discussion_r464717996



##########
File path: sdks/go/examples/xlang/wordcount/xlang_wordcount.go
##########
@@ -0,0 +1,100 @@
+package main
+
+import (
+       "context"
+       "flag"
+       "fmt"
+       "log"
+       "regexp"
+       "strings"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+
+       "github.com/apache/beam/sdks/go/pkg/beam"
+       "github.com/apache/beam/sdks/go/pkg/beam/io/textio"
+       "github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
+
+       // Imports to enable correct filesystem access and runner setup in 
LOOPBACK mode
+       _ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/gcs"
+       _ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/local"
+       _ "github.com/apache/beam/sdks/go/pkg/beam/runners/universal"
+)
+
+var (
+       // Set this option to choose a different input file or glob.
+       input = flag.String("input", "./input", "File(s) to read.")
+
+       // Set this required option to specify where to write the output.
+       output = flag.String("output", "./output", "Output file (required).")
+)
+
+var (
+       wordRE  = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`)
+       empty   = beam.NewCounter("extract", "emptyLines")
+       lineLen = beam.NewDistribution("extract", "lineLenDistro")
+)
+
+// extractFn is a DoFn that emits the words in a given line.
+func extractFn(ctx context.Context, line string, emit func(string)) {
+       lineLen.Update(ctx, int64(len(line)))
+       if len(strings.TrimSpace(line)) == 0 {
+               empty.Inc(ctx, 1)
+       }
+       for _, word := range wordRE.FindAllString(line, -1) {
+               emit(word)
+       }
+}
+
+// formatFn is a DoFn that formats a word and its count as a string.
+func formatFn(w string, c int64) string {
+       fmt.Println(w, c)

Review comment:
       Arguably we can remove this line for debugging.

##########
File path: sdks/go/examples/xlang/wordcount/xlang_wordcount.go
##########
@@ -0,0 +1,100 @@
+package main
+
+import (
+       "context"
+       "flag"
+       "fmt"
+       "log"
+       "regexp"
+       "strings"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+
+       "github.com/apache/beam/sdks/go/pkg/beam"
+       "github.com/apache/beam/sdks/go/pkg/beam/io/textio"
+       "github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
+
+       // Imports to enable correct filesystem access and runner setup in 
LOOPBACK mode
+       _ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/gcs"
+       _ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/local"
+       _ "github.com/apache/beam/sdks/go/pkg/beam/runners/universal"
+)
+
+var (
+       // Set this option to choose a different input file or glob.
+       input = flag.String("input", "./input", "File(s) to read.")
+
+       // Set this required option to specify where to write the output.
+       output = flag.String("output", "./output", "Output file (required).")
+)
+
+var (
+       wordRE  = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`)
+       empty   = beam.NewCounter("extract", "emptyLines")
+       lineLen = beam.NewDistribution("extract", "lineLenDistro")
+)
+
+// extractFn is a DoFn that emits the words in a given line.
+func extractFn(ctx context.Context, line string, emit func(string)) {
+       lineLen.Update(ctx, int64(len(line)))
+       if len(strings.TrimSpace(line)) == 0 {
+               empty.Inc(ctx, 1)
+       }
+       for _, word := range wordRE.FindAllString(line, -1) {
+               emit(word)
+       }
+}
+
+// formatFn is a DoFn that formats a word and its count as a string.
+func formatFn(w string, c int64) string {
+       fmt.Println(w, c)
+       return fmt.Sprintf("%s: %v", w, c)
+}
+
+func init() {
+       beam.RegisterFunction(extractFn)
+       beam.RegisterFunction(formatFn)
+}
+
+func main() {
+       // If beamx or Go flags are used, flags must be parsed first.

Review comment:
       Feel free to delete the copy pasted documentation from the original 
wordcount here, it doesn't need to be repeated as it draws focus away from the 
important part: Cross Language.

##########
File path: sdks/go/examples/xlang/wordcount/xlang_wordcount.go
##########
@@ -0,0 +1,100 @@
+package main

Review comment:
       Go packages and binaries should have a doc string with a blank line 
between it and the apache license.
   
   // xlang_wordcount use a cross language transform from Python to count words 
from a file.

##########
File path: sdks/go/pkg/beam/external.go
##########
@@ -16,10 +16,151 @@
 package beam
 
 import (
+       "context"
+       "fmt"
+
        "github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
        "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+       jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+       pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+       "google.golang.org/grpc"
 )
 
+// ExternalTransform represents the cross-language transform in and out of the 
Pipeline as a MultiEdge and Expanded proto respectively
+type ExternalTransform struct {
+       id                int
+       Urn               string
+       Payload           []byte
+       In                []PCollection
+       Out               []FullType
+       Bounded           bool
+       ExpansionAddr     string
+       Components        *pipepb.Components
+       ExpandedTransform *pipepb.PTransform
+       Requirements      []string
+}
+
+// CrossLanguage is the temporary API to execute external transforms
+// TODO(pskevin): Handle errors using the TryN and Must strategies instead one 
function handling multiple points of failure
+func CrossLanguage(s Scope, p *Pipeline, e *ExternalTransform) []PCollection {
+       if e.ExpansionAddr == "" { // TODO(pskevin): Better way to check if the 
value was ever set
+               // return Legacy External API
+       }
+
+       /*
+               Add ExternalTranform to the Graph

Review comment:
       Either remove this block comment entirely, or convert it to a line 
comment and fix the typo.

##########
File path: sdks/go/pkg/beam/external.go
##########
@@ -16,10 +16,151 @@
 package beam
 
 import (
+       "context"
+       "fmt"
+
        "github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
        "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+       jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+       pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+       "google.golang.org/grpc"
 )
 
+// ExternalTransform represents the cross-language transform in and out of the 
Pipeline as a MultiEdge and Expanded proto respectively
+type ExternalTransform struct {
+       id                int
+       Urn               string
+       Payload           []byte
+       In                []PCollection
+       Out               []FullType
+       Bounded           bool
+       ExpansionAddr     string
+       Components        *pipepb.Components
+       ExpandedTransform *pipepb.PTransform
+       Requirements      []string
+}
+
+// CrossLanguage is the temporary API to execute external transforms
+// TODO(pskevin): Handle errors using the TryN and Must strategies instead one 
function handling multiple points of failure
+func CrossLanguage(s Scope, p *Pipeline, e *ExternalTransform) []PCollection {
+       if e.ExpansionAddr == "" { // TODO(pskevin): Better way to check if the 
value was ever set
+               // return Legacy External API
+       }
+
+       /*
+               Add ExternalTranform to the Graph
+       */
+       // Validating scope and inputs
+       if !s.IsValid() {
+               // return nil, errors.New("invalid scope")
+               fmt.Println("invalid scope")
+       }
+       for i, col := range e.In {
+               if !col.IsValid() {
+                       // return nil, errors.Errorf("invalid pcollection to 
external: index %v", i)

Review comment:
       Same here, uncomment the error return and remove the fmt.

##########
File path: sdks/go/pkg/beam/external.go
##########
@@ -16,10 +16,151 @@
 package beam
 
 import (
+       "context"
+       "fmt"
+
        "github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
        "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+       jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+       pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+       "google.golang.org/grpc"
 )
 
+// ExternalTransform represents the cross-language transform in and out of the 
Pipeline as a MultiEdge and Expanded proto respectively
+type ExternalTransform struct {
+       id                int
+       Urn               string
+       Payload           []byte
+       In                []PCollection
+       Out               []FullType
+       Bounded           bool
+       ExpansionAddr     string
+       Components        *pipepb.Components
+       ExpandedTransform *pipepb.PTransform
+       Requirements      []string
+}
+
+// CrossLanguage is the temporary API to execute external transforms
+// TODO(pskevin): Handle errors using the TryN and Must strategies instead one 
function handling multiple points of failure
+func CrossLanguage(s Scope, p *Pipeline, e *ExternalTransform) []PCollection {
+       if e.ExpansionAddr == "" { // TODO(pskevin): Better way to check if the 
value was ever set
+               // return Legacy External API
+       }
+
+       /*
+               Add ExternalTranform to the Graph
+       */
+       // Validating scope and inputs
+       if !s.IsValid() {
+               // return nil, errors.New("invalid scope")

Review comment:
       Uncomment this validation please, and remove the print out.

##########
File path: sdks/go/pkg/beam/external.go
##########
@@ -16,10 +16,151 @@
 package beam
 
 import (
+       "context"
+       "fmt"
+
        "github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
        "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+       jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+       pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+       "google.golang.org/grpc"
 )
 
+// ExternalTransform represents the cross-language transform in and out of the 
Pipeline as a MultiEdge and Expanded proto respectively
+type ExternalTransform struct {
+       id                int
+       Urn               string
+       Payload           []byte
+       In                []PCollection
+       Out               []FullType
+       Bounded           bool
+       ExpansionAddr     string
+       Components        *pipepb.Components
+       ExpandedTransform *pipepb.PTransform
+       Requirements      []string
+}
+
+// CrossLanguage is the temporary API to execute external transforms
+// TODO(pskevin): Handle errors using the TryN and Must strategies instead one 
function handling multiple points of failure
+func CrossLanguage(s Scope, p *Pipeline, e *ExternalTransform) []PCollection {

Review comment:
       Have both a TryCrossLanguage, which has an error return (to handle all 
the newly uncommented validations) and CrossLanguage without the error return. 
You can see how External passes the TryExternal results to a Must, so errors 
can be panicked if necesarry but otherwise propagated properly if Try is used.
   
   
https://github.com/apache/beam/blob/43a4a119bf0d95a1fc33c65842b99ef0ebbcf041/sdks/go/pkg/beam/external.go#L170

##########
File path: sdks/go/examples/xlang/wordcount/input
##########
@@ -0,0 +1,5 @@
+Lorem ipsum dolor sit amet, consectetur adipiscing elit. Mauris id tellus 
vehicula, rutrum turpis quis, suscipit est. Quisque vehicula nec ex a interdum. 
Phasellus vulputate nunc sit amet nisl dapibus tincidunt ut ullamcorper nisi. 
Mauris gravida porta leo vel congue. Duis sit amet arcu eu nisl pharetra 
interdum a eget enim. Nulla facilisis massa ut egestas interdum. Nunc elit dui, 
hendrerit at pharetra a, pellentesque non turpis. Integer auctor vulputate 
congue. Pellentesque habitant morbi tristique senectus et netus et malesuada 
fames ac turpis egestas. Ut sagittis convallis lorem non semper. Ut ultrices 
elit a enim pulvinar fermentum.

Review comment:
       +1. For example/demo reasons, we can just do an "in memory" file 
instead. 
   
   AKA, copy pasted this into a variable (note the back tics for go's Raw 
string handling, eg for the newlines.)
   const lorem = \`
   Lorem ipsum...
   \`
   
   , and load that into beam.Create. One of the wordcount examples does this.

##########
File path: sdks/go/pkg/beam/external.go
##########
@@ -16,10 +16,151 @@
 package beam
 
 import (
+       "context"
+       "fmt"
+
        "github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
        "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+       jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+       pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+       "google.golang.org/grpc"
 )
 
+// ExternalTransform represents the cross-language transform in and out of the 
Pipeline as a MultiEdge and Expanded proto respectively
+type ExternalTransform struct {
+       id                int
+       Urn               string
+       Payload           []byte
+       In                []PCollection
+       Out               []FullType
+       Bounded           bool
+       ExpansionAddr     string
+       Components        *pipepb.Components
+       ExpandedTransform *pipepb.PTransform
+       Requirements      []string
+}
+
+// CrossLanguage is the temporary API to execute external transforms
+// TODO(pskevin): Handle errors using the TryN and Must strategies instead one 
function handling multiple points of failure
+func CrossLanguage(s Scope, p *Pipeline, e *ExternalTransform) []PCollection {
+       if e.ExpansionAddr == "" { // TODO(pskevin): Better way to check if the 
value was ever set

Review comment:
       As mentioned, don't worry about the legacy API at this point, focus on a 
clean implementation of the new API.

##########
File path: sdks/go/pkg/beam/external.go
##########
@@ -16,10 +16,151 @@
 package beam
 
 import (
+       "context"
+       "fmt"
+
        "github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
        "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+       jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+       pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+       "google.golang.org/grpc"
 )
 
+// ExternalTransform represents the cross-language transform in and out of the 
Pipeline as a MultiEdge and Expanded proto respectively
+type ExternalTransform struct {
+       id                int
+       Urn               string
+       Payload           []byte
+       In                []PCollection
+       Out               []FullType
+       Bounded           bool
+       ExpansionAddr     string
+       Components        *pipepb.Components
+       ExpandedTransform *pipepb.PTransform
+       Requirements      []string
+}
+
+// CrossLanguage is the temporary API to execute external transforms
+// TODO(pskevin): Handle errors using the TryN and Must strategies instead one 
function handling multiple points of failure
+func CrossLanguage(s Scope, p *Pipeline, e *ExternalTransform) []PCollection {
+       if e.ExpansionAddr == "" { // TODO(pskevin): Better way to check if the 
value was ever set
+               // return Legacy External API
+       }
+
+       /*
+               Add ExternalTranform to the Graph
+       */
+       // Validating scope and inputs
+       if !s.IsValid() {
+               // return nil, errors.New("invalid scope")
+               fmt.Println("invalid scope")
+       }
+       for i, col := range e.In {
+               if !col.IsValid() {
+                       // return nil, errors.Errorf("invalid pcollection to 
external: index %v", i)
+                       fmt.Printf("\ninvalid pcollection to external: index 
%v", i)
+
+               }
+       }
+
+       // Using exisiting MultiEdge format to represent ExternalTransform 
(already backwards compatible)
+       payload := &graph.Payload{
+               URN:  e.Urn,
+               Data: e.Payload,
+       }
+       var ins []*graph.Node
+       for _, col := range e.In {
+               ins = append(ins, col.n)
+       }
+       edge := graph.NewCrossLanguage(s.real, s.scope, ins, payload)
+
+       // TODO(pskevin): There needs to be a better way of associating this 
ExternalTransform to the pipeline
+       // Adding ExternalTransform to pipeline referenced by MultiEdge ID
+       if p.ExpandedTransforms == nil {
+               p.ExpandedTransforms = make(map[string]*ExternalTransform)
+       }
+       p.ExpandedTransforms[fmt.Sprintf("e%v", edge.ID())] = e
+
+       /*
+               Build the ExpansionRequest
+       */
+       // Obtaining the components and transform proto representing this 
transform
+       pipeline, err := graphx.Marshal([]*graph.MultiEdge{edge}, 
&graphx.Options{})
+       if err != nil {
+               panic(err)
+       }
+
+       // Adding fake impulses to each input as required for correct expansion
+       // TODO(pskevin): Remove these fake impulses from final Pipeline since 
multiple producers of the same PCollections is logically wrong
+       transforms := pipeline.Components.Transforms
+       rootTransformID := pipeline.RootTransformIds[0]
+       for tag, id := range transforms[rootTransformID].Inputs {
+               key := fmt.Sprintf("%s_%s", "impulse", tag)
+
+               output := map[string]string{"out": id}
+               impulse := &pipepb.PTransform{
+                       UniqueName: key,
+                       Spec: &pipepb.FunctionSpec{
+                               Urn: graphx.URNImpulse,
+                       },
+                       Outputs: output,
+               }
+
+               transforms[key] = impulse
+       }
+
+       // Assembling ExpansionRequest proto
+       req := &jobpb.ExpansionRequest{
+               Components: pipeline.Components,
+               Transform:  transforms[rootTransformID],
+               Namespace:  s.String(),
+       }
+
+       /*
+               Querying Expansion Service
+       */
+       // Setting grpc client
+       conn, err := grpc.Dial(e.ExpansionAddr, grpc.WithInsecure())
+       if err != nil {
+               panic(err)

Review comment:
       Change to a return once this can return errors. 
   return errors.Wrapf(err, "unable to connect to expansion service at %v, 
e.ExpansionAddr)

##########
File path: sdks/go/pkg/beam/external.go
##########
@@ -16,10 +16,151 @@
 package beam
 
 import (
+       "context"
+       "fmt"
+
        "github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
        "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+       jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+       pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+       "google.golang.org/grpc"
 )
 
+// ExternalTransform represents the cross-language transform in and out of the 
Pipeline as a MultiEdge and Expanded proto respectively
+type ExternalTransform struct {
+       id                int
+       Urn               string
+       Payload           []byte
+       In                []PCollection
+       Out               []FullType
+       Bounded           bool
+       ExpansionAddr     string
+       Components        *pipepb.Components
+       ExpandedTransform *pipepb.PTransform
+       Requirements      []string
+}
+
+// CrossLanguage is the temporary API to execute external transforms
+// TODO(pskevin): Handle errors using the TryN and Must strategies instead one 
function handling multiple points of failure
+func CrossLanguage(s Scope, p *Pipeline, e *ExternalTransform) []PCollection {
+       if e.ExpansionAddr == "" { // TODO(pskevin): Better way to check if the 
value was ever set
+               // return Legacy External API
+       }
+
+       /*
+               Add ExternalTranform to the Graph
+       */
+       // Validating scope and inputs
+       if !s.IsValid() {
+               // return nil, errors.New("invalid scope")
+               fmt.Println("invalid scope")
+       }
+       for i, col := range e.In {
+               if !col.IsValid() {
+                       // return nil, errors.Errorf("invalid pcollection to 
external: index %v", i)
+                       fmt.Printf("\ninvalid pcollection to external: index 
%v", i)
+
+               }
+       }
+
+       // Using exisiting MultiEdge format to represent ExternalTransform 
(already backwards compatible)
+       payload := &graph.Payload{
+               URN:  e.Urn,
+               Data: e.Payload,
+       }
+       var ins []*graph.Node
+       for _, col := range e.In {
+               ins = append(ins, col.n)
+       }
+       edge := graph.NewCrossLanguage(s.real, s.scope, ins, payload)
+
+       // TODO(pskevin): There needs to be a better way of associating this 
ExternalTransform to the pipeline
+       // Adding ExternalTransform to pipeline referenced by MultiEdge ID
+       if p.ExpandedTransforms == nil {
+               p.ExpandedTransforms = make(map[string]*ExternalTransform)
+       }
+       p.ExpandedTransforms[fmt.Sprintf("e%v", edge.ID())] = e
+
+       /*
+               Build the ExpansionRequest
+       */
+       // Obtaining the components and transform proto representing this 
transform
+       pipeline, err := graphx.Marshal([]*graph.MultiEdge{edge}, 
&graphx.Options{})
+       if err != nil {
+               panic(err)
+       }
+
+       // Adding fake impulses to each input as required for correct expansion
+       // TODO(pskevin): Remove these fake impulses from final Pipeline since 
multiple producers of the same PCollections is logically wrong
+       transforms := pipeline.Components.Transforms
+       rootTransformID := pipeline.RootTransformIds[0]
+       for tag, id := range transforms[rootTransformID].Inputs {
+               key := fmt.Sprintf("%s_%s", "impulse", tag)
+
+               output := map[string]string{"out": id}
+               impulse := &pipepb.PTransform{
+                       UniqueName: key,
+                       Spec: &pipepb.FunctionSpec{
+                               Urn: graphx.URNImpulse,
+                       },
+                       Outputs: output,
+               }
+
+               transforms[key] = impulse
+       }
+
+       // Assembling ExpansionRequest proto
+       req := &jobpb.ExpansionRequest{
+               Components: pipeline.Components,
+               Transform:  transforms[rootTransformID],
+               Namespace:  s.String(),
+       }
+
+       /*
+               Querying Expansion Service
+       */
+       // Setting grpc client
+       conn, err := grpc.Dial(e.ExpansionAddr, grpc.WithInsecure())
+       if err != nil {
+               panic(err)
+       }
+       defer conn.Close()
+       client := jobpb.NewExpansionServiceClient(conn)
+
+       // Handling ExpansionResponse
+       res, err := client.Expand(context.Background(), req)
+       if err != nil {
+               panic(err)
+       }
+       e.Components = res.GetComponents()
+       e.ExpandedTransform = res.GetTransform()
+       e.Requirements = res.GetRequirements()
+
+       /*
+               Associating output PCollections of the expanded transform with 
correct internal outbound links and nodes
+       */
+       // No information about the output types and bounded nature has been 
explicitly passed by the user
+       if len(e.Out) == 0 || cap(e.Out) == 0 {
+               // Infer output types from ExpansionResponse and update e.Out
+               if e.Out == nil {
+                       // Use reverse schema encoding
+               } else {
+                       // Use the coders list and map from coder id to 
internal FullType?
+               }
+       }

Review comment:
       Since the next change will handle PCollection mappings by user provided 
string keys, we can probably get rid of this for now. Do add a TODO(lostluck) 
to remind me to plug in the schema coders here instead of what we're doing for 
the output types.

##########
File path: sdks/go/pkg/beam/external.go
##########
@@ -16,10 +16,151 @@
 package beam
 
 import (
+       "context"
+       "fmt"
+
        "github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
        "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+       jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+       pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+       "google.golang.org/grpc"
 )
 
+// ExternalTransform represents the cross-language transform in and out of the 
Pipeline as a MultiEdge and Expanded proto respectively
+type ExternalTransform struct {
+       id                int
+       Urn               string
+       Payload           []byte
+       In                []PCollection
+       Out               []FullType
+       Bounded           bool
+       ExpansionAddr     string
+       Components        *pipepb.Components
+       ExpandedTransform *pipepb.PTransform
+       Requirements      []string
+}
+
+// CrossLanguage is the temporary API to execute external transforms
+// TODO(pskevin): Handle errors using the TryN and Must strategies instead one 
function handling multiple points of failure
+func CrossLanguage(s Scope, p *Pipeline, e *ExternalTransform) []PCollection {
+       if e.ExpansionAddr == "" { // TODO(pskevin): Better way to check if the 
value was ever set
+               // return Legacy External API
+       }
+
+       /*
+               Add ExternalTranform to the Graph
+       */
+       // Validating scope and inputs
+       if !s.IsValid() {
+               // return nil, errors.New("invalid scope")
+               fmt.Println("invalid scope")
+       }
+       for i, col := range e.In {
+               if !col.IsValid() {
+                       // return nil, errors.Errorf("invalid pcollection to 
external: index %v", i)
+                       fmt.Printf("\ninvalid pcollection to external: index 
%v", i)
+
+               }
+       }
+
+       // Using exisiting MultiEdge format to represent ExternalTransform 
(already backwards compatible)
+       payload := &graph.Payload{
+               URN:  e.Urn,
+               Data: e.Payload,
+       }
+       var ins []*graph.Node
+       for _, col := range e.In {
+               ins = append(ins, col.n)
+       }
+       edge := graph.NewCrossLanguage(s.real, s.scope, ins, payload)
+
+       // TODO(pskevin): There needs to be a better way of associating this 
ExternalTransform to the pipeline
+       // Adding ExternalTransform to pipeline referenced by MultiEdge ID
+       if p.ExpandedTransforms == nil {
+               p.ExpandedTransforms = make(map[string]*ExternalTransform)
+       }
+       p.ExpandedTransforms[fmt.Sprintf("e%v", edge.ID())] = e
+
+       /*
+               Build the ExpansionRequest
+       */
+       // Obtaining the components and transform proto representing this 
transform
+       pipeline, err := graphx.Marshal([]*graph.MultiEdge{edge}, 
&graphx.Options{})
+       if err != nil {
+               panic(err)
+       }
+
+       // Adding fake impulses to each input as required for correct expansion
+       // TODO(pskevin): Remove these fake impulses from final Pipeline since 
multiple producers of the same PCollections is logically wrong
+       transforms := pipeline.Components.Transforms
+       rootTransformID := pipeline.RootTransformIds[0]
+       for tag, id := range transforms[rootTransformID].Inputs {
+               key := fmt.Sprintf("%s_%s", "impulse", tag)
+
+               output := map[string]string{"out": id}
+               impulse := &pipepb.PTransform{
+                       UniqueName: key,
+                       Spec: &pipepb.FunctionSpec{
+                               Urn: graphx.URNImpulse,
+                       },
+                       Outputs: output,
+               }
+
+               transforms[key] = impulse
+       }
+
+       // Assembling ExpansionRequest proto
+       req := &jobpb.ExpansionRequest{
+               Components: pipeline.Components,
+               Transform:  transforms[rootTransformID],
+               Namespace:  s.String(),
+       }
+
+       /*
+               Querying Expansion Service
+       */
+       // Setting grpc client
+       conn, err := grpc.Dial(e.ExpansionAddr, grpc.WithInsecure())
+       if err != nil {
+               panic(err)
+       }
+       defer conn.Close()
+       client := jobpb.NewExpansionServiceClient(conn)
+
+       // Handling ExpansionResponse
+       res, err := client.Expand(context.Background(), req)
+       if err != nil {
+               panic(err)
+       }
+       e.Components = res.GetComponents()

Review comment:
       Note these can be added as fields (in one form or another), as an 
exported field on graph.MultiEdge
   
   
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/graph/edge.go#L143
   
   For now disregard the restriction on dealing with protos in graph.
   
   Technically, one other approach to avoid the dependencies is to simply 
re-Marshal the received protos back to []byte and Unmarshal them in 
graphx/translate.go. As always, get it working first before trying to fix this 
dependency knot. 

##########
File path: sdks/go/examples/xlang/wordcount/xlang_wordcount.go
##########
@@ -0,0 +1,100 @@
+package main
+
+import (
+       "context"
+       "flag"
+       "fmt"
+       "log"
+       "regexp"
+       "strings"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+
+       "github.com/apache/beam/sdks/go/pkg/beam"
+       "github.com/apache/beam/sdks/go/pkg/beam/io/textio"
+       "github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
+
+       // Imports to enable correct filesystem access and runner setup in 
LOOPBACK mode
+       _ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/gcs"
+       _ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/local"
+       _ "github.com/apache/beam/sdks/go/pkg/beam/runners/universal"
+)
+
+var (
+       // Set this option to choose a different input file or glob.
+       input = flag.String("input", "./input", "File(s) to read.")
+
+       // Set this required option to specify where to write the output.
+       output = flag.String("output", "./output", "Output file (required).")
+)
+
+var (
+       wordRE  = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`)
+       empty   = beam.NewCounter("extract", "emptyLines")
+       lineLen = beam.NewDistribution("extract", "lineLenDistro")
+)
+
+// extractFn is a DoFn that emits the words in a given line.
+func extractFn(ctx context.Context, line string, emit func(string)) {
+       lineLen.Update(ctx, int64(len(line)))
+       if len(strings.TrimSpace(line)) == 0 {
+               empty.Inc(ctx, 1)
+       }
+       for _, word := range wordRE.FindAllString(line, -1) {
+               emit(word)
+       }
+}
+
+// formatFn is a DoFn that formats a word and its count as a string.
+func formatFn(w string, c int64) string {
+       fmt.Println(w, c)
+       return fmt.Sprintf("%s: %v", w, c)
+}
+
+func init() {
+       beam.RegisterFunction(extractFn)
+       beam.RegisterFunction(formatFn)
+}
+
+func main() {
+       // If beamx or Go flags are used, flags must be parsed first.
+       flag.Parse()
+       // beam.Init() is an initialization hook that must be called on 
startup. On
+       // distributed runners, it is used to intercept control.
+       beam.Init()
+
+       // Input validation is done as usual. Note that it must be after Init().
+       if *output == "" {
+               log.Fatal("No output provided")
+       }
+
+       // Concepts #3 and #4: The pipeline uses the named transform and DoFn.
+       p := beam.NewPipeline()
+       s := p.Root()
+
+       lines := textio.Read(s, *input)
+       // Convert lines of text into individual words.
+       col := beam.ParDo(s, extractFn, lines)
+
+       // Using Cross-language Count from Python's test expansion service
+       // TODO(pskevin): Cleaner using-face API
+       outputType := typex.NewKV(typex.New(reflectx.String), 
typex.New(reflectx.Int64))
+       external := &beam.ExternalTransform{
+               In:            []beam.PCollection{col},
+               Urn:           "beam:transforms:xlang:count",
+               ExpansionAddr: "localhost:8118",
+               Out:           []typex.FullType{outputType},
+               Bounded:       true, // TODO(pskevin): Infer this value from 
output PCollection(s) part of the expanded tranform
+       }
+       counted := beam.CrossLanguage(s, p, external) // TODO(pskevin): Add 
external transform to Pipeline without passing it to the transform
+
+       formatted := beam.ParDo(s, formatFn, counted[0])
+       textio.Write(s, *output, formatted)
+
+       // Concept #1: The beamx.Run convenience wrapper allows a number of

Review comment:
       Same comment here: Delete the unnecessary extra documentation.

##########
File path: sdks/go/examples/xlang/wordcount/xlang_wordcount.go
##########
@@ -0,0 +1,100 @@
+package main
+
+import (
+       "context"
+       "flag"
+       "fmt"
+       "log"
+       "regexp"
+       "strings"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+
+       "github.com/apache/beam/sdks/go/pkg/beam"
+       "github.com/apache/beam/sdks/go/pkg/beam/io/textio"
+       "github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
+
+       // Imports to enable correct filesystem access and runner setup in 
LOOPBACK mode
+       _ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/gcs"
+       _ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/local"
+       _ "github.com/apache/beam/sdks/go/pkg/beam/runners/universal"
+)
+
+var (
+       // Set this option to choose a different input file or glob.
+       input = flag.String("input", "./input", "File(s) to read.")
+
+       // Set this required option to specify where to write the output.
+       output = flag.String("output", "./output", "Output file (required).")
+)
+
+var (
+       wordRE  = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`)
+       empty   = beam.NewCounter("extract", "emptyLines")
+       lineLen = beam.NewDistribution("extract", "lineLenDistro")
+)
+
+// extractFn is a DoFn that emits the words in a given line.
+func extractFn(ctx context.Context, line string, emit func(string)) {
+       lineLen.Update(ctx, int64(len(line)))
+       if len(strings.TrimSpace(line)) == 0 {
+               empty.Inc(ctx, 1)
+       }
+       for _, word := range wordRE.FindAllString(line, -1) {
+               emit(word)
+       }
+}
+
+// formatFn is a DoFn that formats a word and its count as a string.
+func formatFn(w string, c int64) string {
+       fmt.Println(w, c)
+       return fmt.Sprintf("%s: %v", w, c)
+}
+
+func init() {
+       beam.RegisterFunction(extractFn)
+       beam.RegisterFunction(formatFn)
+}
+
+func main() {
+       // If beamx or Go flags are used, flags must be parsed first.
+       flag.Parse()
+       // beam.Init() is an initialization hook that must be called on 
startup. On
+       // distributed runners, it is used to intercept control.
+       beam.Init()
+
+       // Input validation is done as usual. Note that it must be after Init().
+       if *output == "" {
+               log.Fatal("No output provided")
+       }
+
+       // Concepts #3 and #4: The pipeline uses the named transform and DoFn.
+       p := beam.NewPipeline()
+       s := p.Root()
+
+       lines := textio.Read(s, *input)
+       // Convert lines of text into individual words.
+       col := beam.ParDo(s, extractFn, lines)
+
+       // Using Cross-language Count from Python's test expansion service
+       // TODO(pskevin): Cleaner using-face API
+       outputType := typex.NewKV(typex.New(reflectx.String), 
typex.New(reflectx.Int64))
+       external := &beam.ExternalTransform{
+               In:            []beam.PCollection{col},
+               Urn:           "beam:transforms:xlang:count",
+               ExpansionAddr: "localhost:8118",
+               Out:           []typex.FullType{outputType},
+               Bounded:       true, // TODO(pskevin): Infer this value from 
output PCollection(s) part of the expanded tranform
+       }
+       counted := beam.CrossLanguage(s, p, external) // TODO(pskevin): Add 
external transform to Pipeline without passing it to the transform

Review comment:
       +1 I would strongly prefer not having a separate ExternalTransform 
struct. Pass the arguments in. I wouldn't worry about trying to accommodate the 
previous External api.

##########
File path: sdks/go/pkg/beam/external.go
##########
@@ -16,10 +16,151 @@
 package beam
 
 import (
+       "context"
+       "fmt"
+
        "github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
        "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+       jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+       pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+       "google.golang.org/grpc"
 )
 
+// ExternalTransform represents the cross-language transform in and out of the 
Pipeline as a MultiEdge and Expanded proto respectively
+type ExternalTransform struct {
+       id                int
+       Urn               string
+       Payload           []byte
+       In                []PCollection
+       Out               []FullType
+       Bounded           bool
+       ExpansionAddr     string
+       Components        *pipepb.Components
+       ExpandedTransform *pipepb.PTransform
+       Requirements      []string
+}
+
+// CrossLanguage is the temporary API to execute external transforms
+// TODO(pskevin): Handle errors using the TryN and Must strategies instead one 
function handling multiple points of failure
+func CrossLanguage(s Scope, p *Pipeline, e *ExternalTransform) []PCollection {
+       if e.ExpansionAddr == "" { // TODO(pskevin): Better way to check if the 
value was ever set
+               // return Legacy External API
+       }
+
+       /*
+               Add ExternalTranform to the Graph
+       */
+       // Validating scope and inputs
+       if !s.IsValid() {
+               // return nil, errors.New("invalid scope")
+               fmt.Println("invalid scope")
+       }
+       for i, col := range e.In {
+               if !col.IsValid() {
+                       // return nil, errors.Errorf("invalid pcollection to 
external: index %v", i)
+                       fmt.Printf("\ninvalid pcollection to external: index 
%v", i)
+
+               }
+       }
+
+       // Using exisiting MultiEdge format to represent ExternalTransform 
(already backwards compatible)
+       payload := &graph.Payload{
+               URN:  e.Urn,
+               Data: e.Payload,
+       }
+       var ins []*graph.Node
+       for _, col := range e.In {
+               ins = append(ins, col.n)
+       }
+       edge := graph.NewCrossLanguage(s.real, s.scope, ins, payload)
+
+       // TODO(pskevin): There needs to be a better way of associating this 
ExternalTransform to the pipeline
+       // Adding ExternalTransform to pipeline referenced by MultiEdge ID
+       if p.ExpandedTransforms == nil {
+               p.ExpandedTransforms = make(map[string]*ExternalTransform)
+       }
+       p.ExpandedTransforms[fmt.Sprintf("e%v", edge.ID())] = e

Review comment:
       As discussed, this data can be part of the graph.External node (or a new 
node if desired) which keeps it as part of the graph and can be handled 
appropriately in graphx/translate.go

##########
File path: sdks/go/pkg/beam/runners/universal/universal.go
##########
@@ -82,6 +82,54 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
                return errors.WithContextf(err, "generating model pipeline")
        }
 
+       // Adding Expanded transforms to their counterparts in the Pipeline
+       for id, external := range p.ExpandedTransforms {
+               pipeline.Requirements = append(pipeline.Requirements, 
external.Requirements...)
+
+               // Correct update of transform corresponding to the 
ExpandedTransform
+               // TODO(pskevin): Figure if there is a better way of supporting 
multiple outputs
+               transform := pipeline.Components.Transforms[id]
+               existingInput := ""
+               newInput := ""
+               for _, v := range transform.Outputs {
+                       existingInput = v
+               }
+               for _, v := range external.ExpandedTransform.Outputs {
+                       newInput = v
+               }
+
+               for _, t := range pipeline.Components.Transforms {
+                       for idx, i := range t.Inputs {
+                               if i == existingInput {
+                                       t.Inputs[idx] = newInput
+                               }
+                       }
+               }
+
+               // Adding components of the Expanded Transforms to the current 
Pipeline
+               for k, v := range external.Components.Transforms {

Review comment:
       You should be able to clean this up with pipelinex.Update
   
   
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/pipelinex/replace.go#L32
   
   Note that you'll need to delete the "go" entry from the environments first 
to do that safely for the loopback mode fix.

##########
File path: sdks/go/pkg/beam/pipeline.go
##########
@@ -60,7 +60,8 @@ func (s Scope) String() string {
 // Pipelines can safely be executed concurrently.
 type Pipeline struct {
        // real is the deferred execution Graph as it is being constructed.
-       real *graph.Graph
+       real               *graph.Graph
+       ExpandedTransforms map[string]*ExternalTransform

Review comment:
       As discussed, we don't want to have to hang the expanded transforms on 
the pipeline here.

##########
File path: sdks/go/pkg/beam/runners/universal/universal.go
##########
@@ -82,6 +82,54 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
                return errors.WithContextf(err, "generating model pipeline")
        }
 
+       // Adding Expanded transforms to their counterparts in the Pipeline
+       for id, external := range p.ExpandedTransforms {

Review comment:
       Similarly, this code should be moved into graphx/translate.go (though 
TBH this is substantial enough that having things isolated in function in a 
graphx/xlang.go which are then called in translate.go would be a good move.)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to