This is an automated email from the ASF dual-hosted git repository.
shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new f9dca88eb62 Fix the issue of timer clearing not working across bundle
(#35106)
f9dca88eb62 is described below
commit f9dca88eb62829039e13ba651368b0798e00b5d9
Author: Shunping Huang <[email protected]>
AuthorDate: Fri May 30 23:28:44 2025 -0400
Fix the issue of timer clearing not working across bundle (#35106)
* Fix the issue of timer clearing not working across bundle
* Correct keep track of pending counts for cleared timers. Add some more
comments and fix typos.
* Fix pending counts for clearing signal again. Minor edits on comment.
---
.../prism/internal/engine/elementmanager.go | 23 ++++++++++++----------
.../beam/runners/prism/internal/engine/timers.go | 17 +++++++++++++---
2 files changed, 27 insertions(+), 13 deletions(-)
diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
index de997fa3282..08056ae18b2 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
@@ -53,6 +53,7 @@ type element struct {
// No synchronization is required in specifying this,
// since keyed elements are only processed by a single bundle at a time,
// if stateful stages are concerned.
+ // If a timer element has sequence set to -1, it means it is being
cleared.
sequence int
elmBytes []byte // When nil, indicates this is a timer.
@@ -959,11 +960,6 @@ func (em *ElementManager) triageTimers(d TentativeData,
inputInfo PColInfo, stag
for _, e := range ret.elms {
keyToTimers[timerKey{key:
string(ret.keyBytes), tag: ret.tag, win: e.window}] = e
}
- if len(ret.elms) == 0 {
- for _, w := range ret.windows {
- delete(keyToTimers,
timerKey{key: string(ret.keyBytes), tag: ret.tag, win: w})
- }
- }
// Indicate we'd like to continue iterating.
return true
})
@@ -1347,16 +1343,23 @@ func (*statefulStageKind) addPending(ss *stageState, em
*ElementManager, newPend
if e.IsTimer() {
if lastSet, ok := dnt.timers[timerKey{family: e.family,
tag: e.tag, window: e.window}]; ok {
// existing timer!
- // don't increase the count this time, as
"this" timer is already pending.
+ // don't increase the count this time, as "e"
is already pending.
count--
// clear out the existing hold for accounting
purposes.
ss.watermarkHolds.Drop(lastSet.hold, 1)
}
- // Update the last set time on the timer.
- dnt.timers[timerKey{family: e.family, tag: e.tag,
window: e.window}] = timerTimes{firing: e.timestamp, hold: e.holdTimestamp}
+ if e.sequence >= 0 {
+ // Update the last set time on the timer.
+ dnt.timers[timerKey{family: e.family, tag:
e.tag, window: e.window}] = timerTimes{firing: e.timestamp, hold:
e.holdTimestamp}
- // Mark the hold in the heap.
- ss.watermarkHolds.Add(e.holdTimestamp, 1)
+ // Mark the hold in the heap.
+ ss.watermarkHolds.Add(e.holdTimestamp, 1)
+ } else {
+ // decrement the pending count since "e" is not
a real timer (merely a clearing signal)
+ count--
+ // timer is to be cleared
+ delete(dnt.timers, timerKey{family: e.family,
tag: e.tag, window: e.window})
+ }
}
}
return count
diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/timers.go
b/sdks/go/pkg/beam/runners/prism/internal/engine/timers.go
index 8b90591974b..636ed5a644c 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/engine/timers.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/timers.go
@@ -86,8 +86,19 @@ func decodeTimerIter(keyDec func(io.Reader) []byte, winCoder
WinCoderType, raw [
clear := d.Bool()
hold := mtime.MaxTimestamp
if clear {
- if !yield(timerRet{keyBytes, tag, nil, ws}) {
- return // Halt iteration if yeild
returns false.
+ var elms []element
+ for _, w := range ws {
+ elms = append(elms, element{
+ tag: tag,
+ elmBytes: nil, // indicates
this is a timer.
+ keyBytes: keyBytes,
+ window: w,
+ sequence: -1, // indicates this
timer is being cleared.
+ })
+ }
+
+ if !yield(timerRet{keyBytes, tag, elms, ws}) {
+ return // Halt iteration if yield
returns false.
}
// Otherwise continue handling the remaining
bytes.
raw = d.UnusedBytes()
@@ -113,7 +124,7 @@ func decodeTimerIter(keyDec func(io.Reader) []byte,
winCoder WinCoderType, raw [
}
if !yield(timerRet{keyBytes, tag, elms, ws}) {
- return // Halt iteration if yeild returns false.
+ return // Halt iteration if yield returns false.
}
// Otherwise continue handling the remaining bytes.
raw = d.UnusedBytes()