This is an automated email from the ASF dual-hosted git repository.
shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new d0e48e20240 [Prism] Defer unlocking to avoid deadlock (#36163)
d0e48e20240 is described below
commit d0e48e202403c2b1b96d8b170a6e575682332a31
Author: Shunping Huang <[email protected]>
AuthorDate: Mon Sep 15 21:53:42 2025 -0400
[Prism] Defer unlocking to avoid deadlock (#36163)
* Defer unlocking to avoid deadlock
* Capture panics and fail the job. Keep prism running.
---
.../prism/internal/engine/elementmanager.go | 128 +++++++++++----------
sdks/go/pkg/beam/runners/prism/internal/execute.go | 8 ++
2 files changed, 77 insertions(+), 59 deletions(-)
diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
index 18f10f45e6c..d489bcc18c2 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
@@ -892,70 +892,75 @@ func (em *ElementManager) PersistBundle(rb RunBundle,
col2Coders map[string]PCol
// Clear out the inprogress elements associated with the completed
bundle.
// Must be done after adding the new pending elements to avoid an
incorrect
// watermark advancement.
- stage.mu.Lock()
- completed := stage.inprogress[rb.BundleID]
- em.addPending(-len(completed.es))
- delete(stage.inprogress, rb.BundleID)
- for k := range stage.inprogressKeysByBundle[rb.BundleID] {
- delete(stage.inprogressKeys, k)
- }
- delete(stage.inprogressKeysByBundle, rb.BundleID)
-
- // Adjust holds as needed.
- for h, c := range newHolds {
- if c > 0 {
- stage.watermarkHolds.Add(h, c)
- } else if c < 0 {
- stage.watermarkHolds.Drop(h, -c)
- }
- }
- for hold, v := range stage.inprogressHoldsByBundle[rb.BundleID] {
- stage.watermarkHolds.Drop(hold, v)
- }
- delete(stage.inprogressHoldsByBundle, rb.BundleID)
-
- // Clean up OnWindowExpiration bundle accounting, so window state
- // may be garbage collected.
- if stage.expiryWindowsByBundles != nil {
- win, ok := stage.expiryWindowsByBundles[rb.BundleID]
- if ok {
- stage.inProgressExpiredWindows[win] -= 1
- if stage.inProgressExpiredWindows[win] == 0 {
- delete(stage.inProgressExpiredWindows, win)
+ func() {
+ stage.mu.Lock()
+ // Defer unlocking the mutex within an anonymous function to
ensure it's released
+ // even if a panic occurs during `em.addPending`. This prevents
potential deadlocks
+ // if the waitgroup unexpectedly drops below zero due to a
runner bug.
+ defer stage.mu.Unlock()
+ completed := stage.inprogress[rb.BundleID]
+ em.addPending(-len(completed.es))
+ delete(stage.inprogress, rb.BundleID)
+ for k := range stage.inprogressKeysByBundle[rb.BundleID] {
+ delete(stage.inprogressKeys, k)
+ }
+ delete(stage.inprogressKeysByBundle, rb.BundleID)
+
+ // Adjust holds as needed.
+ for h, c := range newHolds {
+ if c > 0 {
+ stage.watermarkHolds.Add(h, c)
+ } else if c < 0 {
+ stage.watermarkHolds.Drop(h, -c)
}
- delete(stage.expiryWindowsByBundles, rb.BundleID)
}
- }
+ for hold, v := range stage.inprogressHoldsByBundle[rb.BundleID]
{
+ stage.watermarkHolds.Drop(hold, v)
+ }
+ delete(stage.inprogressHoldsByBundle, rb.BundleID)
- // If there are estimated output watermarks, set the estimated
- // output watermark for the stage.
- if len(residuals.MinOutputWatermarks) > 0 {
- estimate := mtime.MaxTimestamp
- for _, t := range residuals.MinOutputWatermarks {
- estimate = mtime.Min(estimate, t)
+ // Clean up OnWindowExpiration bundle accounting, so window
state
+ // may be garbage collected.
+ if stage.expiryWindowsByBundles != nil {
+ win, ok := stage.expiryWindowsByBundles[rb.BundleID]
+ if ok {
+ stage.inProgressExpiredWindows[win] -= 1
+ if stage.inProgressExpiredWindows[win] == 0 {
+ delete(stage.inProgressExpiredWindows,
win)
+ }
+ delete(stage.expiryWindowsByBundles,
rb.BundleID)
+ }
}
- stage.estimatedOutput = estimate
- }
- // Handle persisting.
- for link, winMap := range d.state {
- linkMap, ok := stage.state[link]
- if !ok {
- linkMap = map[typex.Window]map[string]StateData{}
- stage.state[link] = linkMap
+ // If there are estimated output watermarks, set the estimated
+ // output watermark for the stage.
+ if len(residuals.MinOutputWatermarks) > 0 {
+ estimate := mtime.MaxTimestamp
+ for _, t := range residuals.MinOutputWatermarks {
+ estimate = mtime.Min(estimate, t)
+ }
+ stage.estimatedOutput = estimate
}
- for w, keyMap := range winMap {
- wlinkMap, ok := linkMap[w]
+
+ // Handle persisting.
+ for link, winMap := range d.state {
+ linkMap, ok := stage.state[link]
if !ok {
- wlinkMap = map[string]StateData{}
- linkMap[w] = wlinkMap
+ linkMap =
map[typex.Window]map[string]StateData{}
+ stage.state[link] = linkMap
}
- for key, data := range keyMap {
- wlinkMap[key] = data
+ for w, keyMap := range winMap {
+ wlinkMap, ok := linkMap[w]
+ if !ok {
+ wlinkMap = map[string]StateData{}
+ linkMap[w] = wlinkMap
+ }
+ for key, data := range keyMap {
+ wlinkMap[key] = data
+ }
}
}
- }
- stage.mu.Unlock()
+ }()
em.markChangedAndClearBundle(stage.ID, rb.BundleID, ptRefreshes)
}
@@ -1032,11 +1037,16 @@ func (em *ElementManager) triageTimers(d TentativeData,
inputInfo PColInfo, stag
// FailBundle clears the extant data allowing the execution to shut down.
func (em *ElementManager) FailBundle(rb RunBundle) {
stage := em.stages[rb.StageID]
- stage.mu.Lock()
- completed := stage.inprogress[rb.BundleID]
- em.addPending(-len(completed.es))
- delete(stage.inprogress, rb.BundleID)
- stage.mu.Unlock()
+ func() {
+ stage.mu.Lock()
+ // Defer unlocking the mutex within an anonymous function to
ensure it's released
+ // even if a panic occurs during `em.addPending`. This prevents
potential deadlocks
+ // if the waitgroup unexpectedly drops below zero due to a
runner bug.
+ defer stage.mu.Unlock()
+ completed := stage.inprogress[rb.BundleID]
+ em.addPending(-len(completed.es))
+ delete(stage.inprogress, rb.BundleID)
+ }()
em.markChangedAndClearBundle(rb.StageID, rb.BundleID, nil)
}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go
b/sdks/go/pkg/beam/runners/prism/internal/execute.go
index e9edbe62c81..772c3a9ebb8 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/execute.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go
@@ -22,6 +22,7 @@ import (
"fmt"
"io"
"log/slog"
+ "runtime/debug"
"sort"
"sync/atomic"
"time"
@@ -79,6 +80,13 @@ func RunPipeline(j *jobservices.Job) {
j.WaitForCleanUp()
}()
+ // Add this defer function to capture and log panics.
+ defer func() {
+ if e := recover(); e != nil {
+ j.Failed(fmt.Errorf("pipeline panicked: %v\nStacktrace:
%s", e, string(debug.Stack())))
+ }
+ }()
+
j.SendMsg("running " + j.String())
j.Running()