This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 96f4391cc61 [Go SDK] Fix multimap support for the direct runner 
(#24775)
96f4391cc61 is described below

commit 96f4391cc614364b3432b2287e97dd103d23469d
Author: camphillips22 <[email protected]>
AuthorDate: Tue Dec 27 12:37:40 2022 -0500

    [Go SDK] Fix multimap support for the direct runner (#24775)
    
    Updates the `NewKeyedIterable()` implementation to correctly
    parse a multimap side input from an in memory ReStream.
---
 sdks/go/pkg/beam/runners/direct/buffer.go      | 17 ++++++++++++++-
 sdks/go/pkg/beam/runners/direct/direct_test.go | 30 ++++++++++++++++++++++++++
 2 files changed, 46 insertions(+), 1 deletion(-)

diff --git a/sdks/go/pkg/beam/runners/direct/buffer.go 
b/sdks/go/pkg/beam/runners/direct/buffer.go
index e831930a618..383db3db306 100644
--- a/sdks/go/pkg/beam/runners/direct/buffer.go
+++ b/sdks/go/pkg/beam/runners/direct/buffer.go
@@ -72,7 +72,22 @@ func (n *buffer) NewIterable(ctx context.Context, reader 
exec.StateReader, w typ
 }
 
 func (n *buffer) NewKeyedIterable(ctx context.Context, reader 
exec.StateReader, w typex.Window, iterKey any) (exec.ReStream, error) {
-       return n.NewIterable(ctx, reader, w)
+       if !n.done {
+               panic(fmt.Sprintf("buffer[%v] incomplete: %v", n.uid, 
len(n.buf)))
+       }
+       s := &exec.FixedReStream{Buf: make([]exec.FullValue, 0)}
+       for _, v := range n.buf {
+               if v.Elm == iterKey {
+                       s.Buf = append(s.Buf, exec.FullValue{
+                               Elm:          v.Elm2,
+                               Timestamp:    v.Timestamp,
+                               Windows:      v.Windows,
+                               Pane:         v.Pane,
+                               Continuation: v.Continuation,
+                       })
+               }
+       }
+       return s, nil
 }
 
 func (n *buffer) String() string {
diff --git a/sdks/go/pkg/beam/runners/direct/direct_test.go 
b/sdks/go/pkg/beam/runners/direct/direct_test.go
index ac1eeecb64b..a8108580aa2 100644
--- a/sdks/go/pkg/beam/runners/direct/direct_test.go
+++ b/sdks/go/pkg/beam/runners/direct/direct_test.go
@@ -29,6 +29,7 @@ import (
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/metrics"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/filter"
        "github.com/google/go-cmp/cmp"
 )
 
@@ -44,6 +45,7 @@ func init() {
        beam.RegisterFunction(dofn2x1)
        beam.RegisterFunction(dofn3x1)
        beam.RegisterFunction(dofn2x2KV)
+       beam.RegisterFunction(dofnMultiMap)
        beam.RegisterFunction(dofn2)
        beam.RegisterFunction(dofnKV)
        beam.RegisterFunction(dofnKV2)
@@ -121,6 +123,16 @@ func dofn2x2KV(imp []byte, iter func(*string, *int64) 
bool, emitK func(string),
        emitV(sum)
 }
 
+func dofnMultiMap(key string, lookup func(string) func(*int64) bool, emitK 
func(string), emitV func(int64)) {
+       var v, sum int64
+       iter := lookup(key)
+       for iter(&v) {
+               sum += v
+       }
+       emitK(key)
+       emitV(sum)
+}
+
 // int64Check validates that within a single bundle,
 // we received the expected int64 values.
 type int64Check struct {
@@ -446,6 +458,24 @@ func TestRunner_Pipelines(t *testing.T) {
                        t.Fatal(err)
                }
        })
+       t.Run("sideinput_multimap", func(t *testing.T) {
+               p, s := beam.NewPipelineWithRoot()
+               imp := beam.Impulse(s)
+               col1 := beam.ParDo(s, dofnKV, imp)
+               keys := filter.Distinct(s, beam.DropValue(s, col1))
+               ks, sum := beam.ParDo2(s, dofnMultiMap, keys, 
beam.SideInput{Input: col1})
+               beam.ParDo(s, &stringCheck{
+                       Name: "iterKV sideinput check K",
+                       Want: []string{"a", "b"},
+               }, ks)
+               beam.ParDo(s, &int64Check{
+                       Name: "iterKV sideinput check V",
+                       Want: []int{9, 12},
+               }, sum)
+               if _, err := executeWithT(context.Background(), t, p); err != 
nil {
+                       t.Fatal(err)
+               }
+       })
        // Validates the waiting on side input readiness in buffer.
        t.Run("sideinput_2iterable", func(t *testing.T) {
                p, s := beam.NewPipelineWithRoot()

Reply via email to