riteshghorse commented on code in PR #26782:
URL: https://github.com/apache/beam/pull/26782#discussion_r1213300505


##########
sdks/go/pkg/beam/core/timers/timers.go:
##########
@@ -139,12 +154,17 @@ func (pt ProcessingTime) Clear(p Provider) {
        p.Set(TimerMap{Family: pt.Family, Clear: true})
 }
 
+// Clear clears this timer for the given tag.

Review Comment:
   ```suggestion
   // ClearTag clears this timer for the given tag.
   ```



##########
sdks/go/pkg/beam/core/timers/timers.go:
##########
@@ -105,10 +115,15 @@ func (et *EventTime) Set(p Provider, FiringTimestamp 
time.Time, opts ...timerOpt
 }
 
 // Clear clears this timer.
-func (et *EventTime) Clear(p Provider) {
+func (et EventTime) Clear(p Provider) {
        p.Set(TimerMap{Family: et.Family, Clear: true})
 }
 
+// Clear clears this timer for the given tag.

Review Comment:
   ```suggestion
   // ClearTag clears this timer for the given tag.
   ```



##########
sdks/go/pkg/beam/core/runtime/exec/pardo.go:
##########
@@ -362,47 +370,128 @@ func (n *ParDo) invokeDataFn(ctx context.Context, pn 
typex.PaneInfo, ws []typex.
        return val, nil
 }
 
-func (n *ParDo) InvokeTimerFn(ctx context.Context, fn *funcx.Fn, timerFamilyID 
string, bcr *byteCountReader) (*FullValue, error) {
-       timerAdapter, ok := n.Timer.(*userTimerAdapter)
-       if !ok {
-               return nil, fmt.Errorf("userTimerAdapter empty for ParDo: %v", 
n.GetPID())
+// decodeBundleTimers is a helper to decode a batch of timers for a bundle, 
handling the io.EOF from the reader.
+func decodeBundleTimers(spec timerFamilySpec, r io.Reader) ([]TimerRecv, 
error) {
+       var bundleTimers []TimerRecv
+       for {
+               tmap, err := decodeTimer(spec.KeyDecoder, spec.WinDecoder, r)
+               if err != nil {
+                       if goerrors.Is(err, io.EOF) {
+                               break
+                       }
+                       return nil, errors.WithContext(err, "error decoding 
received timer callback")
+               }
+               bundleTimers = append(bundleTimers, tmap)
        }
-       tmap, err := decodeTimer(timerAdapter.dc, timerAdapter.wc, bcr)
+       return bundleTimers, nil
+}
+
+// ProcessTimers processes all timers in firing order from the runner for a 
timer family ID.
+//
+// A timer refers to a specific combination of Key+Window + Family + Tag. They 
also
+// have a fireing time, and a data watermark hold time. The SDK doesn't 
determine
+// if a timer is ready to fire or not, that's up to the runner.
+//
+// This method fires timers in the order from the runner. During this process, 
the user
+// code may set additional firings for one or more timers, which may overwrite 
orderings
+// from the runner.
+//
+// In particular, if runner sent timer produces a new firing that is earlier 
than a 2nd runner sent timer,
+// then it is processed before that 2nd timer. This will override any 
subsequent firing of the same timer,
+// and as a result, must add a clear to the set of timer modifications.
+func (n *ParDo) ProcessTimers(timerFamilyID string, r io.Reader) (err error) {
+       // Lookup actual domain for family here.
+       spec := n.TimerTracker.familyToSpec[timerFamilyID]
+
+       bundleTimers, err := decodeBundleTimers(spec, r)
        if err != nil {
-               return nil, errors.WithContext(err, "error decoding received 
timer callback")
+               return err
        }
+       for _, tmap := range bundleTimers {
+               n.TimerTracker.SetCurrentKeyString(tmap.KeyString)
+               for i, w := range tmap.Windows {
+                       ws := tmap.Windows[i : i+1]

Review Comment:
   is it okay to use `w` here instead of `tmap.Windows[i : i+1]`



##########
sdks/go/pkg/beam/core/runtime/exec/pardo.go:
##########
@@ -249,6 +253,10 @@ func (n *ParDo) FinishBundle(_ context.Context) error {
        if _, err := n.invokeDataFn(n.ctx, typex.NoFiringPane(), 
window.SingleGlobalWindow, mtime.ZeroTimestamp, n.Fn.FinishBundleFn(), nil); 
err != nil {
                return n.fail(err)
        }
+       // Flush timers if any.
+       if err := n.TimerTracker.FlushAndReset(n.ctx, n.timerManager); err != 
nil {

Review Comment:
   Similar comment as above if we need to check for nil



##########
sdks/go/pkg/beam/core/runtime/exec/timers.go:
##########
@@ -16,112 +16,314 @@
 package exec
 
 import (
+       "bytes"
+       "container/heap"
        "context"
-       "fmt"
        "io"
 
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
-       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/timers"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
 )
 
-// UserTimerAdapter provides a timer provider to be used for manipulating 
timers.
-type UserTimerAdapter interface {
-       NewTimerProvider(ctx context.Context, manager DataManager, 
inputTimestamp typex.EventTime, windows []typex.Window, element *MainInput) 
(timerProvider, error)
+type userTimerAdapter struct {
+       sID           StreamID
+       familyToSpec  map[string]timerFamilySpec
+       modifications map[windowKeyPair]*timerModifications
+
+       currentKey       any
+       keyEncoded       bool
+       buf              bytes.Buffer
+       currentKeyString string
 }
 
-type userTimerAdapter struct {
-       sID StreamID
-       ec  ElementEncoder
-       dc  ElementDecoder
-       wc  WindowDecoder
+type timerFamilySpec struct {
+       Domain     timers.TimeDomain
+       KeyEncoder ElementEncoder
+       KeyDecoder ElementDecoder
+       WinEncoder WindowEncoder
+       WinDecoder WindowDecoder
 }
 
-// NewUserTimerAdapter returns a user timer adapter for the given StreamID and 
timer coder.
-func NewUserTimerAdapter(sID StreamID, c *coder.Coder, timerCoder 
*coder.Coder) UserTimerAdapter {
-       if !coder.IsW(c) {
-               panic(fmt.Sprintf("expected WV coder for user timer %v: %v", 
sID, c))
+func newTimerFamilySpec(domain timers.TimeDomain, timerCoder *coder.Coder) 
timerFamilySpec {
+       keyCoder := timerCoder.Components[0]
+       return timerFamilySpec{
+               Domain:     domain,
+               KeyEncoder: MakeElementEncoder(keyCoder),
+               KeyDecoder: MakeElementDecoder(keyCoder),
+               WinEncoder: MakeWindowEncoder(timerCoder.Window),
+               WinDecoder: MakeWindowDecoder(timerCoder.Window),
        }
-       ec := MakeElementEncoder(timerCoder)
-       dc := MakeElementDecoder(coder.SkipW(c).Components[0])
-       wc := MakeWindowDecoder(c.Window)
-       return &userTimerAdapter{sID: sID, ec: ec, wc: wc, dc: dc}
 }
 
-// NewTimerProvider creates and returns a timer provider to set/clear timers.
-func (u *userTimerAdapter) NewTimerProvider(ctx context.Context, manager 
DataManager, inputTs typex.EventTime, w []typex.Window, element *MainInput) 
(timerProvider, error) {
-       userKey := &FullValue{Elm: element.Key.Elm}
-       tp := timerProvider{
-               ctx:                 ctx,
-               tm:                  manager,
-               userKey:             userKey,
-               inputTimestamp:      inputTs,
-               sID:                 u.sID,
-               window:              w,
-               writersByFamily:     make(map[string]io.Writer),
-               timerElementEncoder: u.ec,
-               keyElementDecoder:   u.dc,
+// newUserTimerAdapter returns a user timer adapter for the given StreamID and 
timer coder.
+func newUserTimerAdapter(sID StreamID, familyToSpec 
map[string]timerFamilySpec) *userTimerAdapter {
+       return &userTimerAdapter{sID: sID, familyToSpec: familyToSpec}
+}
+
+// SetCurrentKey keeps the key around so we can encoded if needed for timers.
+func (u *userTimerAdapter) SetCurrentKey(mainIn *MainInput) {
+       if u == nil {
+               return
        }
+       u.currentKey = mainIn.Key.Elm
+       u.keyEncoded = false
+}
 
-       return tp, nil
+// SetCurrentKeyString is for processing timer callbacks, and avoids 
re-encoding the key.

Review Comment:
   NIce optimization here



##########
sdks/go/pkg/beam/core/runtime/exec/pardo.go:
##########
@@ -158,6 +161,7 @@ func (n *ParDo) ProcessElement(_ context.Context, elm 
*FullValue, values ...ReSt
 // a ParDo's ProcessElement functionality with their own construction of
 // MainInputs.
 func (n *ParDo) processMainInput(mainIn *MainInput) error {
+       n.TimerTracker.SetCurrentKey(mainIn)

Review Comment:
   should we check `n.TimerTracker` for nil here?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to