This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 603faba80 [GLUTEN-6960][VL] Limit Velox untracked global memory 
manager's usage  (#6988)
603faba80 is described below

commit 603faba80aff23bd5de0d80b614e20b59020ec7e
Author: Hongze Zhang <[email protected]>
AuthorDate: Mon Aug 26 14:12:32 2024 +0800

    [GLUTEN-6960][VL] Limit Velox untracked global memory manager's usage  
(#6988)
---
 cpp/core/config/GlutenConfig.h                     |  2 ++
 cpp/velox/compute/VeloxBackend.cc                  | 14 ++++++++--
 .../memory/memtarget/ThrowOnOomMemoryTarget.java   |  4 +--
 .../scala/org/apache/gluten/GlutenPlugin.scala     | 30 ++++++++++++++--------
 .../org/apache/spark/util/SparkResourceUtil.scala  | 13 ++++++++++
 .../scala/org/apache/gluten/GlutenConfig.scala     | 22 +++++++++++++---
 6 files changed, 67 insertions(+), 18 deletions(-)

diff --git a/cpp/core/config/GlutenConfig.h b/cpp/core/config/GlutenConfig.h
index 31318ff0a..cd5196aa8 100644
--- a/cpp/core/config/GlutenConfig.h
+++ b/cpp/core/config/GlutenConfig.h
@@ -38,6 +38,8 @@ const std::string kIgnoreMissingFiles = 
"spark.sql.files.ignoreMissingFiles";
 
 const std::string kDefaultSessionTimezone = 
"spark.gluten.sql.session.timeZone.default";
 
+const std::string kSparkOverheadMemory = 
"spark.gluten.memoryOverhead.size.in.bytes";
+
 const std::string kSparkOffHeapMemory = 
"spark.gluten.memory.offHeap.size.in.bytes";
 
 const std::string kSparkTaskOffHeapMemory = 
"spark.gluten.memory.task.offHeap.size.in.bytes";
diff --git a/cpp/velox/compute/VeloxBackend.cc 
b/cpp/velox/compute/VeloxBackend.cc
index 8dc3ade80..dcfadc42c 100644
--- a/cpp/velox/compute/VeloxBackend.cc
+++ b/cpp/velox/compute/VeloxBackend.cc
@@ -126,8 +126,18 @@ void VeloxBackend::init(const 
std::unordered_map<std::string, std::string>& conf
   initUdf();
   registerSparkTokenizer();
 
-  // initialize the global memory manager for current process
-  facebook::velox::memory::MemoryManager::initialize({});
+  // Initialize the global memory manager for current process.
+  auto sparkOverhead = backendConf_->get<int64_t>(kSparkOverheadMemory);
+  int64_t memoryManagerCapacity;
+  if (sparkOverhead.hasValue()) {
+    // 0.75 * total overhead memory is used for Velox global memory manager.
+    // FIXME: Make this configurable.
+    memoryManagerCapacity = sparkOverhead.value() * 0.75;
+  } else {
+    memoryManagerCapacity = facebook::velox::memory::kMaxMemory;
+  }
+  LOG(INFO) << "Setting global Velox memory manager with capacity: " << 
memoryManagerCapacity;
+  facebook::velox::memory::MemoryManager::initialize({.allocatorCapacity = 
memoryManagerCapacity});
 }
 
 facebook::velox::cache::AsyncDataCache* VeloxBackend::getAsyncDataCache() 
const {
diff --git 
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/ThrowOnOomMemoryTarget.java
 
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/ThrowOnOomMemoryTarget.java
index e6b6ba07e..74e0cbb87 100644
--- 
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/ThrowOnOomMemoryTarget.java
+++ 
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/ThrowOnOomMemoryTarget.java
@@ -89,8 +89,8 @@ public class ThrowOnOomMemoryTarget implements MemoryTarget {
         .append(
             String.format(
                 "\t%s=%s",
-                GlutenConfig$.MODULE$.GLUTEN_OFFHEAP_ENABLED(),
-                
SQLConf.get().getConfString(GlutenConfig$.MODULE$.GLUTEN_OFFHEAP_ENABLED())))
+                GlutenConfig$.MODULE$.SPARK_OFFHEAP_ENABLED(),
+                
SQLConf.get().getConfString(GlutenConfig$.MODULE$.SPARK_OFFHEAP_ENABLED())))
         .append(System.lineSeparator())
         .append(
             String.format(
diff --git a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala 
b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
index 6e3484dfa..f775d78a1 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
@@ -156,13 +156,13 @@ private[gluten] class GlutenDriverPlugin extends 
DriverPlugin with Logging {
     val minOffHeapSize = "1MB"
     if (
       !conf.getBoolean(GlutenConfig.GLUTEN_DYNAMIC_OFFHEAP_SIZING_ENABLED, 
false) &&
-      (!conf.getBoolean(GlutenConfig.GLUTEN_OFFHEAP_ENABLED, false) ||
-        conf.getSizeAsBytes(GlutenConfig.GLUTEN_OFFHEAP_SIZE_KEY, 0) < 
JavaUtils.byteStringAsBytes(
+      (!conf.getBoolean(GlutenConfig.SPARK_OFFHEAP_ENABLED, false) ||
+        conf.getSizeAsBytes(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY, 0) < 
JavaUtils.byteStringAsBytes(
           minOffHeapSize))
     ) {
       throw new GlutenException(
-        s"Must set '${GlutenConfig.GLUTEN_OFFHEAP_ENABLED}' to true " +
-          s"and set '${GlutenConfig.GLUTEN_OFFHEAP_SIZE_KEY}' to be greater 
than $minOffHeapSize")
+        s"Must set '${GlutenConfig.SPARK_OFFHEAP_ENABLED}' to true " +
+          s"and set '${GlutenConfig.SPARK_OFFHEAP_SIZE_KEY}' to be greater 
than $minOffHeapSize")
     }
 
     // Session's local time zone must be set. If not explicitly set by user, 
its default
@@ -174,13 +174,23 @@ private[gluten] class GlutenDriverPlugin extends 
DriverPlugin with Logging {
     conf.set(GlutenConfig.GLUTEN_NUM_TASK_SLOTS_PER_EXECUTOR_KEY, 
taskSlots.toString)
 
     val onHeapSize: Long =
-      if (conf.contains(GlutenConfig.GLUTEN_ONHEAP_SIZE_KEY)) {
-        conf.getSizeAsBytes(GlutenConfig.GLUTEN_ONHEAP_SIZE_KEY)
+      if (conf.contains(GlutenConfig.SPARK_ONHEAP_SIZE_KEY)) {
+        conf.getSizeAsBytes(GlutenConfig.SPARK_ONHEAP_SIZE_KEY)
       } else {
         // 1GB default
         1024 * 1024 * 1024
       }
 
+    val overheadSize: Long = SparkResourceUtil.getMemoryOverheadSize(conf)
+    conf.set(GlutenConfig.GLUTEN_OVERHEAD_SIZE_IN_BYTES_KEY, 
overheadSize.toString)
+
+    // FIXME: The following is a workaround. Remove once the causes are fixed.
+    conf.set(GlutenConfig.GLUTEN_OVERHEAD_SIZE_IN_BYTES_KEY, 
Long.MaxValue.toString)
+    logWarning(
+      "Setting overhead memory that Gluten can use to UNLIMITED. This is 
currently a" +
+        " temporary solution to avoid OOM by Velox's global memory pools." +
+        " See GLUTEN-6960 for more information.")
+
     // If dynamic off-heap sizing is enabled, the off-heap size is calculated 
based on the on-heap
     // size. Otherwise, the off-heap size is set to the value specified by the 
user (if any).
     // Note that this means that we will IGNORE the off-heap size specified by 
the user if the
@@ -200,17 +210,17 @@ private[gluten] class GlutenDriverPlugin extends 
DriverPlugin with Logging {
         // The 300MB value, unfortunately, is hard-coded in Spark code.
         ((onHeapSize - (300 * 1024 * 1024)) *
           
conf.getDouble(GlutenConfig.GLUTEN_DYNAMIC_OFFHEAP_SIZING_MEMORY_FRACTION, 
0.6d)).toLong
-      } else if (conf.contains(GlutenConfig.GLUTEN_OFFHEAP_SIZE_KEY)) {
+      } else if (conf.contains(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY)) {
         // Optimistic off-heap sizes, assuming all storage memory can be 
borrowed into execution
         // memory pool, regardless of Spark option 
spark.memory.storageFraction.
-        conf.getSizeAsBytes(GlutenConfig.GLUTEN_OFFHEAP_SIZE_KEY)
+        conf.getSizeAsBytes(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY)
       } else {
         // Default Spark Value.
         0L
       }
 
     conf.set(GlutenConfig.GLUTEN_OFFHEAP_SIZE_IN_BYTES_KEY, 
offHeapSize.toString)
-    conf.set(GlutenConfig.GLUTEN_OFFHEAP_SIZE_KEY, offHeapSize.toString)
+    conf.set(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY, offHeapSize.toString)
 
     val offHeapPerTask = offHeapSize / taskSlots
     conf.set(GlutenConfig.GLUTEN_TASK_OFFHEAP_SIZE_IN_BYTES_KEY, 
offHeapPerTask.toString)
@@ -218,7 +228,7 @@ private[gluten] class GlutenDriverPlugin extends 
DriverPlugin with Logging {
     // If we are using dynamic off-heap sizing, we should also enable off-heap 
memory
     // officially.
     if (conf.getBoolean(GlutenConfig.GLUTEN_DYNAMIC_OFFHEAP_SIZING_ENABLED, 
false)) {
-      conf.set(GlutenConfig.GLUTEN_OFFHEAP_ENABLED, "true")
+      conf.set(GlutenConfig.SPARK_OFFHEAP_ENABLED, "true")
 
       // We already sized the off-heap per task in a conservative manner, so 
we can just
       // use it.
diff --git 
a/gluten-core/src/main/scala/org/apache/spark/util/SparkResourceUtil.scala 
b/gluten-core/src/main/scala/org/apache/spark/util/SparkResourceUtil.scala
index f8c791fe1..890ea31b6 100644
--- a/gluten-core/src/main/scala/org/apache/spark/util/SparkResourceUtil.scala
+++ b/gluten-core/src/main/scala/org/apache/spark/util/SparkResourceUtil.scala
@@ -18,6 +18,8 @@ package org.apache.spark.util
 
 import org.apache.spark.{SparkConf, SparkMasterRegex}
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{EXECUTOR_MEMORY, 
EXECUTOR_MEMORY_OVERHEAD}
+import org.apache.spark.network.util.ByteUnit
 import org.apache.spark.sql.internal.SQLConf
 
 object SparkResourceUtil extends Logging {
@@ -80,4 +82,15 @@ object SparkResourceUtil extends Logging {
   def isLocalMaster(conf: SparkConf): Boolean = {
     Utils.isLocalMaster(conf)
   }
+
+  def getMemoryOverheadSize(conf: SparkConf): Long = {
+    val overheadMib = conf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse {
+      val executorMemMib = conf.get(EXECUTOR_MEMORY)
+      val factor =
+        conf.getDouble("spark.executor.memoryOverheadFactor", 0.1d)
+      val minMib = conf.getLong("spark.executor.minMemoryOverhead", 384L)
+      (executorMemMib * factor).toLong.max(minMib)
+    }
+    ByteUnit.MiB.toBytes(overheadMib)
+  }
 }
diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala 
b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
index 9e5161fea..bb0e683c2 100644
--- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
@@ -535,9 +535,11 @@ object GlutenConfig {
   val GLUTEN_CONFIG_PREFIX = "spark.gluten.sql.columnar.backend."
 
   // Private Spark configs.
-  val GLUTEN_ONHEAP_SIZE_KEY = "spark.executor.memory"
-  val GLUTEN_OFFHEAP_SIZE_KEY = "spark.memory.offHeap.size"
-  val GLUTEN_OFFHEAP_ENABLED = "spark.memory.offHeap.enabled"
+  val SPARK_ONHEAP_SIZE_KEY = "spark.executor.memory"
+  val SPARK_OVERHEAD_SIZE_KEY = "spark.executor.memoryOverhead"
+  val SPARK_OVERHEAD_FACTOR_KEY = "spark.executor.memoryOverheadFactor"
+  val SPARK_OFFHEAP_SIZE_KEY = "spark.memory.offHeap.size"
+  val SPARK_OFFHEAP_ENABLED = "spark.memory.offHeap.enabled"
   val SPARK_REDACTION_REGEX = "spark.redaction.regex"
 
   // For Soft Affinity Scheduling
@@ -570,6 +572,7 @@ object GlutenConfig {
 
   // Added back to Spark Conf during executor initialization
   val GLUTEN_NUM_TASK_SLOTS_PER_EXECUTOR_KEY = 
"spark.gluten.numTaskSlotsPerExecutor"
+  val GLUTEN_OVERHEAD_SIZE_IN_BYTES_KEY = 
"spark.gluten.memoryOverhead.size.in.bytes"
   val GLUTEN_OFFHEAP_SIZE_IN_BYTES_KEY = 
"spark.gluten.memory.offHeap.size.in.bytes"
   val GLUTEN_TASK_OFFHEAP_SIZE_IN_BYTES_KEY = 
"spark.gluten.memory.task.offHeap.size.in.bytes"
   val GLUTEN_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES_KEY =
@@ -762,9 +765,10 @@ object GlutenConfig {
       SPARK_SQL_PARQUET_COMPRESSION_CODEC,
       // datasource config end
 
+      GLUTEN_OVERHEAD_SIZE_IN_BYTES_KEY,
       GLUTEN_OFFHEAP_SIZE_IN_BYTES_KEY,
       GLUTEN_TASK_OFFHEAP_SIZE_IN_BYTES_KEY,
-      GLUTEN_OFFHEAP_ENABLED,
+      SPARK_OFFHEAP_ENABLED,
       SESSION_LOCAL_TIMEZONE.key,
       DECIMAL_OPERATIONS_ALLOW_PREC_LOSS.key,
       SPARK_REDACTION_REGEX
@@ -1244,6 +1248,16 @@ object GlutenConfig {
       .intConf
       .createWithDefaultString("-1")
 
+  val COLUMNAR_OVERHEAD_SIZE_IN_BYTES =
+    buildConf(GlutenConfig.GLUTEN_OVERHEAD_SIZE_IN_BYTES_KEY)
+      .internal()
+      .doc(
+        "Must provide default value since non-execution operations " +
+          "(e.g. org.apache.spark.sql.Dataset#summary) doesn't propagate 
configurations using " +
+          "org.apache.spark.sql.execution.SQLExecution#withSQLConfPropagated")
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefaultString("0")
+
   val COLUMNAR_OFFHEAP_SIZE_IN_BYTES =
     buildConf(GlutenConfig.GLUTEN_OFFHEAP_SIZE_IN_BYTES_KEY)
       .internal()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to