This is an automated email from the ASF dual-hosted git repository.
richox pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git
The following commit(s) were added to refs/heads/master by this push:
new 6ee7b2ea [AURON #1496] Introduce SparkOnHeapSpillManager (#1497)
6ee7b2ea is described below
commit 6ee7b2eaa9fd65a27c2773aff8ec4de8d692b037
Author: zhangmang <[email protected]>
AuthorDate: Wed Oct 22 19:48:51 2025 +0800
[AURON #1496] Introduce SparkOnHeapSpillManager (#1497)
* [AURON #1496] Introduce SparkOnHeapSpillManager
* fix
---
.../apache/auron/memory/OnHeapSpillManager.java | 30 +++++++++++-----------
native-engine/auron-jni-bridge/src/jni_bridge.rs | 4 +--
.../java/org/apache/spark/sql/auron/JniBridge.java | 6 ++---
.../spark/sql/auron/SparkUDAFWrapperContext.scala | 8 +++---
.../spark/sql/auron/memory/OnHeapSpill.scala | 2 +-
...Manager.scala => SparkOnHeapSpillManager.scala} | 25 ++++++++++--------
.../apache/spark/sql/auron/memory/SpillBuf.scala | 2 +-
7 files changed, 40 insertions(+), 37 deletions(-)
diff --git
a/auron-core/src/main/java/org/apache/auron/memory/OnHeapSpillManager.java
b/auron-core/src/main/java/org/apache/auron/memory/OnHeapSpillManager.java
index 5f98f8e1..3627101a 100644
--- a/auron-core/src/main/java/org/apache/auron/memory/OnHeapSpillManager.java
+++ b/auron-core/src/main/java/org/apache/auron/memory/OnHeapSpillManager.java
@@ -22,21 +22,21 @@ import java.nio.ByteBuffer;
* Interface for managing on-heap spill operations.
* This interface provides methods to handle memory spilling to disk when
on-heap memory is insufficient.
*/
-public abstract class OnHeapSpillManager {
+public interface OnHeapSpillManager {
/**
* Check if on-heap memory is available for allocation.
*
* @return true if on-heap memory is available, false otherwise
*/
- abstract boolean isOnHeapAvailable();
+ boolean isOnHeapAvailable();
/**
* Create a new spill operation and return its identifier.
*
* @return spill identifier for the newly created spill
*/
- abstract int newSpill();
+ int newSpill();
/**
* Write data from a ByteBuffer to the spill identified by spillId.
@@ -44,7 +44,7 @@ public abstract class OnHeapSpillManager {
* @param spillId the identifier of the spill to write to
* @param buffer the ByteBuffer containing data to be written
*/
- abstract void writeSpill(int spillId, ByteBuffer buffer);
+ void writeSpill(int spillId, ByteBuffer buffer);
/**
* Read data from the spill identified by spillId into the provided
ByteBuffer.
@@ -53,7 +53,7 @@ public abstract class OnHeapSpillManager {
* @param buffer the ByteBuffer to read data into
* @return the number of bytes actually read
*/
- abstract int readSpill(int spillId, ByteBuffer buffer);
+ int readSpill(int spillId, ByteBuffer buffer);
/**
* Get the disk usage in bytes for the spill identified by spillId.
@@ -61,7 +61,7 @@ public abstract class OnHeapSpillManager {
* @param spillId the identifier of the spill
* @return the disk usage in bytes
*/
- abstract long getSpillDiskUsage(int spillId);
+ long getSpillDiskUsage(int spillId);
/**
* Get the total disk I/O time in nanoseconds for the spill identified by
spillId.
@@ -69,14 +69,14 @@ public abstract class OnHeapSpillManager {
* @param spillId the identifier of the spill
* @return the disk I/O time in nanoseconds
*/
- abstract long getSpillDiskIOTime(int spillId);
+ long getSpillDiskIOTime(int spillId);
/**
* Release and clean up resources associated with the spill identified by
spillId.
*
* @param spillId the identifier of the spill to release
*/
- abstract void releaseSpill(int spillId);
+ void releaseSpill(int spillId);
/**
* Get the disabled on-heap spill manager instance.
@@ -87,37 +87,37 @@ public abstract class OnHeapSpillManager {
return new OnHeapSpillManager() {
@Override
- boolean isOnHeapAvailable() {
+ public boolean isOnHeapAvailable() {
return false;
}
@Override
- int newSpill() {
+ public int newSpill() {
throw new UnsupportedOperationException();
}
@Override
- void writeSpill(int spillId, ByteBuffer buffer) {
+ public void writeSpill(int spillId, ByteBuffer buffer) {
throw new UnsupportedOperationException();
}
@Override
- int readSpill(int spillId, ByteBuffer buffer) {
+ public int readSpill(int spillId, ByteBuffer buffer) {
throw new UnsupportedOperationException();
}
@Override
- long getSpillDiskUsage(int spillId) {
+ public long getSpillDiskUsage(int spillId) {
throw new UnsupportedOperationException();
}
@Override
- long getSpillDiskIOTime(int spillId) {
+ public long getSpillDiskIOTime(int spillId) {
throw new UnsupportedOperationException();
}
@Override
- void releaseSpill(int spillId) {
+ public void releaseSpill(int spillId) {
throw new UnsupportedOperationException();
}
};
diff --git a/native-engine/auron-jni-bridge/src/jni_bridge.rs
b/native-engine/auron-jni-bridge/src/jni_bridge.rs
index 067b5fd3..1e6e7bd2 100644
--- a/native-engine/auron-jni-bridge/src/jni_bridge.rs
+++ b/native-engine/auron-jni-bridge/src/jni_bridge.rs
@@ -611,7 +611,7 @@ impl<'a> JniBridge<'a> {
method_getTaskOnHeapSpillManager: env.get_static_method_id(
class,
"getTaskOnHeapSpillManager",
- "()Lorg/apache/spark/sql/auron/memory/OnHeapSpillManager;",
+ "()Lorg/apache/auron/memory/OnHeapSpillManager;",
)?,
method_getTaskOnHeapSpillManager_ret: ReturnType::Object,
method_isTaskRunning: env.get_static_method_id(class,
"isTaskRunning", "()Z")?,
@@ -1455,7 +1455,7 @@ pub struct AuronOnHeapSpillManager<'a> {
pub method_releaseSpill_ret: ReturnType,
}
impl<'a> AuronOnHeapSpillManager<'a> {
- pub const SIG_TYPE: &'static str =
"org/apache/spark/sql/auron/memory/OnHeapSpillManager";
+ pub const SIG_TYPE: &'static str =
"org/apache/spark/sql/auron/memory/SparkOnHeapSpillManager";
pub fn new(env: &JNIEnv<'a>) -> JniResult<AuronOnHeapSpillManager<'a>> {
let class = get_global_jclass(env, Self::SIG_TYPE)?;
diff --git
a/spark-extension/src/main/java/org/apache/spark/sql/auron/JniBridge.java
b/spark-extension/src/main/java/org/apache/spark/sql/auron/JniBridge.java
index 03811cdc..3078d487 100644
--- a/spark-extension/src/main/java/org/apache/spark/sql/auron/JniBridge.java
+++ b/spark-extension/src/main/java/org/apache/spark/sql/auron/JniBridge.java
@@ -27,13 +27,13 @@ import org.apache.auron.hadoop.fs.FSDataInputWrapper;
import org.apache.auron.hadoop.fs.FSDataInputWrapper$;
import org.apache.auron.hadoop.fs.FSDataOutputWrapper;
import org.apache.auron.hadoop.fs.FSDataOutputWrapper$;
+import org.apache.auron.memory.OnHeapSpillManager;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkEnv;
import org.apache.spark.TaskContext;
import org.apache.spark.TaskContext$;
-import org.apache.spark.sql.auron.memory.OnHeapSpillManager;
-import org.apache.spark.sql.auron.memory.OnHeapSpillManager$;
+import org.apache.spark.sql.auron.memory.SparkOnHeapSpillManager$;
import org.apache.spark.sql.auron.util.TaskContextHelper$;
@SuppressWarnings("unused")
@@ -65,7 +65,7 @@ public class JniBridge {
}
public static OnHeapSpillManager getTaskOnHeapSpillManager() {
- return OnHeapSpillManager$.MODULE$.current();
+ return SparkOnHeapSpillManager$.MODULE$.current();
}
public static boolean isTaskRunning() {
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/SparkUDAFWrapperContext.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/SparkUDAFWrapperContext.scala
index a2be0ecc..83b4bc8d 100644
---
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/SparkUDAFWrapperContext.scala
+++
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/SparkUDAFWrapperContext.scala
@@ -38,7 +38,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.io.SnappyCompressionCodec
import org.apache.spark.memory.MemoryConsumer
import org.apache.spark.memory.MemoryMode
-import org.apache.spark.sql.auron.memory.OnHeapSpillManager
+import org.apache.spark.sql.auron.memory.SparkOnHeapSpillManager
import org.apache.spark.sql.auron.util.Using
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
BoundReference, JoinedRow, Nondeterministic, UnsafeProjection, UnsafeRow}
@@ -211,7 +211,7 @@ trait AggregateEvaluator[B, R <: BufferRowsColumn[B]]
extends Logging {
rows: R,
indices: Iterator[Int],
spillIdx: Long): Int = {
- val hsm = OnHeapSpillManager.current
+ val hsm = SparkOnHeapSpillManager.current
val spillId = memTracker.getSpill(spillIdx)
val byteBuffer =
ByteBuffer.wrap(serializeRows(rows, indices,
spillCodec.compressedOutputStream))
@@ -224,7 +224,7 @@ trait AggregateEvaluator[B, R <: BufferRowsColumn[B]]
extends Logging {
memTracker: SparkUDAFMemTracker,
spillBlockSize: Int,
spillIdx: Long): BufferRowsColumn[B] = {
- val hsm = OnHeapSpillManager.current
+ val hsm = SparkOnHeapSpillManager.current
val spillId = memTracker.getSpill(spillIdx)
val byteBuffer = ByteBuffer.allocate(spillBlockSize)
val readSize = hsm.readSpill(spillId, byteBuffer).toLong
@@ -539,7 +539,7 @@ class SparkUDAFMemTracker
def getSpill(spillIdx: Long): Int = {
this.spills.getOrElseUpdate(
spillIdx, {
- OnHeapSpillManager.current.newSpill()
+ SparkOnHeapSpillManager.current.newSpill()
})
}
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/OnHeapSpill.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/OnHeapSpill.scala
index d634e6ed..541f4b39 100644
---
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/OnHeapSpill.scala
+++
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/OnHeapSpill.scala
@@ -20,7 +20,7 @@ import java.nio.ByteBuffer
import org.apache.spark.internal.Logging
-case class OnHeapSpill(hsm: OnHeapSpillManager, id: Int) extends Logging {
+case class OnHeapSpill(hsm: SparkOnHeapSpillManager, id: Int) extends Logging {
private var spillBuf: SpillBuf = new MemBasedSpillBuf
def memUsed: Long = spillBuf.memUsed
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/OnHeapSpillManager.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/SparkOnHeapSpillManager.scala
similarity index 89%
rename from
spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/OnHeapSpillManager.scala
rename to
spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/SparkOnHeapSpillManager.scala
index 38aa1531..01f03712 100644
---
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/OnHeapSpillManager.scala
+++
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/SparkOnHeapSpillManager.scala
@@ -32,13 +32,16 @@ import org.apache.spark.sql.auron.AuronConf
import org.apache.spark.storage.BlockManager
import org.apache.spark.util.Utils
-class OnHeapSpillManager(taskContext: TaskContext)
+import org.apache.auron.memory.OnHeapSpillManager
+
+class SparkOnHeapSpillManager(taskContext: TaskContext)
extends MemoryConsumer(
taskContext.taskMemoryManager,
taskContext.taskMemoryManager.pageSizeBytes(),
MemoryMode.ON_HEAP)
+ with OnHeapSpillManager
with Logging {
- import org.apache.spark.sql.auron.memory.OnHeapSpillManager._
+ import org.apache.spark.sql.auron.memory.SparkOnHeapSpillManager._
private val _blockManager = SparkEnv.get.blockManager
private val spills = ArrayBuffer[Option[OnHeapSpill]]()
@@ -62,7 +65,7 @@ class OnHeapSpillManager(taskContext: TaskContext)
* @return
*/
@SuppressWarnings(Array("unused"))
- def isOnHeapAvailable: Boolean = {
+ override def isOnHeapAvailable: Boolean = {
// if driver, tc always null.
if (taskContext == null) {
return false
@@ -96,7 +99,7 @@ class OnHeapSpillManager(taskContext: TaskContext)
* @return
* allocated spill id
*/
- def newSpill(): Int = {
+ override def newSpill(): Int = {
synchronized {
val spill = OnHeapSpill(this, spills.length)
spills.append(Some(spill))
@@ -108,7 +111,7 @@ class OnHeapSpillManager(taskContext: TaskContext)
}
}
- def writeSpill(spillId: Int, data: ByteBuffer): Unit = {
+ override def writeSpill(spillId: Int, data: ByteBuffer): Unit = {
spills(spillId)
.getOrElse(
throw new RuntimeException(
@@ -116,7 +119,7 @@ class OnHeapSpillManager(taskContext: TaskContext)
.write(data)
}
- def readSpill(spillId: Int, buf: ByteBuffer): Int = {
+ override def readSpill(spillId: Int, buf: ByteBuffer): Int = {
spills(spillId)
.getOrElse(
throw new RuntimeException(
@@ -128,15 +131,15 @@ class OnHeapSpillManager(taskContext: TaskContext)
spills(spillId).map(_.size).getOrElse(0)
}
- def getSpillDiskUsage(spillId: Int): Long = {
+ override def getSpillDiskUsage(spillId: Int): Long = {
spills(spillId).map(_.diskUsed).getOrElse(0)
}
- def getSpillDiskIOTime(spillId: Int): Long = {
+ override def getSpillDiskIOTime(spillId: Int): Long = {
spills(spillId).map(_.diskIOTime).getOrElse(0) // time unit: ns
}
- def releaseSpill(spillId: Int): Unit = {
+ override def releaseSpill(spillId: Int): Unit = {
spills(spillId) match {
case Some(spill) =>
spill.release()
@@ -185,11 +188,11 @@ class OnHeapSpillManager(taskContext: TaskContext)
}
}
-object OnHeapSpillManager extends Logging {
+object SparkOnHeapSpillManager extends Logging {
val all: mutable.Map[Long, OnHeapSpillManager] = concurrent.TrieMap[Long,
OnHeapSpillManager]()
def current: OnHeapSpillManager = {
val tc = TaskContext.get
- all.getOrElseUpdate(tc.taskAttemptId(), new OnHeapSpillManager(tc))
+ all.getOrElseUpdate(tc.taskAttemptId(), new SparkOnHeapSpillManager(tc))
}
}
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/SpillBuf.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/SpillBuf.scala
index 24adfbaa..ba2a5749 100644
---
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/SpillBuf.scala
+++
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/SpillBuf.scala
@@ -81,7 +81,7 @@ class MemBasedSpillBuf extends SpillBuf with Logging {
override def diskIOTime: Long = 0
override def size: Long = numWrittenBytes
- def spill(hsm: OnHeapSpillManager): FileBasedSpillBuf = {
+ def spill(hsm: SparkOnHeapSpillManager): FileBasedSpillBuf = {
logWarning(s"spilling in-mem spill buffer to disk,
size=${Utils.bytesToString(size)}")
val startTimeNs = System.nanoTime()