pskevin commented on a change in pull request #12667:
URL: https://github.com/apache/beam/pull/12667#discussion_r475027840
##########
File path: sdks/go/pkg/beam/xlang.go
##########
@@ -21,91 +21,154 @@ import (
"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/xlangx"
+ "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
)
-// This is an experimetnal API and subject to change
-func CrossLanguage(s Scope, urn string, payload []byte, expansionAddr string,
inputs map[string]PCollection, outputTypes map[string]FullType)
map[string]PCollection {
+// xlang exposes an API to execute cross-language transforms within the Go SDK.
+// It is experimental and likely to change. It exposes convenient wrappers
+// around the core functions to pass in any combination of named/unnamed
+// inputs/outputs.
+
+// CrossLanguage executes a cross-language transform that uses named inputs and
+// returns named outputs.
+func CrossLanguage(
+ s Scope,
+ urn string,
+ payload []byte,
+ expansionAddr string,
+ namedInputs map[string]PCollection,
+ namedOutputTypes map[string]FullType,
+) map[string]PCollection {
if !s.IsValid() {
panic(errors.New("invalid scope"))
}
- namedInputNodes := mapPCollectionToNode(inputs)
-
- inputsMap, inboundLinks := graph.NewNamedInboundLinks(namedInputNodes)
- outputsMap, outboundLinks := graph.NewNamedOutboundLinks(s.real,
outputTypes)
+ inputsMap, inboundLinks :=
graph.NewNamedInboundLinks(mapPCollectionToNode(namedInputs))
+ outputsMap, outboundLinks := graph.NewNamedOutboundLinks(s.real,
namedOutputTypes)
ext := graph.ExternalTransform{
Urn: urn,
Payload: payload,
ExpansionAddr: expansionAddr,
}.WithNamedInputs(inputsMap).WithNamedOutputs(outputsMap)
- outputNodes, err := TryCrossLanguage(s, &ext, inboundLinks,
outboundLinks)
+ namedOutputs, err := TryCrossLanguage(s, &ext, inboundLinks,
outboundLinks)
if err != nil {
panic(errors.WithContextf(err, "tried cross-language and
failed"))
}
- return mapNodeToPCollection(outputNodes)
+ return mapNodeToPCollection(namedOutputs)
}
-/*
-func CrossLanguageWithSink(s Scope, urn string, payload []byte, expansionAddr
string, inputs map[string]PCollection, outputType FullType) PCollection {
- inputNodes := mapPCollectionToNode(inputs)
+// CrossLanguageWithSingleInputOutput executes a cross-language transform that
+// uses a single unnamed input and returns a single unnamed output.
+func CrossLanguageWithSingleInputOutput(
+ s Scope,
+ urn string,
+ payload []byte,
+ expansionAddr string,
+ input PCollection,
+ outputType FullType,
+) PCollection {
+ if !s.IsValid() {
+ panic(errors.New("invalid scope"))
+ }
+
+ // Adding dummy SourceInputTag to process it as a named input
+ namedInput :=
mapPCollectionToNode(map[string]PCollection{graph.SourceInputTag: input})
+ // Adding dummy SinkOutputTag to process it as a named output
+ namedOutputType := map[string]typex.FullType{graph.SinkOutputTag:
outputType}
- inputsMap, inboundLinks := graph.NewNamedInboundLinks(inputNodes)
- outputsMap, outboundLinks := graph.NewNamedOutboundLinks(s.real,
map[string]typex.FullType{graph.SinkOutputTag: outputType})
+ inputsMap, inboundLinks := graph.NewNamedInboundLinks(namedInput)
+ outputsMap, outboundLinks := graph.NewNamedOutboundLinks(s.real,
namedOutputType)
ext := graph.ExternalTransform{
Urn: urn,
Payload: payload,
ExpansionAddr: expansionAddr,
- }.WithNamedInputs(inputNodes).WithNamedOutputs(outputTypes)
+ }.WithNamedInputs(inputsMap).WithNamedOutputs(outputsMap)
- outputNodes, err := TryCrossLanguage(s, &ext, inboundLinks,
outboundLinks)
+ namedOutput, err := TryCrossLanguage(s, &ext, inboundLinks,
outboundLinks)
if err != nil {
panic(errors.WithContextf(err, "tried cross-language and
failed"))
}
- namedOutputNode := mapNodeToPCollection(outputNodes)
+ return nodeToPCollection(namedOutput[graph.SinkOutputTag])
+}
- outputNode, exists := namedOutputNode[graph.SinkOutputTag]
- if !exists {
- panic("a")
+// CrossLanguageWithSink executes a cross-language transform that uses named
+// inputs and returns a single unnamed output.
Review comment:
Interesting point. Unfortunately that'll be a bigger change and I hope
one that will be addressed later.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]