This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 35f516cf22 [VL] Minor fixes for the memory API + dynamic off-heap
sizing code (#10234)
35f516cf22 is described below
commit 35f516cf2298c83be723cdd75fa71489a6317e54
Author: Hongze Zhang <[email protected]>
AuthorDate: Tue Jul 22 10:06:22 2025 +0800
[VL] Minor fixes for the memory API + dynamic off-heap sizing code (#10234)
---
.../spark/memory/GlobalOffHeapMemorySuite.scala | 8 ++++----
.../memtarget/DynamicOffHeapSizingMemoryTarget.java | 5 +++--
.../apache/gluten/memory/memtarget/MemoryTargets.java | 9 ++++++++-
.../memory/memtarget/spark/TreeMemoryConsumers.java | 19 ++++++++++++-------
.../memtarget/spark/TreeMemoryConsumerTest.java | 19 +++++++++++++------
5 files changed, 40 insertions(+), 20 deletions(-)
diff --git
a/backends-velox/src/test/scala/org/apache/spark/memory/GlobalOffHeapMemorySuite.scala
b/backends-velox/src/test/scala/org/apache/spark/memory/GlobalOffHeapMemorySuite.scala
index d4d36c6560..3a46c1ac1a 100644
---
a/backends-velox/src/test/scala/org/apache/spark/memory/GlobalOffHeapMemorySuite.scala
+++
b/backends-velox/src/test/scala/org/apache/spark/memory/GlobalOffHeapMemorySuite.scala
@@ -42,7 +42,7 @@ class GlobalOffHeapMemorySuite extends AnyFunSuite with
BeforeAndAfterAll {
test("Sanity") {
TaskResources.runUnsafe {
val factory =
- TreeMemoryConsumers.factory(TaskContext.get().taskMemoryManager())
+ TreeMemoryConsumers.factory(TaskContext.get().taskMemoryManager(),
MemoryMode.OFF_HEAP)
val consumer =
factory
.legacyRoot()
@@ -65,7 +65,7 @@ class GlobalOffHeapMemorySuite extends AnyFunSuite with
BeforeAndAfterAll {
test("Task OOM by global occupation") {
TaskResources.runUnsafe {
val factory =
- TreeMemoryConsumers.factory(TaskContext.get().taskMemoryManager())
+ TreeMemoryConsumers.factory(TaskContext.get().taskMemoryManager(),
MemoryMode.OFF_HEAP)
val consumer =
factory
.legacyRoot()
@@ -84,7 +84,7 @@ class GlobalOffHeapMemorySuite extends AnyFunSuite with
BeforeAndAfterAll {
test("Release global") {
TaskResources.runUnsafe {
val factory =
- TreeMemoryConsumers.factory(TaskContext.get().taskMemoryManager())
+ TreeMemoryConsumers.factory(TaskContext.get().taskMemoryManager(),
MemoryMode.OFF_HEAP)
val consumer =
factory
.legacyRoot()
@@ -103,7 +103,7 @@ class GlobalOffHeapMemorySuite extends AnyFunSuite with
BeforeAndAfterAll {
test("Release task") {
TaskResources.runUnsafe {
val factory =
- TreeMemoryConsumers.factory(TaskContext.get().taskMemoryManager())
+ TreeMemoryConsumers.factory(TaskContext.get().taskMemoryManager(),
MemoryMode.OFF_HEAP)
val consumer =
factory
.legacyRoot()
diff --git
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java
index bc17142dec..b1664223aa 100644
---
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java
+++
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java
@@ -30,6 +30,7 @@ import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.lang.reflect.Method;
import java.util.List;
+import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -82,7 +83,7 @@ public class DynamicOffHeapSizingMemoryTarget implements
MemoryTarget, KnownName
arg,
originalMinHeapFreeRatio);
}
- } else if (arg == "-XX:+ExplicitGCInvokesConcurrent") {
+ } else if (Objects.equals(arg, "-XX:+ExplicitGCInvokesConcurrent")) {
// If this is set -XX:+ExplicitGCInvokesConcurrent, System.gc() does
not trigger Full GC,
// so explicit JVM shrinking is not effective.
LOG.error(
@@ -90,7 +91,7 @@ public class DynamicOffHeapSizingMemoryTarget implements
MemoryTarget, KnownName
+ " is set. Please check the JVM arguments: {}. ",
arg);
- } else if (arg == "-XX:+DisableExplicitGC") {
+ } else if (Objects.equals(arg, "-XX:+DisableExplicitGC")) {
// If -XX:+DisableExplicitGC is set, calls to System.gc() are ignored,
// so explicit JVM shrinking will not work as intended.
LOG.error(
diff --git
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
index 5f8013c62e..b1e41bc9eb 100644
---
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
+++
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
@@ -22,6 +22,7 @@ import
org.apache.gluten.memory.memtarget.spark.TreeMemoryConsumers;
import org.apache.spark.SparkEnv;
import org.apache.spark.annotation.Experimental;
+import org.apache.spark.memory.MemoryMode;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.util.SparkResourceUtil;
import org.slf4j.Logger;
@@ -62,7 +63,13 @@ public final class MemoryTargets {
String name,
Spiller spiller,
Map<String, MemoryUsageStatsBuilder> virtualChildren) {
- final TreeMemoryConsumers.Factory factory =
TreeMemoryConsumers.factory(tmm);
+ final MemoryMode mode;
+ if (GlutenCoreConfig.get().dynamicOffHeapSizingEnabled()) {
+ mode = MemoryMode.ON_HEAP;
+ } else {
+ mode = MemoryMode.OFF_HEAP;
+ }
+ final TreeMemoryConsumers.Factory factory =
TreeMemoryConsumers.factory(tmm, mode);
if (GlutenCoreConfig.get().memoryIsolation()) {
return TreeMemoryTargets.newChild(factory.isolatedRoot(), name, spiller,
virtualChildren);
}
diff --git
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumers.java
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumers.java
index 0f7c877a97..87e8937f53 100644
---
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumers.java
+++
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumers.java
@@ -20,6 +20,7 @@ import org.apache.gluten.config.GlutenCoreConfig;
import org.apache.gluten.memory.memtarget.Spillers;
import org.apache.gluten.memory.memtarget.TreeMemoryTarget;
+import com.google.common.base.Preconditions;
import org.apache.commons.collections.map.ReferenceMap;
import org.apache.spark.memory.MemoryMode;
import org.apache.spark.memory.TaskMemoryManager;
@@ -35,9 +36,17 @@ public final class TreeMemoryConsumers {
private TreeMemoryConsumers() {}
@SuppressWarnings("unchecked")
- public static Factory factory(TaskMemoryManager tmm) {
+ public static Factory factory(TaskMemoryManager tmm, MemoryMode mode) {
synchronized (FACTORIES) {
- return (Factory) FACTORIES.computeIfAbsent(tmm, m -> new
Factory((TaskMemoryManager) m));
+ final Factory factory =
+ (Factory) FACTORIES.computeIfAbsent(tmm, m -> new
Factory((TaskMemoryManager) m, mode));
+ final MemoryMode foundMode = factory.sparkConsumer.getMode();
+ Preconditions.checkState(
+ foundMode == mode,
+ "An existing Spark memory consumer already exists but is of the
different memory "
+ + "mode: %s",
+ foundMode);
+ return factory;
}
}
@@ -45,11 +54,7 @@ public final class TreeMemoryConsumers {
private final TreeMemoryConsumer sparkConsumer;
private final Map<Long, TreeMemoryTarget> roots = new
ConcurrentHashMap<>();
- private Factory(TaskMemoryManager tmm) {
- MemoryMode mode =
- GlutenCoreConfig.get().dynamicOffHeapSizingEnabled()
- ? MemoryMode.ON_HEAP
- : MemoryMode.OFF_HEAP;
+ private Factory(TaskMemoryManager tmm, MemoryMode mode) {
this.sparkConsumer = new TreeMemoryConsumer(tmm, mode);
}
diff --git
a/gluten-core/src/test/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumerTest.java
b/gluten-core/src/test/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumerTest.java
index b59856eef6..81a562f45f 100644
---
a/gluten-core/src/test/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumerTest.java
+++
b/gluten-core/src/test/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumerTest.java
@@ -23,6 +23,7 @@ import org.apache.gluten.memory.memtarget.Spillers;
import org.apache.gluten.memory.memtarget.TreeMemoryTarget;
import org.apache.spark.TaskContext;
+import org.apache.spark.memory.MemoryMode;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.task.TaskResources$;
import org.junit.Assert;
@@ -50,7 +51,8 @@ public class TreeMemoryConsumerTest {
test(
() -> {
final TreeMemoryConsumers.Factory factory =
-
TreeMemoryConsumers.factory(TaskContext.get().taskMemoryManager());
+ TreeMemoryConsumers.factory(
+ TaskContext.get().taskMemoryManager(), MemoryMode.OFF_HEAP);
final TreeMemoryTarget consumer =
factory
.isolatedRoot()
@@ -71,7 +73,8 @@ public class TreeMemoryConsumerTest {
test(
() -> {
final TreeMemoryConsumers.Factory factory =
-
TreeMemoryConsumers.factory(TaskContext.get().taskMemoryManager());
+ TreeMemoryConsumers.factory(
+ TaskContext.get().taskMemoryManager(), MemoryMode.OFF_HEAP);
final TreeMemoryTarget consumer =
factory
.legacyRoot()
@@ -92,7 +95,8 @@ public class TreeMemoryConsumerTest {
test(
() -> {
final TreeMemoryTarget legacy =
-
TreeMemoryConsumers.factory(TaskContext.get().taskMemoryManager())
+ TreeMemoryConsumers.factory(
+ TaskContext.get().taskMemoryManager(),
MemoryMode.OFF_HEAP)
.legacyRoot()
.newChild(
"FOO",
@@ -101,7 +105,8 @@ public class TreeMemoryConsumerTest {
Collections.emptyMap());
Assert.assertEquals(110, legacy.borrow(110));
final TreeMemoryTarget isolated =
-
TreeMemoryConsumers.factory(TaskContext.get().taskMemoryManager())
+ TreeMemoryConsumers.factory(
+ TaskContext.get().taskMemoryManager(),
MemoryMode.OFF_HEAP)
.isolatedRoot()
.newChild(
"FOO",
@@ -118,7 +123,8 @@ public class TreeMemoryConsumerTest {
() -> {
final Spillers.AppendableSpillerList spillers =
Spillers.appendable();
final TreeMemoryTarget legacy =
-
TreeMemoryConsumers.factory(TaskContext.get().taskMemoryManager())
+ TreeMemoryConsumers.factory(
+ TaskContext.get().taskMemoryManager(),
MemoryMode.OFF_HEAP)
.legacyRoot()
.newChild(
"FOO", TreeMemoryTarget.CAPACITY_UNLIMITED, spillers,
Collections.emptyMap());
@@ -154,7 +160,8 @@ public class TreeMemoryConsumerTest {
() -> {
final Spillers.AppendableSpillerList spillers =
Spillers.appendable();
final TreeMemoryTarget legacy =
-
TreeMemoryConsumers.factory(TaskContext.get().taskMemoryManager())
+ TreeMemoryConsumers.factory(
+ TaskContext.get().taskMemoryManager(),
MemoryMode.OFF_HEAP)
.legacyRoot()
.newChild(
"FOO", TreeMemoryTarget.CAPACITY_UNLIMITED, spillers,
Collections.emptyMap());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]