This is an automated email from the ASF dual-hosted git repository.

shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new f9dca88eb62 Fix the issue of timer clearing not working across bundle 
(#35106)
f9dca88eb62 is described below

commit f9dca88eb62829039e13ba651368b0798e00b5d9
Author: Shunping Huang <[email protected]>
AuthorDate: Fri May 30 23:28:44 2025 -0400

    Fix the issue of timer clearing not working across bundle (#35106)
    
    * Fix the issue of timer clearing not working across bundle
    
    * Correct keep track of pending counts for cleared timers. Add some more 
comments and fix typos.
    
    * Fix pending counts for clearing signal again. Minor edits on comment.
---
 .../prism/internal/engine/elementmanager.go        | 23 ++++++++++++----------
 .../beam/runners/prism/internal/engine/timers.go   | 17 +++++++++++++---
 2 files changed, 27 insertions(+), 13 deletions(-)

diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go 
b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
index de997fa3282..08056ae18b2 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
@@ -53,6 +53,7 @@ type element struct {
        // No synchronization is required in specifying this,
        // since keyed elements are only processed by a single bundle at a time,
        // if stateful stages are concerned.
+       // If a timer element has sequence set to -1, it means it is being 
cleared.
        sequence int
 
        elmBytes []byte // When nil, indicates this is a timer.
@@ -959,11 +960,6 @@ func (em *ElementManager) triageTimers(d TentativeData, 
inputInfo PColInfo, stag
                                for _, e := range ret.elms {
                                        keyToTimers[timerKey{key: 
string(ret.keyBytes), tag: ret.tag, win: e.window}] = e
                                }
-                               if len(ret.elms) == 0 {
-                                       for _, w := range ret.windows {
-                                               delete(keyToTimers, 
timerKey{key: string(ret.keyBytes), tag: ret.tag, win: w})
-                                       }
-                               }
                                // Indicate we'd like to continue iterating.
                                return true
                        })
@@ -1347,16 +1343,23 @@ func (*statefulStageKind) addPending(ss *stageState, em 
*ElementManager, newPend
                if e.IsTimer() {
                        if lastSet, ok := dnt.timers[timerKey{family: e.family, 
tag: e.tag, window: e.window}]; ok {
                                // existing timer!
-                               // don't increase the count this time, as 
"this" timer is already pending.
+                               // don't increase the count this time, as "e" 
is already pending.
                                count--
                                // clear out the existing hold for accounting 
purposes.
                                ss.watermarkHolds.Drop(lastSet.hold, 1)
                        }
-                       // Update the last set time on the timer.
-                       dnt.timers[timerKey{family: e.family, tag: e.tag, 
window: e.window}] = timerTimes{firing: e.timestamp, hold: e.holdTimestamp}
+                       if e.sequence >= 0 {
+                               // Update the last set time on the timer.
+                               dnt.timers[timerKey{family: e.family, tag: 
e.tag, window: e.window}] = timerTimes{firing: e.timestamp, hold: 
e.holdTimestamp}
 
-                       // Mark the hold in the heap.
-                       ss.watermarkHolds.Add(e.holdTimestamp, 1)
+                               // Mark the hold in the heap.
+                               ss.watermarkHolds.Add(e.holdTimestamp, 1)
+                       } else {
+                               // decrement the pending count since "e" is not 
a real timer (merely a clearing signal)
+                               count--
+                               // timer is to be cleared
+                               delete(dnt.timers, timerKey{family: e.family, 
tag: e.tag, window: e.window})
+                       }
                }
        }
        return count
diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/timers.go 
b/sdks/go/pkg/beam/runners/prism/internal/engine/timers.go
index 8b90591974b..636ed5a644c 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/engine/timers.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/timers.go
@@ -86,8 +86,19 @@ func decodeTimerIter(keyDec func(io.Reader) []byte, winCoder 
WinCoderType, raw [
                        clear := d.Bool()
                        hold := mtime.MaxTimestamp
                        if clear {
-                               if !yield(timerRet{keyBytes, tag, nil, ws}) {
-                                       return // Halt iteration if yeild 
returns false.
+                               var elms []element
+                               for _, w := range ws {
+                                       elms = append(elms, element{
+                                               tag:      tag,
+                                               elmBytes: nil, // indicates 
this is a timer.
+                                               keyBytes: keyBytes,
+                                               window:   w,
+                                               sequence: -1, // indicates this 
timer is being cleared.
+                                       })
+                               }
+
+                               if !yield(timerRet{keyBytes, tag, elms, ws}) {
+                                       return // Halt iteration if yield 
returns false.
                                }
                                // Otherwise continue handling the remaining 
bytes.
                                raw = d.UnusedBytes()
@@ -113,7 +124,7 @@ func decodeTimerIter(keyDec func(io.Reader) []byte, 
winCoder WinCoderType, raw [
                        }
 
                        if !yield(timerRet{keyBytes, tag, elms, ws}) {
-                               return // Halt iteration if yeild returns false.
+                               return // Halt iteration if yield returns false.
                        }
                        // Otherwise continue handling the remaining bytes.
                        raw = d.UnusedBytes()

Reply via email to