This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 496dbea1d fix: Executor memory overhead overriding (#1462)
496dbea1d is described below

commit 496dbea1d2a02e517e9398b127a8835314a9f31c
Author: Lukas Moravec <[email protected]>
AuthorDate: Wed Mar 5 19:54:00 2025 +0100

    fix: Executor memory overhead overriding (#1462)
---
 .../apache/comet/CometSparkSessionExtensions.scala |  5 +++
 .../src/main/scala/org/apache/spark/Plugins.scala  |  5 +--
 .../scala/org/apache/spark/CometPluginsSuite.scala | 36 ++++++++++++++++++++++
 3 files changed, 44 insertions(+), 2 deletions(-)

diff --git 
a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala 
b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
index 9f295bc3f..c82d10da2 100644
--- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
+++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
@@ -1409,6 +1409,11 @@ object CometSparkSessionExtensions extends Logging {
     }
   }
 
+  /** Calculates Comet shuffle memory size in MB */
+  def getCometShuffleMemorySizeInMiB(sparkConf: SparkConf, conf: SQLConf = 
SQLConf.get): Long = {
+    ByteUnit.BYTE.toMiB(getCometShuffleMemorySize(sparkConf, conf))
+  }
+
   def cometUnifiedMemoryManagerEnabled(sparkConf: SparkConf): Boolean = {
     sparkConf.getBoolean("spark.memory.offHeap.enabled", false)
   }
diff --git a/spark/src/main/scala/org/apache/spark/Plugins.scala 
b/spark/src/main/scala/org/apache/spark/Plugins.scala
index 0e5c5d6b0..644ec81b7 100644
--- a/spark/src/main/scala/org/apache/spark/Plugins.scala
+++ b/spark/src/main/scala/org/apache/spark/Plugins.scala
@@ -55,7 +55,8 @@ class CometDriverPlugin extends DriverPlugin with Logging 
with ShimCometDriverPl
         sc.getConf.getSizeAsMb(EXECUTOR_MEMORY_OVERHEAD.key)
       } else {
         // By default, executorMemory * spark.executor.memoryOverheadFactor, 
with minimum of 384MB
-        val executorMemory = sc.getConf.getSizeAsMb(EXECUTOR_MEMORY.key, 
EXECUTOR_MEMORY_DEFAULT)
+        val executorMemory =
+          sc.getConf.getSizeAsMb(EXECUTOR_MEMORY.key, EXECUTOR_MEMORY_DEFAULT)
         val memoryOverheadFactor = getMemoryOverheadFactor(sc.getConf)
         val memoryOverheadMinMib = getMemoryOverheadMinMib(sc.getConf)
 
@@ -67,7 +68,7 @@ class CometDriverPlugin extends DriverPlugin with Logging 
with ShimCometDriverPl
           CometSparkSessionExtensions.getCometMemoryOverheadInMiB(sc.getConf)
         } else {
           // comet shuffle unified memory manager is disabled, so we need to 
add overhead memory
-          CometSparkSessionExtensions.getCometShuffleMemorySize(sc.getConf)
+          
CometSparkSessionExtensions.getCometShuffleMemorySizeInMiB(sc.getConf)
         }
       sc.conf.set(EXECUTOR_MEMORY_OVERHEAD.key, s"${execMemOverhead + 
cometMemOverhead}M")
       val newExecMemOverhead = 
sc.getConf.getSizeAsMb(EXECUTOR_MEMORY_OVERHEAD.key)
diff --git a/spark/src/test/scala/org/apache/spark/CometPluginsSuite.scala 
b/spark/src/test/scala/org/apache/spark/CometPluginsSuite.scala
index 8b388f2d3..67a06b28b 100644
--- a/spark/src/test/scala/org/apache/spark/CometPluginsSuite.scala
+++ b/spark/src/test/scala/org/apache/spark/CometPluginsSuite.scala
@@ -143,3 +143,39 @@ class CometPluginsNonOverrideSuite extends CometTestBase {
     assert(execMemOverhead4 == "2G")
   }
 }
+
+class CometPluginsUnifiedModeOverrideSuite extends CometTestBase {
+  override protected def sparkConf: SparkConf = {
+    val conf = new SparkConf()
+    conf.set("spark.driver.memory", "1G")
+    conf.set("spark.executor.memory", "1G")
+    conf.set("spark.executor.memoryOverhead", "1G")
+    conf.set("spark.plugins", "org.apache.spark.CometPlugin")
+    conf.set("spark.comet.enabled", "true")
+    conf.set("spark.memory.offHeap.enabled", "true")
+    conf.set("spark.memory.offHeap.size", "2G")
+    conf.set("spark.comet.exec.shuffle.enabled", "true")
+    conf.set("spark.comet.exec.enabled", "true")
+    conf.set("spark.comet.memory.overhead.factor", "0.5")
+    conf
+  }
+
+  /*
+   * Since using unified memory, but not shuffle unified memory
+   * executor memory should be overridden by adding comet shuffle memory size
+   */
+  test("executor memory overhead is correctly overridden") {
+    val execMemOverhead1 = spark.conf.get("spark.executor.memoryOverhead")
+    val execMemOverhead2 = 
spark.sessionState.conf.getConfString("spark.executor.memoryOverhead")
+    val execMemOverhead3 = 
spark.sparkContext.getConf.get("spark.executor.memoryOverhead")
+    val execMemOverhead4 = 
spark.sparkContext.conf.get("spark.executor.memoryOverhead")
+
+    // in unified memory mode, comet memory overhead is 
spark.memory.offHeap.size (2G) * spark.comet.memory.overhead.factor (0.5) = 1G
+    // so the total executor memory overhead is executor memory overhead (1G) 
+ comet memory overhead (1G) = 2G
+    // and the overhead is overridden in MiB
+    assert(execMemOverhead1 == "2048M")
+    assert(execMemOverhead2 == "2048M")
+    assert(execMemOverhead3 == "2048M")
+    assert(execMemOverhead4 == "2048M")
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to