This is an automated email from the ASF dual-hosted git repository. lostluck pushed a commit to branch timermgr in repository https://gitbox.apache.org/repos/asf/beam.git
commit 100b013f3493c43d5837e011a264220f91a785a9 Author: lostluck <[email protected]> AuthorDate: Wed Mar 1 09:11:17 2023 -0800 [go-timers] things I'm pretty sure work --- sdks/go/pkg/beam/core/funcx/fn.go | 45 +++++++- sdks/go/pkg/beam/core/graph/edge.go | 1 + sdks/go/pkg/beam/core/graph/fn.go | 80 +++++++++++++- sdks/go/pkg/beam/core/runtime/exec/coder.go | 3 + sdks/go/pkg/beam/core/runtime/exec/coder_test.go | 26 +++-- sdks/go/pkg/beam/core/runtime/exec/fn.go | 90 ++++++++++++---- sdks/go/pkg/beam/core/runtime/exec/pardo.go | 15 ++- sdks/go/pkg/beam/core/runtime/exec/timers.go | 125 ++++++++++++++++++++++ sdks/go/pkg/beam/core/runtime/exec/translate.go | 22 ++++ sdks/go/pkg/beam/core/runtime/graphx/coder.go | 2 +- sdks/go/pkg/beam/core/runtime/graphx/serialize.go | 5 + sdks/go/pkg/beam/core/runtime/graphx/translate.go | 15 +++ sdks/go/pkg/beam/core/timers/timers.go | 119 ++++++++++++++++++++ sdks/go/pkg/beam/core/typex/class.go | 1 + sdks/go/pkg/beam/core/typex/special.go | 3 +- sdks/go/pkg/beam/pardo.go | 12 +++ 16 files changed, 528 insertions(+), 36 deletions(-) diff --git a/sdks/go/pkg/beam/core/funcx/fn.go b/sdks/go/pkg/beam/core/funcx/fn.go index b579cb56d52..3aef5a1695a 100644 --- a/sdks/go/pkg/beam/core/funcx/fn.go +++ b/sdks/go/pkg/beam/core/funcx/fn.go @@ -21,6 +21,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/state" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/timers" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx" "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" @@ -85,6 +86,8 @@ const ( FnWatermarkEstimator FnParamKind = 0x1000 // FnStateProvider indicates a function input parameter that implements state.Provider FnStateProvider FnParamKind = 0x2000 + // FnTimerProvider indicates a function input parameter that implements timer.Provider + FnTimerProvider FnParamKind = 0x4000 ) func (k FnParamKind) String() string { @@ -117,6 +120,8 @@ func (k FnParamKind) String() string { return "WatermarkEstimator" case FnStateProvider: return "StateProvider" + case FnTimerProvider: + return "TimerProvider" default: return fmt.Sprintf("%v", int(k)) } @@ -305,6 +310,15 @@ func (u *Fn) StateProvider() (pos int, exists bool) { return -1, false } +func (u *Fn) TimerProvider() (pos int, exists bool) { + for i, p := range u.Param { + if p.Kind == FnTimerProvider { + return i, true + } + } + return -1, false +} + // WatermarkEstimator returns (index, true) iff the function expects a // parameter that implements sdf.WatermarkEstimator. func (u *Fn) WatermarkEstimator() (pos int, exists bool) { @@ -392,6 +406,8 @@ func New(fn reflectx.Func) (*Fn, error) { kind = FnBundleFinalization case t == state.ProviderType: kind = FnStateProvider + case t == timers.ProviderType: + kind = FnTimerProvider case t == reflectx.Type: kind = FnType case t.Implements(reflect.TypeOf((*sdf.RTracker)(nil)).Elem()): @@ -482,7 +498,7 @@ func SubReturns(list []ReturnParam, indices ...int) []ReturnParam { } // The order of present parameters and return values must be as follows: -// func(FnContext?, FnPane?, FnWindow?, FnEventTime?, FnWatermarkEstimator?, FnType?, FnBundleFinalization?, FnRTracker?, FnStateProvider?, (FnValue, SideInput*)?, FnEmit*) (RetEventTime?, RetOutput?, RetError?) +// func(FnContext?, FnPane?, FnWindow?, FnEventTime?, FnWatermarkEstimator?, FnType?, FnBundleFinalization?, FnRTracker?, FnStateProvider?, FnTimerProvider?, (FnValue, SideInput*)?, FnEmit*) (RetEventTime?, RetOutput?, RetError?) // // where ? indicates 0 or 1, and * indicates any number. // and a SideInput is one of FnValue or FnIter or FnReIter @@ -517,6 +533,7 @@ var ( errRTrackerPrecedence = errors.New("may only have a single sdf.RTracker parameter and it must precede the main input parameter") errBundleFinalizationPrecedence = errors.New("may only have a single BundleFinalization parameter and it must precede the main input parameter") errStateProviderPrecedence = errors.New("may only have a single state.Provider parameter and it must precede the main input parameter") + errTimerProviderPrecedence = errors.New("may only have a single timer.Provider parameter and it must precede the main input parameter") errInputPrecedence = errors.New("inputs parameters must precede emit function parameters") ) @@ -535,6 +552,7 @@ const ( psRTracker psBundleFinalization psStateProvider + psTimerProvider ) func nextParamState(cur paramState, transition FnParamKind) (paramState, error) { @@ -559,6 +577,8 @@ func nextParamState(cur paramState, transition FnParamKind) (paramState, error) return psRTracker, nil case FnStateProvider: return psStateProvider, nil + case FnTimerProvider: + return psTimerProvider, nil } case psContext: switch transition { @@ -578,6 +598,8 @@ func nextParamState(cur paramState, transition FnParamKind) (paramState, error) return psRTracker, nil case FnStateProvider: return psStateProvider, nil + case FnTimerProvider: + return psTimerProvider, nil } case psPane: switch transition { @@ -595,6 +617,8 @@ func nextParamState(cur paramState, transition FnParamKind) (paramState, error) return psRTracker, nil case FnStateProvider: return psStateProvider, nil + case FnTimerProvider: + return psTimerProvider, nil } case psWindow: switch transition { @@ -610,6 +634,8 @@ func nextParamState(cur paramState, transition FnParamKind) (paramState, error) return psRTracker, nil case FnStateProvider: return psStateProvider, nil + case FnTimerProvider: + return psTimerProvider, nil } case psEventTime: switch transition { @@ -623,6 +649,8 @@ func nextParamState(cur paramState, transition FnParamKind) (paramState, error) return psRTracker, nil case FnStateProvider: return psStateProvider, nil + case FnTimerProvider: + return psTimerProvider, nil } case psWatermarkEstimator: switch transition { @@ -634,6 +662,8 @@ func nextParamState(cur paramState, transition FnParamKind) (paramState, error) return psRTracker, nil case FnStateProvider: return psStateProvider, nil + case FnTimerProvider: + return psTimerProvider, nil } case psType: switch transition { @@ -643,6 +673,8 @@ func nextParamState(cur paramState, transition FnParamKind) (paramState, error) return psRTracker, nil case FnStateProvider: return psStateProvider, nil + case FnTimerProvider: + return psTimerProvider, nil } case psBundleFinalization: switch transition { @@ -650,13 +682,22 @@ func nextParamState(cur paramState, transition FnParamKind) (paramState, error) return psRTracker, nil case FnStateProvider: return psStateProvider, nil + case FnTimerProvider: + return psTimerProvider, nil } case psRTracker: switch transition { case FnStateProvider: return psStateProvider, nil + case FnTimerProvider: + return psTimerProvider, nil } case psStateProvider: + switch transition { + case FnTimerProvider: + return psTimerProvider, nil + } + case psTimerProvider: // Completely handled by the default clause case psInput: switch transition { @@ -689,6 +730,8 @@ func nextParamState(cur paramState, transition FnParamKind) (paramState, error) return -1, errRTrackerPrecedence case FnStateProvider: return -1, errStateProviderPrecedence + case FnTimerProvider: + return -1, errTimerProviderPrecedence case FnIter, FnReIter, FnValue, FnMultiMap: return psInput, nil case FnEmit: diff --git a/sdks/go/pkg/beam/core/graph/edge.go b/sdks/go/pkg/beam/core/graph/edge.go index a9f1c8a092b..86891114dd0 100644 --- a/sdks/go/pkg/beam/core/graph/edge.go +++ b/sdks/go/pkg/beam/core/graph/edge.go @@ -156,6 +156,7 @@ type MultiEdge struct { DoFn *DoFn // ParDo RestrictionCoder *coder.Coder // SplittableParDo StateCoders map[string]*coder.Coder // Stateful ParDo + TimerCoders map[string]*coder.Coder // Stateful ParDo CombineFn *CombineFn // Combine AccumCoder *coder.Coder // Combine Value []byte // Impulse diff --git a/sdks/go/pkg/beam/core/graph/fn.go b/sdks/go/pkg/beam/core/graph/fn.go index 54cc02e07b3..1bb6548e850 100644 --- a/sdks/go/pkg/beam/core/graph/fn.go +++ b/sdks/go/pkg/beam/core/graph/fn.go @@ -22,6 +22,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/core/funcx" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/state" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/timers" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx" "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" @@ -167,13 +168,13 @@ const ( initialWatermarkEstimatorStateName = "InitialWatermarkEstimatorState" watermarkEstimatorStateName = "WatermarkEstimatorState" + onTimerName = "OnTimer" + createAccumulatorName = "CreateAccumulator" addInputName = "AddInput" mergeAccumulatorsName = "MergeAccumulators" extractOutputName = "ExtractOutput" compactName = "Compact" - - // TODO: ViewFn, etc. ) var doFnNames = []string{ @@ -304,6 +305,40 @@ func (f *DoFn) PipelineState() []state.PipelineState { return s } +type PipelineTimer interface { + TimerFamily() string + TimerDomain() timers.TimeDomainEnum +} + +var ( + _ PipelineTimer = timers.EventTime{} + _ PipelineTimer = timers.ProcessingTime{} +) + +func (f *DoFn) OnTimerFn() (*funcx.Fn, bool) { + m, ok := f.methods[onTimerName] + return m, ok +} + +func (f *DoFn) PipelineTimers() []PipelineTimer { + var t []PipelineTimer + if f.Recv == nil { + return t + } + + v := reflect.Indirect(reflect.ValueOf(f.Recv)) + + for i := 0; i < v.NumField(); i++ { + f := v.Field(i) + if f.CanInterface() { + if pt, ok := f.Interface().(PipelineTimer); ok { + t = append(t, pt) + } + } + } + return t +} + // SplittableDoFn represents a DoFn implementing SDF methods. type SplittableDoFn DoFn @@ -607,6 +642,11 @@ func AsDoFn(fn *Fn, numMainIn mainInputs) (*DoFn, error) { return nil, addContext(err, fn) } + err = validateTimer(doFn) + if err != nil { + return nil, addContext(err, fn) + } + return doFn, nil } @@ -1350,6 +1390,42 @@ func validateState(fn *DoFn, numIn mainInputs) error { return nil } +func validateTimer(fn *DoFn) error { + if fn.Fn == nil { + return nil + } + + pt := fn.PipelineTimers() + + if _, ok := fn.Fn.TimerProvider(); ok { + if len(pt) == 0 { + err := errors.Errorf("ProcessElement uses a TimerProvider, but no timer struct-tags are attached to the DoFn") + return errors.SetTopLevelMsgf(err, "ProcessElement uses a TimerProvider, but no timer struct-tags are attached to the DoFn"+ + ", Ensure that you are including the timer structs you're using to set/clear global state as uppercase member variables") + } + timerKeys := make(map[string]PipelineTimer) + for _, t := range pt { + k := t.TimerFamily() + if timer, ok := timerKeys[k]; ok { + err := errors.Errorf("Duplicate timer key %v", k) + return errors.SetTopLevelMsgf(err, "Duplicate timer key %v used by %v and %v. Ensure that keys are unique per DoFn", k, timer, t) + } else { + timerKeys[k] = t + } + } + } else { + if len(pt) > 0 { + err := errors.Errorf("ProcessElement doesn't use a TimerProvider, but Timer Struct is attached to the DoFn: %v", pt) + return errors.SetTopLevelMsgf(err, "ProcessElement doesn't use a TimerProvider, but Timer Struct is attached to the DoFn: %v"+ + ", Ensure that you are using the TimerProvider to set/clear the timers.", pt) + } + } + + // DO NOT SUBMIT: Require an OnTimer method existing + + return nil +} + // CombineFn represents a CombineFn. type CombineFn Fn diff --git a/sdks/go/pkg/beam/core/runtime/exec/coder.go b/sdks/go/pkg/beam/core/runtime/exec/coder.go index 39248f7f5ac..4b750a9be98 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/coder.go +++ b/sdks/go/pkg/beam/core/runtime/exec/coder.go @@ -1295,6 +1295,9 @@ func decodeTimer(dec ElementDecoder, win WindowDecoder, r io.Reader) (typex.Time if err != nil { return tm, errors.WithContext(err, "error decoding timer key") } + + // TODO Change to not type assert once general timers key + // fix is done. tm.Key = fv.Elm.(string) s, err := coder.DecodeStringUTF8(r) diff --git a/sdks/go/pkg/beam/core/runtime/exec/coder_test.go b/sdks/go/pkg/beam/core/runtime/exec/coder_test.go index 75d18e533cf..533dbb45874 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/coder_test.go +++ b/sdks/go/pkg/beam/core/runtime/exec/coder_test.go @@ -21,16 +21,16 @@ import ( "reflect" "testing" - "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" - "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx" - "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/coderx" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx" + "github.com/google/go-cmp/cmp" ) func TestCoders(t *testing.T) { - for _, test := range []struct { + tests := []struct { coder *coder.Coder val *FullValue }{ @@ -92,8 +92,22 @@ func TestCoders(t *testing.T) { }, { coder: coder.NewIntervalWindowCoder(), val: &FullValue{Elm: window.IntervalWindow{Start: 0, End: 100}}, + }, { + coder: coder.NewT(coder.NewString(), coder.NewGlobalWindow()), + val: &FullValue{ + Elm: typex.TimerMap{ + Key: "key", + Tag: "tag", + Windows: []typex.Window{window.GlobalWindow{}}, + Clear: false, + FireTimestamp: 1234, + HoldTimestamp: 5678, + Pane: typex.PaneInfo{IsFirst: true, IsLast: true, Timing: typex.PaneUnknown, Index: 0, NonSpeculativeIndex: 0}, + }, + }, }, - } { + } + for _, test := range tests { t.Run(fmt.Sprintf("%v", test.coder), func(t *testing.T) { var buf bytes.Buffer enc := MakeElementEncoder(test.coder) @@ -132,7 +146,7 @@ func compareFV(t *testing.T, got *FullValue, want *FullValue) { if gotFv, ok := got.Elm.(*FullValue); ok { compareFV(t, gotFv, wantFv) } - } else if got, want := got.Elm, want.Elm; got != want { + } else if got, want := got.Elm, want.Elm; !cmp.Equal(want, got) { t.Errorf("got %v [type: %s], want %v [type %s]", got, reflect.TypeOf(got), wantFv, reflect.TypeOf(wantFv)) } diff --git a/sdks/go/pkg/beam/core/runtime/exec/fn.go b/sdks/go/pkg/beam/core/runtime/exec/fn.go index eaf9df81e4a..d0fdb8e3630 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/fn.go +++ b/sdks/go/pkg/beam/core/runtime/exec/fn.go @@ -28,6 +28,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" + "github.com/apache/beam/sdks/v2/go/pkg/beam/log" ) //go:generate specialize --input=fn_arity.tmpl @@ -69,16 +70,47 @@ func (bf *bundleFinalizer) RegisterCallback(t time.Duration, cb func() error) { } // Invoke invokes the fn with the given values. The extra values must match the non-main +// side input and emitters. It returns the direct output, if any.\ +// +// Deprecated: prefer InvokeWithOpts +func Invoke(ctx context.Context, pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, fn *funcx.Fn, opt *MainInput, bf *bundleFinalizer, we sdf.WatermarkEstimator, sa UserStateAdapter, sr StateReader, extra ...any) (*FullValue, error) { + if fn == nil { + return nil, nil // ok: nothing to Invoke + } + inv := newInvoker(fn) + return inv.invokeWithOpts(ctx, pn, ws, ts, InvokeOpts{opt: opt, bf: bf, we: we, sa: sa, sr: sr, extra: extra}) +} + +// InvokeOpts are optional parameters to invoke a Fn. +type InvokeOpts struct { + opt *MainInput + bf *bundleFinalizer + we sdf.WatermarkEstimator + sa UserStateAdapter + sr StateReader + ta UserTimerAdapter + tm DataManager + extra []any +} + +// InvokeWithOpts invokes the fn with the given values. The extra values must match the non-main // side input and emitters. It returns the direct output, if any. -func Invoke(ctx context.Context, pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, fn *funcx.Fn, opt *MainInput, bf *bundleFinalizer, we sdf.WatermarkEstimator, sa UserStateAdapter, reader StateReader, extra ...any) (*FullValue, error) { +func InvokeWithOpts(ctx context.Context, fn *funcx.Fn, pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, opts InvokeOpts) (*FullValue, error) { if fn == nil { return nil, nil // ok: nothing to Invoke } inv := newInvoker(fn) - return inv.Invoke(ctx, pn, ws, ts, opt, bf, we, sa, reader, extra...) + return inv.invokeWithOpts(ctx, pn, ws, ts, opts) +} + +// InvokeWithOptsWithoutEventTime runs the given function at time 0 in the global window. +func InvokeWithOptsWithoutEventTime(ctx context.Context, fn *funcx.Fn, opts InvokeOpts) (*FullValue, error) { + return InvokeWithOpts(ctx, fn, typex.NoFiringPane(), window.SingleGlobalWindow, mtime.ZeroTimestamp, opts) } // InvokeWithoutEventTime runs the given function at time 0 in the global window. +// +// Deprecated: prefer InvokeWithOptsWithoutEventTime func InvokeWithoutEventTime(ctx context.Context, fn *funcx.Fn, opt *MainInput, bf *bundleFinalizer, we sdf.WatermarkEstimator, sa UserStateAdapter, reader StateReader, extra ...any) (*FullValue, error) { if fn == nil { return nil, nil // ok: nothing to Invoke @@ -93,10 +125,12 @@ type invoker struct { fn *funcx.Fn args []any sp *stateProvider + tp *timerProvider + // TODO(lostluck): 2018/07/06 consider replacing with a slice of functions to run over the args slice, as an improvement. - ctxIdx, pnIdx, wndIdx, etIdx, bfIdx, weIdx, spIdx int // specialized input indexes - outEtIdx, outPcIdx, outErrIdx int // specialized output indexes - in, out []int // general indexes + ctxIdx, pnIdx, wndIdx, etIdx, bfIdx, weIdx, spIdx, tpIdx int // specialized input indexes + outEtIdx, outPcIdx, outErrIdx int // specialized output indexes + in, out []int // general indexes ret FullValue // ret is a cached allocation for passing to the next Unit. Units never modify the passed in FullValue. elmConvert, elm2Convert func(any) any // Cached conversion functions, which assums this invoker is always used with the same parameter types. @@ -129,6 +163,9 @@ func newInvoker(fn *funcx.Fn) *invoker { if n.spIdx, ok = fn.StateProvider(); !ok { n.spIdx = -1 } + if n.tpIdx, ok = fn.TimerProvider(); !ok { + n.tpIdx = -1 + } if n.outEtIdx, ok = fn.OutEventTime(); !ok { n.outEtIdx = -1 } @@ -163,7 +200,11 @@ func (n *invoker) InvokeWithoutEventTime(ctx context.Context, opt *MainInput, bf // Invoke invokes the fn with the given values. The extra values must match the non-main // side input and emitters. It returns the direct output, if any. -func (n *invoker) Invoke(ctx context.Context, pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, opt *MainInput, bf *bundleFinalizer, we sdf.WatermarkEstimator, sa UserStateAdapter, reader StateReader, extra ...any) (*FullValue, error) { +func (n *invoker) Invoke(ctx context.Context, pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, opt *MainInput, bf *bundleFinalizer, we sdf.WatermarkEstimator, sa UserStateAdapter, sr StateReader, extra ...any) (*FullValue, error) { + return n.invokeWithOpts(ctx, pn, ws, ts, InvokeOpts{opt: opt, bf: bf, we: we, sa: sa, sr: sr, extra: extra}) +} + +func (n *invoker) invokeWithOpts(ctx context.Context, pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, opts InvokeOpts) (*FullValue, error) { // (1) Populate contexts // extract these to make things easier to read. args := n.args @@ -178,7 +219,7 @@ func (n *invoker) Invoke(ctx context.Context, pn typex.PaneInfo, ws []typex.Wind } if n.wndIdx >= 0 { if len(ws) != 1 { - return nil, errors.Errorf("DoFns that observe windows must be invoked with single window: %v", opt.Key.Windows) + return nil, errors.Errorf("DoFns that observe windows must be invoked with single window: %v", opts.opt.Key.Windows) } args[n.wndIdx] = ws[0] } @@ -186,14 +227,14 @@ func (n *invoker) Invoke(ctx context.Context, pn typex.PaneInfo, ws []typex.Wind args[n.etIdx] = ts } if n.bfIdx >= 0 { - args[n.bfIdx] = bf + args[n.bfIdx] = opts.bf } if n.weIdx >= 0 { - args[n.weIdx] = we + args[n.weIdx] = opts.we } if n.spIdx >= 0 { - sp, err := sa.NewStateProvider(ctx, reader, ws[0], opt) + sp, err := opts.sa.NewStateProvider(ctx, opts.sr, ws[0], opts.opt) if err != nil { return nil, err } @@ -201,29 +242,38 @@ func (n *invoker) Invoke(ctx context.Context, pn typex.PaneInfo, ws []typex.Wind args[n.spIdx] = n.sp } + if n.tpIdx >= 0 { + log.Debugf(ctx, "timercall %+v", opts) + tp, err := opts.ta.NewTimerProvider(ctx, opts.tm, ws, opts.opt) + if err != nil { + return nil, err + } + n.tp = &tp + args[n.tpIdx] = n.tp + } // (2) Main input from value, if any. i := 0 - if opt != nil { - if opt.RTracker != nil { - args[in[i]] = opt.RTracker + if opts.opt != nil { + if opts.opt.RTracker != nil { + args[in[i]] = opts.opt.RTracker i++ } if n.elmConvert == nil { - from := reflect.TypeOf(opt.Key.Elm) + from := reflect.TypeOf(opts.opt.Key.Elm) n.elmConvert = ConvertFn(from, fn.Param[in[i]].T) } - args[in[i]] = n.elmConvert(opt.Key.Elm) + args[in[i]] = n.elmConvert(opts.opt.Key.Elm) i++ - if opt.Key.Elm2 != nil { + if opts.opt.Key.Elm2 != nil { if n.elm2Convert == nil { - from := reflect.TypeOf(opt.Key.Elm2) + from := reflect.TypeOf(opts.opt.Key.Elm2) n.elm2Convert = ConvertFn(from, fn.Param[in[i]].T) } - args[in[i]] = n.elm2Convert(opt.Key.Elm2) + args[in[i]] = n.elm2Convert(opts.opt.Key.Elm2) i++ } - for _, iter := range opt.Values { + for _, iter := range opts.opt.Values { param := fn.Param[in[i]] if param.Kind != funcx.FnIter { @@ -243,7 +293,7 @@ func (n *invoker) Invoke(ctx context.Context, pn typex.PaneInfo, ws []typex.Wind } // (3) Precomputed side input and emitters (or other output). - for _, arg := range extra { + for _, arg := range opts.extra { args[in[i]] = arg i++ } diff --git a/sdks/go/pkg/beam/core/runtime/exec/pardo.go b/sdks/go/pkg/beam/core/runtime/exec/pardo.go index 8cb5342ded8..be5c71f2c75 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/pardo.go +++ b/sdks/go/pkg/beam/core/runtime/exec/pardo.go @@ -48,8 +48,10 @@ type ParDo struct { bf *bundleFinalizer we sdf.WatermarkEstimator - reader StateReader - cache *cacheElm + Timer UserTimerAdapter + timerManager DataManager + reader StateReader + cache *cacheElm status Status err errorx.GuardedError @@ -88,7 +90,7 @@ func (n *ParDo) Up(ctx context.Context) error { // Subsequent bundles might run this same node, and the context here would be // incorrectly refering to the older bundleId. setupCtx := metrics.SetPTransformID(ctx, n.PID) - if _, err := InvokeWithoutEventTime(setupCtx, n.Fn.SetupFn(), nil, nil, nil, nil, nil); err != nil { + if _, err := InvokeWithOptsWithoutEventTime(setupCtx, n.Fn.SetupFn(), InvokeOpts{}); err != nil { return n.fail(err) } @@ -111,6 +113,7 @@ func (n *ParDo) StartBundle(ctx context.Context, id string, data DataContext) er } n.status = Active n.reader = data.State + n.timerManager = data.Data // Allocating contexts all the time is expensive, but we seldom re-write them, // and never accept modified contexts from users, so we will cache them per-bundle // per-unit, to avoid the constant allocation overhead. @@ -236,6 +239,7 @@ func (n *ParDo) FinishBundle(_ context.Context) error { } n.reader = nil n.cache = nil + n.timerManager = nil if err := MultiFinishBundle(n.ctx, n.Out...); err != nil { return n.fail(err) @@ -251,8 +255,9 @@ func (n *ParDo) Down(ctx context.Context) error { n.status = Down n.reader = nil n.cache = nil + n.timerManager = nil - if _, err := InvokeWithoutEventTime(ctx, n.Fn.TeardownFn(), nil, nil, nil, nil, nil); err != nil { + if _, err := InvokeWithOptsWithoutEventTime(ctx, n.Fn.TeardownFn(), InvokeOpts{}); err != nil { n.err.TrySetError(err) } return n.err.Error() @@ -356,7 +361,7 @@ func (n *ParDo) invokeProcessFn(ctx context.Context, pn typex.PaneInfo, ws []typ if err := n.preInvoke(ctx, ws, ts); err != nil { return nil, err } - val, err = n.inv.Invoke(ctx, pn, ws, ts, opt, n.bf, n.we, n.UState, n.reader, n.cache.extra...) + val, err = n.inv.invokeWithOpts(ctx, pn, ws, ts, InvokeOpts{opt: opt, bf: n.bf, we: n.we, sa: n.UState, sr: n.reader, ta: n.Timer, tm: n.timerManager, extra: n.cache.extra}) if err != nil { return nil, err } diff --git a/sdks/go/pkg/beam/core/runtime/exec/timers.go b/sdks/go/pkg/beam/core/runtime/exec/timers.go new file mode 100644 index 00000000000..0ceed0d2ebd --- /dev/null +++ b/sdks/go/pkg/beam/core/runtime/exec/timers.go @@ -0,0 +1,125 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package exec + +import ( + "context" + "fmt" + "io" + + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/timers" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" +) + +type UserTimerAdapter interface { + NewTimerProvider(ctx context.Context, manager DataManager, w []typex.Window, element *MainInput) (timerProvider, error) +} + +type userTimerAdapter struct { + SID StreamID + wc WindowEncoder + kc ElementEncoder + timerIDToCoder map[string]*coder.Coder + C *coder.Coder +} + +func NewUserTimerAdapter(sID StreamID, c *coder.Coder, timerCoders map[string]*coder.Coder) UserTimerAdapter { + if !coder.IsW(c) { + panic(fmt.Sprintf("expected WV coder for user timer %v: %v", sID, c)) + } + + wc := MakeWindowEncoder(c.Window) + var kc ElementEncoder + if coder.IsKV(coder.SkipW(c)) { + kc = MakeElementEncoder(coder.SkipW(c).Components[0]) + } + + return &userTimerAdapter{SID: sID, wc: wc, kc: kc, C: c, timerIDToCoder: timerCoders} +} + +func (u *userTimerAdapter) NewTimerProvider(ctx context.Context, manager DataManager, w []typex.Window, element *MainInput) (timerProvider, error) { + if u.kc == nil { + return timerProvider{}, fmt.Errorf("cannot make a state provider for an unkeyed input %v", element) + } + elementKey, err := EncodeElement(u.kc, element.Key.Elm) + if err != nil { + return timerProvider{}, err + } + + // win, err := EncodeWindow(u.wc, w[0]) + // if err != nil { + // return timerProvider{}, err + // } + tp := timerProvider{ + ctx: ctx, + tm: manager, + elementKey: elementKey, + SID: u.SID, + window: w, + writersByFamily: make(map[string]io.Writer), + codersByFamily: u.timerIDToCoder, + } + + return tp, nil +} + +type timerProvider struct { + ctx context.Context + tm DataManager + SID StreamID + elementKey []byte + window []typex.Window + + pn typex.PaneInfo + + writersByFamily map[string]io.Writer + codersByFamily map[string]*coder.Coder +} + +func (p *timerProvider) getWriter(family string) (io.Writer, error) { + if w, ok := p.writersByFamily[family]; ok { + return w, nil + } else { + w, err := p.tm.OpenTimerWrite(p.ctx, p.SID, family) + if err != nil { + return nil, err + } + p.writersByFamily[family] = w + return p.writersByFamily[family], nil + } +} + +func (p *timerProvider) Set(t timers.TimerMap) { + w, err := p.getWriter(t.Family) + if err != nil { + panic(err) + } + tm := typex.TimerMap{ + Key: string(p.elementKey), + Tag: t.Tag, + Windows: p.window, + Clear: t.Clear, + FireTimestamp: t.FireTimestamp, + HoldTimestamp: t.HoldTimestamp, + Pane: p.pn, + } + fv := FullValue{Elm: tm} + enc := MakeElementEncoder(coder.SkipW(p.codersByFamily[t.Family])) + if err := enc.Encode(&fv, w); err != nil { + panic(err) + } +} diff --git a/sdks/go/pkg/beam/core/runtime/exec/translate.go b/sdks/go/pkg/beam/core/runtime/exec/translate.go index 78cf0ef65cd..0403f0ab0ab 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/translate.go +++ b/sdks/go/pkg/beam/core/runtime/exec/translate.go @@ -16,6 +16,7 @@ package exec import ( + "context" "fmt" "math/rand" "strconv" @@ -30,6 +31,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/protox" "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" + "github.com/apache/beam/sdks/v2/go/pkg/beam/log" fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1" pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1" "github.com/golang/protobuf/proto" @@ -462,6 +464,7 @@ func (b *builder) makeLink(from string, id linkID) (Node, error) { var data string var sides map[string]*pipepb.SideInput var userState map[string]*pipepb.StateSpec + var userTimers map[string]*pipepb.TimerFamilySpec switch urn { case graphx.URNParDo, urnPairWithRestriction, @@ -475,6 +478,7 @@ func (b *builder) makeLink(from string, id linkID) (Node, error) { data = string(pardo.GetDoFn().GetPayload()) sides = pardo.GetSideInputs() userState = pardo.GetStateSpecs() + userTimers = pardo.GetTimerFamilySpecs() case urnPerKeyCombinePre, urnPerKeyCombineMerge, urnPerKeyCombineExtract, urnPerKeyCombineConvert: var cmb pipepb.CombinePayload if err := proto.Unmarshal(payload, &cmb); err != nil { @@ -587,6 +591,24 @@ func (b *builder) makeLink(from string, id linkID) (Node, error) { n.UState = NewUserStateAdapter(sid, coder.NewW(ec, wc), stateIDToCoder, stateIDToKeyCoder, stateIDToCombineFn) } } + if len(userTimers) > 0 { + log.Debugf(context.TODO(), "userTimers %+v", userTimers) + timerIDToCoder := make(map[string]*coder.Coder) + for key, spec := range userTimers { + cID := spec.GetTimerFamilyCoderId() + c, err := b.coders.Coder(cID) + if err != nil { + return nil, err + } + timerIDToCoder[key] = c + sID := StreamID{Port: Port{URL: b.desc.GetTimerApiServiceDescriptor().GetUrl()}, PtransformID: id.to} + ec, wc, err := b.makeCoderForPCollection(input[0]) + if err != nil { + return nil, err + } + n.Timer = NewUserTimerAdapter(sID, coder.NewW(ec, wc), timerIDToCoder) + } + } for i := 1; i < len(input); i++ { // TODO(https://github.com/apache/beam/issues/18602) Handle ViewFns for side inputs diff --git a/sdks/go/pkg/beam/core/runtime/graphx/coder.go b/sdks/go/pkg/beam/core/runtime/graphx/coder.go index 498e145f5db..34b44dd8592 100644 --- a/sdks/go/pkg/beam/core/runtime/graphx/coder.go +++ b/sdks/go/pkg/beam/core/runtime/graphx/coder.go @@ -73,7 +73,7 @@ func knownStandardCoders() []string { urnIntervalWindow, urnRowCoder, urnNullableCoder, - // TODO(https://github.com/apache/beam/issues/20510): Add urnTimerCoder once finalized. + urnTimerCoder, } } diff --git a/sdks/go/pkg/beam/core/runtime/graphx/serialize.go b/sdks/go/pkg/beam/core/runtime/graphx/serialize.go index 65ad1bdd060..fb62e18e130 100644 --- a/sdks/go/pkg/beam/core/runtime/graphx/serialize.go +++ b/sdks/go/pkg/beam/core/runtime/graphx/serialize.go @@ -28,6 +28,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime" v1pb "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx/v1" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/state" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/timers" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/jsonx" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx" @@ -523,6 +524,8 @@ func tryEncodeSpecial(t reflect.Type) (v1pb.Type_Special, bool) { return v1pb.Type_BUNDLEFINALIZATION, true case state.ProviderType: return v1pb.Type_STATEPROVIDER, true + case timers.ProviderType: + return v1pb.Type_TIMERPROVIDER, true case typex.KVType: return v1pb.Type_KV, true case typex.CoGBKType: @@ -689,6 +692,8 @@ func decodeSpecial(s v1pb.Type_Special) (reflect.Type, error) { return typex.BundleFinalizationType, nil case v1pb.Type_STATEPROVIDER: return state.ProviderType, nil + case v1pb.Type_TIMERPROVIDER: + return timers.ProviderType, nil case v1pb.Type_KV: return typex.KVType, nil case v1pb.Type_COGBK: diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go b/sdks/go/pkg/beam/core/runtime/graphx/translate.go index 68074ac7eb3..a427f22f882 100644 --- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go +++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go @@ -578,6 +578,21 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) ([]string, error) { } payload.StateSpecs = stateSpecs } + if _, ok := edge.Edge.DoFn.ProcessElementFn().TimerProvider(); ok { + m.requirements[URNRequiresStatefulProcessing] = true + timerSpecs := make(map[string]*pipepb.TimerFamilySpec) + for _, pt := range edge.Edge.DoFn.PipelineTimers() { + coderID, err := m.coders.Add(edge.Edge.TimerCoders[pt.TimerFamily()]) + if err != nil { + return handleErr(err) + } + timerSpecs[pt.TimerFamily()] = &pipepb.TimerFamilySpec{ + TimeDomain: pipepb.TimeDomain_Enum(pt.TimerDomain()), + TimerFamilyCoderId: coderID, + } + } + payload.TimerFamilySpecs = timerSpecs + } spec = &pipepb.FunctionSpec{Urn: URNParDo, Payload: protox.MustEncode(payload)} annotations = edge.Edge.DoFn.Annotations() diff --git a/sdks/go/pkg/beam/core/timers/timers.go b/sdks/go/pkg/beam/core/timers/timers.go new file mode 100644 index 00000000000..130564790ca --- /dev/null +++ b/sdks/go/pkg/beam/core/timers/timers.go @@ -0,0 +1,119 @@ +// Licensed to the Apache SoFiringTimestampware Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, soFiringTimestampware +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package timer provides structs for reading and writing timers. +package timers + +import ( + "context" + "reflect" + "time" + + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime" + "github.com/apache/beam/sdks/v2/go/pkg/beam/log" +) + +var ( + ProviderType = reflect.TypeOf((*Provider)(nil)).Elem() +) + +type TimeDomainEnum int32 + +const ( + TimeDomainUnspecified TimeDomainEnum = 0 + TimeDomainEventTime TimeDomainEnum = 1 + TimeDomainProcessingTime TimeDomainEnum = 2 +) + +type TimerMap struct { + Family string + Tag string + Clear bool + FireTimestamp, HoldTimestamp mtime.Time +} + +type Provider interface { + Set(t TimerMap) +} + +type EventTime struct { + // need to export them otherwise the key comes out empty in execution? + Family string +} + +func (t EventTime) Set(p Provider, firingTimestamp time.Time) { + fire := mtime.FromTime(firingTimestamp) + // Hold timestamp must match fireing timestamp if not otherwise set. + p.Set(TimerMap{Family: t.Family, FireTimestamp: fire, HoldTimestamp: fire}) +} + +type Opts struct { + Tag string + Hold time.Time +} + +func (t *EventTime) SetWithOpts(p Provider, firingTimestamp time.Time, opts Opts) { + fire := mtime.FromTime(firingTimestamp) + // Hold timestamp must match fireing timestamp if not otherwise set. + tm := TimerMap{Family: t.Family, Tag: opts.Tag, FireTimestamp: fire, HoldTimestamp: fire} + if !opts.Hold.IsZero() { + tm.HoldTimestamp = mtime.FromTime(opts.Hold) + } + p.Set(tm) +} + +func (e EventTime) TimerFamily() string { + return e.Family +} + +func (e EventTime) TimerDomain() TimeDomainEnum { + return TimeDomainEventTime +} + +type ProcessingTime struct { + Family string +} + +func (e ProcessingTime) TimerFamily() string { + return e.Family +} + +func (e ProcessingTime) TimerDomain() TimeDomainEnum { + return TimeDomainProcessingTime +} + +func (t ProcessingTime) Set(p Provider, firingTimestamp time.Time) { + log.Infof(context.Background(), "setting timer in core/timer: %+v", t) + fire := mtime.FromTime(firingTimestamp) + p.Set(TimerMap{Family: t.Family, FireTimestamp: fire, HoldTimestamp: fire}) +} + +func (t ProcessingTime) SetWithOpts(p Provider, firingTimestamp time.Time, opts Opts) { + fire := mtime.FromTime(firingTimestamp) + // Hold timestamp must match fireing timestamp if not otherwise set. + tm := TimerMap{Family: t.Family, Tag: opts.Tag, FireTimestamp: fire, HoldTimestamp: fire} + if !opts.Hold.IsZero() { + tm.HoldTimestamp = mtime.FromTime(opts.Hold) + } + p.Set(tm) +} + +func InEventTime(Key string) EventTime { + return EventTime{Family: Key} +} + +func InProcessingTime(Key string) ProcessingTime { + return ProcessingTime{Family: Key} +} diff --git a/sdks/go/pkg/beam/core/typex/class.go b/sdks/go/pkg/beam/core/typex/class.go index e112495ee98..63e4543a3e5 100644 --- a/sdks/go/pkg/beam/core/typex/class.go +++ b/sdks/go/pkg/beam/core/typex/class.go @@ -120,6 +120,7 @@ func isConcrete(t reflect.Type, visited map[uintptr]bool) error { t == EventTimeType || t.Implements(WindowType) || t == PaneInfoType || + t == TimersType || t == BundleFinalizationType || t == reflectx.Error || t == reflectx.Context || diff --git a/sdks/go/pkg/beam/core/typex/special.go b/sdks/go/pkg/beam/core/typex/special.go index 93537122584..4cce19dcfbd 100644 --- a/sdks/go/pkg/beam/core/typex/special.go +++ b/sdks/go/pkg/beam/core/typex/special.go @@ -107,7 +107,8 @@ type Timers struct { // TimerMap is a placeholder for timer details used in encoding/decoding. type TimerMap struct { - Key, Tag string + Key string + Tag string Windows []Window // []typex.Window Clear bool FireTimestamp, HoldTimestamp mtime.Time diff --git a/sdks/go/pkg/beam/pardo.go b/sdks/go/pkg/beam/pardo.go index e1cdc4f417c..f2c9303b016 100644 --- a/sdks/go/pkg/beam/pardo.go +++ b/sdks/go/pkg/beam/pardo.go @@ -116,6 +116,18 @@ func TryParDo(s Scope, dofn any, col PCollection, opts ...Option) ([]PCollection } } + wc := inWfn.Coder() + pipelineTimers := fn.PipelineTimers() + if len(pipelineTimers) > 0 { + // TODO(riteshghorse): replace the coder with type of key + c := coder.NewString() + edge.TimerCoders = make(map[string]*coder.Coder) + for _, pt := range pipelineTimers { + tc := coder.NewT(c, wc) + edge.TimerCoders[pt.TimerFamily()] = tc + } + } + var ret []PCollection for _, out := range edge.Output { c := PCollection{out.To}
