This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 083ecfbbe9 [VL] Move pre-configuration code of dynamic off-heap sizing 
to its own place (#9336)
083ecfbbe9 is described below

commit 083ecfbbe9cbb8d2dffd86c177ac5e1debdac973
Author: Hongze Zhang <[email protected]>
AuthorDate: Thu Apr 17 17:10:25 2025 +0100

    [VL] Move pre-configuration code of dynamic off-heap sizing to its own 
place (#9336)
---
 .../DynamicOffHeapSizingMemoryTarget.java          | 77 +++++++++++++++-------
 .../gluten/memory/memtarget/MemoryTargets.java     |  2 +-
 .../scala/org/apache/gluten/GlutenPlugin.scala     | 75 ++++-----------------
 .../org/apache/spark/memory/SparkMemoryUtil.scala  |  4 +-
 .../org/apache/gluten/config/GlutenConfig.scala    |  9 ++-
 5 files changed, 75 insertions(+), 92 deletions(-)

diff --git 
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java
 
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java
index 4056df062e..6b981903f4 100644
--- 
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java
+++ 
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java
@@ -17,6 +17,8 @@
 package org.apache.gluten.memory.memtarget;
 
 import org.apache.gluten.config.GlutenConfig;
+import org.apache.gluten.memory.SimpleMemoryUsageRecorder;
+import org.apache.gluten.proto.MemoryUsageStats;
 
 import org.apache.spark.annotation.Experimental;
 import org.slf4j.Logger;
@@ -24,19 +26,43 @@ import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.atomic.AtomicLong;
 
+/**
+ * The memory target used by dynamic off-heap sizing. Since
+ * https://github.com/apache/incubator-gluten/issues/5439.
+ */
 @Experimental
-public class DynamicOffHeapSizingMemoryTarget implements MemoryTarget {
+public class DynamicOffHeapSizingMemoryTarget implements MemoryTarget, 
KnownNameAndStats {
+
   private static final Logger LOG = 
LoggerFactory.getLogger(DynamicOffHeapSizingMemoryTarget.class);
-  private final MemoryTarget delegated;
   // When dynamic off-heap sizing is enabled, the off-heap should be sized for 
the total usable
   // memory, so we can use it as the max memory we will use.
-  private static final long MAX_MEMORY_IN_BYTES = 
GlutenConfig.get().offHeapMemorySize();
-  private static final AtomicLong USED_OFFHEAP_BYTES = new AtomicLong();
-
-  public DynamicOffHeapSizingMemoryTarget(MemoryTarget delegated) {
-    this.delegated = delegated;
+  private static final long TOTAL_MEMORY_SHARED;
+
+  static {
+    final long maxOnHeapSize = Runtime.getRuntime().maxMemory();
+    final double fractionForSizing = 
GlutenConfig.get().dynamicOffHeapSizingMemoryFraction();
+    // Since when dynamic off-heap sizing is enabled, we commingle on-heap
+    // and off-heap memory, we set the off-heap size to the usable on-heap 
size. We will
+    // size it with a memory fraction, which can be aggressively set, but the 
default
+    // is using the same way that Spark sizes on-heap memory:
+    //
+    // spark.gluten.memory.dynamic.offHeap.sizing.memory.fraction *
+    //    (spark.executor.memory - 300MB).
+    //
+    // We will be careful to use the same configuration settings as Spark to 
ensure
+    // that we are sizing the off-heap memory in the same way as Spark sizes 
on-heap memory.
+    // The 300MB value, unfortunately, is hard-coded in Spark code.
+    TOTAL_MEMORY_SHARED = (long) ((maxOnHeapSize - (300 * 1024 * 1024)) * 
fractionForSizing);
+    LOG.info("DynamicOffHeapSizingMemoryTarget MAX_MEMORY_IN_BYTES: {}", 
TOTAL_MEMORY_SHARED);
   }
 
+  private static final AtomicLong USED_OFF_HEAP_BYTES = new AtomicLong();
+
+  private final String name = 
MemoryTargetUtil.toUniqueName("DynamicOffHeapSizing");
+  private final SimpleMemoryUsageRecorder recorder = new 
SimpleMemoryUsageRecorder();
+
+  public DynamicOffHeapSizingMemoryTarget() {}
+
   @Override
   public long borrow(long size) {
     if (size == 0) {
@@ -47,47 +73,42 @@ public class DynamicOffHeapSizingMemoryTarget implements 
MemoryTarget {
     // See https://github.com/apache/incubator-gluten/issues/9276.
     long totalHeapMemory = Runtime.getRuntime().totalMemory();
     long freeHeapMemory = Runtime.getRuntime().freeMemory();
-
-    long usedOffHeapBytesNow = USED_OFFHEAP_BYTES.get();
+    long usedOffHeapMemory = USED_OFF_HEAP_BYTES.get();
 
     // Adds the total JVM memory which is the actual memory the JVM occupied 
from the operating
     // system into the counter.
-    if (size + usedOffHeapBytesNow + totalHeapMemory > MAX_MEMORY_IN_BYTES) {
+    if (size + usedOffHeapMemory + totalHeapMemory > TOTAL_MEMORY_SHARED) {
       LOG.warn(
           String.format(
               "Failing allocation as unified memory is OOM. "
                   + "Used Off-heap: %d, Used On-Heap: %d, "
                   + "Free On-heap: %d, Total On-heap: %d, "
                   + "Max On-heap: %d, Allocation: %d.",
-              usedOffHeapBytesNow,
+              usedOffHeapMemory,
               totalHeapMemory - freeHeapMemory,
               freeHeapMemory,
               totalHeapMemory,
-              MAX_MEMORY_IN_BYTES,
+              TOTAL_MEMORY_SHARED,
               size));
 
       return 0;
     }
 
-    long reserved = delegated.borrow(size);
-
-    USED_OFFHEAP_BYTES.addAndGet(reserved);
-
-    return reserved;
+    USED_OFF_HEAP_BYTES.addAndGet(size);
+    recorder.inc(size);
+    return size;
   }
 
   @Override
   public long repay(long size) {
-    long unreserved = delegated.repay(size);
-
-    USED_OFFHEAP_BYTES.addAndGet(-unreserved);
-
-    return unreserved;
+    USED_OFF_HEAP_BYTES.addAndGet(-size);
+    recorder.inc(-size);
+    return size;
   }
 
   @Override
   public long usedBytes() {
-    return delegated.usedBytes();
+    return recorder.current();
   }
 
   @Override
@@ -95,7 +116,13 @@ public class DynamicOffHeapSizingMemoryTarget implements 
MemoryTarget {
     return visitor.visit(this);
   }
 
-  public MemoryTarget delegated() {
-    return delegated;
+  @Override
+  public String name() {
+    return name;
+  }
+
+  @Override
+  public MemoryUsageStats stats() {
+    return recorder.toStats();
   }
 }
diff --git 
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
 
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
index 9630af8acc..ff2a4da03e 100644
--- 
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
+++ 
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
@@ -51,7 +51,7 @@ public final class MemoryTargets {
   @Experimental
   public static MemoryTarget dynamicOffHeapSizingIfEnabled(MemoryTarget 
memoryTarget) {
     if (GlutenConfig.get().dynamicOffHeapSizingEnabled()) {
-      return new DynamicOffHeapSizingMemoryTarget(memoryTarget);
+      return new DynamicOffHeapSizingMemoryTarget();
     }
 
     return memoryTarget;
diff --git a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala 
b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
index 8a539d44e4..ff5ae59eef 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
@@ -188,74 +188,25 @@ private[gluten] class GlutenDriverPlugin extends 
DriverPlugin with Logging {
     // check memory off-heap enabled and size.
     checkOffHeapSettings(conf)
 
-    // Task slots.
-    val taskSlots = SparkResourceUtil.getTaskSlots(conf)
-    conf.set(NUM_TASK_SLOTS_PER_EXECUTOR.key, taskSlots.toString)
-
-    val onHeapSize: Long = conf.getSizeAsBytes(SPARK_ONHEAP_SIZE_KEY, 1024 * 
1024 * 1024)
-
-    // If dynamic off-heap sizing is enabled, the off-heap size is calculated 
based on the on-heap
-    // size. Otherwise, the off-heap size is set to the value specified by the 
user (if any).
-    // Note that this means that we will IGNORE the off-heap size specified by 
the user if the
-    // dynamic off-heap feature is enabled.
-    val offHeapSize: Long =
-      if (
-        conf.getBoolean(
-          DYNAMIC_OFFHEAP_SIZING_ENABLED.key,
-          DYNAMIC_OFFHEAP_SIZING_ENABLED.defaultValue.get)
-      ) {
-        // Since when dynamic off-heap sizing is enabled, we commingle on-heap
-        // and off-heap memory, we set the off-heap size to the usable on-heap 
size. We will
-        // size it with a memory fraction, which can be aggressively set, but 
the default
-        // is using the same way that Spark sizes on-heap memory:
-        //
-        // spark.gluten.memory.dynamic.offHeap.sizing.memory.fraction *
-        //    (spark.executor.memory - 300MB).
-        //
-        // We will be careful to use the same configuration settings as Spark 
to ensure
-        // that we are sizing the off-heap memory in the same way as Spark 
sizes on-heap memory.
-        // The 300MB value, unfortunately, is hard-coded in Spark code.
-        ((onHeapSize - (300 * 1024 * 1024)) *
-          conf.getDouble(DYNAMIC_OFFHEAP_SIZING_MEMORY_FRACTION.key, 
0.6d)).toLong
-      } else {
-        // Optimistic off-heap sizes, assuming all storage memory can be 
borrowed into execution
-        // memory pool, regardless of Spark option 
spark.memory.storageFraction.
-        conf.getSizeAsBytes(SPARK_OFFHEAP_SIZE_KEY, 0L)
-      }
+    // Get the off-heap size set by user.
+    val offHeapSize = conf.getSizeAsBytes(SPARK_OFFHEAP_SIZE_KEY)
 
+    // Set off-heap size in bytes.
     conf.set(COLUMNAR_OFFHEAP_SIZE_IN_BYTES.key, offHeapSize.toString)
-    conf.set(SPARK_OFFHEAP_SIZE_KEY, offHeapSize.toString)
 
+    // Set off-heap size in bytes per task.
+    val taskSlots = SparkResourceUtil.getTaskSlots(conf)
+    conf.set(NUM_TASK_SLOTS_PER_EXECUTOR.key, taskSlots.toString)
     val offHeapPerTask = offHeapSize / taskSlots
     conf.set(COLUMNAR_TASK_OFFHEAP_SIZE_IN_BYTES.key, offHeapPerTask.toString)
 
-    // If we are using dynamic off-heap sizing, we should also enable off-heap 
memory
-    // officially.
-    if (
-      conf.getBoolean(
-        DYNAMIC_OFFHEAP_SIZING_ENABLED.key,
-        DYNAMIC_OFFHEAP_SIZING_ENABLED.defaultValue.get)
-    ) {
-      conf.set(SPARK_OFFHEAP_ENABLED, "true")
-
-      // We already sized the off-heap per task in a conservative manner, so 
we can just
-      // use it.
-      conf.set(COLUMNAR_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES.key, 
offHeapPerTask.toString)
-    } else {
-      // Let's make sure this is set to false explicitly if it is not on as it
-      // is looked up when throwing OOF exceptions.
-      conf.set(
-        DYNAMIC_OFFHEAP_SIZING_ENABLED.key,
-        DYNAMIC_OFFHEAP_SIZING_ENABLED.defaultValueString)
-
-      // Pessimistic off-heap sizes, with the assumption that all 
non-borrowable storage memory
-      // determined by spark.memory.storageFraction was used.
-      val fraction = 1.0d - conf.getDouble("spark.memory.storageFraction", 
0.5d)
-      val conservativeOffHeapPerTask = (offHeapSize * fraction).toLong / 
taskSlots
-      conf.set(
-        COLUMNAR_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES.key,
-        conservativeOffHeapPerTask.toString)
-    }
+    // Pessimistic off-heap sizes, with the assumption that all non-borrowable 
storage memory
+    // determined by spark.memory.storageFraction was used.
+    val fraction = 1.0d - conf.getDouble("spark.memory.storageFraction", 0.5d)
+    val conservativeOffHeapPerTask = (offHeapSize * fraction).toLong / 
taskSlots
+    conf.set(
+      COLUMNAR_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES.key,
+      conservativeOffHeapPerTask.toString)
 
     // Disable vanilla columnar readers, to prevent columnar-to-columnar 
conversions.
     // FIXME: Do we still need this trick since
diff --git 
a/gluten-core/src/main/scala/org/apache/spark/memory/SparkMemoryUtil.scala 
b/gluten-core/src/main/scala/org/apache/spark/memory/SparkMemoryUtil.scala
index b0322fecd4..ae287cef3f 100644
--- a/gluten-core/src/main/scala/org/apache/spark/memory/SparkMemoryUtil.scala
+++ b/gluten-core/src/main/scala/org/apache/spark/memory/SparkMemoryUtil.scala
@@ -129,7 +129,9 @@ object SparkMemoryUtil {
 
       override def visit(
           dynamicOffHeapSizingMemoryTarget: DynamicOffHeapSizingMemoryTarget): 
String = {
-        dynamicOffHeapSizingMemoryTarget.delegated().accept(this)
+        prettyPrintStats(
+          "Dynamic off-heap sizing memory target stats: ",
+          dynamicOffHeapSizingMemoryTarget)
       }
 
       override def visit(retryOnOomMemoryTarget: RetryOnOomMemoryTarget): 
String = {
diff --git 
a/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala 
b/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
index f19ef0833a..449db38e24 100644
--- a/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
@@ -342,6 +342,9 @@ class GlutenConfig(conf: SQLConf) extends Logging {
   def dynamicOffHeapSizingEnabled: Boolean =
     getConf(DYNAMIC_OFFHEAP_SIZING_ENABLED)
 
+  def dynamicOffHeapSizingMemoryFraction: Double =
+    getConf(DYNAMIC_OFFHEAP_SIZING_MEMORY_FRACTION)
+
   def enableHiveFileFormatWriter: Boolean = 
getConf(NATIVE_HIVEFILEFORMAT_WRITER_ENABLED)
 
   def enableCelebornFallback: Boolean = getConf(CELEBORN_FALLBACK_ENABLED)
@@ -1244,7 +1247,7 @@ object GlutenConfig {
       .createWithDefault(false)
 
   val COLUMNAR_MEMORY_UNTRACKED =
-    buildConf("spark.gluten.memory.untracked")
+    buildStaticConf("spark.gluten.memory.untracked")
       .internal()
       .doc(
         "When enabled, turn all native memory allocations in Gluten into 
untracked. Spark " +
@@ -1634,7 +1637,7 @@ object GlutenConfig {
       .createWithDefault(true)
 
   val DYNAMIC_OFFHEAP_SIZING_ENABLED =
-    buildConf("spark.gluten.memory.dynamic.offHeap.sizing.enabled")
+    buildStaticConf("spark.gluten.memory.dynamic.offHeap.sizing.enabled")
       .internal()
       .doc(
         "Experimental: When set to true, the offheap config 
(spark.memory.offHeap.size) will " +
@@ -1651,7 +1654,7 @@ object GlutenConfig {
       .createWithDefault(false)
 
   val DYNAMIC_OFFHEAP_SIZING_MEMORY_FRACTION =
-    buildConf("spark.gluten.memory.dynamic.offHeap.sizing.memory.fraction")
+    
buildStaticConf("spark.gluten.memory.dynamic.offHeap.sizing.memory.fraction")
       .internal()
       .doc(
         "Experimental: Determines the memory fraction used to determine the 
total " +


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to