This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 36e435d5d [VL] Set Spark memory overhead automatically according to 
off-heap size when it's not explicitly configured (#7045)
36e435d5d is described below

commit 36e435d5d92f8155fa7701a15cf19b9c9363966d
Author: Hongze Zhang <[email protected]>
AuthorDate: Wed Aug 28 12:51:07 2024 +0800

    [VL] Set Spark memory overhead automatically according to off-heap size 
when it's not explicitly configured (#7045)
---
 .../backendsapi/velox/VeloxListenerApi.scala       | 25 ++++++++++++++++------
 .../apache/gluten/test/VeloxBackendTestBase.java   |  1 +
 .../scala/org/apache/gluten/GlutenPlugin.scala     |  5 +----
 .../org/apache/spark/util/SparkResourceUtil.scala  | 12 +++++++++--
 4 files changed, 31 insertions(+), 12 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
index 56b439a37..c7bfa9ab5 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
@@ -27,6 +27,7 @@ import org.apache.gluten.vectorized.{JniLibLoader, 
JniWorkspace}
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.api.plugin.PluginContext
 import org.apache.spark.internal.Logging
+import org.apache.spark.network.util.ByteUnit
 import 
org.apache.spark.sql.execution.datasources.velox.{VeloxOrcWriterInjects, 
VeloxParquetWriterInjects, VeloxRowSplitter}
 import org.apache.spark.sql.expression.UDFResolver
 import org.apache.spark.sql.internal.{GlutenConfigUtil, StaticSQLConf}
@@ -42,12 +43,24 @@ class VeloxListenerApi extends ListenerApi with Logging {
   override def onDriverStart(sc: SparkContext, pc: PluginContext): Unit = {
     val conf = pc.conf()
 
-    // FIXME: The following is a workaround. Remove once the causes are fixed.
-    conf.set(GlutenConfig.GLUTEN_OVERHEAD_SIZE_IN_BYTES_KEY, 
Long.MaxValue.toString)
-    logWarning(
-      "Setting overhead memory that Gluten can use to UNLIMITED. This is 
currently a" +
-        " temporary solution to avoid OOM by Velox's global memory pools." +
-        " See GLUTEN-6960 for more information.")
+    // Overhead memory limits.
+    val offHeapSize = conf.getSizeAsBytes(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY)
+    val desiredOverheadSize = (0.1 * 
offHeapSize).toLong.max(ByteUnit.MiB.toBytes(384))
+    if (!SparkResourceUtil.isMemoryOverheadSet(conf)) {
+      // If memory overhead is not set by user, automatically set it according 
to off-heap settings.
+      logInfo(
+        s"Memory overhead is not set. Setting it to $desiredOverheadSize 
automatically." +
+          " Gluten doesn't follow Spark's calculation on default value of this 
option because the" +
+          " actual required memory overhead will depend on off-heap usage than 
on on-heap usage.")
+      conf.set(GlutenConfig.SPARK_OVERHEAD_SIZE_KEY, 
desiredOverheadSize.toString)
+    }
+    val overheadSize: Long = SparkResourceUtil.getMemoryOverheadSize(conf)
+    if (overheadSize < desiredOverheadSize) {
+      logWarning(
+        s"Memory overhead is set to $overheadSize which is smaller than the 
recommended size" +
+          s" $desiredOverheadSize. This may cause OOM.")
+    }
+    conf.set(GlutenConfig.GLUTEN_OVERHEAD_SIZE_IN_BYTES_KEY, 
overheadSize.toString)
 
     // Sql table cache serializer.
     if (conf.getBoolean(GlutenConfig.COLUMNAR_TABLE_CACHE_ENABLED.key, 
defaultValue = false)) {
diff --git 
a/backends-velox/src/test/java/org/apache/gluten/test/VeloxBackendTestBase.java 
b/backends-velox/src/test/java/org/apache/gluten/test/VeloxBackendTestBase.java
index 2f0087608..1cbaefba8 100644
--- 
a/backends-velox/src/test/java/org/apache/gluten/test/VeloxBackendTestBase.java
+++ 
b/backends-velox/src/test/java/org/apache/gluten/test/VeloxBackendTestBase.java
@@ -54,6 +54,7 @@ public abstract class VeloxBackendTestBase {
       public SparkConf conf() {
         final SparkConf conf = new SparkConf();
         conf.set(GlutenConfig.COLUMNAR_VELOX_CONNECTOR_IO_THREADS().key(), 
"0");
+        conf.set(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY(), "1g");
         return conf;
       }
 
diff --git a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala 
b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
index 47551764e..d3179f6bb 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
@@ -159,7 +159,7 @@ private[gluten] class GlutenDriverPlugin extends 
DriverPlugin with Logging {
     // value (detected for the platform) is used, consistent with spark.
     conf.set(GLUTEN_DEFAULT_SESSION_TIMEZONE_KEY, 
SQLConf.SESSION_LOCAL_TIMEZONE.defaultValueString)
 
-    // task slots
+    // Task slots.
     val taskSlots = SparkResourceUtil.getTaskSlots(conf)
     conf.set(GlutenConfig.GLUTEN_NUM_TASK_SLOTS_PER_EXECUTOR_KEY, 
taskSlots.toString)
 
@@ -171,9 +171,6 @@ private[gluten] class GlutenDriverPlugin extends 
DriverPlugin with Logging {
         1024 * 1024 * 1024
       }
 
-    val overheadSize: Long = SparkResourceUtil.getMemoryOverheadSize(conf)
-    conf.set(GlutenConfig.GLUTEN_OVERHEAD_SIZE_IN_BYTES_KEY, 
overheadSize.toString)
-
     // If dynamic off-heap sizing is enabled, the off-heap size is calculated 
based on the on-heap
     // size. Otherwise, the off-heap size is set to the value specified by the 
user (if any).
     // Note that this means that we will IGNORE the off-heap size specified by 
the user if the
diff --git 
a/gluten-core/src/main/scala/org/apache/spark/util/SparkResourceUtil.scala 
b/gluten-core/src/main/scala/org/apache/spark/util/SparkResourceUtil.scala
index 890ea31b6..6be1791fa 100644
--- a/gluten-core/src/main/scala/org/apache/spark/util/SparkResourceUtil.scala
+++ b/gluten-core/src/main/scala/org/apache/spark/util/SparkResourceUtil.scala
@@ -23,6 +23,8 @@ import org.apache.spark.network.util.ByteUnit
 import org.apache.spark.sql.internal.SQLConf
 
 object SparkResourceUtil extends Logging {
+  private val MEMORY_OVERHEAD_FACTOR = "spark.executor.memoryOverheadFactor"
+  private val MIN_MEMORY_OVERHEAD = "spark.executor.minMemoryOverhead"
 
   /** Get the total cores of the Spark application */
   def getTotalCores(sqlConf: SQLConf): Int = {
@@ -83,12 +85,18 @@ object SparkResourceUtil extends Logging {
     Utils.isLocalMaster(conf)
   }
 
+  // Returns whether user manually sets memory overhead.
+  def isMemoryOverheadSet(conf: SparkConf): Boolean = {
+    Seq(EXECUTOR_MEMORY_OVERHEAD.key, MEMORY_OVERHEAD_FACTOR, 
MIN_MEMORY_OVERHEAD).exists(
+      conf.contains)
+  }
+
   def getMemoryOverheadSize(conf: SparkConf): Long = {
     val overheadMib = conf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse {
       val executorMemMib = conf.get(EXECUTOR_MEMORY)
       val factor =
-        conf.getDouble("spark.executor.memoryOverheadFactor", 0.1d)
-      val minMib = conf.getLong("spark.executor.minMemoryOverhead", 384L)
+        conf.getDouble(MEMORY_OVERHEAD_FACTOR, 0.1d)
+      val minMib = conf.getLong(MIN_MEMORY_OVERHEAD, 384L)
       (executorMemMib * factor).toLong.max(minMib)
     }
     ByteUnit.MiB.toBytes(overheadMib)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to