This is an automated email from the ASF dual-hosted git repository.

richox pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ee7b2ea [AURON #1496] Introduce SparkOnHeapSpillManager (#1497)
6ee7b2ea is described below

commit 6ee7b2eaa9fd65a27c2773aff8ec4de8d692b037
Author: zhangmang <[email protected]>
AuthorDate: Wed Oct 22 19:48:51 2025 +0800

    [AURON #1496] Introduce SparkOnHeapSpillManager (#1497)
    
    * [AURON #1496] Introduce SparkOnHeapSpillManager
    
    * fix
---
 .../apache/auron/memory/OnHeapSpillManager.java    | 30 +++++++++++-----------
 native-engine/auron-jni-bridge/src/jni_bridge.rs   |  4 +--
 .../java/org/apache/spark/sql/auron/JniBridge.java |  6 ++---
 .../spark/sql/auron/SparkUDAFWrapperContext.scala  |  8 +++---
 .../spark/sql/auron/memory/OnHeapSpill.scala       |  2 +-
 ...Manager.scala => SparkOnHeapSpillManager.scala} | 25 ++++++++++--------
 .../apache/spark/sql/auron/memory/SpillBuf.scala   |  2 +-
 7 files changed, 40 insertions(+), 37 deletions(-)

diff --git 
a/auron-core/src/main/java/org/apache/auron/memory/OnHeapSpillManager.java 
b/auron-core/src/main/java/org/apache/auron/memory/OnHeapSpillManager.java
index 5f98f8e1..3627101a 100644
--- a/auron-core/src/main/java/org/apache/auron/memory/OnHeapSpillManager.java
+++ b/auron-core/src/main/java/org/apache/auron/memory/OnHeapSpillManager.java
@@ -22,21 +22,21 @@ import java.nio.ByteBuffer;
  * Interface for managing on-heap spill operations.
  * This interface provides methods to handle memory spilling to disk when 
on-heap memory is insufficient.
  */
-public abstract class OnHeapSpillManager {
+public interface OnHeapSpillManager {
 
     /**
      * Check if on-heap memory is available for allocation.
      *
      * @return true if on-heap memory is available, false otherwise
      */
-    abstract boolean isOnHeapAvailable();
+    boolean isOnHeapAvailable();
 
     /**
      * Create a new spill operation and return its identifier.
      *
      * @return spill identifier for the newly created spill
      */
-    abstract int newSpill();
+    int newSpill();
 
     /**
      * Write data from a ByteBuffer to the spill identified by spillId.
@@ -44,7 +44,7 @@ public abstract class OnHeapSpillManager {
      * @param spillId the identifier of the spill to write to
      * @param buffer the ByteBuffer containing data to be written
      */
-    abstract void writeSpill(int spillId, ByteBuffer buffer);
+    void writeSpill(int spillId, ByteBuffer buffer);
 
     /**
      * Read data from the spill identified by spillId into the provided 
ByteBuffer.
@@ -53,7 +53,7 @@ public abstract class OnHeapSpillManager {
      * @param buffer the ByteBuffer to read data into
      * @return the number of bytes actually read
      */
-    abstract int readSpill(int spillId, ByteBuffer buffer);
+    int readSpill(int spillId, ByteBuffer buffer);
 
     /**
      * Get the disk usage in bytes for the spill identified by spillId.
@@ -61,7 +61,7 @@ public abstract class OnHeapSpillManager {
      * @param spillId the identifier of the spill
      * @return the disk usage in bytes
      */
-    abstract long getSpillDiskUsage(int spillId);
+    long getSpillDiskUsage(int spillId);
 
     /**
      * Get the total disk I/O time in nanoseconds for the spill identified by 
spillId.
@@ -69,14 +69,14 @@ public abstract class OnHeapSpillManager {
      * @param spillId the identifier of the spill
      * @return the disk I/O time in nanoseconds
      */
-    abstract long getSpillDiskIOTime(int spillId);
+    long getSpillDiskIOTime(int spillId);
 
     /**
      * Release and clean up resources associated with the spill identified by 
spillId.
      *
      * @param spillId the identifier of the spill to release
      */
-    abstract void releaseSpill(int spillId);
+    void releaseSpill(int spillId);
 
     /**
      * Get the disabled on-heap spill manager instance.
@@ -87,37 +87,37 @@ public abstract class OnHeapSpillManager {
         return new OnHeapSpillManager() {
 
             @Override
-            boolean isOnHeapAvailable() {
+            public boolean isOnHeapAvailable() {
                 return false;
             }
 
             @Override
-            int newSpill() {
+            public int newSpill() {
                 throw new UnsupportedOperationException();
             }
 
             @Override
-            void writeSpill(int spillId, ByteBuffer buffer) {
+            public void writeSpill(int spillId, ByteBuffer buffer) {
                 throw new UnsupportedOperationException();
             }
 
             @Override
-            int readSpill(int spillId, ByteBuffer buffer) {
+            public int readSpill(int spillId, ByteBuffer buffer) {
                 throw new UnsupportedOperationException();
             }
 
             @Override
-            long getSpillDiskUsage(int spillId) {
+            public long getSpillDiskUsage(int spillId) {
                 throw new UnsupportedOperationException();
             }
 
             @Override
-            long getSpillDiskIOTime(int spillId) {
+            public long getSpillDiskIOTime(int spillId) {
                 throw new UnsupportedOperationException();
             }
 
             @Override
-            void releaseSpill(int spillId) {
+            public void releaseSpill(int spillId) {
                 throw new UnsupportedOperationException();
             }
         };
diff --git a/native-engine/auron-jni-bridge/src/jni_bridge.rs 
b/native-engine/auron-jni-bridge/src/jni_bridge.rs
index 067b5fd3..1e6e7bd2 100644
--- a/native-engine/auron-jni-bridge/src/jni_bridge.rs
+++ b/native-engine/auron-jni-bridge/src/jni_bridge.rs
@@ -611,7 +611,7 @@ impl<'a> JniBridge<'a> {
             method_getTaskOnHeapSpillManager: env.get_static_method_id(
                 class,
                 "getTaskOnHeapSpillManager",
-                "()Lorg/apache/spark/sql/auron/memory/OnHeapSpillManager;",
+                "()Lorg/apache/auron/memory/OnHeapSpillManager;",
             )?,
             method_getTaskOnHeapSpillManager_ret: ReturnType::Object,
             method_isTaskRunning: env.get_static_method_id(class, 
"isTaskRunning", "()Z")?,
@@ -1455,7 +1455,7 @@ pub struct AuronOnHeapSpillManager<'a> {
     pub method_releaseSpill_ret: ReturnType,
 }
 impl<'a> AuronOnHeapSpillManager<'a> {
-    pub const SIG_TYPE: &'static str = 
"org/apache/spark/sql/auron/memory/OnHeapSpillManager";
+    pub const SIG_TYPE: &'static str = 
"org/apache/spark/sql/auron/memory/SparkOnHeapSpillManager";
 
     pub fn new(env: &JNIEnv<'a>) -> JniResult<AuronOnHeapSpillManager<'a>> {
         let class = get_global_jclass(env, Self::SIG_TYPE)?;
diff --git 
a/spark-extension/src/main/java/org/apache/spark/sql/auron/JniBridge.java 
b/spark-extension/src/main/java/org/apache/spark/sql/auron/JniBridge.java
index 03811cdc..3078d487 100644
--- a/spark-extension/src/main/java/org/apache/spark/sql/auron/JniBridge.java
+++ b/spark-extension/src/main/java/org/apache/spark/sql/auron/JniBridge.java
@@ -27,13 +27,13 @@ import org.apache.auron.hadoop.fs.FSDataInputWrapper;
 import org.apache.auron.hadoop.fs.FSDataInputWrapper$;
 import org.apache.auron.hadoop.fs.FSDataOutputWrapper;
 import org.apache.auron.hadoop.fs.FSDataOutputWrapper$;
+import org.apache.auron.memory.OnHeapSpillManager;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.spark.SparkEnv;
 import org.apache.spark.TaskContext;
 import org.apache.spark.TaskContext$;
-import org.apache.spark.sql.auron.memory.OnHeapSpillManager;
-import org.apache.spark.sql.auron.memory.OnHeapSpillManager$;
+import org.apache.spark.sql.auron.memory.SparkOnHeapSpillManager$;
 import org.apache.spark.sql.auron.util.TaskContextHelper$;
 
 @SuppressWarnings("unused")
@@ -65,7 +65,7 @@ public class JniBridge {
     }
 
     public static OnHeapSpillManager getTaskOnHeapSpillManager() {
-        return OnHeapSpillManager$.MODULE$.current();
+        return SparkOnHeapSpillManager$.MODULE$.current();
     }
 
     public static boolean isTaskRunning() {
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/SparkUDAFWrapperContext.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/SparkUDAFWrapperContext.scala
index a2be0ecc..83b4bc8d 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/SparkUDAFWrapperContext.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/SparkUDAFWrapperContext.scala
@@ -38,7 +38,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.io.SnappyCompressionCodec
 import org.apache.spark.memory.MemoryConsumer
 import org.apache.spark.memory.MemoryMode
-import org.apache.spark.sql.auron.memory.OnHeapSpillManager
+import org.apache.spark.sql.auron.memory.SparkOnHeapSpillManager
 import org.apache.spark.sql.auron.util.Using
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
BoundReference, JoinedRow, Nondeterministic, UnsafeProjection, UnsafeRow}
@@ -211,7 +211,7 @@ trait AggregateEvaluator[B, R <: BufferRowsColumn[B]] 
extends Logging {
       rows: R,
       indices: Iterator[Int],
       spillIdx: Long): Int = {
-    val hsm = OnHeapSpillManager.current
+    val hsm = SparkOnHeapSpillManager.current
     val spillId = memTracker.getSpill(spillIdx)
     val byteBuffer =
       ByteBuffer.wrap(serializeRows(rows, indices, 
spillCodec.compressedOutputStream))
@@ -224,7 +224,7 @@ trait AggregateEvaluator[B, R <: BufferRowsColumn[B]] 
extends Logging {
       memTracker: SparkUDAFMemTracker,
       spillBlockSize: Int,
       spillIdx: Long): BufferRowsColumn[B] = {
-    val hsm = OnHeapSpillManager.current
+    val hsm = SparkOnHeapSpillManager.current
     val spillId = memTracker.getSpill(spillIdx)
     val byteBuffer = ByteBuffer.allocate(spillBlockSize)
     val readSize = hsm.readSpill(spillId, byteBuffer).toLong
@@ -539,7 +539,7 @@ class SparkUDAFMemTracker
   def getSpill(spillIdx: Long): Int = {
     this.spills.getOrElseUpdate(
       spillIdx, {
-        OnHeapSpillManager.current.newSpill()
+        SparkOnHeapSpillManager.current.newSpill()
       })
   }
 
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/OnHeapSpill.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/OnHeapSpill.scala
index d634e6ed..541f4b39 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/OnHeapSpill.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/OnHeapSpill.scala
@@ -20,7 +20,7 @@ import java.nio.ByteBuffer
 
 import org.apache.spark.internal.Logging
 
-case class OnHeapSpill(hsm: OnHeapSpillManager, id: Int) extends Logging {
+case class OnHeapSpill(hsm: SparkOnHeapSpillManager, id: Int) extends Logging {
   private var spillBuf: SpillBuf = new MemBasedSpillBuf
 
   def memUsed: Long = spillBuf.memUsed
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/OnHeapSpillManager.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/SparkOnHeapSpillManager.scala
similarity index 89%
rename from 
spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/OnHeapSpillManager.scala
rename to 
spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/SparkOnHeapSpillManager.scala
index 38aa1531..01f03712 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/OnHeapSpillManager.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/SparkOnHeapSpillManager.scala
@@ -32,13 +32,16 @@ import org.apache.spark.sql.auron.AuronConf
 import org.apache.spark.storage.BlockManager
 import org.apache.spark.util.Utils
 
-class OnHeapSpillManager(taskContext: TaskContext)
+import org.apache.auron.memory.OnHeapSpillManager
+
+class SparkOnHeapSpillManager(taskContext: TaskContext)
     extends MemoryConsumer(
       taskContext.taskMemoryManager,
       taskContext.taskMemoryManager.pageSizeBytes(),
       MemoryMode.ON_HEAP)
+    with OnHeapSpillManager
     with Logging {
-  import org.apache.spark.sql.auron.memory.OnHeapSpillManager._
+  import org.apache.spark.sql.auron.memory.SparkOnHeapSpillManager._
 
   private val _blockManager = SparkEnv.get.blockManager
   private val spills = ArrayBuffer[Option[OnHeapSpill]]()
@@ -62,7 +65,7 @@ class OnHeapSpillManager(taskContext: TaskContext)
    * @return
    */
   @SuppressWarnings(Array("unused"))
-  def isOnHeapAvailable: Boolean = {
+  override def isOnHeapAvailable: Boolean = {
     // if driver, tc always null.
     if (taskContext == null) {
       return false
@@ -96,7 +99,7 @@ class OnHeapSpillManager(taskContext: TaskContext)
    * @return
    *   allocated spill id
    */
-  def newSpill(): Int = {
+  override def newSpill(): Int = {
     synchronized {
       val spill = OnHeapSpill(this, spills.length)
       spills.append(Some(spill))
@@ -108,7 +111,7 @@ class OnHeapSpillManager(taskContext: TaskContext)
     }
   }
 
-  def writeSpill(spillId: Int, data: ByteBuffer): Unit = {
+  override def writeSpill(spillId: Int, data: ByteBuffer): Unit = {
     spills(spillId)
       .getOrElse(
         throw new RuntimeException(
@@ -116,7 +119,7 @@ class OnHeapSpillManager(taskContext: TaskContext)
       .write(data)
   }
 
-  def readSpill(spillId: Int, buf: ByteBuffer): Int = {
+  override def readSpill(spillId: Int, buf: ByteBuffer): Int = {
     spills(spillId)
       .getOrElse(
         throw new RuntimeException(
@@ -128,15 +131,15 @@ class OnHeapSpillManager(taskContext: TaskContext)
     spills(spillId).map(_.size).getOrElse(0)
   }
 
-  def getSpillDiskUsage(spillId: Int): Long = {
+  override def getSpillDiskUsage(spillId: Int): Long = {
     spills(spillId).map(_.diskUsed).getOrElse(0)
   }
 
-  def getSpillDiskIOTime(spillId: Int): Long = {
+  override def getSpillDiskIOTime(spillId: Int): Long = {
     spills(spillId).map(_.diskIOTime).getOrElse(0) // time unit: ns
   }
 
-  def releaseSpill(spillId: Int): Unit = {
+  override def releaseSpill(spillId: Int): Unit = {
     spills(spillId) match {
       case Some(spill) =>
         spill.release()
@@ -185,11 +188,11 @@ class OnHeapSpillManager(taskContext: TaskContext)
   }
 }
 
-object OnHeapSpillManager extends Logging {
+object SparkOnHeapSpillManager extends Logging {
   val all: mutable.Map[Long, OnHeapSpillManager] = concurrent.TrieMap[Long, 
OnHeapSpillManager]()
 
   def current: OnHeapSpillManager = {
     val tc = TaskContext.get
-    all.getOrElseUpdate(tc.taskAttemptId(), new OnHeapSpillManager(tc))
+    all.getOrElseUpdate(tc.taskAttemptId(), new SparkOnHeapSpillManager(tc))
   }
 }
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/SpillBuf.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/SpillBuf.scala
index 24adfbaa..ba2a5749 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/SpillBuf.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/memory/SpillBuf.scala
@@ -81,7 +81,7 @@ class MemBasedSpillBuf extends SpillBuf with Logging {
   override def diskIOTime: Long = 0
   override def size: Long = numWrittenBytes
 
-  def spill(hsm: OnHeapSpillManager): FileBasedSpillBuf = {
+  def spill(hsm: SparkOnHeapSpillManager): FileBasedSpillBuf = {
     logWarning(s"spilling in-mem spill buffer to disk, 
size=${Utils.bytesToString(size)}")
 
     val startTimeNs = System.nanoTime()

Reply via email to