This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch timermgr
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 100b013f3493c43d5837e011a264220f91a785a9
Author: lostluck <[email protected]>
AuthorDate: Wed Mar 1 09:11:17 2023 -0800

    [go-timers] things I'm pretty sure work
---
 sdks/go/pkg/beam/core/funcx/fn.go                 |  45 +++++++-
 sdks/go/pkg/beam/core/graph/edge.go               |   1 +
 sdks/go/pkg/beam/core/graph/fn.go                 |  80 +++++++++++++-
 sdks/go/pkg/beam/core/runtime/exec/coder.go       |   3 +
 sdks/go/pkg/beam/core/runtime/exec/coder_test.go  |  26 +++--
 sdks/go/pkg/beam/core/runtime/exec/fn.go          |  90 ++++++++++++----
 sdks/go/pkg/beam/core/runtime/exec/pardo.go       |  15 ++-
 sdks/go/pkg/beam/core/runtime/exec/timers.go      | 125 ++++++++++++++++++++++
 sdks/go/pkg/beam/core/runtime/exec/translate.go   |  22 ++++
 sdks/go/pkg/beam/core/runtime/graphx/coder.go     |   2 +-
 sdks/go/pkg/beam/core/runtime/graphx/serialize.go |   5 +
 sdks/go/pkg/beam/core/runtime/graphx/translate.go |  15 +++
 sdks/go/pkg/beam/core/timers/timers.go            | 119 ++++++++++++++++++++
 sdks/go/pkg/beam/core/typex/class.go              |   1 +
 sdks/go/pkg/beam/core/typex/special.go            |   3 +-
 sdks/go/pkg/beam/pardo.go                         |  12 +++
 16 files changed, 528 insertions(+), 36 deletions(-)

diff --git a/sdks/go/pkg/beam/core/funcx/fn.go 
b/sdks/go/pkg/beam/core/funcx/fn.go
index b579cb56d52..3aef5a1695a 100644
--- a/sdks/go/pkg/beam/core/funcx/fn.go
+++ b/sdks/go/pkg/beam/core/funcx/fn.go
@@ -21,6 +21,7 @@ import (
 
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/state"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/timers"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
@@ -85,6 +86,8 @@ const (
        FnWatermarkEstimator FnParamKind = 0x1000
        // FnStateProvider indicates a function input parameter that implements 
state.Provider
        FnStateProvider FnParamKind = 0x2000
+       // FnTimerProvider indicates a function input parameter that implements 
timer.Provider
+       FnTimerProvider FnParamKind = 0x4000
 )
 
 func (k FnParamKind) String() string {
@@ -117,6 +120,8 @@ func (k FnParamKind) String() string {
                return "WatermarkEstimator"
        case FnStateProvider:
                return "StateProvider"
+       case FnTimerProvider:
+               return "TimerProvider"
        default:
                return fmt.Sprintf("%v", int(k))
        }
@@ -305,6 +310,15 @@ func (u *Fn) StateProvider() (pos int, exists bool) {
        return -1, false
 }
 
+func (u *Fn) TimerProvider() (pos int, exists bool) {
+       for i, p := range u.Param {
+               if p.Kind == FnTimerProvider {
+                       return i, true
+               }
+       }
+       return -1, false
+}
+
 // WatermarkEstimator returns (index, true) iff the function expects a
 // parameter that implements sdf.WatermarkEstimator.
 func (u *Fn) WatermarkEstimator() (pos int, exists bool) {
@@ -392,6 +406,8 @@ func New(fn reflectx.Func) (*Fn, error) {
                        kind = FnBundleFinalization
                case t == state.ProviderType:
                        kind = FnStateProvider
+               case t == timers.ProviderType:
+                       kind = FnTimerProvider
                case t == reflectx.Type:
                        kind = FnType
                case t.Implements(reflect.TypeOf((*sdf.RTracker)(nil)).Elem()):
@@ -482,7 +498,7 @@ func SubReturns(list []ReturnParam, indices ...int) 
[]ReturnParam {
 }
 
 // The order of present parameters and return values must be as follows:
-// func(FnContext?, FnPane?, FnWindow?, FnEventTime?, FnWatermarkEstimator?, 
FnType?, FnBundleFinalization?, FnRTracker?, FnStateProvider?, (FnValue, 
SideInput*)?, FnEmit*) (RetEventTime?, RetOutput?, RetError?)
+// func(FnContext?, FnPane?, FnWindow?, FnEventTime?, FnWatermarkEstimator?, 
FnType?, FnBundleFinalization?, FnRTracker?, FnStateProvider?, 
FnTimerProvider?, (FnValue, SideInput*)?, FnEmit*) (RetEventTime?, RetOutput?, 
RetError?)
 //
 //     where ? indicates 0 or 1, and * indicates any number.
 //     and  a SideInput is one of FnValue or FnIter or FnReIter
@@ -517,6 +533,7 @@ var (
        errRTrackerPrecedence                = errors.New("may only have a 
single sdf.RTracker parameter and it must precede the main input parameter")
        errBundleFinalizationPrecedence      = errors.New("may only have a 
single BundleFinalization parameter and it must precede the main input 
parameter")
        errStateProviderPrecedence           = errors.New("may only have a 
single state.Provider parameter and it must precede the main input parameter")
+       errTimerProviderPrecedence           = errors.New("may only have a 
single timer.Provider parameter and it must precede the main input parameter")
        errInputPrecedence                   = errors.New("inputs parameters 
must precede emit function parameters")
 )
 
@@ -535,6 +552,7 @@ const (
        psRTracker
        psBundleFinalization
        psStateProvider
+       psTimerProvider
 )
 
 func nextParamState(cur paramState, transition FnParamKind) (paramState, 
error) {
@@ -559,6 +577,8 @@ func nextParamState(cur paramState, transition FnParamKind) 
(paramState, error)
                        return psRTracker, nil
                case FnStateProvider:
                        return psStateProvider, nil
+               case FnTimerProvider:
+                       return psTimerProvider, nil
                }
        case psContext:
                switch transition {
@@ -578,6 +598,8 @@ func nextParamState(cur paramState, transition FnParamKind) 
(paramState, error)
                        return psRTracker, nil
                case FnStateProvider:
                        return psStateProvider, nil
+               case FnTimerProvider:
+                       return psTimerProvider, nil
                }
        case psPane:
                switch transition {
@@ -595,6 +617,8 @@ func nextParamState(cur paramState, transition FnParamKind) 
(paramState, error)
                        return psRTracker, nil
                case FnStateProvider:
                        return psStateProvider, nil
+               case FnTimerProvider:
+                       return psTimerProvider, nil
                }
        case psWindow:
                switch transition {
@@ -610,6 +634,8 @@ func nextParamState(cur paramState, transition FnParamKind) 
(paramState, error)
                        return psRTracker, nil
                case FnStateProvider:
                        return psStateProvider, nil
+               case FnTimerProvider:
+                       return psTimerProvider, nil
                }
        case psEventTime:
                switch transition {
@@ -623,6 +649,8 @@ func nextParamState(cur paramState, transition FnParamKind) 
(paramState, error)
                        return psRTracker, nil
                case FnStateProvider:
                        return psStateProvider, nil
+               case FnTimerProvider:
+                       return psTimerProvider, nil
                }
        case psWatermarkEstimator:
                switch transition {
@@ -634,6 +662,8 @@ func nextParamState(cur paramState, transition FnParamKind) 
(paramState, error)
                        return psRTracker, nil
                case FnStateProvider:
                        return psStateProvider, nil
+               case FnTimerProvider:
+                       return psTimerProvider, nil
                }
        case psType:
                switch transition {
@@ -643,6 +673,8 @@ func nextParamState(cur paramState, transition FnParamKind) 
(paramState, error)
                        return psRTracker, nil
                case FnStateProvider:
                        return psStateProvider, nil
+               case FnTimerProvider:
+                       return psTimerProvider, nil
                }
        case psBundleFinalization:
                switch transition {
@@ -650,13 +682,22 @@ func nextParamState(cur paramState, transition 
FnParamKind) (paramState, error)
                        return psRTracker, nil
                case FnStateProvider:
                        return psStateProvider, nil
+               case FnTimerProvider:
+                       return psTimerProvider, nil
                }
        case psRTracker:
                switch transition {
                case FnStateProvider:
                        return psStateProvider, nil
+               case FnTimerProvider:
+                       return psTimerProvider, nil
                }
        case psStateProvider:
+               switch transition {
+               case FnTimerProvider:
+                       return psTimerProvider, nil
+               }
+       case psTimerProvider:
                // Completely handled by the default clause
        case psInput:
                switch transition {
@@ -689,6 +730,8 @@ func nextParamState(cur paramState, transition FnParamKind) 
(paramState, error)
                return -1, errRTrackerPrecedence
        case FnStateProvider:
                return -1, errStateProviderPrecedence
+       case FnTimerProvider:
+               return -1, errTimerProviderPrecedence
        case FnIter, FnReIter, FnValue, FnMultiMap:
                return psInput, nil
        case FnEmit:
diff --git a/sdks/go/pkg/beam/core/graph/edge.go 
b/sdks/go/pkg/beam/core/graph/edge.go
index a9f1c8a092b..86891114dd0 100644
--- a/sdks/go/pkg/beam/core/graph/edge.go
+++ b/sdks/go/pkg/beam/core/graph/edge.go
@@ -156,6 +156,7 @@ type MultiEdge struct {
        DoFn             *DoFn                   // ParDo
        RestrictionCoder *coder.Coder            // SplittableParDo
        StateCoders      map[string]*coder.Coder // Stateful ParDo
+       TimerCoders      map[string]*coder.Coder // Stateful ParDo
        CombineFn        *CombineFn              // Combine
        AccumCoder       *coder.Coder            // Combine
        Value            []byte                  // Impulse
diff --git a/sdks/go/pkg/beam/core/graph/fn.go 
b/sdks/go/pkg/beam/core/graph/fn.go
index 54cc02e07b3..1bb6548e850 100644
--- a/sdks/go/pkg/beam/core/graph/fn.go
+++ b/sdks/go/pkg/beam/core/graph/fn.go
@@ -22,6 +22,7 @@ import (
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/funcx"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/state"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/timers"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
@@ -167,13 +168,13 @@ const (
        initialWatermarkEstimatorStateName = "InitialWatermarkEstimatorState"
        watermarkEstimatorStateName        = "WatermarkEstimatorState"
 
+       onTimerName = "OnTimer"
+
        createAccumulatorName = "CreateAccumulator"
        addInputName          = "AddInput"
        mergeAccumulatorsName = "MergeAccumulators"
        extractOutputName     = "ExtractOutput"
        compactName           = "Compact"
-
-       // TODO: ViewFn, etc.
 )
 
 var doFnNames = []string{
@@ -304,6 +305,40 @@ func (f *DoFn) PipelineState() []state.PipelineState {
        return s
 }
 
+type PipelineTimer interface {
+       TimerFamily() string
+       TimerDomain() timers.TimeDomainEnum
+}
+
+var (
+       _ PipelineTimer = timers.EventTime{}
+       _ PipelineTimer = timers.ProcessingTime{}
+)
+
+func (f *DoFn) OnTimerFn() (*funcx.Fn, bool) {
+       m, ok := f.methods[onTimerName]
+       return m, ok
+}
+
+func (f *DoFn) PipelineTimers() []PipelineTimer {
+       var t []PipelineTimer
+       if f.Recv == nil {
+               return t
+       }
+
+       v := reflect.Indirect(reflect.ValueOf(f.Recv))
+
+       for i := 0; i < v.NumField(); i++ {
+               f := v.Field(i)
+               if f.CanInterface() {
+                       if pt, ok := f.Interface().(PipelineTimer); ok {
+                               t = append(t, pt)
+                       }
+               }
+       }
+       return t
+}
+
 // SplittableDoFn represents a DoFn implementing SDF methods.
 type SplittableDoFn DoFn
 
@@ -607,6 +642,11 @@ func AsDoFn(fn *Fn, numMainIn mainInputs) (*DoFn, error) {
                return nil, addContext(err, fn)
        }
 
+       err = validateTimer(doFn)
+       if err != nil {
+               return nil, addContext(err, fn)
+       }
+
        return doFn, nil
 }
 
@@ -1350,6 +1390,42 @@ func validateState(fn *DoFn, numIn mainInputs) error {
        return nil
 }
 
+func validateTimer(fn *DoFn) error {
+       if fn.Fn == nil {
+               return nil
+       }
+
+       pt := fn.PipelineTimers()
+
+       if _, ok := fn.Fn.TimerProvider(); ok {
+               if len(pt) == 0 {
+                       err := errors.Errorf("ProcessElement uses a 
TimerProvider, but no timer struct-tags are attached to the DoFn")
+                       return errors.SetTopLevelMsgf(err, "ProcessElement uses 
a TimerProvider, but no timer struct-tags are attached to the DoFn"+
+                               ", Ensure that you are including the timer 
structs you're using to set/clear global state as uppercase member variables")
+               }
+               timerKeys := make(map[string]PipelineTimer)
+               for _, t := range pt {
+                       k := t.TimerFamily()
+                       if timer, ok := timerKeys[k]; ok {
+                               err := errors.Errorf("Duplicate timer key %v", 
k)
+                               return errors.SetTopLevelMsgf(err, "Duplicate 
timer key %v used by %v and %v. Ensure that keys are unique per DoFn", k, 
timer, t)
+                       } else {
+                               timerKeys[k] = t
+                       }
+               }
+       } else {
+               if len(pt) > 0 {
+                       err := errors.Errorf("ProcessElement doesn't  use a 
TimerProvider, but Timer Struct is attached to the DoFn: %v", pt)
+                       return errors.SetTopLevelMsgf(err, "ProcessElement 
doesn't  use a TimerProvider, but Timer Struct is attached to the DoFn: %v"+
+                               ", Ensure that you are using the TimerProvider 
to set/clear the timers.", pt)
+               }
+       }
+
+       // DO NOT SUBMIT: Require an OnTimer method existing
+
+       return nil
+}
+
 // CombineFn represents a CombineFn.
 type CombineFn Fn
 
diff --git a/sdks/go/pkg/beam/core/runtime/exec/coder.go 
b/sdks/go/pkg/beam/core/runtime/exec/coder.go
index 39248f7f5ac..4b750a9be98 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/coder.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/coder.go
@@ -1295,6 +1295,9 @@ func decodeTimer(dec ElementDecoder, win WindowDecoder, r 
io.Reader) (typex.Time
        if err != nil {
                return tm, errors.WithContext(err, "error decoding timer key")
        }
+
+       // TODO Change to not type assert once general timers key
+       // fix is done.
        tm.Key = fv.Elm.(string)
 
        s, err := coder.DecodeStringUTF8(r)
diff --git a/sdks/go/pkg/beam/core/runtime/exec/coder_test.go 
b/sdks/go/pkg/beam/core/runtime/exec/coder_test.go
index 75d18e533cf..533dbb45874 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/coder_test.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/coder_test.go
@@ -21,16 +21,16 @@ import (
        "reflect"
        "testing"
 
-       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
-       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
-
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/coderx"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
+       "github.com/google/go-cmp/cmp"
 )
 
 func TestCoders(t *testing.T) {
-       for _, test := range []struct {
+       tests := []struct {
                coder *coder.Coder
                val   *FullValue
        }{
@@ -92,8 +92,22 @@ func TestCoders(t *testing.T) {
                }, {
                        coder: coder.NewIntervalWindowCoder(),
                        val:   &FullValue{Elm: window.IntervalWindow{Start: 0, 
End: 100}},
+               }, {
+                       coder: coder.NewT(coder.NewString(), 
coder.NewGlobalWindow()),
+                       val: &FullValue{
+                               Elm: typex.TimerMap{
+                                       Key:           "key",
+                                       Tag:           "tag",
+                                       Windows:       
[]typex.Window{window.GlobalWindow{}},
+                                       Clear:         false,
+                                       FireTimestamp: 1234,
+                                       HoldTimestamp: 5678,
+                                       Pane:          typex.PaneInfo{IsFirst: 
true, IsLast: true, Timing: typex.PaneUnknown, Index: 0, NonSpeculativeIndex: 
0},
+                               },
+                       },
                },
-       } {
+       }
+       for _, test := range tests {
                t.Run(fmt.Sprintf("%v", test.coder), func(t *testing.T) {
                        var buf bytes.Buffer
                        enc := MakeElementEncoder(test.coder)
@@ -132,7 +146,7 @@ func compareFV(t *testing.T, got *FullValue, want 
*FullValue) {
                if gotFv, ok := got.Elm.(*FullValue); ok {
                        compareFV(t, gotFv, wantFv)
                }
-       } else if got, want := got.Elm, want.Elm; got != want {
+       } else if got, want := got.Elm, want.Elm; !cmp.Equal(want, got) {
                t.Errorf("got %v [type: %s], want %v [type %s]",
                        got, reflect.TypeOf(got), wantFv, 
reflect.TypeOf(wantFv))
        }
diff --git a/sdks/go/pkg/beam/core/runtime/exec/fn.go 
b/sdks/go/pkg/beam/core/runtime/exec/fn.go
index eaf9df81e4a..d0fdb8e3630 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/fn.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/fn.go
@@ -28,6 +28,7 @@ import (
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
 )
 
 //go:generate specialize --input=fn_arity.tmpl
@@ -69,16 +70,47 @@ func (bf *bundleFinalizer) RegisterCallback(t 
time.Duration, cb func() error) {
 }
 
 // Invoke invokes the fn with the given values. The extra values must match 
the non-main
+// side input and emitters. It returns the direct output, if any.\
+//
+// Deprecated: prefer InvokeWithOpts
+func Invoke(ctx context.Context, pn typex.PaneInfo, ws []typex.Window, ts 
typex.EventTime, fn *funcx.Fn, opt *MainInput, bf *bundleFinalizer, we 
sdf.WatermarkEstimator, sa UserStateAdapter, sr StateReader, extra ...any) 
(*FullValue, error) {
+       if fn == nil {
+               return nil, nil // ok: nothing to Invoke
+       }
+       inv := newInvoker(fn)
+       return inv.invokeWithOpts(ctx, pn, ws, ts, InvokeOpts{opt: opt, bf: bf, 
we: we, sa: sa, sr: sr, extra: extra})
+}
+
+// InvokeOpts are optional parameters to invoke a Fn.
+type InvokeOpts struct {
+       opt   *MainInput
+       bf    *bundleFinalizer
+       we    sdf.WatermarkEstimator
+       sa    UserStateAdapter
+       sr    StateReader
+       ta    UserTimerAdapter
+       tm    DataManager
+       extra []any
+}
+
+// InvokeWithOpts invokes the fn with the given values. The extra values must 
match the non-main
 // side input and emitters. It returns the direct output, if any.
-func Invoke(ctx context.Context, pn typex.PaneInfo, ws []typex.Window, ts 
typex.EventTime, fn *funcx.Fn, opt *MainInput, bf *bundleFinalizer, we 
sdf.WatermarkEstimator, sa UserStateAdapter, reader StateReader, extra ...any) 
(*FullValue, error) {
+func InvokeWithOpts(ctx context.Context, fn *funcx.Fn, pn typex.PaneInfo, ws 
[]typex.Window, ts typex.EventTime, opts InvokeOpts) (*FullValue, error) {
        if fn == nil {
                return nil, nil // ok: nothing to Invoke
        }
        inv := newInvoker(fn)
-       return inv.Invoke(ctx, pn, ws, ts, opt, bf, we, sa, reader, extra...)
+       return inv.invokeWithOpts(ctx, pn, ws, ts, opts)
+}
+
+// InvokeWithOptsWithoutEventTime runs the given function at time 0 in the 
global window.
+func InvokeWithOptsWithoutEventTime(ctx context.Context, fn *funcx.Fn, opts 
InvokeOpts) (*FullValue, error) {
+       return InvokeWithOpts(ctx, fn, typex.NoFiringPane(), 
window.SingleGlobalWindow, mtime.ZeroTimestamp, opts)
 }
 
 // InvokeWithoutEventTime runs the given function at time 0 in the global 
window.
+//
+// Deprecated: prefer InvokeWithOptsWithoutEventTime
 func InvokeWithoutEventTime(ctx context.Context, fn *funcx.Fn, opt *MainInput, 
bf *bundleFinalizer, we sdf.WatermarkEstimator, sa UserStateAdapter, reader 
StateReader, extra ...any) (*FullValue, error) {
        if fn == nil {
                return nil, nil // ok: nothing to Invoke
@@ -93,10 +125,12 @@ type invoker struct {
        fn   *funcx.Fn
        args []any
        sp   *stateProvider
+       tp   *timerProvider
+
        // TODO(lostluck):  2018/07/06 consider replacing with a slice of 
functions to run over the args slice, as an improvement.
-       ctxIdx, pnIdx, wndIdx, etIdx, bfIdx, weIdx, spIdx int   // specialized 
input indexes
-       outEtIdx, outPcIdx, outErrIdx                     int   // specialized 
output indexes
-       in, out                                           []int // general 
indexes
+       ctxIdx, pnIdx, wndIdx, etIdx, bfIdx, weIdx, spIdx, tpIdx int   // 
specialized input indexes
+       outEtIdx, outPcIdx, outErrIdx                            int   // 
specialized output indexes
+       in, out                                                  []int // 
general indexes
 
        ret                     FullValue     // ret is a cached allocation for 
passing to the next Unit. Units never modify the passed in FullValue.
        elmConvert, elm2Convert func(any) any // Cached conversion functions, 
which assums this invoker is always used with the same parameter types.
@@ -129,6 +163,9 @@ func newInvoker(fn *funcx.Fn) *invoker {
        if n.spIdx, ok = fn.StateProvider(); !ok {
                n.spIdx = -1
        }
+       if n.tpIdx, ok = fn.TimerProvider(); !ok {
+               n.tpIdx = -1
+       }
        if n.outEtIdx, ok = fn.OutEventTime(); !ok {
                n.outEtIdx = -1
        }
@@ -163,7 +200,11 @@ func (n *invoker) InvokeWithoutEventTime(ctx 
context.Context, opt *MainInput, bf
 
 // Invoke invokes the fn with the given values. The extra values must match 
the non-main
 // side input and emitters. It returns the direct output, if any.
-func (n *invoker) Invoke(ctx context.Context, pn typex.PaneInfo, ws 
[]typex.Window, ts typex.EventTime, opt *MainInput, bf *bundleFinalizer, we 
sdf.WatermarkEstimator, sa UserStateAdapter, reader StateReader, extra ...any) 
(*FullValue, error) {
+func (n *invoker) Invoke(ctx context.Context, pn typex.PaneInfo, ws 
[]typex.Window, ts typex.EventTime, opt *MainInput, bf *bundleFinalizer, we 
sdf.WatermarkEstimator, sa UserStateAdapter, sr StateReader, extra ...any) 
(*FullValue, error) {
+       return n.invokeWithOpts(ctx, pn, ws, ts, InvokeOpts{opt: opt, bf: bf, 
we: we, sa: sa, sr: sr, extra: extra})
+}
+
+func (n *invoker) invokeWithOpts(ctx context.Context, pn typex.PaneInfo, ws 
[]typex.Window, ts typex.EventTime, opts InvokeOpts) (*FullValue, error) {
        // (1) Populate contexts
        // extract these to make things easier to read.
        args := n.args
@@ -178,7 +219,7 @@ func (n *invoker) Invoke(ctx context.Context, pn 
typex.PaneInfo, ws []typex.Wind
        }
        if n.wndIdx >= 0 {
                if len(ws) != 1 {
-                       return nil, errors.Errorf("DoFns that observe windows 
must be invoked with single window: %v", opt.Key.Windows)
+                       return nil, errors.Errorf("DoFns that observe windows 
must be invoked with single window: %v", opts.opt.Key.Windows)
                }
                args[n.wndIdx] = ws[0]
        }
@@ -186,14 +227,14 @@ func (n *invoker) Invoke(ctx context.Context, pn 
typex.PaneInfo, ws []typex.Wind
                args[n.etIdx] = ts
        }
        if n.bfIdx >= 0 {
-               args[n.bfIdx] = bf
+               args[n.bfIdx] = opts.bf
        }
        if n.weIdx >= 0 {
-               args[n.weIdx] = we
+               args[n.weIdx] = opts.we
        }
 
        if n.spIdx >= 0 {
-               sp, err := sa.NewStateProvider(ctx, reader, ws[0], opt)
+               sp, err := opts.sa.NewStateProvider(ctx, opts.sr, ws[0], 
opts.opt)
                if err != nil {
                        return nil, err
                }
@@ -201,29 +242,38 @@ func (n *invoker) Invoke(ctx context.Context, pn 
typex.PaneInfo, ws []typex.Wind
                args[n.spIdx] = n.sp
        }
 
+       if n.tpIdx >= 0 {
+               log.Debugf(ctx, "timercall %+v", opts)
+               tp, err := opts.ta.NewTimerProvider(ctx, opts.tm, ws, opts.opt)
+               if err != nil {
+                       return nil, err
+               }
+               n.tp = &tp
+               args[n.tpIdx] = n.tp
+       }
        // (2) Main input from value, if any.
        i := 0
-       if opt != nil {
-               if opt.RTracker != nil {
-                       args[in[i]] = opt.RTracker
+       if opts.opt != nil {
+               if opts.opt.RTracker != nil {
+                       args[in[i]] = opts.opt.RTracker
                        i++
                }
                if n.elmConvert == nil {
-                       from := reflect.TypeOf(opt.Key.Elm)
+                       from := reflect.TypeOf(opts.opt.Key.Elm)
                        n.elmConvert = ConvertFn(from, fn.Param[in[i]].T)
                }
-               args[in[i]] = n.elmConvert(opt.Key.Elm)
+               args[in[i]] = n.elmConvert(opts.opt.Key.Elm)
                i++
-               if opt.Key.Elm2 != nil {
+               if opts.opt.Key.Elm2 != nil {
                        if n.elm2Convert == nil {
-                               from := reflect.TypeOf(opt.Key.Elm2)
+                               from := reflect.TypeOf(opts.opt.Key.Elm2)
                                n.elm2Convert = ConvertFn(from, 
fn.Param[in[i]].T)
                        }
-                       args[in[i]] = n.elm2Convert(opt.Key.Elm2)
+                       args[in[i]] = n.elm2Convert(opts.opt.Key.Elm2)
                        i++
                }
 
-               for _, iter := range opt.Values {
+               for _, iter := range opts.opt.Values {
                        param := fn.Param[in[i]]
 
                        if param.Kind != funcx.FnIter {
@@ -243,7 +293,7 @@ func (n *invoker) Invoke(ctx context.Context, pn 
typex.PaneInfo, ws []typex.Wind
        }
 
        // (3) Precomputed side input and emitters (or other output).
-       for _, arg := range extra {
+       for _, arg := range opts.extra {
                args[in[i]] = arg
                i++
        }
diff --git a/sdks/go/pkg/beam/core/runtime/exec/pardo.go 
b/sdks/go/pkg/beam/core/runtime/exec/pardo.go
index 8cb5342ded8..be5c71f2c75 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/pardo.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/pardo.go
@@ -48,8 +48,10 @@ type ParDo struct {
        bf       *bundleFinalizer
        we       sdf.WatermarkEstimator
 
-       reader StateReader
-       cache  *cacheElm
+       Timer        UserTimerAdapter
+       timerManager DataManager
+       reader       StateReader
+       cache        *cacheElm
 
        status Status
        err    errorx.GuardedError
@@ -88,7 +90,7 @@ func (n *ParDo) Up(ctx context.Context) error {
        // Subsequent bundles might run this same node, and the context here 
would be
        // incorrectly refering to the older bundleId.
        setupCtx := metrics.SetPTransformID(ctx, n.PID)
-       if _, err := InvokeWithoutEventTime(setupCtx, n.Fn.SetupFn(), nil, nil, 
nil, nil, nil); err != nil {
+       if _, err := InvokeWithOptsWithoutEventTime(setupCtx, n.Fn.SetupFn(), 
InvokeOpts{}); err != nil {
                return n.fail(err)
        }
 
@@ -111,6 +113,7 @@ func (n *ParDo) StartBundle(ctx context.Context, id string, 
data DataContext) er
        }
        n.status = Active
        n.reader = data.State
+       n.timerManager = data.Data
        // Allocating contexts all the time is expensive, but we seldom 
re-write them,
        // and never accept modified contexts from users, so we will cache them 
per-bundle
        // per-unit, to avoid the constant allocation overhead.
@@ -236,6 +239,7 @@ func (n *ParDo) FinishBundle(_ context.Context) error {
        }
        n.reader = nil
        n.cache = nil
+       n.timerManager = nil
 
        if err := MultiFinishBundle(n.ctx, n.Out...); err != nil {
                return n.fail(err)
@@ -251,8 +255,9 @@ func (n *ParDo) Down(ctx context.Context) error {
        n.status = Down
        n.reader = nil
        n.cache = nil
+       n.timerManager = nil
 
-       if _, err := InvokeWithoutEventTime(ctx, n.Fn.TeardownFn(), nil, nil, 
nil, nil, nil); err != nil {
+       if _, err := InvokeWithOptsWithoutEventTime(ctx, n.Fn.TeardownFn(), 
InvokeOpts{}); err != nil {
                n.err.TrySetError(err)
        }
        return n.err.Error()
@@ -356,7 +361,7 @@ func (n *ParDo) invokeProcessFn(ctx context.Context, pn 
typex.PaneInfo, ws []typ
        if err := n.preInvoke(ctx, ws, ts); err != nil {
                return nil, err
        }
-       val, err = n.inv.Invoke(ctx, pn, ws, ts, opt, n.bf, n.we, n.UState, 
n.reader, n.cache.extra...)
+       val, err = n.inv.invokeWithOpts(ctx, pn, ws, ts, InvokeOpts{opt: opt, 
bf: n.bf, we: n.we, sa: n.UState, sr: n.reader, ta: n.Timer, tm: 
n.timerManager, extra: n.cache.extra})
        if err != nil {
                return nil, err
        }
diff --git a/sdks/go/pkg/beam/core/runtime/exec/timers.go 
b/sdks/go/pkg/beam/core/runtime/exec/timers.go
new file mode 100644
index 00000000000..0ceed0d2ebd
--- /dev/null
+++ b/sdks/go/pkg/beam/core/runtime/exec/timers.go
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package exec
+
+import (
+       "context"
+       "fmt"
+       "io"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/timers"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+)
+
+type UserTimerAdapter interface {
+       NewTimerProvider(ctx context.Context, manager DataManager, w 
[]typex.Window, element *MainInput) (timerProvider, error)
+}
+
+type userTimerAdapter struct {
+       SID            StreamID
+       wc             WindowEncoder
+       kc             ElementEncoder
+       timerIDToCoder map[string]*coder.Coder
+       C              *coder.Coder
+}
+
+func NewUserTimerAdapter(sID StreamID, c *coder.Coder, timerCoders 
map[string]*coder.Coder) UserTimerAdapter {
+       if !coder.IsW(c) {
+               panic(fmt.Sprintf("expected WV coder for user timer %v: %v", 
sID, c))
+       }
+
+       wc := MakeWindowEncoder(c.Window)
+       var kc ElementEncoder
+       if coder.IsKV(coder.SkipW(c)) {
+               kc = MakeElementEncoder(coder.SkipW(c).Components[0])
+       }
+
+       return &userTimerAdapter{SID: sID, wc: wc, kc: kc, C: c, 
timerIDToCoder: timerCoders}
+}
+
+func (u *userTimerAdapter) NewTimerProvider(ctx context.Context, manager 
DataManager, w []typex.Window, element *MainInput) (timerProvider, error) {
+       if u.kc == nil {
+               return timerProvider{}, fmt.Errorf("cannot make a state 
provider for an unkeyed input %v", element)
+       }
+       elementKey, err := EncodeElement(u.kc, element.Key.Elm)
+       if err != nil {
+               return timerProvider{}, err
+       }
+
+       // win, err := EncodeWindow(u.wc, w[0])
+       // if err != nil {
+       //      return timerProvider{}, err
+       // }
+       tp := timerProvider{
+               ctx:             ctx,
+               tm:              manager,
+               elementKey:      elementKey,
+               SID:             u.SID,
+               window:          w,
+               writersByFamily: make(map[string]io.Writer),
+               codersByFamily:  u.timerIDToCoder,
+       }
+
+       return tp, nil
+}
+
+type timerProvider struct {
+       ctx        context.Context
+       tm         DataManager
+       SID        StreamID
+       elementKey []byte
+       window     []typex.Window
+
+       pn typex.PaneInfo
+
+       writersByFamily map[string]io.Writer
+       codersByFamily  map[string]*coder.Coder
+}
+
+func (p *timerProvider) getWriter(family string) (io.Writer, error) {
+       if w, ok := p.writersByFamily[family]; ok {
+               return w, nil
+       } else {
+               w, err := p.tm.OpenTimerWrite(p.ctx, p.SID, family)
+               if err != nil {
+                       return nil, err
+               }
+               p.writersByFamily[family] = w
+               return p.writersByFamily[family], nil
+       }
+}
+
+func (p *timerProvider) Set(t timers.TimerMap) {
+       w, err := p.getWriter(t.Family)
+       if err != nil {
+               panic(err)
+       }
+       tm := typex.TimerMap{
+               Key:           string(p.elementKey),
+               Tag:           t.Tag,
+               Windows:       p.window,
+               Clear:         t.Clear,
+               FireTimestamp: t.FireTimestamp,
+               HoldTimestamp: t.HoldTimestamp,
+               Pane:          p.pn,
+       }
+       fv := FullValue{Elm: tm}
+       enc := MakeElementEncoder(coder.SkipW(p.codersByFamily[t.Family]))
+       if err := enc.Encode(&fv, w); err != nil {
+               panic(err)
+       }
+}
diff --git a/sdks/go/pkg/beam/core/runtime/exec/translate.go 
b/sdks/go/pkg/beam/core/runtime/exec/translate.go
index 78cf0ef65cd..0403f0ab0ab 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/translate.go
@@ -16,6 +16,7 @@
 package exec
 
 import (
+       "context"
        "fmt"
        "math/rand"
        "strconv"
@@ -30,6 +31,7 @@ import (
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/protox"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
        fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
        pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
        "github.com/golang/protobuf/proto"
@@ -462,6 +464,7 @@ func (b *builder) makeLink(from string, id linkID) (Node, 
error) {
                var data string
                var sides map[string]*pipepb.SideInput
                var userState map[string]*pipepb.StateSpec
+               var userTimers map[string]*pipepb.TimerFamilySpec
                switch urn {
                case graphx.URNParDo,
                        urnPairWithRestriction,
@@ -475,6 +478,7 @@ func (b *builder) makeLink(from string, id linkID) (Node, 
error) {
                        data = string(pardo.GetDoFn().GetPayload())
                        sides = pardo.GetSideInputs()
                        userState = pardo.GetStateSpecs()
+                       userTimers = pardo.GetTimerFamilySpecs()
                case urnPerKeyCombinePre, urnPerKeyCombineMerge, 
urnPerKeyCombineExtract, urnPerKeyCombineConvert:
                        var cmb pipepb.CombinePayload
                        if err := proto.Unmarshal(payload, &cmb); err != nil {
@@ -587,6 +591,24 @@ func (b *builder) makeLink(from string, id linkID) (Node, 
error) {
                                                        n.UState = 
NewUserStateAdapter(sid, coder.NewW(ec, wc), stateIDToCoder, stateIDToKeyCoder, 
stateIDToCombineFn)
                                                }
                                        }
+                                       if len(userTimers) > 0 {
+                                               log.Debugf(context.TODO(), 
"userTimers %+v", userTimers)
+                                               timerIDToCoder := 
make(map[string]*coder.Coder)
+                                               for key, spec := range 
userTimers {
+                                                       cID := 
spec.GetTimerFamilyCoderId()
+                                                       c, err := 
b.coders.Coder(cID)
+                                                       if err != nil {
+                                                               return nil, err
+                                                       }
+                                                       timerIDToCoder[key] = c
+                                                       sID := StreamID{Port: 
Port{URL: b.desc.GetTimerApiServiceDescriptor().GetUrl()}, PtransformID: id.to}
+                                                       ec, wc, err := 
b.makeCoderForPCollection(input[0])
+                                                       if err != nil {
+                                                               return nil, err
+                                                       }
+                                                       n.Timer = 
NewUserTimerAdapter(sID, coder.NewW(ec, wc), timerIDToCoder)
+                                               }
+                                       }
 
                                        for i := 1; i < len(input); i++ {
                                                // 
TODO(https://github.com/apache/beam/issues/18602) Handle ViewFns for side inputs
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/coder.go 
b/sdks/go/pkg/beam/core/runtime/graphx/coder.go
index 498e145f5db..34b44dd8592 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/coder.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/coder.go
@@ -73,7 +73,7 @@ func knownStandardCoders() []string {
                urnIntervalWindow,
                urnRowCoder,
                urnNullableCoder,
-               // TODO(https://github.com/apache/beam/issues/20510): Add 
urnTimerCoder once finalized.
+               urnTimerCoder,
        }
 }
 
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/serialize.go 
b/sdks/go/pkg/beam/core/runtime/graphx/serialize.go
index 65ad1bdd060..fb62e18e130 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/serialize.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/serialize.go
@@ -28,6 +28,7 @@ import (
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime"
        v1pb "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx/v1"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/state"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/timers"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/jsonx"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
@@ -523,6 +524,8 @@ func tryEncodeSpecial(t reflect.Type) (v1pb.Type_Special, 
bool) {
                return v1pb.Type_BUNDLEFINALIZATION, true
        case state.ProviderType:
                return v1pb.Type_STATEPROVIDER, true
+       case timers.ProviderType:
+               return v1pb.Type_TIMERPROVIDER, true
        case typex.KVType:
                return v1pb.Type_KV, true
        case typex.CoGBKType:
@@ -689,6 +692,8 @@ func decodeSpecial(s v1pb.Type_Special) (reflect.Type, 
error) {
                return typex.BundleFinalizationType, nil
        case v1pb.Type_STATEPROVIDER:
                return state.ProviderType, nil
+       case v1pb.Type_TIMERPROVIDER:
+               return timers.ProviderType, nil
        case v1pb.Type_KV:
                return typex.KVType, nil
        case v1pb.Type_COGBK:
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go 
b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
index 68074ac7eb3..a427f22f882 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
@@ -578,6 +578,21 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) 
([]string, error) {
                        }
                        payload.StateSpecs = stateSpecs
                }
+               if _, ok := edge.Edge.DoFn.ProcessElementFn().TimerProvider(); 
ok {
+                       m.requirements[URNRequiresStatefulProcessing] = true
+                       timerSpecs := make(map[string]*pipepb.TimerFamilySpec)
+                       for _, pt := range edge.Edge.DoFn.PipelineTimers() {
+                               coderID, err := 
m.coders.Add(edge.Edge.TimerCoders[pt.TimerFamily()])
+                               if err != nil {
+                                       return handleErr(err)
+                               }
+                               timerSpecs[pt.TimerFamily()] = 
&pipepb.TimerFamilySpec{
+                                       TimeDomain:         
pipepb.TimeDomain_Enum(pt.TimerDomain()),
+                                       TimerFamilyCoderId: coderID,
+                               }
+                       }
+                       payload.TimerFamilySpecs = timerSpecs
+               }
                spec = &pipepb.FunctionSpec{Urn: URNParDo, Payload: 
protox.MustEncode(payload)}
                annotations = edge.Edge.DoFn.Annotations()
 
diff --git a/sdks/go/pkg/beam/core/timers/timers.go 
b/sdks/go/pkg/beam/core/timers/timers.go
new file mode 100644
index 00000000000..130564790ca
--- /dev/null
+++ b/sdks/go/pkg/beam/core/timers/timers.go
@@ -0,0 +1,119 @@
+// Licensed to the Apache SoFiringTimestampware Foundation (ASF) under one or 
more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, 
soFiringTimestampware
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package timer provides structs for reading and writing timers.
+package timers
+
+import (
+       "context"
+       "reflect"
+       "time"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
+)
+
+var (
+       ProviderType = reflect.TypeOf((*Provider)(nil)).Elem()
+)
+
+type TimeDomainEnum int32
+
+const (
+       TimeDomainUnspecified    TimeDomainEnum = 0
+       TimeDomainEventTime      TimeDomainEnum = 1
+       TimeDomainProcessingTime TimeDomainEnum = 2
+)
+
+type TimerMap struct {
+       Family                       string
+       Tag                          string
+       Clear                        bool
+       FireTimestamp, HoldTimestamp mtime.Time
+}
+
+type Provider interface {
+       Set(t TimerMap)
+}
+
+type EventTime struct {
+       // need to export them otherwise the key comes out empty in execution?
+       Family string
+}
+
+func (t EventTime) Set(p Provider, firingTimestamp time.Time) {
+       fire := mtime.FromTime(firingTimestamp)
+       // Hold timestamp must match fireing timestamp if not otherwise set.
+       p.Set(TimerMap{Family: t.Family, FireTimestamp: fire, HoldTimestamp: 
fire})
+}
+
+type Opts struct {
+       Tag  string
+       Hold time.Time
+}
+
+func (t *EventTime) SetWithOpts(p Provider, firingTimestamp time.Time, opts 
Opts) {
+       fire := mtime.FromTime(firingTimestamp)
+       // Hold timestamp must match fireing timestamp if not otherwise set.
+       tm := TimerMap{Family: t.Family, Tag: opts.Tag, FireTimestamp: fire, 
HoldTimestamp: fire}
+       if !opts.Hold.IsZero() {
+               tm.HoldTimestamp = mtime.FromTime(opts.Hold)
+       }
+       p.Set(tm)
+}
+
+func (e EventTime) TimerFamily() string {
+       return e.Family
+}
+
+func (e EventTime) TimerDomain() TimeDomainEnum {
+       return TimeDomainEventTime
+}
+
+type ProcessingTime struct {
+       Family string
+}
+
+func (e ProcessingTime) TimerFamily() string {
+       return e.Family
+}
+
+func (e ProcessingTime) TimerDomain() TimeDomainEnum {
+       return TimeDomainProcessingTime
+}
+
+func (t ProcessingTime) Set(p Provider, firingTimestamp time.Time) {
+       log.Infof(context.Background(), "setting timer in core/timer: %+v", t)
+       fire := mtime.FromTime(firingTimestamp)
+       p.Set(TimerMap{Family: t.Family, FireTimestamp: fire, HoldTimestamp: 
fire})
+}
+
+func (t ProcessingTime) SetWithOpts(p Provider, firingTimestamp time.Time, 
opts Opts) {
+       fire := mtime.FromTime(firingTimestamp)
+       // Hold timestamp must match fireing timestamp if not otherwise set.
+       tm := TimerMap{Family: t.Family, Tag: opts.Tag, FireTimestamp: fire, 
HoldTimestamp: fire}
+       if !opts.Hold.IsZero() {
+               tm.HoldTimestamp = mtime.FromTime(opts.Hold)
+       }
+       p.Set(tm)
+}
+
+func InEventTime(Key string) EventTime {
+       return EventTime{Family: Key}
+}
+
+func InProcessingTime(Key string) ProcessingTime {
+       return ProcessingTime{Family: Key}
+}
diff --git a/sdks/go/pkg/beam/core/typex/class.go 
b/sdks/go/pkg/beam/core/typex/class.go
index e112495ee98..63e4543a3e5 100644
--- a/sdks/go/pkg/beam/core/typex/class.go
+++ b/sdks/go/pkg/beam/core/typex/class.go
@@ -120,6 +120,7 @@ func isConcrete(t reflect.Type, visited map[uintptr]bool) 
error {
                t == EventTimeType ||
                t.Implements(WindowType) ||
                t == PaneInfoType ||
+               t == TimersType ||
                t == BundleFinalizationType ||
                t == reflectx.Error ||
                t == reflectx.Context ||
diff --git a/sdks/go/pkg/beam/core/typex/special.go 
b/sdks/go/pkg/beam/core/typex/special.go
index 93537122584..4cce19dcfbd 100644
--- a/sdks/go/pkg/beam/core/typex/special.go
+++ b/sdks/go/pkg/beam/core/typex/special.go
@@ -107,7 +107,8 @@ type Timers struct {
 
 // TimerMap is a placeholder for timer details used in encoding/decoding.
 type TimerMap struct {
-       Key, Tag                     string
+       Key                          string
+       Tag                          string
        Windows                      []Window // []typex.Window
        Clear                        bool
        FireTimestamp, HoldTimestamp mtime.Time
diff --git a/sdks/go/pkg/beam/pardo.go b/sdks/go/pkg/beam/pardo.go
index e1cdc4f417c..f2c9303b016 100644
--- a/sdks/go/pkg/beam/pardo.go
+++ b/sdks/go/pkg/beam/pardo.go
@@ -116,6 +116,18 @@ func TryParDo(s Scope, dofn any, col PCollection, opts 
...Option) ([]PCollection
                }
        }
 
+       wc := inWfn.Coder()
+       pipelineTimers := fn.PipelineTimers()
+       if len(pipelineTimers) > 0 {
+               // TODO(riteshghorse): replace the coder with type of key
+               c := coder.NewString()
+               edge.TimerCoders = make(map[string]*coder.Coder)
+               for _, pt := range pipelineTimers {
+                       tc := coder.NewT(c, wc)
+                       edge.TimerCoders[pt.TimerFamily()] = tc
+               }
+       }
+
        var ret []PCollection
        for _, out := range edge.Output {
                c := PCollection{out.To}


Reply via email to