lostluck commented on a change in pull request #12445:
URL: https://github.com/apache/beam/pull/12445#discussion_r464721180
##########
File path: sdks/go/pkg/beam/external.go
##########
@@ -16,10 +16,151 @@
package beam
import (
+ "context"
+ "fmt"
+
"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+ "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+ jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+ pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+ "google.golang.org/grpc"
)
+// ExternalTransform represents the cross-language transform in and out of the
Pipeline as a MultiEdge and Expanded proto respectively
+type ExternalTransform struct {
+ id int
+ Urn string
+ Payload []byte
+ In []PCollection
+ Out []FullType
+ Bounded bool
+ ExpansionAddr string
+ Components *pipepb.Components
+ ExpandedTransform *pipepb.PTransform
+ Requirements []string
+}
+
+// CrossLanguage is the temporary API to execute external transforms
+// TODO(pskevin): Handle errors using the TryN and Must strategies instead one
function handling multiple points of failure
+func CrossLanguage(s Scope, p *Pipeline, e *ExternalTransform) []PCollection {
+ if e.ExpansionAddr == "" { // TODO(pskevin): Better way to check if the
value was ever set
+ // return Legacy External API
+ }
+
+ /*
+ Add ExternalTranform to the Graph
+ */
+ // Validating scope and inputs
+ if !s.IsValid() {
+ // return nil, errors.New("invalid scope")
+ fmt.Println("invalid scope")
+ }
+ for i, col := range e.In {
+ if !col.IsValid() {
+ // return nil, errors.Errorf("invalid pcollection to
external: index %v", i)
+ fmt.Printf("\ninvalid pcollection to external: index
%v", i)
+
+ }
+ }
+
+ // Using exisiting MultiEdge format to represent ExternalTransform
(already backwards compatible)
+ payload := &graph.Payload{
+ URN: e.Urn,
+ Data: e.Payload,
+ }
+ var ins []*graph.Node
+ for _, col := range e.In {
+ ins = append(ins, col.n)
+ }
+ edge := graph.NewCrossLanguage(s.real, s.scope, ins, payload)
+
+ // TODO(pskevin): There needs to be a better way of associating this
ExternalTransform to the pipeline
+ // Adding ExternalTransform to pipeline referenced by MultiEdge ID
+ if p.ExpandedTransforms == nil {
+ p.ExpandedTransforms = make(map[string]*ExternalTransform)
+ }
+ p.ExpandedTransforms[fmt.Sprintf("e%v", edge.ID())] = e
Review comment:
As discussed, this data can be part of the graph.External node (or a new
graph.CrossLanguage struct if desired) which keeps it as part of the graph and
can be handled appropriately in graphx/translate.go. There's absolutely no need
to add a new way to pass information in through the pipeline OR the suggestion
you have for scope.
Use the existing abstraction. If it's not sufficient, please articulate why.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]