This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new bfc71b0 [BEAM-3293] Add initial MultiMap side input backing type
(#15975)
bfc71b0 is described below
commit bfc71b0c6700bff51474824199cea6aabe5b43f3
Author: Jack McCluskey <[email protected]>
AuthorDate: Mon Nov 15 13:48:17 2021 -0500
[BEAM-3293] Add initial MultiMap side input backing type (#15975)
---
sdks/go/pkg/beam/core/runtime/exec/input.go | 46 +++++++++++++++++++++++++++++
1 file changed, 46 insertions(+)
diff --git a/sdks/go/pkg/beam/core/runtime/exec/input.go
b/sdks/go/pkg/beam/core/runtime/exec/input.go
index 98922a2..37fb3b8 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/input.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/input.go
@@ -16,6 +16,7 @@
package exec
import (
+ "context"
"fmt"
"io"
"reflect"
@@ -196,3 +197,48 @@ func (v *fixedValue) Value() interface{} {
func (v *fixedValue) Reset() error {
return nil
}
+
+type multiMapValue struct {
+ t reflect.Type
+ keyType reflect.Type
+ // These four things are needed to dynamically build the iterables
+ ctx context.Context
+ adapter SideInputAdapter
+ reader StateReader
+ w typex.Window
+ // fn is the actual invoked function
+ fn interface{}
+}
+
+func makeMultiMap(ctx context.Context, t reflect.Type, adapter
SideInputAdapter, reader StateReader, w typex.Window) ReusableInput {
+ types, ok := funcx.UnfoldMultiMap(t)
+ if !ok {
+ panic(fmt.Sprintf("illegal multimap type: %v", t))
+ }
+ mm := &multiMapValue{t: t, keyType: types[0], ctx: ctx, adapter:
adapter, reader: reader, w: w}
+ mm.fn = reflect.MakeFunc(t, mm.invoke).Interface()
+ return mm
+}
+
+func (v *multiMapValue) Init() error {
+ return nil
+}
+
+func (v *multiMapValue) Value() interface{} {
+ return v.fn
+}
+
+func (v *multiMapValue) Reset() error {
+ return nil
+}
+
+func (v *multiMapValue) invoke(args []reflect.Value) []reflect.Value {
+ if len(args) != 1 {
+ panic(fmt.Sprintf("wanted one key value, got %v", args))
+ }
+ key := reflect.ValueOf(Convert(args[0], v.keyType))
+ rs, _ := v.adapter.NewKeyedIterable(v.ctx, v.reader, v.w, key)
+ iter := makeIter(v.t.Out(0), rs)
+ iter.Init()
+ return []reflect.Value{reflect.ValueOf(iter.Value())}
+}