damondouglas commented on code in PR #32119:
URL: https://github.com/apache/beam/pull/32119#discussion_r1711809986
##########
sdks/go/pkg/beam/runners/prism/internal/engine/timers.go:
##########
@@ -31,53 +31,77 @@ import (
"google.golang.org/protobuf/encoding/protowire"
)
-// DecodeTimer extracts timers to elements for insertion into their keyed
queues.
-// Returns the key bytes, tag, window exploded elements, and the hold
timestamp.
+type timerRet struct {
+ keyBytes []byte
+ tag string
+ elms []element
+ windows []typex.Window
+}
+
+// decodeTimerIter extracts timers to elements for insertion into their keyed
queues,
+// through a go iterator function, to be called by the caller with their
processing function.
+//
+// For each timer, a key, tag, windowed elements, and the window set are
returned.
+//
// If the timer has been cleared, no elements will be returned. Any existing
timers
-// for the tag *must* be cleared from the pending queue.
-func decodeTimer(keyDec func(io.Reader) []byte, usesGlobalWindow bool, raw
[]byte) ([]byte, string, []element) {
- keyBytes := keyDec(bytes.NewBuffer(raw))
-
- d := decoder{raw: raw, cursor: len(keyBytes)}
- tag := string(d.Bytes())
-
- var ws []typex.Window
- numWin := d.Fixed32()
- if usesGlobalWindow {
- for i := 0; i < int(numWin); i++ {
- ws = append(ws, window.GlobalWindow{})
- }
- } else {
- // Assume interval windows here, since we don't understand
custom windows yet.
- for i := 0; i < int(numWin); i++ {
- ws = append(ws, d.IntervalWindow())
- }
- }
+// for the tag *must* be cleared from the pending queue. The windows
associated with
+// the clear are provided to be able to delete pending timers.
+func decodeTimerIter(keyDec func(io.Reader) []byte, usesGlobalWindow bool, raw
[]byte) func(yeild func(timerRet) bool) {
Review Comment:
Not blocking, perhaps later. Consider fixing `yeild` vs `yield` spelling.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]