This is an automated email from the ASF dual-hosted git repository.
shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 1c8bcf9bae1 Fix race condition and nil pointer dereferencing (#36370)
1c8bcf9bae1 is described below
commit 1c8bcf9bae17d222e888734327ed9ba6599956f2
Author: Shunping Huang <[email protected]>
AuthorDate: Fri Oct 3 09:17:34 2025 -0400
Fix race condition and nil pointer dereferencing (#36370)
---
.../pkg/beam/runners/prism/internal/engine/elementmanager.go | 10 ++++++++++
1 file changed, 10 insertions(+)
diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
index d03d906e47d..f77844b6f6c 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
@@ -384,6 +384,7 @@ func (em *ElementManager) Bundles(ctx context.Context,
upstreamCancelFn context.
defer func() {
// In case of panics in bundle generation, fail and
cancel the job.
if e := recover(); e != nil {
+ slog.Error("panic in ElementManager.Bundles
watermark evaluation goroutine", "error", e, "traceback", string(debug.Stack()))
upstreamCancelFn(fmt.Errorf("panic in
ElementManager.Bundles watermark evaluation goroutine: %v\n%v", e,
string(debug.Stack())))
}
}()
@@ -1366,7 +1367,9 @@ func (ss *stageState) injectTriggeredBundlesIfReady(em
*ElementManager, window t
// TODO: how to deal with watermark holds for
this implicit processing time timer
// ss.watermarkHolds.Add(timer.holdTimestamp, 1)
ss.processingTimeTimers.Persist(firingTime,
timer, notYetHolds)
+ em.refreshCond.L.Lock()
em.processTimeEvents.Schedule(firingTime, ss.ID)
+ em.refreshCond.L.Unlock()
em.wakeUpAt(firingTime)
}
}
@@ -1566,6 +1569,13 @@ func (ss *stageState) savePanes(bundID string,
panesInBundle []bundlePane) {
func (ss *stageState) buildTriggeredBundle(em *ElementManager, key string, win
typex.Window) ([]element, int) {
var toProcess []element
dnt := ss.pendingByKeys[key]
+ if dnt == nil {
+ // If we set an after-processing-time trigger, but some other
triggers fire or
+ // the end of window is reached before the first trigger could
fire, then
+ // the pending elements are processed in other bundles, leaving
a nil when
+ // we try to build this triggered bundle.
+ return toProcess, 0
+ }
var notYet []element
// Look at all elements for this key, and only for this window.