This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new c61b2c0965 [VL] Fix object leak on `TreeMemoryConsumers#FACTORIES` 
(#11500)
c61b2c0965 is described below

commit c61b2c0965901d9e2e0459a7a2b35be557cf1073
Author: Hongze Zhang <[email protected]>
AuthorDate: Wed Jan 28 16:21:46 2026 +0000

    [VL] Fix object leak on `TreeMemoryConsumers#FACTORIES` (#11500)
    
    Co-authored-by: Copilot <[email protected]>
---
 .../spark/memory/GlobalOffHeapMemorySuite.scala    |  9 ++-
 .../gluten/columnarbatch/IndicatorVectorPool.java  |  2 +-
 .../memory/arrow/alloc/ArrowBufferAllocators.java  |  6 +-
 .../memory/arrow/pool/ArrowNativeMemoryPool.java   |  2 +-
 .../memory/listener/ReservationListeners.java      |  4 +-
 .../apache/gluten/memory/NativeMemoryManager.scala |  4 +-
 .../scala/org/apache/gluten/runtime/Runtime.scala  |  2 +-
 .../gluten/memory/memtarget/MemoryTargets.java     |  8 +--
 .../memtarget/spark/TreeMemoryConsumers.java       | 64 ++++++++++++++++------
 .../memtarget/spark/TreeMemoryConsumerTest.java    | 19 ++-----
 10 files changed, 67 insertions(+), 53 deletions(-)

diff --git 
a/backends-velox/src/test/scala/org/apache/spark/memory/GlobalOffHeapMemorySuite.scala
 
b/backends-velox/src/test/scala/org/apache/spark/memory/GlobalOffHeapMemorySuite.scala
index 95053e3e65..74d3bb4212 100644
--- 
a/backends-velox/src/test/scala/org/apache/spark/memory/GlobalOffHeapMemorySuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/spark/memory/GlobalOffHeapMemorySuite.scala
@@ -21,7 +21,6 @@ import org.apache.gluten.memory.memtarget.{Spillers, 
TreeMemoryTarget}
 import 
org.apache.gluten.memory.memtarget.ThrowOnOomMemoryTarget.OutOfMemoryException
 import org.apache.gluten.memory.memtarget.spark.TreeMemoryConsumers
 
-import org.apache.spark.TaskContext
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.task.TaskResources
 
@@ -42,7 +41,7 @@ class GlobalOffHeapMemorySuite extends AnyFunSuite with 
BeforeAndAfterAll {
   test("Sanity") {
     TaskResources.runUnsafe {
       val factory =
-        TreeMemoryConsumers.factory(TaskContext.get().taskMemoryManager(), 
MemoryMode.OFF_HEAP)
+        TreeMemoryConsumers.factory(MemoryMode.OFF_HEAP)
       val consumer =
         factory
           .legacyRoot()
@@ -65,7 +64,7 @@ class GlobalOffHeapMemorySuite extends AnyFunSuite with 
BeforeAndAfterAll {
   test("Task OOM by global occupation") {
     TaskResources.runUnsafe {
       val factory =
-        TreeMemoryConsumers.factory(TaskContext.get().taskMemoryManager(), 
MemoryMode.OFF_HEAP)
+        TreeMemoryConsumers.factory(MemoryMode.OFF_HEAP)
       val consumer =
         factory
           .legacyRoot()
@@ -84,7 +83,7 @@ class GlobalOffHeapMemorySuite extends AnyFunSuite with 
BeforeAndAfterAll {
   test("Release global") {
     TaskResources.runUnsafe {
       val factory =
-        TreeMemoryConsumers.factory(TaskContext.get().taskMemoryManager(), 
MemoryMode.OFF_HEAP)
+        TreeMemoryConsumers.factory(MemoryMode.OFF_HEAP)
       val consumer =
         factory
           .legacyRoot()
@@ -103,7 +102,7 @@ class GlobalOffHeapMemorySuite extends AnyFunSuite with 
BeforeAndAfterAll {
   test("Release task") {
     TaskResources.runUnsafe {
       val factory =
-        TreeMemoryConsumers.factory(TaskContext.get().taskMemoryManager(), 
MemoryMode.OFF_HEAP)
+        TreeMemoryConsumers.factory(MemoryMode.OFF_HEAP)
       val consumer =
         factory
           .legacyRoot()
diff --git 
a/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/IndicatorVectorPool.java
 
b/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/IndicatorVectorPool.java
index 41c8cbdd43..4cc534e62a 100644
--- 
a/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/IndicatorVectorPool.java
+++ 
b/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/IndicatorVectorPool.java
@@ -56,7 +56,7 @@ public class IndicatorVectorPool implements TaskResource {
 
   @Override
   public int priority() {
-    return 10;
+    return 20;
   }
 
   @Override
diff --git 
a/gluten-arrow/src/main/java/org/apache/gluten/memory/arrow/alloc/ArrowBufferAllocators.java
 
b/gluten-arrow/src/main/java/org/apache/gluten/memory/arrow/alloc/ArrowBufferAllocators.java
index 7d72a263ac..1aa6c4cc2b 100644
--- 
a/gluten-arrow/src/main/java/org/apache/gluten/memory/arrow/alloc/ArrowBufferAllocators.java
+++ 
b/gluten-arrow/src/main/java/org/apache/gluten/memory/arrow/alloc/ArrowBufferAllocators.java
@@ -25,7 +25,6 @@ import org.apache.arrow.memory.AllocationListener;
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
 import org.apache.spark.memory.GlobalOffHeapMemory;
-import org.apache.spark.memory.TaskMemoryManager;
 import org.apache.spark.task.TaskResource;
 import org.apache.spark.task.TaskResources;
 import org.slf4j.Logger;
@@ -72,7 +71,6 @@ public class ArrowBufferAllocators {
     private final String name;
 
     {
-      final TaskMemoryManager tmm = 
TaskResources.getLocalTaskContext().taskMemoryManager();
       if (GlutenConfig.get().memoryUntracked()) {
         listener = AllocationListener.NOOP;
       } else {
@@ -81,7 +79,7 @@ public class ArrowBufferAllocators {
                 MemoryTargets.throwOnOom(
                     MemoryTargets.dynamicOffHeapSizingIfEnabled(
                         MemoryTargets.newConsumer(
-                            tmm, "ArrowContextInstance", Spillers.NOOP, 
Collections.emptyMap()))),
+                            "ArrowContextInstance", Spillers.NOOP, 
Collections.emptyMap()))),
                 TaskResources.getSharedUsage());
       }
     }
@@ -122,7 +120,7 @@ public class ArrowBufferAllocators {
 
     @Override
     public int priority() {
-      return 0; // lowest priority
+      return 10; // low priority: released after higher-priority task resources
     }
 
     @Override
diff --git 
a/gluten-arrow/src/main/java/org/apache/gluten/memory/arrow/pool/ArrowNativeMemoryPool.java
 
b/gluten-arrow/src/main/java/org/apache/gluten/memory/arrow/pool/ArrowNativeMemoryPool.java
index a1f07c949e..f1aa6149f9 100644
--- 
a/gluten-arrow/src/main/java/org/apache/gluten/memory/arrow/pool/ArrowNativeMemoryPool.java
+++ 
b/gluten-arrow/src/main/java/org/apache/gluten/memory/arrow/pool/ArrowNativeMemoryPool.java
@@ -61,7 +61,7 @@ public class ArrowNativeMemoryPool implements TaskResource {
 
   @Override
   public int priority() {
-    return 0;
+    return 10;
   }
 
   @Override
diff --git 
a/gluten-arrow/src/main/java/org/apache/gluten/memory/listener/ReservationListeners.java
 
b/gluten-arrow/src/main/java/org/apache/gluten/memory/listener/ReservationListeners.java
index ecfa6e9945..070533c5e4 100644
--- 
a/gluten-arrow/src/main/java/org/apache/gluten/memory/listener/ReservationListeners.java
+++ 
b/gluten-arrow/src/main/java/org/apache/gluten/memory/listener/ReservationListeners.java
@@ -49,10 +49,10 @@ public final class ReservationListeners {
     final TaskMemoryManager tmm = 
TaskResources.getLocalTaskContext().taskMemoryManager();
     final TreeMemoryTarget consumer =
         MemoryTargets.newConsumer(
-            tmm, name, Spillers.withMinSpillSize(spiller, 
reservationBlockSize), mutableStats);
+            name, Spillers.withMinSpillSize(spiller, reservationBlockSize), 
mutableStats);
     final MemoryTarget overConsumer =
         MemoryTargets.newConsumer(
-            tmm, consumer.name() + ".OverAcquire", Spillers.NOOP, 
Collections.emptyMap());
+            consumer.name() + ".OverAcquire", Spillers.NOOP, 
Collections.emptyMap());
     final MemoryTarget target =
         MemoryTargets.throwOnOom(
             MemoryTargets.overAcquire(
diff --git 
a/gluten-arrow/src/main/scala/org/apache/gluten/memory/NativeMemoryManager.scala
 
b/gluten-arrow/src/main/scala/org/apache/gluten/memory/NativeMemoryManager.scala
index 3559d7a213..159e1bba5e 100644
--- 
a/gluten-arrow/src/main/scala/org/apache/gluten/memory/NativeMemoryManager.scala
+++ 
b/gluten-arrow/src/main/scala/org/apache/gluten/memory/NativeMemoryManager.scala
@@ -112,8 +112,8 @@ object NativeMemoryManager {
     }
     override def priority(): Int = {
       // Memory managers should be released after all runtimes are released.
-      // So lower the priority to 0.
-      0
+      // So set the priority lower than runtime resources.
+      10
     }
     override def resourceName(): String = "nmm"
   }
diff --git 
a/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtime.scala 
b/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtime.scala
index 219eb216df..e57bec619d 100644
--- a/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtime.scala
+++ b/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtime.scala
@@ -76,7 +76,7 @@ object Runtime {
 
     }
 
-    override def priority(): Int = 20
+    override def priority(): Int = 30
 
     override def resourceName(): String = s"runtime"
   }
diff --git 
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
 
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
index 2ad47f2890..b5138bd4c6 100644
--- 
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
+++ 
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
@@ -24,7 +24,6 @@ import org.apache.spark.SparkEnv;
 import org.apache.spark.annotation.Experimental;
 import org.apache.spark.memory.GlobalOffHeapMemoryTarget;
 import org.apache.spark.memory.MemoryMode;
-import org.apache.spark.memory.TaskMemoryManager;
 import org.apache.spark.util.SparkResourceUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -64,17 +63,14 @@ public final class MemoryTargets {
   }
 
   public static TreeMemoryTarget newConsumer(
-      TaskMemoryManager tmm,
-      String name,
-      Spiller spiller,
-      Map<String, MemoryUsageStatsBuilder> virtualChildren) {
+      String name, Spiller spiller, Map<String, MemoryUsageStatsBuilder> 
virtualChildren) {
     final MemoryMode mode;
     if (GlutenCoreConfig.get().dynamicOffHeapSizingEnabled()) {
       mode = MemoryMode.ON_HEAP;
     } else {
       mode = MemoryMode.OFF_HEAP;
     }
-    final TreeMemoryConsumers.Factory factory = 
TreeMemoryConsumers.factory(tmm, mode);
+    final TreeMemoryConsumers.Factory factory = 
TreeMemoryConsumers.factory(mode);
     if (GlutenCoreConfig.get().memoryIsolation()) {
       return TreeMemoryTargets.newChild(factory.isolatedRoot(), name, spiller, 
virtualChildren);
     }
diff --git 
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumers.java
 
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumers.java
index 0c42ce966e..3782ae475a 100644
--- 
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumers.java
+++ 
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumers.java
@@ -21,37 +21,45 @@ import org.apache.gluten.memory.memtarget.Spillers;
 import org.apache.gluten.memory.memtarget.TreeMemoryTarget;
 
 import com.google.common.base.Preconditions;
-import org.apache.commons.collections4.map.AbstractReferenceMap;
-import org.apache.commons.collections4.map.ReferenceMap;
 import org.apache.spark.memory.MemoryMode;
 import org.apache.spark.memory.TaskMemoryManager;
+import org.apache.spark.task.TaskResource;
+import org.apache.spark.task.TaskResources;
 import org.apache.spark.util.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
-public final class TreeMemoryConsumers {
-  private static final ReferenceMap<TaskMemoryManager, Factory> FACTORIES =
-      new ReferenceMap<>(
-          AbstractReferenceMap.ReferenceStrength.WEAK, 
AbstractReferenceMap.ReferenceStrength.WEAK);
+import scala.Function0;
 
+public final class TreeMemoryConsumers {
   private TreeMemoryConsumers() {}
 
-  public static Factory factory(TaskMemoryManager tmm, MemoryMode mode) {
-    synchronized (FACTORIES) {
-      final Factory factory = FACTORIES.computeIfAbsent(tmm, m -> new 
Factory(m, mode));
-      final MemoryMode foundMode = factory.sparkConsumer.getMode();
-      Preconditions.checkState(
-          foundMode == mode,
-          "An existing Spark memory consumer already exists but is of the 
different memory "
-              + "mode: %s",
-          foundMode);
-      return factory;
-    }
+  public static Factory factory(MemoryMode mode) {
+    final Factory factory =
+        TaskResources.addResourceIfNotRegistered(
+            Factory.class.getName(),
+            new Function0<Factory>() {
+              @Override
+              public Factory apply() {
+                return new 
Factory(TaskResources.getLocalTaskContext().taskMemoryManager(), mode);
+              }
+            });
+    final MemoryMode foundMode = factory.sparkConsumer.getMode();
+    Preconditions.checkState(
+        foundMode == mode,
+        "An existing Spark memory consumer already exists but is of the 
different memory "
+            + "mode: %s",
+        foundMode);
+    return factory;
   }
 
-  public static class Factory {
+  public static class Factory implements TaskResource {
+    private static final Logger LOG = LoggerFactory.getLogger(Factory.class);
+
     private final TreeMemoryConsumer sparkConsumer;
     private final Map<Long, TreeMemoryTarget> roots = new 
ConcurrentHashMap<>();
 
@@ -90,5 +98,25 @@ public final class TreeMemoryConsumers {
     public TreeMemoryTarget isolatedRoot() {
       return 
ofCapacity(GlutenCoreConfig.get().conservativeTaskOffHeapMemorySize());
     }
+
+    @Override
+    public void release() throws Exception {
+      if (sparkConsumer.usedBytes() != 0) {
+        LOG.warn(
+            "{} still used {} bytes when task is ending," + " this may cause 
memory leak",
+            resourceName(),
+            sparkConsumer.usedBytes());
+      }
+    }
+
+    @Override
+    public int priority() {
+      return 5;
+    }
+
+    @Override
+    public String resourceName() {
+      return Factory.class.getSimpleName();
+    }
   }
 }
diff --git 
a/gluten-core/src/test/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumerTest.java
 
b/gluten-core/src/test/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumerTest.java
index 81a562f45f..e252b7d7c4 100644
--- 
a/gluten-core/src/test/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumerTest.java
+++ 
b/gluten-core/src/test/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumerTest.java
@@ -22,7 +22,6 @@ import org.apache.gluten.memory.memtarget.Spiller;
 import org.apache.gluten.memory.memtarget.Spillers;
 import org.apache.gluten.memory.memtarget.TreeMemoryTarget;
 
-import org.apache.spark.TaskContext;
 import org.apache.spark.memory.MemoryMode;
 import org.apache.spark.sql.internal.SQLConf;
 import org.apache.spark.task.TaskResources$;
@@ -51,8 +50,7 @@ public class TreeMemoryConsumerTest {
     test(
         () -> {
           final TreeMemoryConsumers.Factory factory =
-              TreeMemoryConsumers.factory(
-                  TaskContext.get().taskMemoryManager(), MemoryMode.OFF_HEAP);
+              TreeMemoryConsumers.factory(MemoryMode.OFF_HEAP);
           final TreeMemoryTarget consumer =
               factory
                   .isolatedRoot()
@@ -73,8 +71,7 @@ public class TreeMemoryConsumerTest {
     test(
         () -> {
           final TreeMemoryConsumers.Factory factory =
-              TreeMemoryConsumers.factory(
-                  TaskContext.get().taskMemoryManager(), MemoryMode.OFF_HEAP);
+              TreeMemoryConsumers.factory(MemoryMode.OFF_HEAP);
           final TreeMemoryTarget consumer =
               factory
                   .legacyRoot()
@@ -95,8 +92,7 @@ public class TreeMemoryConsumerTest {
     test(
         () -> {
           final TreeMemoryTarget legacy =
-              TreeMemoryConsumers.factory(
-                      TaskContext.get().taskMemoryManager(), 
MemoryMode.OFF_HEAP)
+              TreeMemoryConsumers.factory(MemoryMode.OFF_HEAP)
                   .legacyRoot()
                   .newChild(
                       "FOO",
@@ -105,8 +101,7 @@ public class TreeMemoryConsumerTest {
                       Collections.emptyMap());
           Assert.assertEquals(110, legacy.borrow(110));
           final TreeMemoryTarget isolated =
-              TreeMemoryConsumers.factory(
-                      TaskContext.get().taskMemoryManager(), 
MemoryMode.OFF_HEAP)
+              TreeMemoryConsumers.factory(MemoryMode.OFF_HEAP)
                   .isolatedRoot()
                   .newChild(
                       "FOO",
@@ -123,8 +118,7 @@ public class TreeMemoryConsumerTest {
         () -> {
           final Spillers.AppendableSpillerList spillers = 
Spillers.appendable();
           final TreeMemoryTarget legacy =
-              TreeMemoryConsumers.factory(
-                      TaskContext.get().taskMemoryManager(), 
MemoryMode.OFF_HEAP)
+              TreeMemoryConsumers.factory(MemoryMode.OFF_HEAP)
                   .legacyRoot()
                   .newChild(
                       "FOO", TreeMemoryTarget.CAPACITY_UNLIMITED, spillers, 
Collections.emptyMap());
@@ -160,8 +154,7 @@ public class TreeMemoryConsumerTest {
         () -> {
           final Spillers.AppendableSpillerList spillers = 
Spillers.appendable();
           final TreeMemoryTarget legacy =
-              TreeMemoryConsumers.factory(
-                      TaskContext.get().taskMemoryManager(), 
MemoryMode.OFF_HEAP)
+              TreeMemoryConsumers.factory(MemoryMode.OFF_HEAP)
                   .legacyRoot()
                   .newChild(
                       "FOO", TreeMemoryTarget.CAPACITY_UNLIMITED, spillers, 
Collections.emptyMap());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to