[ 
https://issues.apache.org/jira/browse/BEAM-4726?focusedWorklogId=118812&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-118812
 ]

ASF GitHub Bot logged work on BEAM-4726:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 03/Jul/18 20:04
            Start Date: 03/Jul/18 20:04
    Worklog Time Spent: 10m 
      Work Description: lostluck closed pull request #5882: [BEAM-4726] Cache 
fixed per function Invoke values
URL: https://github.com/apache/beam/pull/5882
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/go/pkg/beam/core/runtime/exec/fn.go 
b/sdks/go/pkg/beam/core/runtime/exec/fn.go
index 25a069100ee..b997df3b35c 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/fn.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/fn.go
@@ -125,10 +125,148 @@ func Invoke(ctx context.Context, ws []typex.Window, ts 
typex.EventTime, fn *func
        return nil, nil
 }
 
+// InvokeWithoutEventTime runs the given function at time 0 in the global 
window.
 func InvokeWithoutEventTime(ctx context.Context, fn *funcx.Fn, opt *MainInput, 
extra ...interface{}) (*FullValue, error) {
        return Invoke(ctx, window.SingleGlobalWindow, mtime.ZeroTimestamp, fn, 
opt, extra...)
 }
 
+// invoker is a container class for hot path invocations of DoFns, to avoid
+// repeating fixed set up per element.
+type invoker struct {
+       fn   *funcx.Fn
+       args []interface{}
+       // replace with a slice of functions to run over the args slice? 
Profile and find out!
+       ctxIdx, wndIdx, etIdx int   // specialized input indexes
+       outEtIdx, errIdx      int   // specialized output indexes
+       in, out               []int // general indexes
+}
+
+func newInvoker(fn *funcx.Fn) *invoker {
+       n := &invoker{
+               fn:       fn,
+               args:     make([]interface{}, len(fn.Param)),
+               ctxIdx:   -1,
+               wndIdx:   -1,
+               etIdx:    -1,
+               outEtIdx: -1,
+               errIdx:   -1,
+               in:       fn.Params(funcx.FnValue | funcx.FnIter | 
funcx.FnReIter | funcx.FnEmit),
+               out:      fn.Returns(funcx.RetValue),
+       }
+       if index, ok := fn.Context(); ok {
+               n.ctxIdx = index
+       }
+       if index, ok := fn.Window(); ok {
+               n.wndIdx = index
+       }
+       if index, ok := fn.EventTime(); ok {
+               n.etIdx = index
+       }
+       if index, ok := fn.OutEventTime(); ok {
+               n.outEtIdx = index
+       }
+       if index, ok := fn.Error(); ok {
+               n.errIdx = index
+       }
+       return n
+}
+
+// ClearArgs zeroes argument entries in the cached slice to allow values to be 
garbage collected after the bundle ends.
+func (n *invoker) ClearArgs() {
+       for i := range n.args {
+               n.args[i] = nil
+       }
+}
+
+// Invoke invokes the fn with the given values. The extra values must match 
the non-main
+// side input and emitters. It returns the direct output, if any.
+func (n *invoker) Invoke(ctx context.Context, ws []typex.Window, ts 
typex.EventTime, opt *MainInput, extra ...interface{}) (*FullValue, error) {
+       // (1) Populate contexts
+       // extract these to make things easier to read.
+       args := n.args
+       fn := n.fn
+       in := n.in
+
+       if n.ctxIdx >= 0 {
+               args[n.ctxIdx] = ctx
+       }
+       if n.wndIdx >= 0 {
+               if len(ws) != 1 {
+                       return nil, fmt.Errorf("DoFns that observe windows must 
be invoked with single window: %v", opt.Key.Windows)
+               }
+               args[n.wndIdx] = ws[0]
+       }
+       if n.etIdx >= 0 {
+               args[n.etIdx] = ts
+       }
+
+       // (2) Main input from value, if any.
+       i := 0
+       if opt != nil {
+               args[in[i]] = Convert(opt.Key.Elm, fn.Param[in[i]].T)
+               i++
+               if opt.Key.Elm2 != nil {
+                       args[in[i]] = Convert(opt.Key.Elm2, fn.Param[in[i]].T)
+                       i++
+               }
+
+               for _, iter := range opt.Values {
+                       param := fn.Param[in[i]]
+
+                       if param.Kind != funcx.FnIter {
+                               return nil, fmt.Errorf("GBK/CoGBK result values 
must be iterable: %v", param)
+                       }
+
+                       // TODO(herohde) 12/12/2017: allow form conversion on 
GBK results?
+
+                       it := makeIter(param.T, iter)
+                       it.Init()
+                       args[in[i]] = it.Value()
+                       i++
+               }
+       }
+
+       // (3) Precomputed side input and emitters (or other output).
+
+       for _, arg := range extra {
+               args[in[i]] = arg
+               i++
+       }
+
+       // (4) Invoke
+       ret, err := reflectx.CallNoPanic(fn.Fn, args)
+       if err != nil {
+               return nil, err
+       }
+       if n.errIdx >= 0 && ret[n.errIdx] != nil {
+               return nil, ret[n.errIdx].(error)
+       }
+
+       // (5) Return direct output, if any. Input timestamp and windows are 
implicitly
+       // propagated.
+
+       out := n.out
+       if len(out) > 0 {
+               value := &FullValue{Windows: ws, Timestamp: ts}
+               if n.outEtIdx >= 0 {
+                       value.Timestamp = ret[n.outEtIdx].(typex.EventTime)
+               }
+               // TODO(herohde) 4/16/2018: apply windowing function to 
elements with explicit timestamp?
+
+               value.Elm = ret[out[0]]
+               if len(out) > 1 {
+                       value.Elm2 = ret[out[1]]
+               }
+               return value, nil
+       }
+
+       return nil, nil
+}
+
+func (n *invoker) InvokeWithoutEventTime(ctx context.Context, opt *MainInput, 
extra ...interface{}) (*FullValue, error) {
+       return n.Invoke(ctx, window.SingleGlobalWindow, mtime.ZeroTimestamp, 
opt, extra...)
+}
+
 func makeSideInputs(fn *funcx.Fn, in []*graph.Inbound, side []ReStream) 
([]ReusableInput, error) {
        if len(side) == 0 {
                return nil, nil // ok: no side input
diff --git a/sdks/go/pkg/beam/core/runtime/exec/pardo.go 
b/sdks/go/pkg/beam/core/runtime/exec/pardo.go
index 9c38a635e6d..9d9da8a656f 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/pardo.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/pardo.go
@@ -45,6 +45,8 @@ type ParDo struct {
 
        status Status
        err    errorx.GuardedError
+
+       inv *invoker
 }
 
 func (n *ParDo) ID() UnitID {
@@ -56,6 +58,7 @@ func (n *ParDo) Up(ctx context.Context) error {
                return fmt.Errorf("invalid status for pardo %v: %v, want 
Initializing", n.UID, n.status)
        }
        n.status = Up
+       n.inv = newInvoker(n.Fn.ProcessElementFn())
 
        if _, err := InvokeWithoutEventTime(ctx, n.Fn.SetupFn(), nil); err != 
nil {
                return n.fail(err)
@@ -102,7 +105,7 @@ func (n *ParDo) ProcessElement(ctx context.Context, elm 
FullValue, values ...ReS
        // is that either there is a single window or the function doesn't 
observes windows.
 
        if !mustExplodeWindows(fn, elm) {
-               val, err := n.invokeDataFn(ctx, elm.Windows, elm.Timestamp, fn, 
&MainInput{Key: elm, Values: values})
+               val, err := n.invokeProcessFn(ctx, elm.Windows, elm.Timestamp, 
&MainInput{Key: elm, Values: values})
                if err != nil {
                        return n.fail(err)
                }
@@ -115,7 +118,7 @@ func (n *ParDo) ProcessElement(ctx context.Context, elm 
FullValue, values ...ReS
                for _, w := range elm.Windows {
                        wElm := FullValue{Elm: elm.Elm, Elm2: elm.Elm2, 
Timestamp: elm.Timestamp, Windows: []typex.Window{w}}
 
-                       val, err := n.invokeDataFn(ctx, wElm.Windows, 
wElm.Timestamp, fn, &MainInput{Key: wElm, Values: values})
+                       val, err := n.invokeProcessFn(ctx, wElm.Windows, 
wElm.Timestamp, &MainInput{Key: wElm, Values: values})
                        if err != nil {
                                return n.fail(err)
                        }
@@ -193,6 +196,7 @@ func (n *ParDo) initIfNeeded() error {
        return err
 }
 
+// invokeDataFn handle non-per element invocations.
 func (n *ParDo) invokeDataFn(ctx context.Context, ws []typex.Window, ts 
typex.EventTime, fn *funcx.Fn, opt *MainInput) (*FullValue, error) {
        if fn == nil {
                return nil, nil
@@ -217,6 +221,27 @@ func (n *ParDo) invokeDataFn(ctx context.Context, ws 
[]typex.Window, ts typex.Ev
        return val, err
 }
 
+// invokeProcessFn handles the per element invocations
+func (n *ParDo) invokeProcessFn(ctx context.Context, ws []typex.Window, ts 
typex.EventTime, opt *MainInput) (*FullValue, error) {
+       for _, e := range n.emitters {
+               if err := e.Init(ctx, ws, ts); err != nil {
+                       return nil, err
+               }
+       }
+       for _, s := range n.sideinput {
+               if err := s.Init(); err != nil {
+                       return nil, err
+               }
+       }
+       val, err := n.inv.Invoke(ctx, ws, ts, opt, n.extra...)
+       for _, s := range n.sideinput {
+               if err := s.Reset(); err != nil {
+                       return nil, err
+               }
+       }
+       return val, err
+}
+
 func (n *ParDo) fail(err error) error {
        n.status = Broken
        n.err.TrySetError(err)
diff --git a/sdks/go/pkg/beam/core/runtime/exec/pardo_test.go 
b/sdks/go/pkg/beam/core/runtime/exec/pardo_test.go
index 731a5cd7f65..0181279969c 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/pardo_test.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/pardo_test.go
@@ -104,7 +104,7 @@ func emitSumFn(n int, emit func(int)) {
 // BenchmarkParDo_EmitSumFn measures the overhead of invoking a ParDo in a 
plan.
 //
 // On @lostluck's desktop:
-// BenchmarkParDo_EmitSumFn-12          1000000              1606 ns/op        
     585 B/op          7 allocs/op
+// BenchmarkParDo_EmitSumFn-12          1000000              1202 ns/op        
     529 B/op          4 allocs/op
 func BenchmarkParDo_EmitSumFn(b *testing.B) {
        fn, err := graph.NewDoFn(emitSumFn)
        if err != nil {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 118812)
    Time Spent: 1h  (was: 50m)

> Reduce ParDo per element Invoke overhead
> ----------------------------------------
>
>                 Key: BEAM-4726
>                 URL: https://issues.apache.org/jira/browse/BEAM-4726
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-go
>            Reporter: Robert Burke
>            Assignee: Robert Burke
>            Priority: Major
>          Time Spent: 1h
>  Remaining Estimate: 0h
>
> Each call to invoke allocates a new args []interface{}, but the common case 
> is to run the same ProcessElement function over and again. It should be 
> possible to have a container struct to retain the args slice, and avoid 
> recomputing the indices for where to assign parameters before calling the 
> ProcessElementFn.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to