This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 603faba80 [GLUTEN-6960][VL] Limit Velox untracked global memory
manager's usage (#6988)
603faba80 is described below
commit 603faba80aff23bd5de0d80b614e20b59020ec7e
Author: Hongze Zhang <[email protected]>
AuthorDate: Mon Aug 26 14:12:32 2024 +0800
[GLUTEN-6960][VL] Limit Velox untracked global memory manager's usage
(#6988)
---
cpp/core/config/GlutenConfig.h | 2 ++
cpp/velox/compute/VeloxBackend.cc | 14 ++++++++--
.../memory/memtarget/ThrowOnOomMemoryTarget.java | 4 +--
.../scala/org/apache/gluten/GlutenPlugin.scala | 30 ++++++++++++++--------
.../org/apache/spark/util/SparkResourceUtil.scala | 13 ++++++++++
.../scala/org/apache/gluten/GlutenConfig.scala | 22 +++++++++++++---
6 files changed, 67 insertions(+), 18 deletions(-)
diff --git a/cpp/core/config/GlutenConfig.h b/cpp/core/config/GlutenConfig.h
index 31318ff0a..cd5196aa8 100644
--- a/cpp/core/config/GlutenConfig.h
+++ b/cpp/core/config/GlutenConfig.h
@@ -38,6 +38,8 @@ const std::string kIgnoreMissingFiles =
"spark.sql.files.ignoreMissingFiles";
const std::string kDefaultSessionTimezone =
"spark.gluten.sql.session.timeZone.default";
+const std::string kSparkOverheadMemory =
"spark.gluten.memoryOverhead.size.in.bytes";
+
const std::string kSparkOffHeapMemory =
"spark.gluten.memory.offHeap.size.in.bytes";
const std::string kSparkTaskOffHeapMemory =
"spark.gluten.memory.task.offHeap.size.in.bytes";
diff --git a/cpp/velox/compute/VeloxBackend.cc
b/cpp/velox/compute/VeloxBackend.cc
index 8dc3ade80..dcfadc42c 100644
--- a/cpp/velox/compute/VeloxBackend.cc
+++ b/cpp/velox/compute/VeloxBackend.cc
@@ -126,8 +126,18 @@ void VeloxBackend::init(const
std::unordered_map<std::string, std::string>& conf
initUdf();
registerSparkTokenizer();
- // initialize the global memory manager for current process
- facebook::velox::memory::MemoryManager::initialize({});
+ // Initialize the global memory manager for current process.
+ auto sparkOverhead = backendConf_->get<int64_t>(kSparkOverheadMemory);
+ int64_t memoryManagerCapacity;
+ if (sparkOverhead.hasValue()) {
+ // 0.75 * total overhead memory is used for Velox global memory manager.
+ // FIXME: Make this configurable.
+ memoryManagerCapacity = sparkOverhead.value() * 0.75;
+ } else {
+ memoryManagerCapacity = facebook::velox::memory::kMaxMemory;
+ }
+ LOG(INFO) << "Setting global Velox memory manager with capacity: " <<
memoryManagerCapacity;
+ facebook::velox::memory::MemoryManager::initialize({.allocatorCapacity =
memoryManagerCapacity});
}
facebook::velox::cache::AsyncDataCache* VeloxBackend::getAsyncDataCache()
const {
diff --git
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/ThrowOnOomMemoryTarget.java
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/ThrowOnOomMemoryTarget.java
index e6b6ba07e..74e0cbb87 100644
---
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/ThrowOnOomMemoryTarget.java
+++
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/ThrowOnOomMemoryTarget.java
@@ -89,8 +89,8 @@ public class ThrowOnOomMemoryTarget implements MemoryTarget {
.append(
String.format(
"\t%s=%s",
- GlutenConfig$.MODULE$.GLUTEN_OFFHEAP_ENABLED(),
-
SQLConf.get().getConfString(GlutenConfig$.MODULE$.GLUTEN_OFFHEAP_ENABLED())))
+ GlutenConfig$.MODULE$.SPARK_OFFHEAP_ENABLED(),
+
SQLConf.get().getConfString(GlutenConfig$.MODULE$.SPARK_OFFHEAP_ENABLED())))
.append(System.lineSeparator())
.append(
String.format(
diff --git a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
index 6e3484dfa..f775d78a1 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
@@ -156,13 +156,13 @@ private[gluten] class GlutenDriverPlugin extends
DriverPlugin with Logging {
val minOffHeapSize = "1MB"
if (
!conf.getBoolean(GlutenConfig.GLUTEN_DYNAMIC_OFFHEAP_SIZING_ENABLED,
false) &&
- (!conf.getBoolean(GlutenConfig.GLUTEN_OFFHEAP_ENABLED, false) ||
- conf.getSizeAsBytes(GlutenConfig.GLUTEN_OFFHEAP_SIZE_KEY, 0) <
JavaUtils.byteStringAsBytes(
+ (!conf.getBoolean(GlutenConfig.SPARK_OFFHEAP_ENABLED, false) ||
+ conf.getSizeAsBytes(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY, 0) <
JavaUtils.byteStringAsBytes(
minOffHeapSize))
) {
throw new GlutenException(
- s"Must set '${GlutenConfig.GLUTEN_OFFHEAP_ENABLED}' to true " +
- s"and set '${GlutenConfig.GLUTEN_OFFHEAP_SIZE_KEY}' to be greater
than $minOffHeapSize")
+ s"Must set '${GlutenConfig.SPARK_OFFHEAP_ENABLED}' to true " +
+ s"and set '${GlutenConfig.SPARK_OFFHEAP_SIZE_KEY}' to be greater
than $minOffHeapSize")
}
// Session's local time zone must be set. If not explicitly set by user,
its default
@@ -174,13 +174,23 @@ private[gluten] class GlutenDriverPlugin extends
DriverPlugin with Logging {
conf.set(GlutenConfig.GLUTEN_NUM_TASK_SLOTS_PER_EXECUTOR_KEY,
taskSlots.toString)
val onHeapSize: Long =
- if (conf.contains(GlutenConfig.GLUTEN_ONHEAP_SIZE_KEY)) {
- conf.getSizeAsBytes(GlutenConfig.GLUTEN_ONHEAP_SIZE_KEY)
+ if (conf.contains(GlutenConfig.SPARK_ONHEAP_SIZE_KEY)) {
+ conf.getSizeAsBytes(GlutenConfig.SPARK_ONHEAP_SIZE_KEY)
} else {
// 1GB default
1024 * 1024 * 1024
}
+ val overheadSize: Long = SparkResourceUtil.getMemoryOverheadSize(conf)
+ conf.set(GlutenConfig.GLUTEN_OVERHEAD_SIZE_IN_BYTES_KEY,
overheadSize.toString)
+
+ // FIXME: The following is a workaround. Remove once the causes are fixed.
+ conf.set(GlutenConfig.GLUTEN_OVERHEAD_SIZE_IN_BYTES_KEY,
Long.MaxValue.toString)
+ logWarning(
+ "Setting overhead memory that Gluten can use to UNLIMITED. This is
currently a" +
+ " temporary solution to avoid OOM by Velox's global memory pools." +
+ " See GLUTEN-6960 for more information.")
+
// If dynamic off-heap sizing is enabled, the off-heap size is calculated
based on the on-heap
// size. Otherwise, the off-heap size is set to the value specified by the
user (if any).
// Note that this means that we will IGNORE the off-heap size specified by
the user if the
@@ -200,17 +210,17 @@ private[gluten] class GlutenDriverPlugin extends
DriverPlugin with Logging {
// The 300MB value, unfortunately, is hard-coded in Spark code.
((onHeapSize - (300 * 1024 * 1024)) *
conf.getDouble(GlutenConfig.GLUTEN_DYNAMIC_OFFHEAP_SIZING_MEMORY_FRACTION,
0.6d)).toLong
- } else if (conf.contains(GlutenConfig.GLUTEN_OFFHEAP_SIZE_KEY)) {
+ } else if (conf.contains(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY)) {
// Optimistic off-heap sizes, assuming all storage memory can be
borrowed into execution
// memory pool, regardless of Spark option
spark.memory.storageFraction.
- conf.getSizeAsBytes(GlutenConfig.GLUTEN_OFFHEAP_SIZE_KEY)
+ conf.getSizeAsBytes(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY)
} else {
// Default Spark Value.
0L
}
conf.set(GlutenConfig.GLUTEN_OFFHEAP_SIZE_IN_BYTES_KEY,
offHeapSize.toString)
- conf.set(GlutenConfig.GLUTEN_OFFHEAP_SIZE_KEY, offHeapSize.toString)
+ conf.set(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY, offHeapSize.toString)
val offHeapPerTask = offHeapSize / taskSlots
conf.set(GlutenConfig.GLUTEN_TASK_OFFHEAP_SIZE_IN_BYTES_KEY,
offHeapPerTask.toString)
@@ -218,7 +228,7 @@ private[gluten] class GlutenDriverPlugin extends
DriverPlugin with Logging {
// If we are using dynamic off-heap sizing, we should also enable off-heap
memory
// officially.
if (conf.getBoolean(GlutenConfig.GLUTEN_DYNAMIC_OFFHEAP_SIZING_ENABLED,
false)) {
- conf.set(GlutenConfig.GLUTEN_OFFHEAP_ENABLED, "true")
+ conf.set(GlutenConfig.SPARK_OFFHEAP_ENABLED, "true")
// We already sized the off-heap per task in a conservative manner, so
we can just
// use it.
diff --git
a/gluten-core/src/main/scala/org/apache/spark/util/SparkResourceUtil.scala
b/gluten-core/src/main/scala/org/apache/spark/util/SparkResourceUtil.scala
index f8c791fe1..890ea31b6 100644
--- a/gluten-core/src/main/scala/org/apache/spark/util/SparkResourceUtil.scala
+++ b/gluten-core/src/main/scala/org/apache/spark/util/SparkResourceUtil.scala
@@ -18,6 +18,8 @@ package org.apache.spark.util
import org.apache.spark.{SparkConf, SparkMasterRegex}
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{EXECUTOR_MEMORY,
EXECUTOR_MEMORY_OVERHEAD}
+import org.apache.spark.network.util.ByteUnit
import org.apache.spark.sql.internal.SQLConf
object SparkResourceUtil extends Logging {
@@ -80,4 +82,15 @@ object SparkResourceUtil extends Logging {
def isLocalMaster(conf: SparkConf): Boolean = {
Utils.isLocalMaster(conf)
}
+
+ def getMemoryOverheadSize(conf: SparkConf): Long = {
+ val overheadMib = conf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse {
+ val executorMemMib = conf.get(EXECUTOR_MEMORY)
+ val factor =
+ conf.getDouble("spark.executor.memoryOverheadFactor", 0.1d)
+ val minMib = conf.getLong("spark.executor.minMemoryOverhead", 384L)
+ (executorMemMib * factor).toLong.max(minMib)
+ }
+ ByteUnit.MiB.toBytes(overheadMib)
+ }
}
diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
index 9e5161fea..bb0e683c2 100644
--- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
@@ -535,9 +535,11 @@ object GlutenConfig {
val GLUTEN_CONFIG_PREFIX = "spark.gluten.sql.columnar.backend."
// Private Spark configs.
- val GLUTEN_ONHEAP_SIZE_KEY = "spark.executor.memory"
- val GLUTEN_OFFHEAP_SIZE_KEY = "spark.memory.offHeap.size"
- val GLUTEN_OFFHEAP_ENABLED = "spark.memory.offHeap.enabled"
+ val SPARK_ONHEAP_SIZE_KEY = "spark.executor.memory"
+ val SPARK_OVERHEAD_SIZE_KEY = "spark.executor.memoryOverhead"
+ val SPARK_OVERHEAD_FACTOR_KEY = "spark.executor.memoryOverheadFactor"
+ val SPARK_OFFHEAP_SIZE_KEY = "spark.memory.offHeap.size"
+ val SPARK_OFFHEAP_ENABLED = "spark.memory.offHeap.enabled"
val SPARK_REDACTION_REGEX = "spark.redaction.regex"
// For Soft Affinity Scheduling
@@ -570,6 +572,7 @@ object GlutenConfig {
// Added back to Spark Conf during executor initialization
val GLUTEN_NUM_TASK_SLOTS_PER_EXECUTOR_KEY =
"spark.gluten.numTaskSlotsPerExecutor"
+ val GLUTEN_OVERHEAD_SIZE_IN_BYTES_KEY =
"spark.gluten.memoryOverhead.size.in.bytes"
val GLUTEN_OFFHEAP_SIZE_IN_BYTES_KEY =
"spark.gluten.memory.offHeap.size.in.bytes"
val GLUTEN_TASK_OFFHEAP_SIZE_IN_BYTES_KEY =
"spark.gluten.memory.task.offHeap.size.in.bytes"
val GLUTEN_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES_KEY =
@@ -762,9 +765,10 @@ object GlutenConfig {
SPARK_SQL_PARQUET_COMPRESSION_CODEC,
// datasource config end
+ GLUTEN_OVERHEAD_SIZE_IN_BYTES_KEY,
GLUTEN_OFFHEAP_SIZE_IN_BYTES_KEY,
GLUTEN_TASK_OFFHEAP_SIZE_IN_BYTES_KEY,
- GLUTEN_OFFHEAP_ENABLED,
+ SPARK_OFFHEAP_ENABLED,
SESSION_LOCAL_TIMEZONE.key,
DECIMAL_OPERATIONS_ALLOW_PREC_LOSS.key,
SPARK_REDACTION_REGEX
@@ -1244,6 +1248,16 @@ object GlutenConfig {
.intConf
.createWithDefaultString("-1")
+ val COLUMNAR_OVERHEAD_SIZE_IN_BYTES =
+ buildConf(GlutenConfig.GLUTEN_OVERHEAD_SIZE_IN_BYTES_KEY)
+ .internal()
+ .doc(
+ "Must provide default value since non-execution operations " +
+ "(e.g. org.apache.spark.sql.Dataset#summary) doesn't propagate
configurations using " +
+ "org.apache.spark.sql.execution.SQLExecution#withSQLConfPropagated")
+ .bytesConf(ByteUnit.BYTE)
+ .createWithDefaultString("0")
+
val COLUMNAR_OFFHEAP_SIZE_IN_BYTES =
buildConf(GlutenConfig.GLUTEN_OFFHEAP_SIZE_IN_BYTES_KEY)
.internal()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]