This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c26009  [BEAM-5354] Add side input nodes to their consumer's 
parent-scope.
     new 9647808  Merge pull request #8713 from lostluck/sideforms
1c26009 is described below

commit 1c26009cf9154ff337ab997b84b404e29e877f89
Author: Robert Burke <[email protected]>
AuthorDate: Wed May 29 09:34:48 2019 -0700

    [BEAM-5354] Add side input nodes to their consumer's parent-scope.
---
 sdks/go/pkg/beam/core/runtime/graphx/translate.go  |  14 ++-
 .../pkg/beam/core/runtime/graphx/translate_test.go | 130 ++++++++++++++++-----
 sdks/go/test/integration/primitives/pardo.go       |   3 +-
 3 files changed, 109 insertions(+), 38 deletions(-)

diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go 
b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
index 99a3b8f..17dc253 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
@@ -128,7 +128,7 @@ func (m *marshaller) addScopeTree(s *ScopeTree) string {
 
        var subtransforms []string
        for _, edge := range s.Edges {
-               subtransforms = append(subtransforms, m.addMultiEdge(edge))
+               subtransforms = append(subtransforms, m.addMultiEdge(edge)...)
        }
        for _, tree := range s.Children {
                subtransforms = append(subtransforms, m.addScopeTree(tree))
@@ -173,14 +173,14 @@ func (m *marshaller) updateIfCombineComposite(s 
*ScopeTree, transform *pb.PTrans
        transform.Spec = &pb.FunctionSpec{Urn: URNCombinePerKey, Payload: 
protox.MustEncode(payload)}
 }
 
-func (m *marshaller) addMultiEdge(edge NamedEdge) string {
+func (m *marshaller) addMultiEdge(edge NamedEdge) []string {
        id := edgeID(edge.Edge)
        if _, exists := m.transforms[id]; exists {
-               return id
+               return []string{id}
        }
 
        if edge.Edge.Op == graph.CoGBK && len(edge.Edge.Input) > 1 {
-               return m.expandCoGBK(edge)
+               return []string{m.expandCoGBK(edge)}
        }
 
        inputs := make(map[string]string)
@@ -194,6 +194,8 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) string {
                outputs[fmt.Sprintf("i%v", i)] = nodeID(out.To)
        }
 
+       // allPIds tracks additional PTransformIDs generated for the pipeline
+       var allPIds []string
        var spec *pb.FunctionSpec
        switch edge.Edge.Op {
        case graph.Impulse:
@@ -238,6 +240,7 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) string {
                                        Outputs: map[string]string{"i0": out},
                                }
                                m.transforms[keyedID] = keyed
+                               allPIds = append(allPIds, keyedID)
 
                                // Fixup input map
                                inputs[fmt.Sprintf("i%v", i)] = out
@@ -320,7 +323,8 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) string {
                Outputs:    outputs,
        }
        m.transforms[id] = transform
-       return id
+       allPIds = append(allPIds, id)
+       return allPIds
 }
 
 func (m *marshaller) expandCoGBK(edge NamedEdge) string {
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate_test.go 
b/sdks/go/pkg/beam/core/runtime/graphx/translate_test.go
index ec2a8b7..0026745 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/translate_test.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/translate_test.go
@@ -41,22 +41,36 @@ func pickFn(a int, small, big func(int)) {
        }
 }
 
-func pick(t *testing.T, g *graph.Graph) *graph.MultiEdge {
-       dofn, err := graph.NewDoFn(pickFn)
+func pickSideFn(a, side int, small, big func(int)) {
+       if a < side {
+               small(a)
+       } else {
+               big(a)
+       }
+}
+
+func addDoFn(t *testing.T, g *graph.Graph, fn interface{}, scope *graph.Scope, 
inputs []*graph.Node, outputCoders []*coder.Coder) {
+       t.Helper()
+       dofn, err := graph.NewDoFn(fn)
        if err != nil {
                t.Fatal(err)
        }
-
-       in := g.NewNode(intT(), window.DefaultWindowingStrategy(), true)
-       in.Coder = intCoder()
-
-       e, err := graph.NewParDo(g, g.Root(), dofn, []*graph.Node{in}, nil)
+       e, err := graph.NewParDo(g, scope, dofn, inputs, nil)
        if err != nil {
                t.Fatal(err)
        }
-       e.Output[0].To.Coder = intCoder()
-       e.Output[1].To.Coder = intCoder()
-       return e
+       if len(outputCoders) != len(e.Output) {
+               t.Fatalf("%v has %d outputs, but only got %d coders", 
dofn.Name(), len(e.Output), len(outputCoders))
+       }
+       for i, c := range outputCoders {
+               e.Output[i].To.Coder = c
+       }
+}
+
+func newIntInput(g *graph.Graph) *graph.Node {
+       in := g.NewNode(intT(), window.DefaultWindowingStrategy(), true)
+       in.Coder = intCoder()
+       return in
 }
 
 func intT() typex.FullType {
@@ -67,30 +81,82 @@ func intCoder() *coder.Coder {
        return custom("int", reflectx.Int)
 }
 
-// TestParDo verifies that ParDo can be serialized.
-func TestParDo(t *testing.T) {
-       g := graph.New()
-       pick(t, g)
-
-       edges, _, err := g.Build()
-       if err != nil {
-               t.Fatal(err)
-       }
-       if len(edges) != 1 {
-               t.Fatal("expected a single edge")
+// TestMarshal verifies that ParDo can be serialized.
+func TestMarshal(t *testing.T) {
+       tests := []struct {
+               name                     string
+               makeGraph                func(t *testing.T, g *graph.Graph)
+               edges, transforms, roots int
+       }{
+               {
+                       name: "ParDo",
+                       makeGraph: func(t *testing.T, g *graph.Graph) {
+                               addDoFn(t, g, pickFn, g.Root(), 
[]*graph.Node{newIntInput(g)}, []*coder.Coder{intCoder(), intCoder()})
+                       },
+                       edges:      1,
+                       transforms: 1,
+                       roots:      1,
+               }, {
+                       name: "ScopedParDo",
+                       makeGraph: func(t *testing.T, g *graph.Graph) {
+                               addDoFn(t, g, pickFn, g.NewScope(g.Root(), 
"sub"), []*graph.Node{newIntInput(g)}, []*coder.Coder{intCoder(), intCoder()})
+                       },
+                       edges:      1,
+                       transforms: 2,
+                       roots:      1,
+               }, {
+                       name: "SideInput",
+                       makeGraph: func(t *testing.T, g *graph.Graph) {
+                               in := newIntInput(g)
+                               side := newIntInput(g)
+                               addDoFn(t, g, pickSideFn, g.Root(), 
[]*graph.Node{in, side}, []*coder.Coder{intCoder(), intCoder()})
+                       },
+                       edges:      1,
+                       transforms: 2,
+                       roots:      2,
+               }, {
+                       name: "ScopedSideInput",
+                       makeGraph: func(t *testing.T, g *graph.Graph) {
+                               in := newIntInput(g)
+                               side := newIntInput(g)
+                               addDoFn(t, g, pickSideFn, g.NewScope(g.Root(), 
"sub"), []*graph.Node{in, side}, []*coder.Coder{intCoder(), intCoder()})
+                       },
+                       edges:      1,
+                       transforms: 3,
+                       roots:      1,
+               },
        }
+       for _, test := range tests {
+               test := test
+               t.Run(test.name, func(t *testing.T) {
 
-       payload, err := proto.Marshal(&pb.DockerPayload{ContainerImage: "foo"})
-       if err != nil {
-               t.Fatal(err)
-       }
-       p, err := graphx.Marshal(edges,
-               &graphx.Options{Environment: pb.Environment{Urn: 
"beam:env:docker:v1", Payload: payload}})
-       if err != nil {
-               t.Fatal(err)
-       }
+                       g := graph.New()
+                       test.makeGraph(t, g)
+
+                       edges, _, err := g.Build()
+                       if err != nil {
+                               t.Fatal(err)
+                       }
+                       if len(edges) != test.edges {
+                               t.Fatal("expected a single edge")
+                       }
+
+                       payload, err := 
proto.Marshal(&pb.DockerPayload{ContainerImage: "foo"})
+                       if err != nil {
+                               t.Fatal(err)
+                       }
+                       p, err := graphx.Marshal(edges,
+                               &graphx.Options{Environment: 
pb.Environment{Urn: "beam:env:docker:v1", Payload: payload}})
+                       if err != nil {
+                               t.Fatal(err)
+                       }
 
-       if len(p.GetComponents().GetTransforms()) != 1 {
-               t.Errorf("bad ParDo translation: %v", 
proto.MarshalTextString(p))
+                       if got, want := len(p.GetComponents().GetTransforms()), 
test.transforms; got != want {
+                               t.Errorf("got %d transforms, want %d : %v", 
got, want, proto.MarshalTextString(p))
+                       }
+                       if got, want := len(p.GetRootTransformIds()), 
test.roots; got != want {
+                               t.Errorf("got %d roots, want %d : %v", got, 
want, proto.MarshalTextString(p))
+                       }
+               })
        }
 }
diff --git a/sdks/go/test/integration/primitives/pardo.go 
b/sdks/go/test/integration/primitives/pardo.go
index ae7d9b2..62654e8 100644
--- a/sdks/go/test/integration/primitives/pardo.go
+++ b/sdks/go/test/integration/primitives/pardo.go
@@ -53,7 +53,8 @@ func ParDoSideInput() *beam.Pipeline {
        p, s := beam.NewPipelineWithRoot()
 
        in := beam.Create(s, 1, 2, 3, 4, 5, 6, 7, 8, 9)
-       out := beam.ParDo(s, sumValuesFn, beam.Impulse(s), 
beam.SideInput{Input: in})
+       sub := s.Scope("subscope") // Ensure scoping works with side inputs. 
See: BEAM-5354
+       out := beam.ParDo(sub, sumValuesFn, beam.Impulse(s), 
beam.SideInput{Input: in})
        passert.Sum(s, out, "out", 1, 45)
 
        return p

Reply via email to