This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new c66e5f2c982 Improved test coverage and fix the implementation of
Inject and CoGBK (#23307)
c66e5f2c982 is described below
commit c66e5f2c982d857dc2c627c08f0ce8bd5e7e56fc
Author: coldWater <[email protected]>
AuthorDate: Tue Oct 18 06:14:44 2022 +0800
Improved test coverage and fix the implementation of Inject and CoGBK
(#23307)
---
sdks/go/pkg/beam/runners/direct/direct_test.go | 59 ++++++++++++++++++++++++++
sdks/go/pkg/beam/runners/direct/gbk.go | 45 +++++++++++---------
sdks/go/test/regression/lperror.go | 2 +-
3 files changed, 84 insertions(+), 22 deletions(-)
diff --git a/sdks/go/pkg/beam/runners/direct/direct_test.go
b/sdks/go/pkg/beam/runners/direct/direct_test.go
index b65d33f9091..ac1eeecb64b 100644
--- a/sdks/go/pkg/beam/runners/direct/direct_test.go
+++ b/sdks/go/pkg/beam/runners/direct/direct_test.go
@@ -23,8 +23,11 @@ import (
"reflect"
"sort"
"testing"
+ "time"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/metrics"
"github.com/google/go-cmp/cmp"
)
@@ -55,6 +58,10 @@ func init() {
beam.RegisterFunction(dofn1Counter)
beam.RegisterFunction(dofnSink)
+
+ beam.RegisterFunction(dofnEtKV1)
+ beam.RegisterFunction(dofnEtKV2)
+ beam.RegisterFunction(formatCoGBK2)
}
func dofn1(imp []byte, emit func(int64)) {
@@ -221,6 +228,36 @@ func dofn1Counter(ctx context.Context, _ []byte, emit
func(int64)) {
beam.NewCounter(ns, "count").Inc(ctx, 1)
}
+const baseTs = mtime.Time(1663663026000)
+
+func dofnEtKV1(imp []byte, emit func(beam.EventTime, string, int64)) {
+ emit(baseTs, "a", 1)
+ emit(baseTs.Add(time.Millisecond*1200), "a", 3)
+
+ emit(baseTs.Add(time.Millisecond*2500), "b", 3)
+}
+
+func dofnEtKV2(imp []byte, emit func(beam.EventTime, string, int64)) {
+ emit(baseTs.Add(time.Millisecond*500), "a", 2)
+
+ emit(baseTs.Add(time.Millisecond), "b", 1)
+ emit(baseTs.Add(time.Millisecond*500), "b", 2)
+}
+
+func formatCoGBK2(s string, u func(*int64) bool, v func(*int64) bool) string {
+ var ls0, ls1 []int64
+ val := int64(0)
+ for u(&val) {
+ ls0 = append(ls0, val)
+ }
+ sort.Slice(ls0, func(i, j int) bool { return ls0[i] < ls0[j] })
+ for v(&val) {
+ ls1 = append(ls1, val)
+ }
+ sort.Slice(ls1, func(i, j int) bool { return ls1[i] < ls1[j] })
+ return fmt.Sprintf("%s,%v,%v", s, ls0, ls1)
+}
+
func TestRunner_Pipelines(t *testing.T) {
t.Run("simple", func(t *testing.T) {
p, s := beam.NewPipelineWithRoot()
@@ -425,6 +462,28 @@ func TestRunner_Pipelines(t *testing.T) {
t.Fatal(err)
}
})
+ t.Run("window", func(t *testing.T) {
+ p, s := beam.NewPipelineWithRoot()
+ imp := beam.Impulse(s)
+ col1 := beam.ParDo(s, dofnEtKV1, imp)
+ wcol1 := beam.WindowInto(s, window.NewSessions(time.Second),
col1)
+ col2 := beam.ParDo(s, dofnEtKV2, imp)
+ wcol2 := beam.WindowInto(s, window.NewSessions(time.Second),
col2)
+ coGBK := beam.CoGroupByKey(s, wcol1, wcol2)
+ format := beam.ParDo(s, formatCoGBK2, coGBK)
+ beam.ParDo(s, &stringCheck{
+ Name: "window",
+ Want: []string{
+ "a,[1 3],[2]",
+ "b,[3],[]",
+ "b,[],[1 2]",
+ },
+ }, format)
+
+ if _, err := executeWithT(context.Background(), t, p); err !=
nil {
+ t.Fatal(err)
+ }
+ })
}
func TestRunner_Metrics(t *testing.T) {
diff --git a/sdks/go/pkg/beam/runners/direct/gbk.go
b/sdks/go/pkg/beam/runners/direct/gbk.go
index c1dc97bb876..71ffe80ecbf 100644
--- a/sdks/go/pkg/beam/runners/direct/gbk.go
+++ b/sdks/go/pkg/beam/runners/direct/gbk.go
@@ -61,44 +61,40 @@ func (n *CoGBK) StartBundle(ctx context.Context, id string,
data exec.DataContex
}
func (n *CoGBK) ProcessElement(ctx context.Context, elm *exec.FullValue, _
...exec.ReStream) error {
- index := elm.Elm.(int)
- value := elm.Elm2.(*exec.FullValue)
+ index := elm.Elm2.(*exec.FullValue).Elm.(int)
+ value := elm.Elm2.(*exec.FullValue).Elm2
for _, w := range elm.Windows {
ws := []typex.Window{w}
n.wins = append(n.wins, ws...)
- key, err := n.encodeKey(value.Elm, ws)
+ g, err := n.getGroup(n.m, elm, ws)
if err != nil {
- return errors.Errorf("failed encoding key for %v: %v",
elm, err)
+ return errors.Errorf("failed getGroup for %v: %v", elm,
err)
}
- g := n.getGroup(n.m, key, value, ws)
- g.values[index] = append(g.values[index], exec.FullValue{Elm:
value.Elm2, Timestamp: value.Timestamp})
+ g.values[index] = append(g.values[index], exec.FullValue{Elm:
value, Timestamp: elm.Timestamp})
}
return nil
}
-func (n *CoGBK) encodeKey(elm interface{}, ws []typex.Window) (string, error) {
+func (n *CoGBK) getGroup(m map[string]*group, elm *exec.FullValue, ws
[]typex.Window) (*group, error) {
var buf bytes.Buffer
- if err := n.enc.Encode(&exec.FullValue{Elm: elm}, &buf); err != nil {
- return "", errors.WithContextf(err, "encoding key %v for
CoGBK", elm)
+ if err := n.enc.Encode(&exec.FullValue{Elm: elm.Elm}, &buf); err != nil
{
+ return nil, errors.WithContextf(err, "encoding key %v for
CoGBK", elm)
}
if err := n.wEnc.Encode(ws, &buf); err != nil {
- return "", errors.WithContextf(err, "encoding window %v for
CoGBK", ws)
+ return nil, errors.WithContextf(err, "encoding window %v for
CoGBK", ws)
}
- return buf.String(), nil
-}
-
-func (n *CoGBK) getGroup(m map[string]*group, key string, value
*exec.FullValue, ws []typex.Window) *group {
+ key := buf.String()
g, ok := m[key]
if !ok {
g = &group{
- key: exec.FullValue{Elm: value.Elm, Timestamp:
value.Timestamp, Windows: ws},
+ key: exec.FullValue{Elm: elm.Elm, Timestamp:
elm.Timestamp, Windows: ws},
values: make([][]exec.FullValue, len(n.Edge.Input)),
}
m[key] = g
}
- return g
+ return g, nil
}
func (n *CoGBK) FinishBundle(ctx context.Context) error {
@@ -132,7 +128,7 @@ func (n *CoGBK) mergeWindows() (map[typex.Window]int,
error) {
// mergeMap is a map from the oringal windows to the index of the new
window
// in the mergedWins slice
mergeMap := make(map[typex.Window]int)
- mergedWins := []typex.Window{}
+ var mergedWins []typex.Window
for i := 0; i < len(n.wins); {
intWin, ok := n.wins[i].(window.IntervalWindow)
if !ok {
@@ -164,11 +160,10 @@ func (n *CoGBK) reprocessByWindow(mergeMap
map[typex.Window]int) error {
newGroups := make(map[string]*group)
for _, g := range n.m {
ws := []typex.Window{n.wins[mergeMap[g.key.Windows[0]]]}
- key, err := n.encodeKey(g.key.Elm, ws)
+ gr, err := n.getGroup(newGroups, &g.key, ws)
if err != nil {
return errors.Errorf("failed encoding key for %v: %v",
g.key.Elm, err)
}
- gr := n.getGroup(newGroups, key, &g.key, ws)
for i, list := range g.values {
gr.values[i] = append(gr.values[i], list...)
}
@@ -186,7 +181,7 @@ func (n *CoGBK) String() string {
}
// Inject injects the predecessor index into each FullValue, effectively
-// converting KV<X,Y> into KV<int,KV<X,Y>>. Used to prime CoGBK.
+// converting KV<X,Y> into KV<X,KV<int,Y>>. Used to prime CoGBK.
type Inject struct {
UID exec.UnitID
N int
@@ -206,7 +201,15 @@ func (n *Inject) StartBundle(ctx context.Context, id
string, data exec.DataConte
}
func (n *Inject) ProcessElement(ctx context.Context, elm *exec.FullValue,
values ...exec.ReStream) error {
- return n.Out.ProcessElement(ctx, &exec.FullValue{Elm: n.N, Elm2: elm,
Timestamp: elm.Timestamp, Windows: elm.Windows}, values...)
+ v := *elm
+ v.Elm = n.N
+ return n.Out.ProcessElement(ctx, &exec.FullValue{
+ Elm: elm.Elm,
+ Elm2: &v,
+ Timestamp: elm.Timestamp,
+ Windows: elm.Windows,
+ Pane: elm.Pane,
+ }, values...)
}
func (n *Inject) FinishBundle(ctx context.Context) error {
diff --git a/sdks/go/test/regression/lperror.go
b/sdks/go/test/regression/lperror.go
index 4e6e22f3eb4..088f81d7a7c 100644
--- a/sdks/go/test/regression/lperror.go
+++ b/sdks/go/test/regression/lperror.go
@@ -60,7 +60,7 @@ func LPErrorPipeline(s beam.Scope) beam.PCollection {
// [0 "Foo"]
fooKV := beam.ParDo(s, toFoo, fruitsGBK)
- // [0 ["Foo"] ["Apple", "Banana", "Cherry"]]
+ // [0 ["Apple", "Banana", "Cherry"] ["Foo"]]
fruitsFooCoGBK := beam.CoGroupByKey(s, fruitsKV, fooKV)
// [0]