[
https://issues.apache.org/jira/browse/BEAM-4726?focusedWorklogId=128771&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-128771
]
ASF GitHub Bot logged work on BEAM-4726:
----------------------------------------
Author: ASF GitHub Bot
Created on: 30/Jul/18 16:05
Start Date: 30/Jul/18 16:05
Worklog Time Spent: 10m
Work Description: herohde closed pull request #5882: [BEAM-4726] Cache
fixed per function Invoke values
URL: https://github.com/apache/beam/pull/5882
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/sdks/go/pkg/beam/core/runtime/exec/fn.go
b/sdks/go/pkg/beam/core/runtime/exec/fn.go
index 25a069100ee..796a871f7e5 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/fn.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/fn.go
@@ -40,29 +40,83 @@ func Invoke(ctx context.Context, ws []typex.Window, ts
typex.EventTime, fn *func
if fn == nil {
return nil, nil // ok: nothing to Invoke
}
+ inv := newInvoker(fn)
+ return inv.Invoke(ctx, ws, ts, opt, extra...)
+}
- // (1) Populate contexts
+// InvokeWithoutEventTime runs the given function at time 0 in the global
window.
+func InvokeWithoutEventTime(ctx context.Context, fn *funcx.Fn, opt *MainInput,
extra ...interface{}) (*FullValue, error) {
+ return Invoke(ctx, window.SingleGlobalWindow, mtime.ZeroTimestamp, fn,
opt, extra...)
+}
- args := make([]interface{}, len(fn.Param))
+// invoker is a container struct for hot path invocations of DoFns, to avoid
+// repeating fixed set up per element.
+type invoker struct {
+ fn *funcx.Fn
+ args []interface{}
+ // TODO(lostluck): 2018/07/06 consider replacing with a slice of
functions to run over the args slice, as an improvement.
+ ctxIdx, wndIdx, etIdx int // specialized input indexes
+ outEtIdx, errIdx int // specialized output indexes
+ in, out []int // general indexes
+}
- if index, ok := fn.Context(); ok {
- args[index] = ctx
+func newInvoker(fn *funcx.Fn) *invoker {
+ n := &invoker{
+ fn: fn,
+ args: make([]interface{}, len(fn.Param)),
+ in: fn.Params(funcx.FnValue | funcx.FnIter | funcx.FnReIter |
funcx.FnEmit),
+ out: fn.Returns(funcx.RetValue),
+ }
+ var ok bool
+ if n.ctxIdx, ok = fn.Context(); !ok {
+ n.ctxIdx = -1
+ }
+ if n.wndIdx, ok = fn.Window(); !ok {
+ n.wndIdx = -1
}
- if index, ok := fn.Window(); ok {
+ if n.etIdx, ok = fn.EventTime(); !ok {
+ n.etIdx = -1
+ }
+ if n.outEtIdx, ok = fn.OutEventTime(); !ok {
+ n.outEtIdx = -1
+ }
+ if n.errIdx, ok = fn.Error(); !ok {
+ n.errIdx = -1
+ }
+ return n
+}
+
+// Reset zeroes argument entries in the cached slice to allow values to be
garbage collected after the bundle ends.
+func (n *invoker) Reset() {
+ for i := range n.args {
+ n.args[i] = nil
+ }
+}
+
+// Invoke invokes the fn with the given values. The extra values must match
the non-main
+// side input and emitters. It returns the direct output, if any.
+func (n *invoker) Invoke(ctx context.Context, ws []typex.Window, ts
typex.EventTime, opt *MainInput, extra ...interface{}) (*FullValue, error) {
+ // (1) Populate contexts
+ // extract these to make things easier to read.
+ args := n.args
+ fn := n.fn
+ in := n.in
+
+ if n.ctxIdx >= 0 {
+ args[n.ctxIdx] = ctx
+ }
+ if n.wndIdx >= 0 {
if len(ws) != 1 {
return nil, fmt.Errorf("DoFns that observe windows must
be invoked with single window: %v", opt.Key.Windows)
}
- args[index] = ws[0]
+ args[n.wndIdx] = ws[0]
}
- if index, ok := fn.EventTime(); ok {
- args[index] = ts
+ if n.etIdx >= 0 {
+ args[n.etIdx] = ts
}
// (2) Main input from value, if any.
-
- in := fn.Params(funcx.FnValue | funcx.FnIter | funcx.FnReIter |
funcx.FnEmit)
i := 0
-
if opt != nil {
args[in[i]] = Convert(opt.Key.Elm, fn.Param[in[i]].T)
i++
@@ -88,30 +142,30 @@ func Invoke(ctx context.Context, ws []typex.Window, ts
typex.EventTime, fn *func
}
// (3) Precomputed side input and emitters (or other output).
-
+ // TODO(lostluck): 2018/07/10 extras (emitters and side inputs), are
constant so we could
+ // initialize them once at construction time, and not clear them in
Reset.
for _, arg := range extra {
args[in[i]] = arg
i++
}
// (4) Invoke
-
ret, err := reflectx.CallNoPanic(fn.Fn, args)
if err != nil {
return nil, err
}
- if index, ok := fn.Error(); ok && ret[index] != nil {
- return nil, ret[index].(error)
+ if n.errIdx >= 0 && ret[n.errIdx] != nil {
+ return nil, ret[n.errIdx].(error)
}
// (5) Return direct output, if any. Input timestamp and windows are
implicitly
// propagated.
- out := fn.Returns(funcx.RetValue)
+ out := n.out
if len(out) > 0 {
value := &FullValue{Windows: ws, Timestamp: ts}
- if index, ok := fn.OutEventTime(); ok {
- value.Timestamp = ret[index].(typex.EventTime)
+ if n.outEtIdx >= 0 {
+ value.Timestamp = ret[n.outEtIdx].(typex.EventTime)
}
// TODO(herohde) 4/16/2018: apply windowing function to
elements with explicit timestamp?
@@ -125,10 +179,6 @@ func Invoke(ctx context.Context, ws []typex.Window, ts
typex.EventTime, fn *func
return nil, nil
}
-func InvokeWithoutEventTime(ctx context.Context, fn *funcx.Fn, opt *MainInput,
extra ...interface{}) (*FullValue, error) {
- return Invoke(ctx, window.SingleGlobalWindow, mtime.ZeroTimestamp, fn,
opt, extra...)
-}
-
func makeSideInputs(fn *funcx.Fn, in []*graph.Inbound, side []ReStream)
([]ReusableInput, error) {
if len(side) == 0 {
return nil, nil // ok: no side input
diff --git a/sdks/go/pkg/beam/core/runtime/exec/fn_test.go
b/sdks/go/pkg/beam/core/runtime/exec/fn_test.go
index b52cf096bd4..7dfdb99d9d2 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/fn_test.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/fn_test.go
@@ -17,6 +17,7 @@ package exec
import (
"context"
+ "fmt"
"reflect"
"testing"
"time"
@@ -127,6 +128,141 @@ func TestInvoke(t *testing.T) {
// Benchmarks
+// Invoke is implemented as a single use of a cached invoker, so a measure of
+// correctness is that the cached version takes less time per operation than
the
+// single use version.
+//
+// NOTE(lostluck) 2018/07/24: run on a amd64 linux desktop
+//
+// BenchmarkInvoke/SingleInvoker_Void_function-12 5000000
300 ns/op 128 B/op 1 allocs/op
+// BenchmarkInvoke/CachedInvoker_Void_function-12 10000000
195 ns/op 0 B/op 0 allocs/op
+//
BenchmarkInvoke/SingleInvoker_Contexts_and_errors_are_allowed_and_handled-12
2000000 967 ns/op 256 B/op 6
allocs/op
+//
BenchmarkInvoke/CachedInvoker_Contexts_and_errors_are_allowed_and_handled-12
2000000 854 ns/op 112 B/op 4
allocs/op
+// BenchmarkInvoke/SingleInvoker_Sum-12
2000000 805 ns/op 464 B/op
11 allocs/op
+// BenchmarkInvoke/CachedInvoker_Sum-12
3000000 523 ns/op 224 B/op
5 allocs/op
+// BenchmarkInvoke/SingleInvoker_Concat-12
2000000 892 ns/op 504 B/op
12 allocs/op
+// BenchmarkInvoke/CachedInvoker_Concat-12
2000000 594 ns/op 259 B/op
6 allocs/op
+// BenchmarkInvoke/SingleInvoker_Length_(slice_type)-12
2000000 661 ns/op 384 B/op
9 allocs/op
+// BenchmarkInvoke/CachedInvoker_Length_(slice_type)-12
3000000 492 ns/op 224 B/op
5 allocs/op
+// BenchmarkInvoke/SingleInvoker_Emitter-12
3000000 412 ns/op 184 B/op
4 allocs/op
+// BenchmarkInvoke/CachedInvoker_Emitter-12
5000000 268 ns/op 32 B/op
1 allocs/op
+// BenchmarkInvoke/SingleInvoker_Side_input-12
2000000 743 ns/op 392 B/op
11 allocs/op
+// BenchmarkInvoke/CachedInvoker_Side_input-12
3000000 506 ns/op 200 B/op
6 allocs/op
+// BenchmarkInvoke/SingleInvoker_Sum_as_Main-12
2000000 810 ns/op 464 B/op
11 allocs/op
+// BenchmarkInvoke/CachedInvoker_Sum_as_Main-12
3000000 548 ns/op 224 B/op
5 allocs/op
+// BenchmarkInvoke/SingleInvoker_Sum_as_Main_KV-12
2000000 823 ns/op 464 B/op
11 allocs/op
+// BenchmarkInvoke/CachedInvoker_Sum_as_Main_KV-12
3000000 547 ns/op 224 B/op
5 allocs/op
+// BenchmarkInvoke/SingleInvoker_EventTime-12
2000000 711 ns/op 376 B/op
10 allocs/op
+// BenchmarkInvoke/CachedInvoker_EventTime-12
3000000 513 ns/op 200 B/op
6 allocs/op
+// BenchmarkInvoke/SingleInvoker_Window-12
1000000 1023 ns/op 368 B/op
9 allocs/op
+// BenchmarkInvoke/CachedInvoker_Window-12
2000000 838 ns/op 192 B/op
5 allocs/op
+
+func BenchmarkInvoke(b *testing.B) {
+ tests := []struct {
+ Name string
+ Fn interface{}
+ Opt *MainInput
+ Args []interface{}
+ Expected interface{}
+ }{
+ {
+ Name: "Void function",
+ Fn: func() {},
+ },
+ {
+ Name: "Contexts and errors are allowed and handled",
+ Fn: func(ctx context.Context) error { return nil },
+ },
+ {
+ Name: "Sum",
+ Fn: func(a, b, c int) int { return a + b + c },
+ Args: []interface{}{1, 2, 3},
+ Expected: 6,
+ },
+ {
+ Name: "Concat",
+ Fn: func(a, b, c string) string { return a + b +
c },
+ Args: []interface{}{"a", "b", "c"},
+ Expected: "abc",
+ },
+ {
+ Name: "Length (slice type)",
+ Fn: func(list []int) (int, error) { return
len(list), nil },
+ Args: []interface{}{[]int{1, 2, 3}},
+ Expected: 3,
+ },
+ {
+ Name: "Emitter",
+ Fn: func(emit func(int)) { emit(1) },
+ Args: []interface{}{func(int) {}},
+ },
+ {
+ Name: "Side input",
+ Fn: func(a int, get func(*int) bool) int {
+ var ret int
+ if !get(&ret) {
+ return a
+ }
+ return ret
+ },
+ Args: []interface{}{1, func(out *int) bool { *out =
2; return true }},
+ Expected: 2,
+ },
+ {
+ Name: "Sum as Main",
+ Fn: func(a, b, c int) int { return a + b + c },
+ Opt: &MainInput{Key: FullValue{Elm: 1}},
+ Args: []interface{}{2, 3},
+ Expected: 6,
+ },
+ {
+ Name: "Sum as Main KV",
+ Fn: func(a, b, c int) int { return a + b + c },
+ Opt: &MainInput{Key: FullValue{Elm: 1, Elm2: 2}},
+ Args: []interface{}{3},
+ Expected: 6,
+ },
+ {
+ Name: "EventTime",
+ Fn: func(ts typex.EventTime, a int) int { return
int(ts.Milliseconds()) + a },
+ Opt: &MainInput{Key: FullValue{Elm: 1}},
+ Expected: 3,
+ },
+ {
+ Name: "Window",
+ Fn: func(w typex.Window, a int) int64 { return
w.MaxTimestamp().Milliseconds() },
+ Opt: &MainInput{Key: FullValue{Elm: 1}},
+ Expected: mtime.EndOfGlobalWindowTime.Milliseconds(),
+ },
+ }
+
+ for _, test := range tests {
+ fn, err := funcx.New(reflectx.MakeFunc(test.Fn))
+ if err != nil {
+ b.Fatalf("function not valid: %v", err)
+ }
+
+ ts := mtime.ZeroTimestamp.Add(2 * time.Millisecond)
+ b.Run(fmt.Sprintf("SingleInvoker_%s", test.Name), func(b
*testing.B) {
+ for i := 0; i < b.N; i++ {
+ _, err := Invoke(context.Background(),
window.SingleGlobalWindow, ts, fn, test.Opt, test.Args...)
+ if err != nil {
+ b.Fatalf("Invoke(%v,%v) failed: %v",
fn.Fn.Name(), test.Args, err)
+ }
+ }
+ })
+ b.Run(fmt.Sprintf("CachedInvoker_%s", test.Name), func(b
*testing.B) {
+ inv := newInvoker(fn)
+ for i := 0; i < b.N; i++ {
+ _, err := inv.Invoke(context.Background(),
window.SingleGlobalWindow, ts, test.Opt, test.Args...)
+ if err != nil {
+ b.Fatalf("Invoke(%v,%v) failed: %v",
fn.Fn.Name(), test.Args, err)
+ }
+ }
+ })
+ }
+}
+
// NOTE(herohde) 12/19/2017: example run on a laptop
//
// BenchmarkDirectCall-4 2000000000 0.26 ns/op
diff --git a/sdks/go/pkg/beam/core/runtime/exec/pardo.go
b/sdks/go/pkg/beam/core/runtime/exec/pardo.go
index 9c38a635e6d..41da9f241fa 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/pardo.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/pardo.go
@@ -45,6 +45,8 @@ type ParDo struct {
status Status
err errorx.GuardedError
+
+ inv *invoker
}
func (n *ParDo) ID() UnitID {
@@ -56,6 +58,7 @@ func (n *ParDo) Up(ctx context.Context) error {
return fmt.Errorf("invalid status for pardo %v: %v, want
Initializing", n.UID, n.status)
}
n.status = Up
+ n.inv = newInvoker(n.Fn.ProcessElementFn())
if _, err := InvokeWithoutEventTime(ctx, n.Fn.SetupFn(), nil); err !=
nil {
return n.fail(err)
@@ -96,13 +99,12 @@ func (n *ParDo) ProcessElement(ctx context.Context, elm
FullValue, values ...ReS
}
ctx = metrics.SetPTransformID(ctx, n.PID)
- fn := n.Fn.ProcessElementFn()
// If the function observes windows, we must invoke it for each window.
The expected fast path
// is that either there is a single window or the function doesn't
observes windows.
- if !mustExplodeWindows(fn, elm) {
- val, err := n.invokeDataFn(ctx, elm.Windows, elm.Timestamp, fn,
&MainInput{Key: elm, Values: values})
+ if !mustExplodeWindows(n.inv.fn, elm) {
+ val, err := n.invokeProcessFn(ctx, elm.Windows, elm.Timestamp,
&MainInput{Key: elm, Values: values})
if err != nil {
return n.fail(err)
}
@@ -115,7 +117,7 @@ func (n *ParDo) ProcessElement(ctx context.Context, elm
FullValue, values ...ReS
for _, w := range elm.Windows {
wElm := FullValue{Elm: elm.Elm, Elm2: elm.Elm2,
Timestamp: elm.Timestamp, Windows: []typex.Window{w}}
- val, err := n.invokeDataFn(ctx, wElm.Windows,
wElm.Timestamp, fn, &MainInput{Key: wElm, Values: values})
+ val, err := n.invokeProcessFn(ctx, wElm.Windows,
wElm.Timestamp, &MainInput{Key: wElm, Values: values})
if err != nil {
return n.fail(err)
}
@@ -144,6 +146,7 @@ func (n *ParDo) FinishBundle(ctx context.Context) error {
return fmt.Errorf("invalid status for pardo %v: %v, want
Active", n.UID, n.status)
}
n.status = Up
+ n.inv.Reset()
if _, err := n.invokeDataFn(ctx, window.SingleGlobalWindow,
mtime.ZeroTimestamp, n.Fn.FinishBundleFn(), nil); err != nil {
return n.fail(err)
@@ -193,28 +196,60 @@ func (n *ParDo) initIfNeeded() error {
return err
}
+// invokeDataFn handle non-per element invocations.
func (n *ParDo) invokeDataFn(ctx context.Context, ws []typex.Window, ts
typex.EventTime, fn *funcx.Fn, opt *MainInput) (*FullValue, error) {
if fn == nil {
return nil, nil
}
+ if err := n.preInvoke(ctx, ws, ts); err != nil {
+ return nil, err
+ }
+ val, err := Invoke(ctx, ws, ts, fn, opt, n.extra...)
+ if err != nil {
+ return nil, err
+ }
+ if err := n.postInvoke(); err != nil {
+ return nil, err
+ }
+ return val, nil
+}
+
+// invokeProcessFn handles the per element invocations
+func (n *ParDo) invokeProcessFn(ctx context.Context, ws []typex.Window, ts
typex.EventTime, opt *MainInput) (*FullValue, error) {
+ if err := n.preInvoke(ctx, ws, ts); err != nil {
+ return nil, err
+ }
+ val, err := n.inv.Invoke(ctx, ws, ts, opt, n.extra...)
+ if err != nil {
+ return nil, err
+ }
+ if err := n.postInvoke(); err != nil {
+ return nil, err
+ }
+ return val, nil
+}
+func (n *ParDo) preInvoke(ctx context.Context, ws []typex.Window, ts
typex.EventTime) error {
for _, e := range n.emitters {
if err := e.Init(ctx, ws, ts); err != nil {
- return nil, err
+ return err
}
}
for _, s := range n.sideinput {
if err := s.Init(); err != nil {
- return nil, err
+ return err
}
}
- val, err := Invoke(ctx, ws, ts, fn, opt, n.extra...)
+ return nil
+}
+
+func (n *ParDo) postInvoke() error {
for _, s := range n.sideinput {
if err := s.Reset(); err != nil {
- return nil, err
+ return err
}
}
- return val, err
+ return nil
}
func (n *ParDo) fail(err error) error {
diff --git a/sdks/go/pkg/beam/core/runtime/exec/pardo_test.go
b/sdks/go/pkg/beam/core/runtime/exec/pardo_test.go
index 731a5cd7f65..0181279969c 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/pardo_test.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/pardo_test.go
@@ -104,7 +104,7 @@ func emitSumFn(n int, emit func(int)) {
// BenchmarkParDo_EmitSumFn measures the overhead of invoking a ParDo in a
plan.
//
// On @lostluck's desktop:
-// BenchmarkParDo_EmitSumFn-12 1000000 1606 ns/op
585 B/op 7 allocs/op
+// BenchmarkParDo_EmitSumFn-12 1000000 1202 ns/op
529 B/op 4 allocs/op
func BenchmarkParDo_EmitSumFn(b *testing.B) {
fn, err := graph.NewDoFn(emitSumFn)
if err != nil {
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 128771)
Time Spent: 7h 20m (was: 7h 10m)
> Reduce ParDo per element Invoke overhead
> ----------------------------------------
>
> Key: BEAM-4726
> URL: https://issues.apache.org/jira/browse/BEAM-4726
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-go
> Reporter: Robert Burke
> Assignee: Robert Burke
> Priority: Major
> Time Spent: 7h 20m
> Remaining Estimate: 0h
>
> Each call to invoke allocates a new args []interface{}, but the common case
> is to run the same ProcessElement function over and again. It should be
> possible to have a container struct to retain the args slice, and avoid
> recomputing the indices for where to assign parameters before calling the
> ProcessElementFn.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)