This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new c61b2c0965 [VL] Fix object leak on `TreeMemoryConsumers#FACTORIES`
(#11500)
c61b2c0965 is described below
commit c61b2c0965901d9e2e0459a7a2b35be557cf1073
Author: Hongze Zhang <[email protected]>
AuthorDate: Wed Jan 28 16:21:46 2026 +0000
[VL] Fix object leak on `TreeMemoryConsumers#FACTORIES` (#11500)
Co-authored-by: Copilot <[email protected]>
---
.../spark/memory/GlobalOffHeapMemorySuite.scala | 9 ++-
.../gluten/columnarbatch/IndicatorVectorPool.java | 2 +-
.../memory/arrow/alloc/ArrowBufferAllocators.java | 6 +-
.../memory/arrow/pool/ArrowNativeMemoryPool.java | 2 +-
.../memory/listener/ReservationListeners.java | 4 +-
.../apache/gluten/memory/NativeMemoryManager.scala | 4 +-
.../scala/org/apache/gluten/runtime/Runtime.scala | 2 +-
.../gluten/memory/memtarget/MemoryTargets.java | 8 +--
.../memtarget/spark/TreeMemoryConsumers.java | 64 ++++++++++++++++------
.../memtarget/spark/TreeMemoryConsumerTest.java | 19 ++-----
10 files changed, 67 insertions(+), 53 deletions(-)
diff --git
a/backends-velox/src/test/scala/org/apache/spark/memory/GlobalOffHeapMemorySuite.scala
b/backends-velox/src/test/scala/org/apache/spark/memory/GlobalOffHeapMemorySuite.scala
index 95053e3e65..74d3bb4212 100644
---
a/backends-velox/src/test/scala/org/apache/spark/memory/GlobalOffHeapMemorySuite.scala
+++
b/backends-velox/src/test/scala/org/apache/spark/memory/GlobalOffHeapMemorySuite.scala
@@ -21,7 +21,6 @@ import org.apache.gluten.memory.memtarget.{Spillers,
TreeMemoryTarget}
import
org.apache.gluten.memory.memtarget.ThrowOnOomMemoryTarget.OutOfMemoryException
import org.apache.gluten.memory.memtarget.spark.TreeMemoryConsumers
-import org.apache.spark.TaskContext
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.task.TaskResources
@@ -42,7 +41,7 @@ class GlobalOffHeapMemorySuite extends AnyFunSuite with
BeforeAndAfterAll {
test("Sanity") {
TaskResources.runUnsafe {
val factory =
- TreeMemoryConsumers.factory(TaskContext.get().taskMemoryManager(),
MemoryMode.OFF_HEAP)
+ TreeMemoryConsumers.factory(MemoryMode.OFF_HEAP)
val consumer =
factory
.legacyRoot()
@@ -65,7 +64,7 @@ class GlobalOffHeapMemorySuite extends AnyFunSuite with
BeforeAndAfterAll {
test("Task OOM by global occupation") {
TaskResources.runUnsafe {
val factory =
- TreeMemoryConsumers.factory(TaskContext.get().taskMemoryManager(),
MemoryMode.OFF_HEAP)
+ TreeMemoryConsumers.factory(MemoryMode.OFF_HEAP)
val consumer =
factory
.legacyRoot()
@@ -84,7 +83,7 @@ class GlobalOffHeapMemorySuite extends AnyFunSuite with
BeforeAndAfterAll {
test("Release global") {
TaskResources.runUnsafe {
val factory =
- TreeMemoryConsumers.factory(TaskContext.get().taskMemoryManager(),
MemoryMode.OFF_HEAP)
+ TreeMemoryConsumers.factory(MemoryMode.OFF_HEAP)
val consumer =
factory
.legacyRoot()
@@ -103,7 +102,7 @@ class GlobalOffHeapMemorySuite extends AnyFunSuite with
BeforeAndAfterAll {
test("Release task") {
TaskResources.runUnsafe {
val factory =
- TreeMemoryConsumers.factory(TaskContext.get().taskMemoryManager(),
MemoryMode.OFF_HEAP)
+ TreeMemoryConsumers.factory(MemoryMode.OFF_HEAP)
val consumer =
factory
.legacyRoot()
diff --git
a/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/IndicatorVectorPool.java
b/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/IndicatorVectorPool.java
index 41c8cbdd43..4cc534e62a 100644
---
a/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/IndicatorVectorPool.java
+++
b/gluten-arrow/src/main/java/org/apache/gluten/columnarbatch/IndicatorVectorPool.java
@@ -56,7 +56,7 @@ public class IndicatorVectorPool implements TaskResource {
@Override
public int priority() {
- return 10;
+ return 20;
}
@Override
diff --git
a/gluten-arrow/src/main/java/org/apache/gluten/memory/arrow/alloc/ArrowBufferAllocators.java
b/gluten-arrow/src/main/java/org/apache/gluten/memory/arrow/alloc/ArrowBufferAllocators.java
index 7d72a263ac..1aa6c4cc2b 100644
---
a/gluten-arrow/src/main/java/org/apache/gluten/memory/arrow/alloc/ArrowBufferAllocators.java
+++
b/gluten-arrow/src/main/java/org/apache/gluten/memory/arrow/alloc/ArrowBufferAllocators.java
@@ -25,7 +25,6 @@ import org.apache.arrow.memory.AllocationListener;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.spark.memory.GlobalOffHeapMemory;
-import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.task.TaskResource;
import org.apache.spark.task.TaskResources;
import org.slf4j.Logger;
@@ -72,7 +71,6 @@ public class ArrowBufferAllocators {
private final String name;
{
- final TaskMemoryManager tmm =
TaskResources.getLocalTaskContext().taskMemoryManager();
if (GlutenConfig.get().memoryUntracked()) {
listener = AllocationListener.NOOP;
} else {
@@ -81,7 +79,7 @@ public class ArrowBufferAllocators {
MemoryTargets.throwOnOom(
MemoryTargets.dynamicOffHeapSizingIfEnabled(
MemoryTargets.newConsumer(
- tmm, "ArrowContextInstance", Spillers.NOOP,
Collections.emptyMap()))),
+ "ArrowContextInstance", Spillers.NOOP,
Collections.emptyMap()))),
TaskResources.getSharedUsage());
}
}
@@ -122,7 +120,7 @@ public class ArrowBufferAllocators {
@Override
public int priority() {
- return 0; // lowest priority
+ return 10; // low priority: released after higher-priority task resources
}
@Override
diff --git
a/gluten-arrow/src/main/java/org/apache/gluten/memory/arrow/pool/ArrowNativeMemoryPool.java
b/gluten-arrow/src/main/java/org/apache/gluten/memory/arrow/pool/ArrowNativeMemoryPool.java
index a1f07c949e..f1aa6149f9 100644
---
a/gluten-arrow/src/main/java/org/apache/gluten/memory/arrow/pool/ArrowNativeMemoryPool.java
+++
b/gluten-arrow/src/main/java/org/apache/gluten/memory/arrow/pool/ArrowNativeMemoryPool.java
@@ -61,7 +61,7 @@ public class ArrowNativeMemoryPool implements TaskResource {
@Override
public int priority() {
- return 0;
+ return 10;
}
@Override
diff --git
a/gluten-arrow/src/main/java/org/apache/gluten/memory/listener/ReservationListeners.java
b/gluten-arrow/src/main/java/org/apache/gluten/memory/listener/ReservationListeners.java
index ecfa6e9945..070533c5e4 100644
---
a/gluten-arrow/src/main/java/org/apache/gluten/memory/listener/ReservationListeners.java
+++
b/gluten-arrow/src/main/java/org/apache/gluten/memory/listener/ReservationListeners.java
@@ -49,10 +49,10 @@ public final class ReservationListeners {
final TaskMemoryManager tmm =
TaskResources.getLocalTaskContext().taskMemoryManager();
final TreeMemoryTarget consumer =
MemoryTargets.newConsumer(
- tmm, name, Spillers.withMinSpillSize(spiller,
reservationBlockSize), mutableStats);
+ name, Spillers.withMinSpillSize(spiller, reservationBlockSize),
mutableStats);
final MemoryTarget overConsumer =
MemoryTargets.newConsumer(
- tmm, consumer.name() + ".OverAcquire", Spillers.NOOP,
Collections.emptyMap());
+ consumer.name() + ".OverAcquire", Spillers.NOOP,
Collections.emptyMap());
final MemoryTarget target =
MemoryTargets.throwOnOom(
MemoryTargets.overAcquire(
diff --git
a/gluten-arrow/src/main/scala/org/apache/gluten/memory/NativeMemoryManager.scala
b/gluten-arrow/src/main/scala/org/apache/gluten/memory/NativeMemoryManager.scala
index 3559d7a213..159e1bba5e 100644
---
a/gluten-arrow/src/main/scala/org/apache/gluten/memory/NativeMemoryManager.scala
+++
b/gluten-arrow/src/main/scala/org/apache/gluten/memory/NativeMemoryManager.scala
@@ -112,8 +112,8 @@ object NativeMemoryManager {
}
override def priority(): Int = {
// Memory managers should be released after all runtimes are released.
- // So lower the priority to 0.
- 0
+ // So set the priority lower than runtime resources.
+ 10
}
override def resourceName(): String = "nmm"
}
diff --git
a/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtime.scala
b/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtime.scala
index 219eb216df..e57bec619d 100644
--- a/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtime.scala
+++ b/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtime.scala
@@ -76,7 +76,7 @@ object Runtime {
}
- override def priority(): Int = 20
+ override def priority(): Int = 30
override def resourceName(): String = s"runtime"
}
diff --git
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
index 2ad47f2890..b5138bd4c6 100644
---
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
+++
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
@@ -24,7 +24,6 @@ import org.apache.spark.SparkEnv;
import org.apache.spark.annotation.Experimental;
import org.apache.spark.memory.GlobalOffHeapMemoryTarget;
import org.apache.spark.memory.MemoryMode;
-import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.util.SparkResourceUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -64,17 +63,14 @@ public final class MemoryTargets {
}
public static TreeMemoryTarget newConsumer(
- TaskMemoryManager tmm,
- String name,
- Spiller spiller,
- Map<String, MemoryUsageStatsBuilder> virtualChildren) {
+ String name, Spiller spiller, Map<String, MemoryUsageStatsBuilder>
virtualChildren) {
final MemoryMode mode;
if (GlutenCoreConfig.get().dynamicOffHeapSizingEnabled()) {
mode = MemoryMode.ON_HEAP;
} else {
mode = MemoryMode.OFF_HEAP;
}
- final TreeMemoryConsumers.Factory factory =
TreeMemoryConsumers.factory(tmm, mode);
+ final TreeMemoryConsumers.Factory factory =
TreeMemoryConsumers.factory(mode);
if (GlutenCoreConfig.get().memoryIsolation()) {
return TreeMemoryTargets.newChild(factory.isolatedRoot(), name, spiller,
virtualChildren);
}
diff --git
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumers.java
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumers.java
index 0c42ce966e..3782ae475a 100644
---
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumers.java
+++
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumers.java
@@ -21,37 +21,45 @@ import org.apache.gluten.memory.memtarget.Spillers;
import org.apache.gluten.memory.memtarget.TreeMemoryTarget;
import com.google.common.base.Preconditions;
-import org.apache.commons.collections4.map.AbstractReferenceMap;
-import org.apache.commons.collections4.map.ReferenceMap;
import org.apache.spark.memory.MemoryMode;
import org.apache.spark.memory.TaskMemoryManager;
+import org.apache.spark.task.TaskResource;
+import org.apache.spark.task.TaskResources;
import org.apache.spark.util.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-public final class TreeMemoryConsumers {
- private static final ReferenceMap<TaskMemoryManager, Factory> FACTORIES =
- new ReferenceMap<>(
- AbstractReferenceMap.ReferenceStrength.WEAK,
AbstractReferenceMap.ReferenceStrength.WEAK);
+import scala.Function0;
+public final class TreeMemoryConsumers {
private TreeMemoryConsumers() {}
- public static Factory factory(TaskMemoryManager tmm, MemoryMode mode) {
- synchronized (FACTORIES) {
- final Factory factory = FACTORIES.computeIfAbsent(tmm, m -> new
Factory(m, mode));
- final MemoryMode foundMode = factory.sparkConsumer.getMode();
- Preconditions.checkState(
- foundMode == mode,
- "An existing Spark memory consumer already exists but is of the
different memory "
- + "mode: %s",
- foundMode);
- return factory;
- }
+ public static Factory factory(MemoryMode mode) {
+ final Factory factory =
+ TaskResources.addResourceIfNotRegistered(
+ Factory.class.getName(),
+ new Function0<Factory>() {
+ @Override
+ public Factory apply() {
+ return new
Factory(TaskResources.getLocalTaskContext().taskMemoryManager(), mode);
+ }
+ });
+ final MemoryMode foundMode = factory.sparkConsumer.getMode();
+ Preconditions.checkState(
+ foundMode == mode,
+ "An existing Spark memory consumer already exists but is of the
different memory "
+ + "mode: %s",
+ foundMode);
+ return factory;
}
- public static class Factory {
+ public static class Factory implements TaskResource {
+ private static final Logger LOG = LoggerFactory.getLogger(Factory.class);
+
private final TreeMemoryConsumer sparkConsumer;
private final Map<Long, TreeMemoryTarget> roots = new
ConcurrentHashMap<>();
@@ -90,5 +98,25 @@ public final class TreeMemoryConsumers {
public TreeMemoryTarget isolatedRoot() {
return
ofCapacity(GlutenCoreConfig.get().conservativeTaskOffHeapMemorySize());
}
+
+ @Override
+ public void release() throws Exception {
+ if (sparkConsumer.usedBytes() != 0) {
+ LOG.warn(
+ "{} still used {} bytes when task is ending," + " this may cause
memory leak",
+ resourceName(),
+ sparkConsumer.usedBytes());
+ }
+ }
+
+ @Override
+ public int priority() {
+ return 5;
+ }
+
+ @Override
+ public String resourceName() {
+ return Factory.class.getSimpleName();
+ }
}
}
diff --git
a/gluten-core/src/test/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumerTest.java
b/gluten-core/src/test/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumerTest.java
index 81a562f45f..e252b7d7c4 100644
---
a/gluten-core/src/test/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumerTest.java
+++
b/gluten-core/src/test/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumerTest.java
@@ -22,7 +22,6 @@ import org.apache.gluten.memory.memtarget.Spiller;
import org.apache.gluten.memory.memtarget.Spillers;
import org.apache.gluten.memory.memtarget.TreeMemoryTarget;
-import org.apache.spark.TaskContext;
import org.apache.spark.memory.MemoryMode;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.task.TaskResources$;
@@ -51,8 +50,7 @@ public class TreeMemoryConsumerTest {
test(
() -> {
final TreeMemoryConsumers.Factory factory =
- TreeMemoryConsumers.factory(
- TaskContext.get().taskMemoryManager(), MemoryMode.OFF_HEAP);
+ TreeMemoryConsumers.factory(MemoryMode.OFF_HEAP);
final TreeMemoryTarget consumer =
factory
.isolatedRoot()
@@ -73,8 +71,7 @@ public class TreeMemoryConsumerTest {
test(
() -> {
final TreeMemoryConsumers.Factory factory =
- TreeMemoryConsumers.factory(
- TaskContext.get().taskMemoryManager(), MemoryMode.OFF_HEAP);
+ TreeMemoryConsumers.factory(MemoryMode.OFF_HEAP);
final TreeMemoryTarget consumer =
factory
.legacyRoot()
@@ -95,8 +92,7 @@ public class TreeMemoryConsumerTest {
test(
() -> {
final TreeMemoryTarget legacy =
- TreeMemoryConsumers.factory(
- TaskContext.get().taskMemoryManager(),
MemoryMode.OFF_HEAP)
+ TreeMemoryConsumers.factory(MemoryMode.OFF_HEAP)
.legacyRoot()
.newChild(
"FOO",
@@ -105,8 +101,7 @@ public class TreeMemoryConsumerTest {
Collections.emptyMap());
Assert.assertEquals(110, legacy.borrow(110));
final TreeMemoryTarget isolated =
- TreeMemoryConsumers.factory(
- TaskContext.get().taskMemoryManager(),
MemoryMode.OFF_HEAP)
+ TreeMemoryConsumers.factory(MemoryMode.OFF_HEAP)
.isolatedRoot()
.newChild(
"FOO",
@@ -123,8 +118,7 @@ public class TreeMemoryConsumerTest {
() -> {
final Spillers.AppendableSpillerList spillers =
Spillers.appendable();
final TreeMemoryTarget legacy =
- TreeMemoryConsumers.factory(
- TaskContext.get().taskMemoryManager(),
MemoryMode.OFF_HEAP)
+ TreeMemoryConsumers.factory(MemoryMode.OFF_HEAP)
.legacyRoot()
.newChild(
"FOO", TreeMemoryTarget.CAPACITY_UNLIMITED, spillers,
Collections.emptyMap());
@@ -160,8 +154,7 @@ public class TreeMemoryConsumerTest {
() -> {
final Spillers.AppendableSpillerList spillers =
Spillers.appendable();
final TreeMemoryTarget legacy =
- TreeMemoryConsumers.factory(
- TaskContext.get().taskMemoryManager(),
MemoryMode.OFF_HEAP)
+ TreeMemoryConsumers.factory(MemoryMode.OFF_HEAP)
.legacyRoot()
.newChild(
"FOO", TreeMemoryTarget.CAPACITY_UNLIMITED, spillers,
Collections.emptyMap());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]