This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 35f516cf22 [VL] Minor fixes for the memory API + dynamic off-heap 
sizing code (#10234)
35f516cf22 is described below

commit 35f516cf2298c83be723cdd75fa71489a6317e54
Author: Hongze Zhang <[email protected]>
AuthorDate: Tue Jul 22 10:06:22 2025 +0800

    [VL] Minor fixes for the memory API + dynamic off-heap sizing code (#10234)
---
 .../spark/memory/GlobalOffHeapMemorySuite.scala       |  8 ++++----
 .../memtarget/DynamicOffHeapSizingMemoryTarget.java   |  5 +++--
 .../apache/gluten/memory/memtarget/MemoryTargets.java |  9 ++++++++-
 .../memory/memtarget/spark/TreeMemoryConsumers.java   | 19 ++++++++++++-------
 .../memtarget/spark/TreeMemoryConsumerTest.java       | 19 +++++++++++++------
 5 files changed, 40 insertions(+), 20 deletions(-)

diff --git 
a/backends-velox/src/test/scala/org/apache/spark/memory/GlobalOffHeapMemorySuite.scala
 
b/backends-velox/src/test/scala/org/apache/spark/memory/GlobalOffHeapMemorySuite.scala
index d4d36c6560..3a46c1ac1a 100644
--- 
a/backends-velox/src/test/scala/org/apache/spark/memory/GlobalOffHeapMemorySuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/spark/memory/GlobalOffHeapMemorySuite.scala
@@ -42,7 +42,7 @@ class GlobalOffHeapMemorySuite extends AnyFunSuite with 
BeforeAndAfterAll {
   test("Sanity") {
     TaskResources.runUnsafe {
       val factory =
-        TreeMemoryConsumers.factory(TaskContext.get().taskMemoryManager())
+        TreeMemoryConsumers.factory(TaskContext.get().taskMemoryManager(), 
MemoryMode.OFF_HEAP)
       val consumer =
         factory
           .legacyRoot()
@@ -65,7 +65,7 @@ class GlobalOffHeapMemorySuite extends AnyFunSuite with 
BeforeAndAfterAll {
   test("Task OOM by global occupation") {
     TaskResources.runUnsafe {
       val factory =
-        TreeMemoryConsumers.factory(TaskContext.get().taskMemoryManager())
+        TreeMemoryConsumers.factory(TaskContext.get().taskMemoryManager(), 
MemoryMode.OFF_HEAP)
       val consumer =
         factory
           .legacyRoot()
@@ -84,7 +84,7 @@ class GlobalOffHeapMemorySuite extends AnyFunSuite with 
BeforeAndAfterAll {
   test("Release global") {
     TaskResources.runUnsafe {
       val factory =
-        TreeMemoryConsumers.factory(TaskContext.get().taskMemoryManager())
+        TreeMemoryConsumers.factory(TaskContext.get().taskMemoryManager(), 
MemoryMode.OFF_HEAP)
       val consumer =
         factory
           .legacyRoot()
@@ -103,7 +103,7 @@ class GlobalOffHeapMemorySuite extends AnyFunSuite with 
BeforeAndAfterAll {
   test("Release task") {
     TaskResources.runUnsafe {
       val factory =
-        TreeMemoryConsumers.factory(TaskContext.get().taskMemoryManager())
+        TreeMemoryConsumers.factory(TaskContext.get().taskMemoryManager(), 
MemoryMode.OFF_HEAP)
       val consumer =
         factory
           .legacyRoot()
diff --git 
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java
 
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java
index bc17142dec..b1664223aa 100644
--- 
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java
+++ 
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java
@@ -30,6 +30,7 @@ import java.lang.management.ManagementFactory;
 import java.lang.management.RuntimeMXBean;
 import java.lang.reflect.Method;
 import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -82,7 +83,7 @@ public class DynamicOffHeapSizingMemoryTarget implements 
MemoryTarget, KnownName
               arg,
               originalMinHeapFreeRatio);
         }
-      } else if (arg == "-XX:+ExplicitGCInvokesConcurrent") {
+      } else if (Objects.equals(arg, "-XX:+ExplicitGCInvokesConcurrent")) {
         // If this is set -XX:+ExplicitGCInvokesConcurrent, System.gc() does 
not trigger Full GC,
         // so explicit JVM shrinking is not effective.
         LOG.error(
@@ -90,7 +91,7 @@ public class DynamicOffHeapSizingMemoryTarget implements 
MemoryTarget, KnownName
                 + " is set. Please check the JVM arguments: {}. ",
             arg);
 
-      } else if (arg == "-XX:+DisableExplicitGC") {
+      } else if (Objects.equals(arg, "-XX:+DisableExplicitGC")) {
         // If -XX:+DisableExplicitGC is set, calls to System.gc() are ignored,
         // so explicit JVM shrinking will not work as intended.
         LOG.error(
diff --git 
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
 
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
index 5f8013c62e..b1e41bc9eb 100644
--- 
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
+++ 
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
@@ -22,6 +22,7 @@ import 
org.apache.gluten.memory.memtarget.spark.TreeMemoryConsumers;
 
 import org.apache.spark.SparkEnv;
 import org.apache.spark.annotation.Experimental;
+import org.apache.spark.memory.MemoryMode;
 import org.apache.spark.memory.TaskMemoryManager;
 import org.apache.spark.util.SparkResourceUtil;
 import org.slf4j.Logger;
@@ -62,7 +63,13 @@ public final class MemoryTargets {
       String name,
       Spiller spiller,
       Map<String, MemoryUsageStatsBuilder> virtualChildren) {
-    final TreeMemoryConsumers.Factory factory = 
TreeMemoryConsumers.factory(tmm);
+    final MemoryMode mode;
+    if (GlutenCoreConfig.get().dynamicOffHeapSizingEnabled()) {
+      mode = MemoryMode.ON_HEAP;
+    } else {
+      mode = MemoryMode.OFF_HEAP;
+    }
+    final TreeMemoryConsumers.Factory factory = 
TreeMemoryConsumers.factory(tmm, mode);
     if (GlutenCoreConfig.get().memoryIsolation()) {
       return TreeMemoryTargets.newChild(factory.isolatedRoot(), name, spiller, 
virtualChildren);
     }
diff --git 
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumers.java
 
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumers.java
index 0f7c877a97..87e8937f53 100644
--- 
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumers.java
+++ 
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumers.java
@@ -20,6 +20,7 @@ import org.apache.gluten.config.GlutenCoreConfig;
 import org.apache.gluten.memory.memtarget.Spillers;
 import org.apache.gluten.memory.memtarget.TreeMemoryTarget;
 
+import com.google.common.base.Preconditions;
 import org.apache.commons.collections.map.ReferenceMap;
 import org.apache.spark.memory.MemoryMode;
 import org.apache.spark.memory.TaskMemoryManager;
@@ -35,9 +36,17 @@ public final class TreeMemoryConsumers {
   private TreeMemoryConsumers() {}
 
   @SuppressWarnings("unchecked")
-  public static Factory factory(TaskMemoryManager tmm) {
+  public static Factory factory(TaskMemoryManager tmm, MemoryMode mode) {
     synchronized (FACTORIES) {
-      return (Factory) FACTORIES.computeIfAbsent(tmm, m -> new 
Factory((TaskMemoryManager) m));
+      final Factory factory =
+          (Factory) FACTORIES.computeIfAbsent(tmm, m -> new 
Factory((TaskMemoryManager) m, mode));
+      final MemoryMode foundMode = factory.sparkConsumer.getMode();
+      Preconditions.checkState(
+          foundMode == mode,
+          "An existing Spark memory consumer already exists but is of the 
different memory "
+              + "mode: %s",
+          foundMode);
+      return factory;
     }
   }
 
@@ -45,11 +54,7 @@ public final class TreeMemoryConsumers {
     private final TreeMemoryConsumer sparkConsumer;
     private final Map<Long, TreeMemoryTarget> roots = new 
ConcurrentHashMap<>();
 
-    private Factory(TaskMemoryManager tmm) {
-      MemoryMode mode =
-          GlutenCoreConfig.get().dynamicOffHeapSizingEnabled()
-              ? MemoryMode.ON_HEAP
-              : MemoryMode.OFF_HEAP;
+    private Factory(TaskMemoryManager tmm, MemoryMode mode) {
       this.sparkConsumer = new TreeMemoryConsumer(tmm, mode);
     }
 
diff --git 
a/gluten-core/src/test/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumerTest.java
 
b/gluten-core/src/test/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumerTest.java
index b59856eef6..81a562f45f 100644
--- 
a/gluten-core/src/test/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumerTest.java
+++ 
b/gluten-core/src/test/java/org/apache/gluten/memory/memtarget/spark/TreeMemoryConsumerTest.java
@@ -23,6 +23,7 @@ import org.apache.gluten.memory.memtarget.Spillers;
 import org.apache.gluten.memory.memtarget.TreeMemoryTarget;
 
 import org.apache.spark.TaskContext;
+import org.apache.spark.memory.MemoryMode;
 import org.apache.spark.sql.internal.SQLConf;
 import org.apache.spark.task.TaskResources$;
 import org.junit.Assert;
@@ -50,7 +51,8 @@ public class TreeMemoryConsumerTest {
     test(
         () -> {
           final TreeMemoryConsumers.Factory factory =
-              
TreeMemoryConsumers.factory(TaskContext.get().taskMemoryManager());
+              TreeMemoryConsumers.factory(
+                  TaskContext.get().taskMemoryManager(), MemoryMode.OFF_HEAP);
           final TreeMemoryTarget consumer =
               factory
                   .isolatedRoot()
@@ -71,7 +73,8 @@ public class TreeMemoryConsumerTest {
     test(
         () -> {
           final TreeMemoryConsumers.Factory factory =
-              
TreeMemoryConsumers.factory(TaskContext.get().taskMemoryManager());
+              TreeMemoryConsumers.factory(
+                  TaskContext.get().taskMemoryManager(), MemoryMode.OFF_HEAP);
           final TreeMemoryTarget consumer =
               factory
                   .legacyRoot()
@@ -92,7 +95,8 @@ public class TreeMemoryConsumerTest {
     test(
         () -> {
           final TreeMemoryTarget legacy =
-              
TreeMemoryConsumers.factory(TaskContext.get().taskMemoryManager())
+              TreeMemoryConsumers.factory(
+                      TaskContext.get().taskMemoryManager(), 
MemoryMode.OFF_HEAP)
                   .legacyRoot()
                   .newChild(
                       "FOO",
@@ -101,7 +105,8 @@ public class TreeMemoryConsumerTest {
                       Collections.emptyMap());
           Assert.assertEquals(110, legacy.borrow(110));
           final TreeMemoryTarget isolated =
-              
TreeMemoryConsumers.factory(TaskContext.get().taskMemoryManager())
+              TreeMemoryConsumers.factory(
+                      TaskContext.get().taskMemoryManager(), 
MemoryMode.OFF_HEAP)
                   .isolatedRoot()
                   .newChild(
                       "FOO",
@@ -118,7 +123,8 @@ public class TreeMemoryConsumerTest {
         () -> {
           final Spillers.AppendableSpillerList spillers = 
Spillers.appendable();
           final TreeMemoryTarget legacy =
-              
TreeMemoryConsumers.factory(TaskContext.get().taskMemoryManager())
+              TreeMemoryConsumers.factory(
+                      TaskContext.get().taskMemoryManager(), 
MemoryMode.OFF_HEAP)
                   .legacyRoot()
                   .newChild(
                       "FOO", TreeMemoryTarget.CAPACITY_UNLIMITED, spillers, 
Collections.emptyMap());
@@ -154,7 +160,8 @@ public class TreeMemoryConsumerTest {
         () -> {
           final Spillers.AppendableSpillerList spillers = 
Spillers.appendable();
           final TreeMemoryTarget legacy =
-              
TreeMemoryConsumers.factory(TaskContext.get().taskMemoryManager())
+              TreeMemoryConsumers.factory(
+                      TaskContext.get().taskMemoryManager(), 
MemoryMode.OFF_HEAP)
                   .legacyRoot()
                   .newChild(
                       "FOO", TreeMemoryTarget.CAPACITY_UNLIMITED, spillers, 
Collections.emptyMap());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to