This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 133b25d  [BEAM-5381] Fix Duplicate CoGBK node names.
     new 9acac69  Merge pull request #8238 from lostluck/cogbkfix
133b25d is described below

commit 133b25d460e5e5cc648a8a44157ca2ed62982955
Author: Robert Burke <[email protected]>
AuthorDate: Fri Apr 5 21:00:48 2019 +0000

    [BEAM-5381] Fix Duplicate CoGBK node names.
---
 sdks/go/pkg/beam/core/runtime/graphx/translate.go |  9 ++++-----
 sdks/go/test/integration/primitives/cogbk.go      | 11 ++++++-----
 2 files changed, 10 insertions(+), 10 deletions(-)

diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go 
b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
index 8e22228..16328a7 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
@@ -21,7 +21,7 @@ import (
        "github.com/apache/beam/sdks/go/pkg/beam/core/graph"
        "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
        "github.com/apache/beam/sdks/go/pkg/beam/core/graph/window"
-       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx/v1"
+       v1 "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx/v1"
        "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/pipelinex"
        "github.com/apache/beam/sdks/go/pkg/beam/core/util/protox"
        pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
@@ -336,7 +336,7 @@ func (m *marshaller) expandCoGBK(edge NamedEdge) string {
        for i, in := range edge.Edge.Input {
                m.addNode(in.From)
 
-               out := fmt.Sprintf("%v_inject%v", nodeID(in.From), i)
+               out := fmt.Sprintf("%v_%v_inject%v", nodeID(in.From), id, i)
                m.makeNode(out, kvCoderID, in.From)
 
                // Inject(i)
@@ -436,7 +436,7 @@ func (m *marshaller) expandCoGBK(edge NamedEdge) string {
                UniqueName:    edge.Name,
                Subtransforms: subtransforms,
        }
-       return id
+       return cogbkID
 }
 
 func (m *marshaller) addNode(n *graph.Node) string {
@@ -462,9 +462,8 @@ func (m *marshaller) makeNode(id, cid string, n 
*graph.Node) string {
 func boolToBounded(bounded bool) pb.IsBounded_Enum {
        if bounded {
                return pb.IsBounded_BOUNDED
-       } else {
-               return pb.IsBounded_UNBOUNDED
        }
+       return pb.IsBounded_UNBOUNDED
 }
 
 func (m *marshaller) addDefaultEnv() string {
diff --git a/sdks/go/test/integration/primitives/cogbk.go 
b/sdks/go/test/integration/primitives/cogbk.go
index 6936494..468d99e 100644
--- a/sdks/go/test/integration/primitives/cogbk.go
+++ b/sdks/go/test/integration/primitives/cogbk.go
@@ -83,12 +83,13 @@ func splitFn(key string, v int, a, b, c, d func(int)) {
 func CoGBK() *beam.Pipeline {
        p, s := beam.NewPipelineWithRoot()
 
-       as := beam.ParDo(s, genA, beam.Impulse(s))
-       bs := beam.ParDo(s, genB, beam.Impulse(s))
-       cs := beam.ParDo(s, genC, beam.Impulse(s))
+       s2 := s.Scope("SubScope")
+       as := beam.ParDo(s2, genA, beam.Impulse(s))
+       bs := beam.ParDo(s2, genB, beam.Impulse(s))
+       cs := beam.ParDo(s2, genC, beam.Impulse(s))
+       grouped := beam.CoGroupByKey(s2, as, bs, cs)
+       joined := beam.ParDo(s2, joinFn, grouped)
 
-       grouped := beam.CoGroupByKey(s, as, bs, cs)
-       joined := beam.ParDo(s, joinFn, grouped)
        a, b, c, d := beam.ParDo4(s, splitFn, joined)
 
        passert.Sum(s, a, "a", 1, 18)

Reply via email to