This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new c66e5f2c982 Improved test coverage and fix the implementation of 
Inject and CoGBK (#23307)
c66e5f2c982 is described below

commit c66e5f2c982d857dc2c627c08f0ce8bd5e7e56fc
Author: coldWater <[email protected]>
AuthorDate: Tue Oct 18 06:14:44 2022 +0800

    Improved test coverage and fix the implementation of Inject and CoGBK 
(#23307)
---
 sdks/go/pkg/beam/runners/direct/direct_test.go | 59 ++++++++++++++++++++++++++
 sdks/go/pkg/beam/runners/direct/gbk.go         | 45 +++++++++++---------
 sdks/go/test/regression/lperror.go             |  2 +-
 3 files changed, 84 insertions(+), 22 deletions(-)

diff --git a/sdks/go/pkg/beam/runners/direct/direct_test.go 
b/sdks/go/pkg/beam/runners/direct/direct_test.go
index b65d33f9091..ac1eeecb64b 100644
--- a/sdks/go/pkg/beam/runners/direct/direct_test.go
+++ b/sdks/go/pkg/beam/runners/direct/direct_test.go
@@ -23,8 +23,11 @@ import (
        "reflect"
        "sort"
        "testing"
+       "time"
 
        "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/metrics"
        "github.com/google/go-cmp/cmp"
 )
@@ -55,6 +58,10 @@ func init() {
 
        beam.RegisterFunction(dofn1Counter)
        beam.RegisterFunction(dofnSink)
+
+       beam.RegisterFunction(dofnEtKV1)
+       beam.RegisterFunction(dofnEtKV2)
+       beam.RegisterFunction(formatCoGBK2)
 }
 
 func dofn1(imp []byte, emit func(int64)) {
@@ -221,6 +228,36 @@ func dofn1Counter(ctx context.Context, _ []byte, emit 
func(int64)) {
        beam.NewCounter(ns, "count").Inc(ctx, 1)
 }
 
+const baseTs = mtime.Time(1663663026000)
+
+func dofnEtKV1(imp []byte, emit func(beam.EventTime, string, int64)) {
+       emit(baseTs, "a", 1)
+       emit(baseTs.Add(time.Millisecond*1200), "a", 3)
+
+       emit(baseTs.Add(time.Millisecond*2500), "b", 3)
+}
+
+func dofnEtKV2(imp []byte, emit func(beam.EventTime, string, int64)) {
+       emit(baseTs.Add(time.Millisecond*500), "a", 2)
+
+       emit(baseTs.Add(time.Millisecond), "b", 1)
+       emit(baseTs.Add(time.Millisecond*500), "b", 2)
+}
+
+func formatCoGBK2(s string, u func(*int64) bool, v func(*int64) bool) string {
+       var ls0, ls1 []int64
+       val := int64(0)
+       for u(&val) {
+               ls0 = append(ls0, val)
+       }
+       sort.Slice(ls0, func(i, j int) bool { return ls0[i] < ls0[j] })
+       for v(&val) {
+               ls1 = append(ls1, val)
+       }
+       sort.Slice(ls1, func(i, j int) bool { return ls1[i] < ls1[j] })
+       return fmt.Sprintf("%s,%v,%v", s, ls0, ls1)
+}
+
 func TestRunner_Pipelines(t *testing.T) {
        t.Run("simple", func(t *testing.T) {
                p, s := beam.NewPipelineWithRoot()
@@ -425,6 +462,28 @@ func TestRunner_Pipelines(t *testing.T) {
                        t.Fatal(err)
                }
        })
+       t.Run("window", func(t *testing.T) {
+               p, s := beam.NewPipelineWithRoot()
+               imp := beam.Impulse(s)
+               col1 := beam.ParDo(s, dofnEtKV1, imp)
+               wcol1 := beam.WindowInto(s, window.NewSessions(time.Second), 
col1)
+               col2 := beam.ParDo(s, dofnEtKV2, imp)
+               wcol2 := beam.WindowInto(s, window.NewSessions(time.Second), 
col2)
+               coGBK := beam.CoGroupByKey(s, wcol1, wcol2)
+               format := beam.ParDo(s, formatCoGBK2, coGBK)
+               beam.ParDo(s, &stringCheck{
+                       Name: "window",
+                       Want: []string{
+                               "a,[1 3],[2]",
+                               "b,[3],[]",
+                               "b,[],[1 2]",
+                       },
+               }, format)
+
+               if _, err := executeWithT(context.Background(), t, p); err != 
nil {
+                       t.Fatal(err)
+               }
+       })
 }
 
 func TestRunner_Metrics(t *testing.T) {
diff --git a/sdks/go/pkg/beam/runners/direct/gbk.go 
b/sdks/go/pkg/beam/runners/direct/gbk.go
index c1dc97bb876..71ffe80ecbf 100644
--- a/sdks/go/pkg/beam/runners/direct/gbk.go
+++ b/sdks/go/pkg/beam/runners/direct/gbk.go
@@ -61,44 +61,40 @@ func (n *CoGBK) StartBundle(ctx context.Context, id string, 
data exec.DataContex
 }
 
 func (n *CoGBK) ProcessElement(ctx context.Context, elm *exec.FullValue, _ 
...exec.ReStream) error {
-       index := elm.Elm.(int)
-       value := elm.Elm2.(*exec.FullValue)
+       index := elm.Elm2.(*exec.FullValue).Elm.(int)
+       value := elm.Elm2.(*exec.FullValue).Elm2
 
        for _, w := range elm.Windows {
                ws := []typex.Window{w}
                n.wins = append(n.wins, ws...)
 
-               key, err := n.encodeKey(value.Elm, ws)
+               g, err := n.getGroup(n.m, elm, ws)
                if err != nil {
-                       return errors.Errorf("failed encoding key for %v: %v", 
elm, err)
+                       return errors.Errorf("failed getGroup for %v: %v", elm, 
err)
                }
-               g := n.getGroup(n.m, key, value, ws)
-               g.values[index] = append(g.values[index], exec.FullValue{Elm: 
value.Elm2, Timestamp: value.Timestamp})
+               g.values[index] = append(g.values[index], exec.FullValue{Elm: 
value, Timestamp: elm.Timestamp})
        }
        return nil
 }
 
-func (n *CoGBK) encodeKey(elm interface{}, ws []typex.Window) (string, error) {
+func (n *CoGBK) getGroup(m map[string]*group, elm *exec.FullValue, ws 
[]typex.Window) (*group, error) {
        var buf bytes.Buffer
-       if err := n.enc.Encode(&exec.FullValue{Elm: elm}, &buf); err != nil {
-               return "", errors.WithContextf(err, "encoding key %v for 
CoGBK", elm)
+       if err := n.enc.Encode(&exec.FullValue{Elm: elm.Elm}, &buf); err != nil 
{
+               return nil, errors.WithContextf(err, "encoding key %v for 
CoGBK", elm)
        }
        if err := n.wEnc.Encode(ws, &buf); err != nil {
-               return "", errors.WithContextf(err, "encoding window %v for 
CoGBK", ws)
+               return nil, errors.WithContextf(err, "encoding window %v for 
CoGBK", ws)
        }
-       return buf.String(), nil
-}
-
-func (n *CoGBK) getGroup(m map[string]*group, key string, value 
*exec.FullValue, ws []typex.Window) *group {
+       key := buf.String()
        g, ok := m[key]
        if !ok {
                g = &group{
-                       key:    exec.FullValue{Elm: value.Elm, Timestamp: 
value.Timestamp, Windows: ws},
+                       key:    exec.FullValue{Elm: elm.Elm, Timestamp: 
elm.Timestamp, Windows: ws},
                        values: make([][]exec.FullValue, len(n.Edge.Input)),
                }
                m[key] = g
        }
-       return g
+       return g, nil
 }
 
 func (n *CoGBK) FinishBundle(ctx context.Context) error {
@@ -132,7 +128,7 @@ func (n *CoGBK) mergeWindows() (map[typex.Window]int, 
error) {
        // mergeMap is a map from the oringal windows to the index of the new 
window
        // in the mergedWins slice
        mergeMap := make(map[typex.Window]int)
-       mergedWins := []typex.Window{}
+       var mergedWins []typex.Window
        for i := 0; i < len(n.wins); {
                intWin, ok := n.wins[i].(window.IntervalWindow)
                if !ok {
@@ -164,11 +160,10 @@ func (n *CoGBK) reprocessByWindow(mergeMap 
map[typex.Window]int) error {
        newGroups := make(map[string]*group)
        for _, g := range n.m {
                ws := []typex.Window{n.wins[mergeMap[g.key.Windows[0]]]}
-               key, err := n.encodeKey(g.key.Elm, ws)
+               gr, err := n.getGroup(newGroups, &g.key, ws)
                if err != nil {
                        return errors.Errorf("failed encoding key for %v: %v", 
g.key.Elm, err)
                }
-               gr := n.getGroup(newGroups, key, &g.key, ws)
                for i, list := range g.values {
                        gr.values[i] = append(gr.values[i], list...)
                }
@@ -186,7 +181,7 @@ func (n *CoGBK) String() string {
 }
 
 // Inject injects the predecessor index into each FullValue, effectively
-// converting KV<X,Y> into KV<int,KV<X,Y>>. Used to prime CoGBK.
+// converting KV<X,Y> into KV<X,KV<int,Y>>. Used to prime CoGBK.
 type Inject struct {
        UID exec.UnitID
        N   int
@@ -206,7 +201,15 @@ func (n *Inject) StartBundle(ctx context.Context, id 
string, data exec.DataConte
 }
 
 func (n *Inject) ProcessElement(ctx context.Context, elm *exec.FullValue, 
values ...exec.ReStream) error {
-       return n.Out.ProcessElement(ctx, &exec.FullValue{Elm: n.N, Elm2: elm, 
Timestamp: elm.Timestamp, Windows: elm.Windows}, values...)
+       v := *elm
+       v.Elm = n.N
+       return n.Out.ProcessElement(ctx, &exec.FullValue{
+               Elm:       elm.Elm,
+               Elm2:      &v,
+               Timestamp: elm.Timestamp,
+               Windows:   elm.Windows,
+               Pane:      elm.Pane,
+       }, values...)
 }
 
 func (n *Inject) FinishBundle(ctx context.Context) error {
diff --git a/sdks/go/test/regression/lperror.go 
b/sdks/go/test/regression/lperror.go
index 4e6e22f3eb4..088f81d7a7c 100644
--- a/sdks/go/test/regression/lperror.go
+++ b/sdks/go/test/regression/lperror.go
@@ -60,7 +60,7 @@ func LPErrorPipeline(s beam.Scope) beam.PCollection {
        // [0 "Foo"]
        fooKV := beam.ParDo(s, toFoo, fruitsGBK)
 
-       // [0 ["Foo"] ["Apple", "Banana", "Cherry"]]
+       // [0 ["Apple", "Banana", "Cherry"] ["Foo"]]
        fruitsFooCoGBK := beam.CoGroupByKey(s, fruitsKV, fooKV)
 
        // [0]

Reply via email to