riteshghorse commented on code in PR #26782:
URL: https://github.com/apache/beam/pull/26782#discussion_r1213735552
##########
sdks/go/pkg/beam/core/runtime/exec/pardo.go:
##########
@@ -362,47 +370,128 @@ func (n *ParDo) invokeDataFn(ctx context.Context, pn
typex.PaneInfo, ws []typex.
return val, nil
}
-func (n *ParDo) InvokeTimerFn(ctx context.Context, fn *funcx.Fn, timerFamilyID
string, bcr *byteCountReader) (*FullValue, error) {
- timerAdapter, ok := n.Timer.(*userTimerAdapter)
- if !ok {
- return nil, fmt.Errorf("userTimerAdapter empty for ParDo: %v",
n.GetPID())
+// decodeBundleTimers is a helper to decode a batch of timers for a bundle,
handling the io.EOF from the reader.
+func decodeBundleTimers(spec timerFamilySpec, r io.Reader) ([]TimerRecv,
error) {
+ var bundleTimers []TimerRecv
+ for {
+ tmap, err := decodeTimer(spec.KeyDecoder, spec.WinDecoder, r)
+ if err != nil {
+ if goerrors.Is(err, io.EOF) {
+ break
+ }
+ return nil, errors.WithContext(err, "error decoding
received timer callback")
+ }
+ bundleTimers = append(bundleTimers, tmap)
}
- tmap, err := decodeTimer(timerAdapter.dc, timerAdapter.wc, bcr)
+ return bundleTimers, nil
+}
+
+// ProcessTimers processes all timers in firing order from the runner for a
timer family ID.
+//
+// A timer refers to a specific combination of Key+Window + Family + Tag. They
also
+// have a fireing time, and a data watermark hold time. The SDK doesn't
determine
+// if a timer is ready to fire or not, that's up to the runner.
+//
+// This method fires timers in the order from the runner. During this process,
the user
+// code may set additional firings for one or more timers, which may overwrite
orderings
+// from the runner.
+//
+// In particular, if runner sent timer produces a new firing that is earlier
than a 2nd runner sent timer,
+// then it is processed before that 2nd timer. This will override any
subsequent firing of the same timer,
+// and as a result, must add a clear to the set of timer modifications.
+func (n *ParDo) ProcessTimers(timerFamilyID string, r io.Reader) (err error) {
+ // Lookup actual domain for family here.
+ spec := n.TimerTracker.familyToSpec[timerFamilyID]
+
+ bundleTimers, err := decodeBundleTimers(spec, r)
if err != nil {
- return nil, errors.WithContext(err, "error decoding received
timer callback")
+ return err
}
+ for _, tmap := range bundleTimers {
+ n.TimerTracker.SetCurrentKeyString(tmap.KeyString)
+ for i, w := range tmap.Windows {
+ ws := tmap.Windows[i : i+1]
Review Comment:
got it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]