pskevin commented on a change in pull request #12632:
URL: https://github.com/apache/beam/pull/12632#discussion_r475028223
##########
File path: sdks/go/pkg/beam/core/graph/edge.go
##########
@@ -283,31 +283,51 @@ func NewFlatten(g *Graph, s *Scope, in []*Node)
(*MultiEdge, error) {
}
// NewCrossLanguage inserts a Cross-langugae External transform.
-func NewCrossLanguage(g *Graph, s *Scope, ext *ExternalTransform) *MultiEdge {
+func NewCrossLanguage(g *Graph, s *Scope, ext *ExternalTransform, ins
[]*Inbound, outs []*Outbound) (*MultiEdge, func(*Node, bool)) {
edge := g.NewEdge(s)
edge.Op = External
edge.External = ext
- for _, n := range ext.Inputs() {
- edge.Input = append(edge.Input, &Inbound{Kind: Main, From: n,
Type: n.Type()})
+ windowingStrategy := inputWindow([]*Node{ins[0].From})
+ for _, o := range outs {
+ o.To.w = windowingStrategy
}
- return edge
+
+ isBoundedUpdater := func(n *Node, bounded bool) {
+ n.bounded = bounded
+ }
+
+ edge.Input = ins
+ edge.Output = outs
+
+ return edge, isBoundedUpdater
+}
+
+func NewNamedInboundLinks(ins map[string]*Node) (map[string]int, []*Inbound) {
+ inputsMap := make(map[string]int)
+ var inboundLinks []*Inbound
+
+ for tag, node := range ins {
+ id := len(inboundLinks)
+ inputsMap[tag] = id
+ inboundLinks = append(inboundLinks, &Inbound{Kind: Main, From:
node, Type: node.Type()})
+ }
+
+ return inputsMap, inboundLinks
}
-// AddOutboundLinks adds Outbound links to existing MultiEdge
-func AddOutboundLinks(g *Graph, e *MultiEdge) {
Review comment:
Hahaha yes I get that. I had to though. That initial version was so
crude. But thanks for bringing it up.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]