shunping commented on code in PR #35106:
URL: https://github.com/apache/beam/pull/35106#discussion_r2116938563
##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -1345,18 +1341,28 @@ func (*statefulStageKind) addPending(ss *stageState, em
*ElementManager, newPend
heap.Push(&dnt.elements, e)
if e.IsTimer() {
- if lastSet, ok := dnt.timers[timerKey{family: e.family,
tag: e.tag, window: e.window}]; ok {
+ lastSet, ok := dnt.timers[timerKey{family: e.family,
tag: e.tag, window: e.window}]
+ if ok {
// existing timer!
// don't increase the count this time, as
"this" timer is already pending.
count--
// clear out the existing hold for accounting
purposes.
ss.watermarkHolds.Drop(lastSet.hold, 1)
}
- // Update the last set time on the timer.
- dnt.timers[timerKey{family: e.family, tag: e.tag,
window: e.window}] = timerTimes{firing: e.timestamp, hold: e.holdTimestamp}
+ if e.sequence >= 0 {
+ // Update the last set time on the timer.
+ dnt.timers[timerKey{family: e.family, tag:
e.tag, window: e.window}] = timerTimes{firing: e.timestamp, hold:
e.holdTimestamp}
- // Mark the hold in the heap.
- ss.watermarkHolds.Add(e.holdTimestamp, 1)
+ // Mark the hold in the heap.
+ ss.watermarkHolds.Add(e.holdTimestamp, 1)
+ } else {
+ // we need to decrement the pending count only
if the timer to be cleared is in the pending list
+ if ok {
+ count--
+ }
Review Comment:
If we clear a timer without setting it, is it gonna to hit this code path?
Let me verify.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]