This is an automated email from the ASF dual-hosted git repository.
richox pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git
The following commit(s) were added to refs/heads/master by this push:
new 599137d4 [AURON #1803] fix possible deadlock in OnHeapSpillManager
(#1804)
599137d4 is described below
commit 599137d48a0e9088ff0377841206189415e88464
Author: Zhang Li <[email protected]>
AuthorDate: Sat Jan 3 18:19:39 2026 +0800
[AURON #1803] fix possible deadlock in OnHeapSpillManager (#1804)
<!--
- Start the PR title with the related issue ID, e.g. '[AURON #XXXX]
Short summary...'.
-->
# Which issue does this PR close?
Closes #1803
# Rationale for this change
# What changes are included in this PR?
Fixes possible synchronize block confliction in OnHeapSpillManager
# Are there any user-facing changes?
No.
# How was this patch tested?
Rerun the online buggy query many times to verify that the bug has gone.
---
.../spark/sql/auron/memory/OnHeapSpill.scala | 36 ++++++++++++++++++----
.../sql/auron/memory/SparkOnHeapSpillManager.scala | 2 +-
2 files changed, 31 insertions(+), 7 deletions(-)
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/OnHeapSpill.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/OnHeapSpill.scala
index 541f4b39..23f3e6c4 100644
---
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/OnHeapSpill.scala
+++
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/OnHeapSpill.scala
@@ -17,20 +17,32 @@
package org.apache.spark.sql.auron.memory
import java.nio.ByteBuffer
+import java.util.concurrent.locks.ReentrantLock
import org.apache.spark.internal.Logging
+import org.apache.spark.memory.MemoryConsumer
case class OnHeapSpill(hsm: SparkOnHeapSpillManager, id: Int) extends Logging {
private var spillBuf: SpillBuf = new MemBasedSpillBuf
+ private val lock = new ReentrantLock
def memUsed: Long = spillBuf.memUsed
def diskUsed: Long = spillBuf.diskUsed
def size: Long = spillBuf.size
def diskIOTime: Long = spillBuf.diskIOTime
+ private def withLock[T](f: => T): T = {
+ lock.lock()
+ try {
+ f
+ } finally {
+ lock.unlock()
+ }
+ }
+
def write(buf: ByteBuffer): Unit = {
var needSpill = false
- synchronized {
+ withLock {
spillBuf match {
case _: MemBasedSpillBuf =>
val acquiredMemory = hsm.acquireMemory(buf.capacity())
@@ -46,13 +58,13 @@ case class OnHeapSpill(hsm: SparkOnHeapSpillManager, id:
Int) extends Logging {
spillInternal()
}
- synchronized {
+ withLock {
spillBuf.write(buf)
}
}
def read(buf: ByteBuffer): Int = {
- synchronized {
+ withLock {
val oldMemUsed = memUsed
val startPosition = buf.position()
spillBuf.read(buf)
@@ -69,7 +81,7 @@ case class OnHeapSpill(hsm: SparkOnHeapSpillManager, id: Int)
extends Logging {
}
def release(): Unit = {
- synchronized {
+ withLock {
val oldMemUsed = memUsed
spillBuf = new ReleasedSpillBuf(spillBuf)
@@ -79,8 +91,20 @@ case class OnHeapSpill(hsm: SparkOnHeapSpillManager, id:
Int) extends Logging {
}
}
- def spill(): Long = {
- synchronized {
+ def spill(trigger: MemoryConsumer): Long = {
+ // this might have been locked if the spilling is triggered by
OnHeapSpill.write
+ if (trigger == this.hsm) {
+ if (lock.tryLock()) {
+ try {
+ return spillInternal()
+ } finally {
+ lock.unlock()
+ }
+ }
+ return 0L
+ }
+
+ withLock {
spillInternal()
}
}
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/SparkOnHeapSpillManager.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/SparkOnHeapSpillManager.scala
index e77aaaf3..e9d3add9 100644
---
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/SparkOnHeapSpillManager.scala
+++
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/SparkOnHeapSpillManager.scala
@@ -168,7 +168,7 @@ class SparkOnHeapSpillManager(taskContext: TaskContext)
val sortedSpills = spills.seq.sortBy(0 -
_.map(_.memUsed).getOrElse(0L))
sortedSpills.foreach {
case Some(spill) if spill.memUsed > 0 =>
- totalFreed += spill.spill()
+ totalFreed += spill.spill(trigger)
if (totalFreed >= size) {
return totalFreed
}