This is an automated email from the ASF dual-hosted git repository.

shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c8bcf9bae1 Fix race condition and nil pointer dereferencing (#36370)
1c8bcf9bae1 is described below

commit 1c8bcf9bae17d222e888734327ed9ba6599956f2
Author: Shunping Huang <[email protected]>
AuthorDate: Fri Oct 3 09:17:34 2025 -0400

    Fix race condition and nil pointer dereferencing (#36370)
---
 .../pkg/beam/runners/prism/internal/engine/elementmanager.go   | 10 ++++++++++
 1 file changed, 10 insertions(+)

diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go 
b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
index d03d906e47d..f77844b6f6c 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
@@ -384,6 +384,7 @@ func (em *ElementManager) Bundles(ctx context.Context, 
upstreamCancelFn context.
                defer func() {
                        // In case of panics in bundle generation, fail and 
cancel the job.
                        if e := recover(); e != nil {
+                               slog.Error("panic in ElementManager.Bundles 
watermark evaluation goroutine", "error", e, "traceback", string(debug.Stack()))
                                upstreamCancelFn(fmt.Errorf("panic in 
ElementManager.Bundles watermark evaluation goroutine: %v\n%v", e, 
string(debug.Stack())))
                        }
                }()
@@ -1366,7 +1367,9 @@ func (ss *stageState) injectTriggeredBundlesIfReady(em 
*ElementManager, window t
                                // TODO: how to deal with watermark holds for 
this implicit processing time timer
                                // ss.watermarkHolds.Add(timer.holdTimestamp, 1)
                                ss.processingTimeTimers.Persist(firingTime, 
timer, notYetHolds)
+                               em.refreshCond.L.Lock()
                                em.processTimeEvents.Schedule(firingTime, ss.ID)
+                               em.refreshCond.L.Unlock()
                                em.wakeUpAt(firingTime)
                        }
                }
@@ -1566,6 +1569,13 @@ func (ss *stageState) savePanes(bundID string, 
panesInBundle []bundlePane) {
 func (ss *stageState) buildTriggeredBundle(em *ElementManager, key string, win 
typex.Window) ([]element, int) {
        var toProcess []element
        dnt := ss.pendingByKeys[key]
+       if dnt == nil {
+               // If we set an after-processing-time trigger, but some other 
triggers fire or
+               // the end of window is reached before the first trigger could 
fire, then
+               // the pending elements are processed in other bundles, leaving 
a nil when
+               // we try to build this triggered bundle.
+               return toProcess, 0
+       }
        var notYet []element
 
        // Look at all elements for this key, and only for this window.

Reply via email to