This is an automated email from the ASF dual-hosted git repository.

shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new d0e48e20240 [Prism] Defer unlocking to avoid deadlock (#36163)
d0e48e20240 is described below

commit d0e48e202403c2b1b96d8b170a6e575682332a31
Author: Shunping Huang <[email protected]>
AuthorDate: Mon Sep 15 21:53:42 2025 -0400

    [Prism] Defer unlocking to avoid deadlock (#36163)
    
    * Defer unlocking to avoid deadlock
    
    * Capture panics and fail the job. Keep prism running.
---
 .../prism/internal/engine/elementmanager.go        | 128 +++++++++++----------
 sdks/go/pkg/beam/runners/prism/internal/execute.go |   8 ++
 2 files changed, 77 insertions(+), 59 deletions(-)

diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go 
b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
index 18f10f45e6c..d489bcc18c2 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
@@ -892,70 +892,75 @@ func (em *ElementManager) PersistBundle(rb RunBundle, 
col2Coders map[string]PCol
        // Clear out the inprogress elements associated with the completed 
bundle.
        // Must be done after adding the new pending elements to avoid an 
incorrect
        // watermark advancement.
-       stage.mu.Lock()
-       completed := stage.inprogress[rb.BundleID]
-       em.addPending(-len(completed.es))
-       delete(stage.inprogress, rb.BundleID)
-       for k := range stage.inprogressKeysByBundle[rb.BundleID] {
-               delete(stage.inprogressKeys, k)
-       }
-       delete(stage.inprogressKeysByBundle, rb.BundleID)
-
-       // Adjust holds as needed.
-       for h, c := range newHolds {
-               if c > 0 {
-                       stage.watermarkHolds.Add(h, c)
-               } else if c < 0 {
-                       stage.watermarkHolds.Drop(h, -c)
-               }
-       }
-       for hold, v := range stage.inprogressHoldsByBundle[rb.BundleID] {
-               stage.watermarkHolds.Drop(hold, v)
-       }
-       delete(stage.inprogressHoldsByBundle, rb.BundleID)
-
-       // Clean up OnWindowExpiration bundle accounting, so window state
-       // may be garbage collected.
-       if stage.expiryWindowsByBundles != nil {
-               win, ok := stage.expiryWindowsByBundles[rb.BundleID]
-               if ok {
-                       stage.inProgressExpiredWindows[win] -= 1
-                       if stage.inProgressExpiredWindows[win] == 0 {
-                               delete(stage.inProgressExpiredWindows, win)
+       func() {
+               stage.mu.Lock()
+               // Defer unlocking the mutex within an anonymous function to 
ensure it's released
+               // even if a panic occurs during `em.addPending`. This prevents 
potential deadlocks
+               // if the waitgroup unexpectedly drops below zero due to a 
runner bug.
+               defer stage.mu.Unlock()
+               completed := stage.inprogress[rb.BundleID]
+               em.addPending(-len(completed.es))
+               delete(stage.inprogress, rb.BundleID)
+               for k := range stage.inprogressKeysByBundle[rb.BundleID] {
+                       delete(stage.inprogressKeys, k)
+               }
+               delete(stage.inprogressKeysByBundle, rb.BundleID)
+
+               // Adjust holds as needed.
+               for h, c := range newHolds {
+                       if c > 0 {
+                               stage.watermarkHolds.Add(h, c)
+                       } else if c < 0 {
+                               stage.watermarkHolds.Drop(h, -c)
                        }
-                       delete(stage.expiryWindowsByBundles, rb.BundleID)
                }
-       }
+               for hold, v := range stage.inprogressHoldsByBundle[rb.BundleID] 
{
+                       stage.watermarkHolds.Drop(hold, v)
+               }
+               delete(stage.inprogressHoldsByBundle, rb.BundleID)
 
-       // If there are estimated output watermarks, set the estimated
-       // output watermark for the stage.
-       if len(residuals.MinOutputWatermarks) > 0 {
-               estimate := mtime.MaxTimestamp
-               for _, t := range residuals.MinOutputWatermarks {
-                       estimate = mtime.Min(estimate, t)
+               // Clean up OnWindowExpiration bundle accounting, so window 
state
+               // may be garbage collected.
+               if stage.expiryWindowsByBundles != nil {
+                       win, ok := stage.expiryWindowsByBundles[rb.BundleID]
+                       if ok {
+                               stage.inProgressExpiredWindows[win] -= 1
+                               if stage.inProgressExpiredWindows[win] == 0 {
+                                       delete(stage.inProgressExpiredWindows, 
win)
+                               }
+                               delete(stage.expiryWindowsByBundles, 
rb.BundleID)
+                       }
                }
-               stage.estimatedOutput = estimate
-       }
 
-       // Handle persisting.
-       for link, winMap := range d.state {
-               linkMap, ok := stage.state[link]
-               if !ok {
-                       linkMap = map[typex.Window]map[string]StateData{}
-                       stage.state[link] = linkMap
+               // If there are estimated output watermarks, set the estimated
+               // output watermark for the stage.
+               if len(residuals.MinOutputWatermarks) > 0 {
+                       estimate := mtime.MaxTimestamp
+                       for _, t := range residuals.MinOutputWatermarks {
+                               estimate = mtime.Min(estimate, t)
+                       }
+                       stage.estimatedOutput = estimate
                }
-               for w, keyMap := range winMap {
-                       wlinkMap, ok := linkMap[w]
+
+               // Handle persisting.
+               for link, winMap := range d.state {
+                       linkMap, ok := stage.state[link]
                        if !ok {
-                               wlinkMap = map[string]StateData{}
-                               linkMap[w] = wlinkMap
+                               linkMap = 
map[typex.Window]map[string]StateData{}
+                               stage.state[link] = linkMap
                        }
-                       for key, data := range keyMap {
-                               wlinkMap[key] = data
+                       for w, keyMap := range winMap {
+                               wlinkMap, ok := linkMap[w]
+                               if !ok {
+                                       wlinkMap = map[string]StateData{}
+                                       linkMap[w] = wlinkMap
+                               }
+                               for key, data := range keyMap {
+                                       wlinkMap[key] = data
+                               }
                        }
                }
-       }
-       stage.mu.Unlock()
+       }()
 
        em.markChangedAndClearBundle(stage.ID, rb.BundleID, ptRefreshes)
 }
@@ -1032,11 +1037,16 @@ func (em *ElementManager) triageTimers(d TentativeData, 
inputInfo PColInfo, stag
 // FailBundle clears the extant data allowing the execution to shut down.
 func (em *ElementManager) FailBundle(rb RunBundle) {
        stage := em.stages[rb.StageID]
-       stage.mu.Lock()
-       completed := stage.inprogress[rb.BundleID]
-       em.addPending(-len(completed.es))
-       delete(stage.inprogress, rb.BundleID)
-       stage.mu.Unlock()
+       func() {
+               stage.mu.Lock()
+               // Defer unlocking the mutex within an anonymous function to 
ensure it's released
+               // even if a panic occurs during `em.addPending`. This prevents 
potential deadlocks
+               // if the waitgroup unexpectedly drops below zero due to a 
runner bug.
+               defer stage.mu.Unlock()
+               completed := stage.inprogress[rb.BundleID]
+               em.addPending(-len(completed.es))
+               delete(stage.inprogress, rb.BundleID)
+       }()
        em.markChangedAndClearBundle(rb.StageID, rb.BundleID, nil)
 }
 
diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go 
b/sdks/go/pkg/beam/runners/prism/internal/execute.go
index e9edbe62c81..772c3a9ebb8 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/execute.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go
@@ -22,6 +22,7 @@ import (
        "fmt"
        "io"
        "log/slog"
+       "runtime/debug"
        "sort"
        "sync/atomic"
        "time"
@@ -79,6 +80,13 @@ func RunPipeline(j *jobservices.Job) {
                j.WaitForCleanUp()
        }()
 
+       // Add this defer function to capture and log panics.
+       defer func() {
+               if e := recover(); e != nil {
+                       j.Failed(fmt.Errorf("pipeline panicked: %v\nStacktrace: 
%s", e, string(debug.Stack())))
+               }
+       }()
+
        j.SendMsg("running " + j.String())
        j.Running()
 

Reply via email to