This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 133b25d [BEAM-5381] Fix Duplicate CoGBK node names.
new 9acac69 Merge pull request #8238 from lostluck/cogbkfix
133b25d is described below
commit 133b25d460e5e5cc648a8a44157ca2ed62982955
Author: Robert Burke <[email protected]>
AuthorDate: Fri Apr 5 21:00:48 2019 +0000
[BEAM-5381] Fix Duplicate CoGBK node names.
---
sdks/go/pkg/beam/core/runtime/graphx/translate.go | 9 ++++-----
sdks/go/test/integration/primitives/cogbk.go | 11 ++++++-----
2 files changed, 10 insertions(+), 10 deletions(-)
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go
b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
index 8e22228..16328a7 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
@@ -21,7 +21,7 @@ import (
"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
"github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
"github.com/apache/beam/sdks/go/pkg/beam/core/graph/window"
- "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx/v1"
+ v1 "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx/v1"
"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/pipelinex"
"github.com/apache/beam/sdks/go/pkg/beam/core/util/protox"
pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
@@ -336,7 +336,7 @@ func (m *marshaller) expandCoGBK(edge NamedEdge) string {
for i, in := range edge.Edge.Input {
m.addNode(in.From)
- out := fmt.Sprintf("%v_inject%v", nodeID(in.From), i)
+ out := fmt.Sprintf("%v_%v_inject%v", nodeID(in.From), id, i)
m.makeNode(out, kvCoderID, in.From)
// Inject(i)
@@ -436,7 +436,7 @@ func (m *marshaller) expandCoGBK(edge NamedEdge) string {
UniqueName: edge.Name,
Subtransforms: subtransforms,
}
- return id
+ return cogbkID
}
func (m *marshaller) addNode(n *graph.Node) string {
@@ -462,9 +462,8 @@ func (m *marshaller) makeNode(id, cid string, n
*graph.Node) string {
func boolToBounded(bounded bool) pb.IsBounded_Enum {
if bounded {
return pb.IsBounded_BOUNDED
- } else {
- return pb.IsBounded_UNBOUNDED
}
+ return pb.IsBounded_UNBOUNDED
}
func (m *marshaller) addDefaultEnv() string {
diff --git a/sdks/go/test/integration/primitives/cogbk.go
b/sdks/go/test/integration/primitives/cogbk.go
index 6936494..468d99e 100644
--- a/sdks/go/test/integration/primitives/cogbk.go
+++ b/sdks/go/test/integration/primitives/cogbk.go
@@ -83,12 +83,13 @@ func splitFn(key string, v int, a, b, c, d func(int)) {
func CoGBK() *beam.Pipeline {
p, s := beam.NewPipelineWithRoot()
- as := beam.ParDo(s, genA, beam.Impulse(s))
- bs := beam.ParDo(s, genB, beam.Impulse(s))
- cs := beam.ParDo(s, genC, beam.Impulse(s))
+ s2 := s.Scope("SubScope")
+ as := beam.ParDo(s2, genA, beam.Impulse(s))
+ bs := beam.ParDo(s2, genB, beam.Impulse(s))
+ cs := beam.ParDo(s2, genC, beam.Impulse(s))
+ grouped := beam.CoGroupByKey(s2, as, bs, cs)
+ joined := beam.ParDo(s2, joinFn, grouped)
- grouped := beam.CoGroupByKey(s, as, bs, cs)
- joined := beam.ParDo(s, joinFn, grouped)
a, b, c, d := beam.ParDo4(s, splitFn, joined)
passert.Sum(s, a, "a", 1, 18)