zhouyuan commented on code in PR #9585:
URL: https://github.com/apache/incubator-gluten/pull/9585#discussion_r2111304542


##########
gluten-core/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java:
##########
@@ -38,26 +45,87 @@ public class DynamicOffHeapSizingMemoryTarget implements 
MemoryTarget, KnownName
   // memory, so we can use it as the max memory we will use.
   private static final long TOTAL_MEMORY_SHARED;
 
+  private static final double ASYNC_GC_MAX_TOTAL_MEMORY_USAGE_RATIO = 0.85;
+  private static final double ASYNC_GC_MAX_ON_HEAP_MEMORY_RATIO = 0.65;
+  private static final double GC_MAX_HEAP_FREE_RATIO = 0.05;
+  private static final int MAX_GC_RETRY_TIMES = 3;
+
+  private static final AtomicBoolean ASYNC_GC_SUSPEND = new 
AtomicBoolean(false);
+  private static final Object JVM_SHRINK_SYNC_OBJECT = new Object();
+
+  private static final int ORIGINAL_MAX_HEAP_FREE_RATIO;
+  private static final int ORIGINAL_MIN_HEAP_FREE_RATIO;
+
   static {
-    final long maxOnHeapSize = Runtime.getRuntime().maxMemory();
-    final double fractionForSizing = 
GlutenConfig.get().dynamicOffHeapSizingMemoryFraction();
-    // Since when dynamic off-heap sizing is enabled, we commingle on-heap
-    // and off-heap memory, we set the off-heap size to the usable on-heap 
size. We will
-    // size it with a memory fraction, which can be aggressively set, but the 
default
-    // is using the same way that Spark sizes on-heap memory:
-    //
-    // spark.gluten.memory.dynamic.offHeap.sizing.memory.fraction *
-    //    (spark.executor.memory - 300MB).
-    //
-    // We will be careful to use the same configuration settings as Spark to 
ensure
-    // that we are sizing the off-heap memory in the same way as Spark sizes 
on-heap memory.
-    // The 300MB value, unfortunately, is hard-coded in Spark code.
-    TOTAL_MEMORY_SHARED = (long) ((maxOnHeapSize - (300 * 1024 * 1024)) * 
fractionForSizing);
-    LOG.info("DynamicOffHeapSizingMemoryTarget MAX_MEMORY_IN_BYTES: {}", 
TOTAL_MEMORY_SHARED);
+    RuntimeMXBean runtimeMxBean = ManagementFactory.getRuntimeMXBean();
+    List<String> jvmArgs = runtimeMxBean.getInputArguments();
+    int originalMaxHeapFreeRatio = 70;
+    int originalMinHeapFreeRatio = 40;
+    for (String arg : jvmArgs) {
+      if (arg.startsWith("-XX:MaxHeapFreeRatio=")) {
+        String valuePart = arg.substring(arg.indexOf('=') + 1);
+        try {
+          originalMaxHeapFreeRatio = Integer.parseInt(valuePart);
+        } catch (NumberFormatException e) {
+          LOG.warn(
+              "Failed to parse MaxHeapFreeRatio from JVM argument: {}. Using 
default value: {}.",
+              arg,
+              originalMaxHeapFreeRatio);
+        }
+      } else if (arg.startsWith("-XX:MinHeapFreeRatio=")) {
+        String valuePart = arg.substring(arg.indexOf('=') + 1);
+        try {
+          originalMinHeapFreeRatio = Integer.parseInt(valuePart);
+        } catch (NumberFormatException e) {
+          LOG.warn(
+              "Failed to parse MinHeapFreeRatio from JVM argument: {}. Using 
default value: {}.",
+              arg,
+              originalMinHeapFreeRatio);
+        }
+      } else if (arg == "-XX:+ExplicitGCInvokesConcurrent") {
+        // If this is set -XX:+ExplicitGCInvokesConcurrent, System.gc() does 
not trigger Full GC,
+        // so explicit JVM shrinking is not effective.
+        LOG.error(
+            "Explicit JVM shrinking is not effective because 
-XX:+ExplicitGCInvokesConcurrent"
+                + " is set. Please check the JVM arguments: {}. ",
+            arg);
+
+      } else if (arg == "-XX:+DisableExplicitGC") {
+        // If -XX:+DisableExplicitGC is set, calls to System.gc() are ignored,
+        // so explicit JVM shrinking will not work as intended.
+        LOG.error(
+            "Explicit JVM shrinking is disabled because -XX:+DisableExplicitGC 
is set. "
+                + "System.gc() calls will be ignored and JVM shrinking will 
not work. "
+                + "Please check the JVM arguments: {}. ",
+            arg);
+      }
+    }
+    ORIGINAL_MIN_HEAP_FREE_RATIO = originalMinHeapFreeRatio;
+    ORIGINAL_MAX_HEAP_FREE_RATIO = originalMaxHeapFreeRatio;
+
+    if (!isJava9OrLater()) {

Review Comment:
   should we consider refuse to start up and abort here? 



##########
gluten-core/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java:
##########
@@ -125,4 +225,159 @@ public String name() {
   public MemoryUsageStats stats() {
     return recorder.toStats();
   }
+
+  public static boolean isJava9OrLater() {
+    String spec = System.getProperty("java.specification.version", "1.8");
+    // "1.8" → 8, "9" → 9, "11" → 11, etc.
+    if (spec.startsWith("1.")) {
+      spec = spec.substring(2);
+    }
+    try {
+      return Integer.parseInt(spec) >= 9;
+    } catch (NumberFormatException e) {
+      return false;
+    }
+  }
+
+  public static boolean canShrinkJVMMemory(long totalMemory, long freeMemory) {

Review Comment:
   does this feature also relies on the specific GC policy, like `G1` ?



##########
backends-velox/src/test/scala/org/apache/gluten/execution/DynamicOffHeapSizingSuite.scala:
##########
@@ -16,45 +16,58 @@
  */
 package org.apache.gluten.execution
 
-import org.apache.gluten.benchmarks.RandomParquetDataGenerator
-import org.apache.gluten.tags.SkipTest
+import org.apache.gluten.memory.memtarget.DynamicOffHeapSizingMemoryTarget
 
 import org.apache.spark.SparkConf
 
-@SkipTest
 class DynamicOffHeapSizingSuite extends VeloxWholeStageTransformerSuite {
   override protected val resourcePath: String = "/tpch-data-parquet"
   override protected val fileFormat: String = "parquet"
 
-  private val dataGenerator = 
RandomParquetDataGenerator(System.currentTimeMillis())
-  private val outputPath = getClass.getResource("/").getPath + 
"dynamicoffheapsizing_output.parquet"
-  private val AGG_SQL =
-    """select f_1, count(DISTINCT f_1)
-      |from tbl group
-      |group by 1""".stripMargin
-
   override def beforeAll(): Unit = {
     super.beforeAll()
+    createTPCHNotNullTables()
   }
+
   override protected def sparkConf: SparkConf = {
     super.sparkConf
       .set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
-      .set("spark.executor.memory", "6GB")
-      .set("spark.gluten.memory.dynamic.offHeap.sizing.memory.fraction", "0.8")
+      .set("spark.executor.memory", "2GB")
+      .set("spark.memory.offHeap.enabled", "false")
+      .set("spark.memory.offHeap.size", "0")
+      .set("spark.gluten.memory.dynamic.offHeap.sizing.memory.fraction", 
"0.95")
       .set("spark.gluten.memory.dynamic.offHeap.sizing.enabled", "true")
   }
 
-  def getRootCause(e: Throwable): Throwable = {
-    if (e.getCause == null) {
-      return e
-    }
-    getRootCause(e.getCause)
-  }
-
   test("Dynamic off-heap sizing") {
-    System.gc()
-    dataGenerator.generateRandomData(spark, Some(outputPath))
-    
spark.read.format("parquet").load(outputPath).createOrReplaceTempView("tbl")
-    spark.sql(AGG_SQL)
+    if (DynamicOffHeapSizingMemoryTarget.isJava9OrLater()) {
+      val query =

Review Comment:
   the CI for unit tests are running with JDK17, so this should be covered



##########
gluten-core/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java:
##########
@@ -125,4 +225,159 @@ public String name() {
   public MemoryUsageStats stats() {
     return recorder.toStats();
   }
+
+  public static boolean isJava9OrLater() {
+    String spec = System.getProperty("java.specification.version", "1.8");
+    // "1.8" → 8, "9" → 9, "11" → 11, etc.
+    if (spec.startsWith("1.")) {
+      spec = spec.substring(2);
+    }
+    try {
+      return Integer.parseInt(spec) >= 9;
+    } catch (NumberFormatException e) {
+      return false;
+    }
+  }
+
+  public static boolean canShrinkJVMMemory(long totalMemory, long freeMemory) {
+    // Check if the JVM memory can be shrunk by a full GC.
+    return freeMemory > totalMemory * GC_MAX_HEAP_FREE_RATIO;
+  }
+
+  public static long getTotalExplicitGCCount() {
+    return TOTAL_EXPLICIT_GC_COUNT.get();
+  }
+
+  private static boolean exceedsMaxMemoryUsage(
+      long totalOnHeapMemory, long totalOffHeapMemory, long requestedSize, 
double ratio) {
+    return requestedSize + totalOffHeapMemory + totalOnHeapMemory >= 
TOTAL_MEMORY_SHARED * ratio;
+  }
+
+  private static boolean shouldTriggerAsyncOnHeapMemoryShrink(
+      long totalOnHeapMemory, long freeOnHeapMemory, long totalOffHeapMemory, 
long requestedSize) {
+    // If most of the memory has already been used, there's a high chance that 
memory will be fully
+    // consumed. We proactively detect this situation to trigger JVM memory 
shrinking using the
+    // following conditions.
+
+    boolean exceedsMaxMemoryUsageRatio =
+        exceedsMaxMemoryUsage(
+            totalOnHeapMemory,
+            totalOffHeapMemory,
+            requestedSize,
+            ASYNC_GC_MAX_TOTAL_MEMORY_USAGE_RATIO);
+    return exceedsMaxMemoryUsageRatio
+        && canShrinkJVMMemory(totalOnHeapMemory, freeOnHeapMemory)
+        // Limit GC frequency to prevent performance impact from excessive 
garbage collection.
+        && totalOnHeapMemory > TOTAL_MEMORY_SHARED * 
ASYNC_GC_MAX_ON_HEAP_MEMORY_RATIO
+        && (!ASYNC_GC_SUSPEND.get()
+            || freeOnHeapMemory > totalOnHeapMemory * 
(ORIGINAL_MIN_HEAP_FREE_RATIO / 100.0));
+  }
+
+  private static long shrinkOnHeapMemoryInternal(
+      long totalMemory, long freeMemory, boolean isAsyncGc) {
+    long totalOffHeapMemory = USED_OFF_HEAP_BYTES.get();
+    LOG.warn(
+        String.format(
+            "Starting %sfull gc to shrink JVM memory: "
+                + "Total On-heap: %d, Free On-heap: %d, "
+                + "Total Off-heap: %d, Used On-Heap: %d, Executor memory: %d.",
+            isAsyncGc ? "async " : "",
+            totalMemory,
+            freeMemory,
+            totalOffHeapMemory,
+            (totalMemory - freeMemory),
+            TOTAL_MEMORY_SHARED));
+    // Explicitly calling System.gc() to trigger a full garbage collection.
+    // This is necessary in this context to attempt to shrink JVM memory usage
+    // when off-heap memory allocation is constrained. Use of System.gc() is
+    // generally discouraged due to its unpredictable performance impact, but
+    // here it is used as a last resort to prevent memory allocation failures.
+    System.gc();
+    long newTotalMemory = Runtime.getRuntime().totalMemory();
+    long newFreeMemory = Runtime.getRuntime().freeMemory();
+    int gcRetryTimes = 0;
+    while (!isAsyncGc
+        && gcRetryTimes < MAX_GC_RETRY_TIMES
+        && newTotalMemory >= totalMemory
+        && canShrinkJVMMemory(newTotalMemory, newFreeMemory)) {
+      // System.gc() is just a suggestion; the JVM may ignore it or perform 
only a partial GC.
+      // Here, the total memory is not reduced but the free memory ratio is 
bigger than the
+      // GC_MAX_HEAP_FREE_RATIO. So we need to call System.gc() again to try 
to reduce the total
+      // memory.
+      // This is a workaround for the JVM's behavior of not reducing the total 
memory after GC.
+      System.gc();
+      newTotalMemory = Runtime.getRuntime().totalMemory();
+      newFreeMemory = Runtime.getRuntime().freeMemory();
+      gcRetryTimes++;
+    }
+    // If the memory usage is still high after GC, we need to suspend the 
async GC for a while.
+    if (isAsyncGc) {
+      ASYNC_GC_SUSPEND.set(
+          totalMemory - newTotalMemory < totalMemory * 
(ORIGINAL_MIN_HEAP_FREE_RATIO / 100.0));
+    }
+
+    TOTAL_EXPLICIT_GC_COUNT.getAndAdd(1);
+    LOG.warn(
+        String.format(
+            "Finished %sfull gc to shrink JVM memory: "
+                + "Total On-heap: %d, Free On-heap: %d, "
+                + "Total Off-heap: %d, Used On-Heap: %d, Executor memory: %d, "
+                + "[GC Retry times: %d].",
+            isAsyncGc ? "async " : "",
+            newTotalMemory,
+            newFreeMemory,
+            totalOffHeapMemory,
+            (newTotalMemory - newFreeMemory),
+            TOTAL_MEMORY_SHARED,
+            gcRetryTimes));
+    return newTotalMemory;
+  }
+
+  public static long shrinkOnHeapMemory(long totalMemory, long freeMemory, 
boolean isAsyncGc) {
+    boolean updateMaxHeapFreeRatio = false;
+    Object hotSpotBean = null;
+    String maxHeapFreeRatioName = "MaxHeapFreeRatio";

Review Comment:
   although the feature is called dynamic offheap sizing - it's using dynamic 
onheap sizing underneeth 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to