This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 36e435d5d [VL] Set Spark memory overhead automatically according to
off-heap size when it's not explicitly configured (#7045)
36e435d5d is described below
commit 36e435d5d92f8155fa7701a15cf19b9c9363966d
Author: Hongze Zhang <[email protected]>
AuthorDate: Wed Aug 28 12:51:07 2024 +0800
[VL] Set Spark memory overhead automatically according to off-heap size
when it's not explicitly configured (#7045)
---
.../backendsapi/velox/VeloxListenerApi.scala | 25 ++++++++++++++++------
.../apache/gluten/test/VeloxBackendTestBase.java | 1 +
.../scala/org/apache/gluten/GlutenPlugin.scala | 5 +----
.../org/apache/spark/util/SparkResourceUtil.scala | 12 +++++++++--
4 files changed, 31 insertions(+), 12 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
index 56b439a37..c7bfa9ab5 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
@@ -27,6 +27,7 @@ import org.apache.gluten.vectorized.{JniLibLoader,
JniWorkspace}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.api.plugin.PluginContext
import org.apache.spark.internal.Logging
+import org.apache.spark.network.util.ByteUnit
import
org.apache.spark.sql.execution.datasources.velox.{VeloxOrcWriterInjects,
VeloxParquetWriterInjects, VeloxRowSplitter}
import org.apache.spark.sql.expression.UDFResolver
import org.apache.spark.sql.internal.{GlutenConfigUtil, StaticSQLConf}
@@ -42,12 +43,24 @@ class VeloxListenerApi extends ListenerApi with Logging {
override def onDriverStart(sc: SparkContext, pc: PluginContext): Unit = {
val conf = pc.conf()
- // FIXME: The following is a workaround. Remove once the causes are fixed.
- conf.set(GlutenConfig.GLUTEN_OVERHEAD_SIZE_IN_BYTES_KEY,
Long.MaxValue.toString)
- logWarning(
- "Setting overhead memory that Gluten can use to UNLIMITED. This is
currently a" +
- " temporary solution to avoid OOM by Velox's global memory pools." +
- " See GLUTEN-6960 for more information.")
+ // Overhead memory limits.
+ val offHeapSize = conf.getSizeAsBytes(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY)
+ val desiredOverheadSize = (0.1 *
offHeapSize).toLong.max(ByteUnit.MiB.toBytes(384))
+ if (!SparkResourceUtil.isMemoryOverheadSet(conf)) {
+ // If memory overhead is not set by user, automatically set it according
to off-heap settings.
+ logInfo(
+ s"Memory overhead is not set. Setting it to $desiredOverheadSize
automatically." +
+ " Gluten doesn't follow Spark's calculation on default value of this
option because the" +
+ " actual required memory overhead will depend on off-heap usage than
on on-heap usage.")
+ conf.set(GlutenConfig.SPARK_OVERHEAD_SIZE_KEY,
desiredOverheadSize.toString)
+ }
+ val overheadSize: Long = SparkResourceUtil.getMemoryOverheadSize(conf)
+ if (overheadSize < desiredOverheadSize) {
+ logWarning(
+ s"Memory overhead is set to $overheadSize which is smaller than the
recommended size" +
+ s" $desiredOverheadSize. This may cause OOM.")
+ }
+ conf.set(GlutenConfig.GLUTEN_OVERHEAD_SIZE_IN_BYTES_KEY,
overheadSize.toString)
// Sql table cache serializer.
if (conf.getBoolean(GlutenConfig.COLUMNAR_TABLE_CACHE_ENABLED.key,
defaultValue = false)) {
diff --git
a/backends-velox/src/test/java/org/apache/gluten/test/VeloxBackendTestBase.java
b/backends-velox/src/test/java/org/apache/gluten/test/VeloxBackendTestBase.java
index 2f0087608..1cbaefba8 100644
---
a/backends-velox/src/test/java/org/apache/gluten/test/VeloxBackendTestBase.java
+++
b/backends-velox/src/test/java/org/apache/gluten/test/VeloxBackendTestBase.java
@@ -54,6 +54,7 @@ public abstract class VeloxBackendTestBase {
public SparkConf conf() {
final SparkConf conf = new SparkConf();
conf.set(GlutenConfig.COLUMNAR_VELOX_CONNECTOR_IO_THREADS().key(),
"0");
+ conf.set(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY(), "1g");
return conf;
}
diff --git a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
index 47551764e..d3179f6bb 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
@@ -159,7 +159,7 @@ private[gluten] class GlutenDriverPlugin extends
DriverPlugin with Logging {
// value (detected for the platform) is used, consistent with spark.
conf.set(GLUTEN_DEFAULT_SESSION_TIMEZONE_KEY,
SQLConf.SESSION_LOCAL_TIMEZONE.defaultValueString)
- // task slots
+ // Task slots.
val taskSlots = SparkResourceUtil.getTaskSlots(conf)
conf.set(GlutenConfig.GLUTEN_NUM_TASK_SLOTS_PER_EXECUTOR_KEY,
taskSlots.toString)
@@ -171,9 +171,6 @@ private[gluten] class GlutenDriverPlugin extends
DriverPlugin with Logging {
1024 * 1024 * 1024
}
- val overheadSize: Long = SparkResourceUtil.getMemoryOverheadSize(conf)
- conf.set(GlutenConfig.GLUTEN_OVERHEAD_SIZE_IN_BYTES_KEY,
overheadSize.toString)
-
// If dynamic off-heap sizing is enabled, the off-heap size is calculated
based on the on-heap
// size. Otherwise, the off-heap size is set to the value specified by the
user (if any).
// Note that this means that we will IGNORE the off-heap size specified by
the user if the
diff --git
a/gluten-core/src/main/scala/org/apache/spark/util/SparkResourceUtil.scala
b/gluten-core/src/main/scala/org/apache/spark/util/SparkResourceUtil.scala
index 890ea31b6..6be1791fa 100644
--- a/gluten-core/src/main/scala/org/apache/spark/util/SparkResourceUtil.scala
+++ b/gluten-core/src/main/scala/org/apache/spark/util/SparkResourceUtil.scala
@@ -23,6 +23,8 @@ import org.apache.spark.network.util.ByteUnit
import org.apache.spark.sql.internal.SQLConf
object SparkResourceUtil extends Logging {
+ private val MEMORY_OVERHEAD_FACTOR = "spark.executor.memoryOverheadFactor"
+ private val MIN_MEMORY_OVERHEAD = "spark.executor.minMemoryOverhead"
/** Get the total cores of the Spark application */
def getTotalCores(sqlConf: SQLConf): Int = {
@@ -83,12 +85,18 @@ object SparkResourceUtil extends Logging {
Utils.isLocalMaster(conf)
}
+ // Returns whether user manually sets memory overhead.
+ def isMemoryOverheadSet(conf: SparkConf): Boolean = {
+ Seq(EXECUTOR_MEMORY_OVERHEAD.key, MEMORY_OVERHEAD_FACTOR,
MIN_MEMORY_OVERHEAD).exists(
+ conf.contains)
+ }
+
def getMemoryOverheadSize(conf: SparkConf): Long = {
val overheadMib = conf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse {
val executorMemMib = conf.get(EXECUTOR_MEMORY)
val factor =
- conf.getDouble("spark.executor.memoryOverheadFactor", 0.1d)
- val minMib = conf.getLong("spark.executor.minMemoryOverhead", 384L)
+ conf.getDouble(MEMORY_OVERHEAD_FACTOR, 0.1d)
+ val minMib = conf.getLong(MIN_MEMORY_OVERHEAD, 384L)
(executorMemMib * factor).toLong.max(minMib)
}
ByteUnit.MiB.toBytes(overheadMib)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]