[ https://issues.apache.org/jira/browse/BEAM-4726?focusedWorklogId=121994&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-121994 ]
ASF GitHub Bot logged work on BEAM-4726: ---------------------------------------- Author: ASF GitHub Bot Created on: 11/Jul/18 17:59 Start Date: 11/Jul/18 17:59 Worklog Time Spent: 10m Work Description: lostluck commented on a change in pull request #5882: [BEAM-4726] Cache fixed per function Invoke values URL: https://github.com/apache/beam/pull/5882#discussion_r201787347 ########## File path: sdks/go/pkg/beam/core/runtime/exec/fn.go ########## @@ -125,10 +125,144 @@ func Invoke(ctx context.Context, ws []typex.Window, ts typex.EventTime, fn *func return nil, nil } +// InvokeWithoutEventTime runs the given function at time 0 in the global window. func InvokeWithoutEventTime(ctx context.Context, fn *funcx.Fn, opt *MainInput, extra ...interface{}) (*FullValue, error) { return Invoke(ctx, window.SingleGlobalWindow, mtime.ZeroTimestamp, fn, opt, extra...) } +// invoker is a container struct for hot path invocations of DoFns, to avoid +// repeating fixed set up per element. +type invoker struct { + fn *funcx.Fn + args []interface{} + // TODO(lostluck): 2018/07/06 consider replacing with a slice of functions to run over the args slice, as an improvement. + ctxIdx, wndIdx, etIdx int // specialized input indexes + outEtIdx, errIdx int // specialized output indexes + in, out []int // general indexes +} + +func newInvoker(fn *funcx.Fn) *invoker { + n := &invoker{ + fn: fn, + args: make([]interface{}, len(fn.Param)), + in: fn.Params(funcx.FnValue | funcx.FnIter | funcx.FnReIter | funcx.FnEmit), + out: fn.Returns(funcx.RetValue), + } + var ok bool + if n.ctxIdx, ok = fn.Context(); !ok { + n.ctxIdx = -1 + } + if n.wndIdx, ok = fn.Window(); !ok { + n.wndIdx = -1 + } + if n.etIdx, ok = fn.EventTime(); !ok { + n.etIdx = -1 + } + if n.outEtIdx, ok = fn.OutEventTime(); !ok { + n.outEtIdx = -1 + } + if n.errIdx, ok = fn.Error(); !ok { + n.errIdx = -1 + } + return n +} + +// ClearArgs zeroes argument entries in the cached slice to allow values to be garbage collected after the bundle ends. +func (n *invoker) ClearArgs() { + for i := range n.args { + n.args[i] = nil + } +} + +// Invoke invokes the fn with the given values. The extra values must match the non-main +// side input and emitters. It returns the direct output, if any. +func (n *invoker) Invoke(ctx context.Context, ws []typex.Window, ts typex.EventTime, opt *MainInput, extra ...interface{}) (*FullValue, error) { + // (1) Populate contexts + // extract these to make things easier to read. + args := n.args + fn := n.fn + in := n.in + + if n.ctxIdx >= 0 { + args[n.ctxIdx] = ctx + } + if n.wndIdx >= 0 { + if len(ws) != 1 { + return nil, fmt.Errorf("DoFns that observe windows must be invoked with single window: %v", opt.Key.Windows) + } + args[n.wndIdx] = ws[0] + } + if n.etIdx >= 0 { + args[n.etIdx] = ts + } + + // (2) Main input from value, if any. + i := 0 + if opt != nil { + args[in[i]] = Convert(opt.Key.Elm, fn.Param[in[i]].T) + i++ + if opt.Key.Elm2 != nil { + args[in[i]] = Convert(opt.Key.Elm2, fn.Param[in[i]].T) + i++ + } + + for _, iter := range opt.Values { + param := fn.Param[in[i]] + + if param.Kind != funcx.FnIter { + return nil, fmt.Errorf("GBK/CoGBK result values must be iterable: %v", param) + } + + // TODO(herohde) 12/12/2017: allow form conversion on GBK results? + + it := makeIter(param.T, iter) + it.Init() + args[in[i]] = it.Value() + i++ + } + } + + // (3) Precomputed side input and emitters (or other output). + + for _, arg := range extra { + args[in[i]] = arg + i++ + } + + // (4) Invoke + ret, err := reflectx.CallNoPanic(fn.Fn, args) + if err != nil { + return nil, err + } + if n.errIdx >= 0 && ret[n.errIdx] != nil { + return nil, ret[n.errIdx].(error) + } + + // (5) Return direct output, if any. Input timestamp and windows are implicitly + // propagated. + + out := n.out + if len(out) > 0 { + value := &FullValue{Windows: ws, Timestamp: ts} + if n.outEtIdx >= 0 { + value.Timestamp = ret[n.outEtIdx].(typex.EventTime) + } + // TODO(herohde) 4/16/2018: apply windowing function to elements with explicit timestamp? + + value.Elm = ret[out[0]] + if len(out) > 1 { + value.Elm2 = ret[out[1]] + } + return value, nil + } + + return nil, nil +} + +func (n *invoker) InvokeWithoutEventTime(ctx context.Context, opt *MainInput, extra ...interface{}) (*FullValue, error) { Review comment: Done. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 121994) Time Spent: 5h (was: 4h 50m) > Reduce ParDo per element Invoke overhead > ---------------------------------------- > > Key: BEAM-4726 > URL: https://issues.apache.org/jira/browse/BEAM-4726 > Project: Beam > Issue Type: Sub-task > Components: sdk-go > Reporter: Robert Burke > Assignee: Robert Burke > Priority: Major > Time Spent: 5h > Remaining Estimate: 0h > > Each call to invoke allocates a new args []interface{}, but the common case > is to run the same ProcessElement function over and again. It should be > possible to have a container struct to retain the args slice, and avoid > recomputing the indices for where to assign parameters before calling the > ProcessElementFn. -- This message was sent by Atlassian JIRA (v7.6.3#76005)