This is an automated email from the ASF dual-hosted git repository.

zhztheplayer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 26a8f9cb38 [VL] Simplify lifecycle of NativeMemoryManager and 
NativeThreadManager (#12323)
26a8f9cb38 is described below

commit 26a8f9cb380f322d100d1ff56ade88b101504061
Author: Hongze Zhang <[email protected]>
AuthorDate: Tue Jun 30 11:14:23 2026 +0100

    [VL] Simplify lifecycle of NativeMemoryManager and NativeThreadManager 
(#12323)
---
 .../apache/gluten/memory/NativeMemoryManager.scala | 20 ++++++----------
 .../scala/org/apache/gluten/runtime/Runtime.scala  |  3 ++-
 .../gluten/threads/NativeThreadManager.scala       | 27 +++++++++-------------
 3 files changed, 20 insertions(+), 30 deletions(-)

diff --git 
a/gluten-arrow/src/main/scala/org/apache/gluten/memory/NativeMemoryManager.scala
 
b/gluten-arrow/src/main/scala/org/apache/gluten/memory/NativeMemoryManager.scala
index 159e1bba5e..3308f357a7 100644
--- 
a/gluten-arrow/src/main/scala/org/apache/gluten/memory/NativeMemoryManager.scala
+++ 
b/gluten-arrow/src/main/scala/org/apache/gluten/memory/NativeMemoryManager.scala
@@ -25,7 +25,6 @@ import org.apache.gluten.utils.ConfigUtil
 
 import org.apache.spark.memory.SparkMemoryUtil
 import org.apache.spark.sql.internal.{GlutenConfigUtil, SQLConf}
-import org.apache.spark.task.{TaskResource, TaskResources}
 
 import org.slf4j.LoggerFactory
 
@@ -38,12 +37,13 @@ trait NativeMemoryManager {
   def addSpiller(spiller: Spiller): Unit
   def hold(): Unit
   def getHandle(): Long
+  def release(): Unit
 }
 
 object NativeMemoryManager {
   private class Impl(backendName: String, name: String)
-    extends NativeMemoryManager
-    with TaskResource {
+    extends NativeMemoryManager {
+    private val nmmName = s"[nmm-$name]"
     private val LOGGER = LoggerFactory.getLogger(classOf[NativeMemoryManager])
     private val spillers = Spillers.appendable()
     private val mutableStats: mutable.Map[String, MemoryUsageStatsBuilder] = 
mutable.Map()
@@ -81,14 +81,14 @@ object NativeMemoryManager {
     override def release(): Unit = {
       if (!released.compareAndSet(false, true)) {
         throw new GlutenException(
-          s"Memory manager instance already released: $handle, 
${resourceName()}, ${priority()}")
+          s"Memory manager instance already released: $handle")
       }
 
       def dump(): String = {
         SparkMemoryUtil.prettyPrintStats(
-          s"[${resourceName()}]",
+          nmmName,
           new KnownNameAndStats() {
-            override def name: String = resourceName()
+            override def name: String = nmmName
             override def stats: MemoryUsageStats = collectUsage()
           })
       }
@@ -110,15 +110,9 @@ object NativeMemoryManager {
           ))
       }
     }
-    override def priority(): Int = {
-      // Memory managers should be released after all runtimes are released.
-      // So set the priority lower than runtime resources.
-      10
-    }
-    override def resourceName(): String = "nmm"
   }
 
   def apply(backendName: String, name: String): NativeMemoryManager = {
-    TaskResources.addAnonymousResource(new Impl(backendName, name))
+    new Impl(backendName, name)
   }
 }
diff --git 
a/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtime.scala 
b/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtime.scala
index 2e3c468140..885dd831e8 100644
--- a/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtime.scala
+++ b/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtime.scala
@@ -81,7 +81,8 @@ object Runtime {
           s"Runtime instance already released: $handle, ${resourceName()}, 
${priority()}")
       }
       RuntimeJniWrapper.releaseRuntime(handle)
-
+      ntm.release()
+      nmm.release()
     }
 
     override def priority(): Int = 30
diff --git 
a/gluten-arrow/src/main/scala/org/apache/gluten/threads/NativeThreadManager.scala
 
b/gluten-arrow/src/main/scala/org/apache/gluten/threads/NativeThreadManager.scala
index 8abe6d9c9b..ce0507b69a 100644
--- 
a/gluten-arrow/src/main/scala/org/apache/gluten/threads/NativeThreadManager.scala
+++ 
b/gluten-arrow/src/main/scala/org/apache/gluten/threads/NativeThreadManager.scala
@@ -18,29 +18,29 @@ package org.apache.gluten.threads
 
 import org.apache.gluten.exception.GlutenException
 
-import org.apache.spark.task.{TaskResource, TaskResources}
-
 import java.util.concurrent.atomic.AtomicBoolean
 
 /**
  * Scala wrapper around a native ThreadManager handle.
  *
- * Created once per Spark task and registered as a [[TaskResource]] so it is 
automatically released
- * when the task completes. The ThreadManager wraps a 
[[NativeThreadInitializer]] that propagates
- * task context to native worker threads spawned by folly executors.
+ * Created once per Spark task by [[org.apache.gluten.runtime.Runtime]]. The 
ThreadManager wraps a
+ * [[NativeThreadInitializer]] that propagates task context to native worker 
threads spawned by
+ * folly executors.
  */
 trait NativeThreadManager {
 
   /** @return opaque native handle passed to RuntimeJniWrapper#createRuntime. 
*/
   def getHandle(): Long
+
+  /** Release the native ThreadManager handle. Called by Runtime during task 
completion. */
+  def release(): Unit
 }
 
 object NativeThreadManager {
   private class Impl(
       private val backendName: String,
       private val initializer: NativeThreadInitializer)
-    extends NativeThreadManager
-    with TaskResource {
+    extends NativeThreadManager {
     private val handle = NativeThreadManagerJniWrapper.create(backendName, 
initializer)
     private val released = new AtomicBoolean(false)
 
@@ -49,20 +49,15 @@ object NativeThreadManager {
     override def release(): Unit = {
       if (!released.compareAndSet(false, true)) {
         throw new GlutenException(
-          s"Thread manager instance already released: $handle, 
${resourceName()}, ${priority()}")
+          s"Thread manager instance already released: $handle")
       }
       NativeThreadManagerJniWrapper.release(handle)
     }
-
-    // Release before MemoryManager (10) but after most other resources.
-    override def priority(): Int = 20
-
-    override def resourceName(): String = "ntm"
   }
 
   /**
-   * Create a new NativeThreadManager and register it with the current Spark 
task's
-   * [[TaskResources]] so it is automatically released when the task finishes.
+   * Create a new NativeThreadManager. The caller (typically Runtime) is 
responsible for calling
+   * `release()` when the manager is no longer needed.
    *
    * @param backendName
    *   the backend kind string (e.g., "velox").
@@ -70,6 +65,6 @@ object NativeThreadManager {
    *   callback invoked when native worker threads are created / destroyed.
    */
   def apply(backendName: String, initializer: NativeThreadInitializer): 
NativeThreadManager = {
-    TaskResources.addAnonymousResource(new Impl(backendName, initializer))
+    new Impl(backendName, initializer)
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to