This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new bfc71b0  [BEAM-3293] Add initial MultiMap side input backing type 
(#15975)
bfc71b0 is described below

commit bfc71b0c6700bff51474824199cea6aabe5b43f3
Author: Jack McCluskey <[email protected]>
AuthorDate: Mon Nov 15 13:48:17 2021 -0500

    [BEAM-3293] Add initial MultiMap side input backing type (#15975)
---
 sdks/go/pkg/beam/core/runtime/exec/input.go | 46 +++++++++++++++++++++++++++++
 1 file changed, 46 insertions(+)

diff --git a/sdks/go/pkg/beam/core/runtime/exec/input.go 
b/sdks/go/pkg/beam/core/runtime/exec/input.go
index 98922a2..37fb3b8 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/input.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/input.go
@@ -16,6 +16,7 @@
 package exec
 
 import (
+       "context"
        "fmt"
        "io"
        "reflect"
@@ -196,3 +197,48 @@ func (v *fixedValue) Value() interface{} {
 func (v *fixedValue) Reset() error {
        return nil
 }
+
+type multiMapValue struct {
+       t       reflect.Type
+       keyType reflect.Type
+       // These four things are needed to dynamically build the iterables
+       ctx     context.Context
+       adapter SideInputAdapter
+       reader  StateReader
+       w       typex.Window
+       // fn is the actual invoked function
+       fn interface{}
+}
+
+func makeMultiMap(ctx context.Context, t reflect.Type, adapter 
SideInputAdapter, reader StateReader, w typex.Window) ReusableInput {
+       types, ok := funcx.UnfoldMultiMap(t)
+       if !ok {
+               panic(fmt.Sprintf("illegal multimap type: %v", t))
+       }
+       mm := &multiMapValue{t: t, keyType: types[0], ctx: ctx, adapter: 
adapter, reader: reader, w: w}
+       mm.fn = reflect.MakeFunc(t, mm.invoke).Interface()
+       return mm
+}
+
+func (v *multiMapValue) Init() error {
+       return nil
+}
+
+func (v *multiMapValue) Value() interface{} {
+       return v.fn
+}
+
+func (v *multiMapValue) Reset() error {
+       return nil
+}
+
+func (v *multiMapValue) invoke(args []reflect.Value) []reflect.Value {
+       if len(args) != 1 {
+               panic(fmt.Sprintf("wanted one key value, got %v", args))
+       }
+       key := reflect.ValueOf(Convert(args[0], v.keyType))
+       rs, _ := v.adapter.NewKeyedIterable(v.ctx, v.reader, v.w, key)
+       iter := makeIter(v.t.Out(0), rs)
+       iter.Init()
+       return []reflect.Value{reflect.ValueOf(iter.Value())}
+}

Reply via email to