[
https://issues.apache.org/jira/browse/BEAM-5382?focusedWorklogId=145790&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-145790
]
ASF GitHub Bot logged work on BEAM-5382:
----------------------------------------
Author: ASF GitHub Bot
Created on: 19/Sep/18 20:19
Start Date: 19/Sep/18 20:19
Worklog Time Spent: 10m
Work Description: herohde closed pull request #6434: [BEAM-5382] Add
fallback for non-binary MergeAccumulatorsFn
URL: https://github.com/apache/beam/pull/6434
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/sdks/go/pkg/beam/core/runtime/exec/combine.go
b/sdks/go/pkg/beam/core/runtime/exec/combine.go
index 99d23343b7c..b9a6eb668ee 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/combine.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/combine.go
@@ -36,7 +36,7 @@ type Combine struct {
UsesKey bool
Out Node
- mergeFn reflectx.Func2x1 // optimized caller in the case of binary
merge accumulators
+ binaryMergeFn reflectx.Func2x1 // optimized caller in the case of
binary merge accumulators
status Status
err errorx.GuardedError
@@ -59,11 +59,32 @@ func (n *Combine) Up(ctx context.Context) error {
}
if n.Fn.AddInputFn() == nil {
- n.mergeFn = reflectx.ToFunc2x1(n.Fn.MergeAccumulatorsFn().Fn)
+ n.optimizeMergeFn()
}
return nil
}
+func (n *Combine) optimizeMergeFn() {
+ typ := n.Fn.MergeAccumulatorsFn().Fn.Type()
+ if typ.NumIn() == 2 && typ.NumOut() == 1 {
+ n.binaryMergeFn =
reflectx.ToFunc2x1(n.Fn.MergeAccumulatorsFn().Fn)
+ }
+}
+
+func (n *Combine) mergeAccumulators(ctx context.Context, a, b interface{})
(interface{}, error) {
+ if n.binaryMergeFn != nil {
+ // Fast path for binary MergeAccumulatorsFn
+ return n.binaryMergeFn.Call2x1(a, b), nil
+ }
+
+ in := &MainInput{Key: FullValue{Elm: a}}
+ val, err := InvokeWithoutEventTime(ctx, n.Fn.MergeAccumulatorsFn(), in,
b)
+ if err != nil {
+ return nil, n.fail(fmt.Errorf("MergeAccumulators failed: %v",
err))
+ }
+ return val.Elm, nil
+}
+
// StartBundle initializes processing this bundle for combines.
func (n *Combine) StartBundle(ctx context.Context, id string, data
DataContext) error {
if n.status != Up {
@@ -179,7 +200,7 @@ func (n *Combine) addInput(ctx context.Context, accum, key,
value interface{}, t
// TODO(herohde) 7/5/2017: do we want to allow addInput to be
optional
// if non-binary merge is defined?
- return n.mergeFn.Call2x1(accum, value), nil
+ return n.mergeAccumulators(ctx, accum, value)
}
opt := &MainInput{
@@ -355,7 +376,10 @@ func (n *MergeAccumulators) ProcessElement(ctx
context.Context, value FullValue,
first = false
continue
}
- a = n.mergeFn.Call2x1(a, v.Elm)
+ a, err = n.mergeAccumulators(ctx, a, v.Elm)
+ if err != nil {
+ return err
+ }
}
return n.Out.ProcessElement(ctx, FullValue{Windows: value.Windows, Elm:
value.Elm, Elm2: a, Timestamp: value.Timestamp})
}
@@ -365,7 +389,7 @@ func (n *MergeAccumulators) Up(ctx context.Context) error {
if err := n.Combine.Up(ctx); err != nil {
return err
}
- n.mergeFn = reflectx.ToFunc2x1(n.Fn.MergeAccumulatorsFn().Fn)
+ n.optimizeMergeFn()
return nil
}
diff --git a/sdks/go/pkg/beam/core/runtime/exec/combine_test.go
b/sdks/go/pkg/beam/core/runtime/exec/combine_test.go
index 02bd3fdcf6e..de263914cf4 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/combine_test.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/combine_test.go
@@ -19,6 +19,7 @@ import (
"context"
"fmt"
"reflect"
+ "runtime"
"strconv"
"testing"
@@ -40,15 +41,26 @@ var tests = []struct {
Expected interface{}
}{
{Fn: mergeFn, AccumCoder: intCoder(reflectx.Int), Input: intInput,
Expected: int(21)},
+ {Fn: nonBinaryMergeFn, AccumCoder: intCoder(reflectx.Int), Input:
intInput, Expected: int(21)},
{Fn: &MyCombine{}, AccumCoder: intCoder(reflectx.Int64), Input:
intInput, Expected: int(21)},
{Fn: &MyOtherCombine{}, AccumCoder: intCoder(reflectx.Int64), Input:
intInput, Expected: "21"},
{Fn: &MyThirdCombine{}, AccumCoder: intCoder(reflectx.Int), Input:
strInput, Expected: int(21)},
+ {Fn: &MyContextCombine{}, AccumCoder: intCoder(reflectx.Int64), Input:
intInput, Expected: int(21)},
+ {Fn: &MyErrorCombine{}, AccumCoder: intCoder(reflectx.Int64), Input:
intInput, Expected: int(21)},
+}
+
+func fnName(x interface{}) string {
+ v := reflect.ValueOf(x)
+ if v.Kind() != reflect.Func {
+ return v.Type().String()
+ }
+ return runtime.FuncForPC(uintptr(v.Pointer())).Name()
}
// TestCombine verifies that the Combine node works correctly.
func TestCombine(t *testing.T) {
for _, test := range tests {
- t.Run(reflect.TypeOf(test.Fn).Name(), func(t *testing.T) {
+ t.Run(fnName(test.Fn), func(t *testing.T) {
edge := getCombineEdge(t, test.Fn, test.AccumCoder)
out := &CaptureNode{UID: 1}
@@ -69,7 +81,7 @@ func TestCombine(t *testing.T) {
// ExtractOutput nodes work correctly after the lift has been performed.
func TestLiftedCombine(t *testing.T) {
for _, test := range tests {
- t.Run(reflect.TypeOf(test.Fn).Name(), func(t *testing.T) {
+ t.Run(fnName(test.Fn), func(t *testing.T) {
edge := getCombineEdge(t, test.Fn, test.AccumCoder)
out := &CaptureNode{UID: 1}
@@ -101,7 +113,7 @@ func getCombineEdge(t *testing.T, cfn interface{}, ac
*coder.Coder) *graph.Multi
// This makes the assumption that the AddInput function is
unkeyed.
vtype = fn.AddInputFn().Param[1].T
} else {
- vtype = fn.MergeAccumulatorsFn().Param[0].T
+ vtype = fn.MergeAccumulatorsFn().Param[1].T
}
inT := typex.NewCoGBK(typex.New(reflectx.Int), typex.New(vtype))
in := g.NewNode(inT, window.DefaultWindowingStrategy(), true)
@@ -163,6 +175,13 @@ func mergeFn(a, b int) int {
return a + b
}
+// nonBinaryMergeFn represents a combine with a context parameter and an error
return, where
+//
+// InputT == OutputT == AccumT == int
+func nonBinaryMergeFn(ctx context.Context, a, b int) (int, error) {
+ return a + b, nil
+}
+
// MyCombine represents a combine with the same Input and Output type (int),
but a
// distinct accumulator type (int64).
//
@@ -214,6 +233,32 @@ func (*MyThirdCombine) MergeAccumulators(a, b int) int {
return a + b
}
+// MyContextCombine is the same as MyCombine, but requires a context parameter.
+//
+// InputT == int
+// AccumT == int64
+// OutputT == string
+type MyContextCombine struct {
+ MyCombine // Embedding to re-use the exisitng AddInput implementations
+}
+
+func (*MyContextCombine) MergeAccumulators(_ context.Context, a, b int64)
int64 {
+ return a + b
+}
+
+// MyErrorCombine is the same as MyCombine, but may return an error.
+//
+// InputT == int
+// AccumT == int64
+// OutputT == string
+type MyErrorCombine struct {
+ MyCombine // Embedding to re-use the exisitng AddInput implementations
+}
+
+func (*MyErrorCombine) MergeAccumulators(a, b int64) (int64, error) {
+ return a + b, nil
+}
+
func intCoder(t reflect.Type) *coder.Coder {
c, err := coderx.NewVarIntZ(t)
if err != nil {
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 145790)
Time Spent: 40m (was: 0.5h)
> Combiner panics at runtime if MergeAccumulators has a context parameter
> -----------------------------------------------------------------------
>
> Key: BEAM-5382
> URL: https://issues.apache.org/jira/browse/BEAM-5382
> Project: Beam
> Issue Type: Improvement
> Components: sdk-go
> Reporter: Cody Schroeder
> Assignee: Cody Schroeder
> Priority: Major
> Time Spent: 40m
> Remaining Estimate: 0h
>
> [combine.go#L62|https://github.com/apache/beam/blob/14ef23c/sdks/go/pkg/beam/core/runtime/exec/combine.go#L62]
> assumes that a combiner's {{MergeAccumulators}} function must be 2x1 but
> {{TryCombinePerKey}} accepts combiner functions with context parameters. I
> believe accepting context parameters is the correct behavior overall.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)