This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 496dbea1d fix: Executor memory overhead overriding (#1462)
496dbea1d is described below
commit 496dbea1d2a02e517e9398b127a8835314a9f31c
Author: Lukas Moravec <[email protected]>
AuthorDate: Wed Mar 5 19:54:00 2025 +0100
fix: Executor memory overhead overriding (#1462)
---
.../apache/comet/CometSparkSessionExtensions.scala | 5 +++
.../src/main/scala/org/apache/spark/Plugins.scala | 5 +--
.../scala/org/apache/spark/CometPluginsSuite.scala | 36 ++++++++++++++++++++++
3 files changed, 44 insertions(+), 2 deletions(-)
diff --git
a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
index 9f295bc3f..c82d10da2 100644
--- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
+++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
@@ -1409,6 +1409,11 @@ object CometSparkSessionExtensions extends Logging {
}
}
+ /** Calculates Comet shuffle memory size in MB */
+ def getCometShuffleMemorySizeInMiB(sparkConf: SparkConf, conf: SQLConf =
SQLConf.get): Long = {
+ ByteUnit.BYTE.toMiB(getCometShuffleMemorySize(sparkConf, conf))
+ }
+
def cometUnifiedMemoryManagerEnabled(sparkConf: SparkConf): Boolean = {
sparkConf.getBoolean("spark.memory.offHeap.enabled", false)
}
diff --git a/spark/src/main/scala/org/apache/spark/Plugins.scala
b/spark/src/main/scala/org/apache/spark/Plugins.scala
index 0e5c5d6b0..644ec81b7 100644
--- a/spark/src/main/scala/org/apache/spark/Plugins.scala
+++ b/spark/src/main/scala/org/apache/spark/Plugins.scala
@@ -55,7 +55,8 @@ class CometDriverPlugin extends DriverPlugin with Logging
with ShimCometDriverPl
sc.getConf.getSizeAsMb(EXECUTOR_MEMORY_OVERHEAD.key)
} else {
// By default, executorMemory * spark.executor.memoryOverheadFactor,
with minimum of 384MB
- val executorMemory = sc.getConf.getSizeAsMb(EXECUTOR_MEMORY.key,
EXECUTOR_MEMORY_DEFAULT)
+ val executorMemory =
+ sc.getConf.getSizeAsMb(EXECUTOR_MEMORY.key, EXECUTOR_MEMORY_DEFAULT)
val memoryOverheadFactor = getMemoryOverheadFactor(sc.getConf)
val memoryOverheadMinMib = getMemoryOverheadMinMib(sc.getConf)
@@ -67,7 +68,7 @@ class CometDriverPlugin extends DriverPlugin with Logging
with ShimCometDriverPl
CometSparkSessionExtensions.getCometMemoryOverheadInMiB(sc.getConf)
} else {
// comet shuffle unified memory manager is disabled, so we need to
add overhead memory
- CometSparkSessionExtensions.getCometShuffleMemorySize(sc.getConf)
+
CometSparkSessionExtensions.getCometShuffleMemorySizeInMiB(sc.getConf)
}
sc.conf.set(EXECUTOR_MEMORY_OVERHEAD.key, s"${execMemOverhead +
cometMemOverhead}M")
val newExecMemOverhead =
sc.getConf.getSizeAsMb(EXECUTOR_MEMORY_OVERHEAD.key)
diff --git a/spark/src/test/scala/org/apache/spark/CometPluginsSuite.scala
b/spark/src/test/scala/org/apache/spark/CometPluginsSuite.scala
index 8b388f2d3..67a06b28b 100644
--- a/spark/src/test/scala/org/apache/spark/CometPluginsSuite.scala
+++ b/spark/src/test/scala/org/apache/spark/CometPluginsSuite.scala
@@ -143,3 +143,39 @@ class CometPluginsNonOverrideSuite extends CometTestBase {
assert(execMemOverhead4 == "2G")
}
}
+
+class CometPluginsUnifiedModeOverrideSuite extends CometTestBase {
+ override protected def sparkConf: SparkConf = {
+ val conf = new SparkConf()
+ conf.set("spark.driver.memory", "1G")
+ conf.set("spark.executor.memory", "1G")
+ conf.set("spark.executor.memoryOverhead", "1G")
+ conf.set("spark.plugins", "org.apache.spark.CometPlugin")
+ conf.set("spark.comet.enabled", "true")
+ conf.set("spark.memory.offHeap.enabled", "true")
+ conf.set("spark.memory.offHeap.size", "2G")
+ conf.set("spark.comet.exec.shuffle.enabled", "true")
+ conf.set("spark.comet.exec.enabled", "true")
+ conf.set("spark.comet.memory.overhead.factor", "0.5")
+ conf
+ }
+
+ /*
+ * Since using unified memory, but not shuffle unified memory
+ * executor memory should be overridden by adding comet shuffle memory size
+ */
+ test("executor memory overhead is correctly overridden") {
+ val execMemOverhead1 = spark.conf.get("spark.executor.memoryOverhead")
+ val execMemOverhead2 =
spark.sessionState.conf.getConfString("spark.executor.memoryOverhead")
+ val execMemOverhead3 =
spark.sparkContext.getConf.get("spark.executor.memoryOverhead")
+ val execMemOverhead4 =
spark.sparkContext.conf.get("spark.executor.memoryOverhead")
+
+ // in unified memory mode, comet memory overhead is
spark.memory.offHeap.size (2G) * spark.comet.memory.overhead.factor (0.5) = 1G
+ // so the total executor memory overhead is executor memory overhead (1G)
+ comet memory overhead (1G) = 2G
+ // and the overhead is overridden in MiB
+ assert(execMemOverhead1 == "2048M")
+ assert(execMemOverhead2 == "2048M")
+ assert(execMemOverhead3 == "2048M")
+ assert(execMemOverhead4 == "2048M")
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]