This is an automated email from the ASF dual-hosted git repository.

richox pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git


The following commit(s) were added to refs/heads/master by this push:
     new 599137d4 [AURON #1803] fix possible deadlock in OnHeapSpillManager 
(#1804)
599137d4 is described below

commit 599137d48a0e9088ff0377841206189415e88464
Author: Zhang Li <[email protected]>
AuthorDate: Sat Jan 3 18:19:39 2026 +0800

    [AURON #1803] fix possible deadlock in OnHeapSpillManager (#1804)
    
    <!--
    - Start the PR title with the related issue ID, e.g. '[AURON #XXXX]
    Short summary...'.
    -->
    # Which issue does this PR close?
    
    Closes #1803
    
    # Rationale for this change
    
    # What changes are included in this PR?
    Fixes possible synchronize block confliction in OnHeapSpillManager
    
    # Are there any user-facing changes?
    No.
    
    # How was this patch tested?
    Rerun the online buggy query many times to verify that the bug has gone.
---
 .../spark/sql/auron/memory/OnHeapSpill.scala       | 36 ++++++++++++++++++----
 .../sql/auron/memory/SparkOnHeapSpillManager.scala |  2 +-
 2 files changed, 31 insertions(+), 7 deletions(-)

diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/OnHeapSpill.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/OnHeapSpill.scala
index 541f4b39..23f3e6c4 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/OnHeapSpill.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/OnHeapSpill.scala
@@ -17,20 +17,32 @@
 package org.apache.spark.sql.auron.memory
 
 import java.nio.ByteBuffer
+import java.util.concurrent.locks.ReentrantLock
 
 import org.apache.spark.internal.Logging
+import org.apache.spark.memory.MemoryConsumer
 
 case class OnHeapSpill(hsm: SparkOnHeapSpillManager, id: Int) extends Logging {
   private var spillBuf: SpillBuf = new MemBasedSpillBuf
+  private val lock = new ReentrantLock
 
   def memUsed: Long = spillBuf.memUsed
   def diskUsed: Long = spillBuf.diskUsed
   def size: Long = spillBuf.size
   def diskIOTime: Long = spillBuf.diskIOTime
 
+  private def withLock[T](f: => T): T = {
+    lock.lock()
+    try {
+      f
+    } finally {
+      lock.unlock()
+    }
+  }
+
   def write(buf: ByteBuffer): Unit = {
     var needSpill = false
-    synchronized {
+    withLock {
       spillBuf match {
         case _: MemBasedSpillBuf =>
           val acquiredMemory = hsm.acquireMemory(buf.capacity())
@@ -46,13 +58,13 @@ case class OnHeapSpill(hsm: SparkOnHeapSpillManager, id: 
Int) extends Logging {
       spillInternal()
     }
 
-    synchronized {
+    withLock {
       spillBuf.write(buf)
     }
   }
 
   def read(buf: ByteBuffer): Int = {
-    synchronized {
+    withLock {
       val oldMemUsed = memUsed
       val startPosition = buf.position()
       spillBuf.read(buf)
@@ -69,7 +81,7 @@ case class OnHeapSpill(hsm: SparkOnHeapSpillManager, id: Int) 
extends Logging {
   }
 
   def release(): Unit = {
-    synchronized {
+    withLock {
       val oldMemUsed = memUsed
       spillBuf = new ReleasedSpillBuf(spillBuf)
 
@@ -79,8 +91,20 @@ case class OnHeapSpill(hsm: SparkOnHeapSpillManager, id: 
Int) extends Logging {
     }
   }
 
-  def spill(): Long = {
-    synchronized {
+  def spill(trigger: MemoryConsumer): Long = {
+    // this might have been locked if the spilling is triggered by 
OnHeapSpill.write
+    if (trigger == this.hsm) {
+      if (lock.tryLock()) {
+        try {
+          return spillInternal()
+        } finally {
+          lock.unlock()
+        }
+      }
+      return 0L
+    }
+
+    withLock {
       spillInternal()
     }
   }
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/SparkOnHeapSpillManager.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/SparkOnHeapSpillManager.scala
index e77aaaf3..e9d3add9 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/SparkOnHeapSpillManager.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/SparkOnHeapSpillManager.scala
@@ -168,7 +168,7 @@ class SparkOnHeapSpillManager(taskContext: TaskContext)
         val sortedSpills = spills.seq.sortBy(0 - 
_.map(_.memUsed).getOrElse(0L))
         sortedSpills.foreach {
           case Some(spill) if spill.memUsed > 0 =>
-            totalFreed += spill.spill()
+            totalFreed += spill.spill(trigger)
             if (totalFreed >= size) {
               return totalFreed
             }

Reply via email to