This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new faa2b0472693 [SPARK-50421][CORE] Fix executor related memory config
incorrect when multiple resource profiles worked
faa2b0472693 is described below
commit faa2b04726935ab6b347497eedd219e4007df559
Author: Terry Wang <[email protected]>
AuthorDate: Thu Dec 5 08:25:54 2024 -0800
[SPARK-50421][CORE] Fix executor related memory config incorrect when
multiple resource profiles worked
### What changes were proposed in this pull request?
Reset the executor's env memory related config when resource profile is not
as the default resource profile!
### Why are the changes needed?
When multiple resource profile exists in the same spark application, now
the executor's memory related config is not override by resource profile's
memory size, which will cause maxOffHeap in `UnifiedMemoryManager` is not
correct.
See https://issues.apache.org/jira/browse/SPARK-50421 for more details
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Tests in our inner spark version and jobs.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #48963 from zjuwangg/m_fixConfig.
Authored-by: Terry Wang <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../scala/org/apache/spark/internal/LogKey.scala | 2 ++
.../executor/CoarseGrainedExecutorBackend.scala | 23 ++++++++++++++++++++++
2 files changed, 25 insertions(+)
diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
index c365797cec69..4fe6d96f9aab 100644
--- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
+++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
@@ -241,6 +241,8 @@ private[spark] object LogKeys {
case object EXECUTOR_ID extends LogKey
case object EXECUTOR_IDS extends LogKey
case object EXECUTOR_LAUNCH_COMMANDS extends LogKey
+ case object EXECUTOR_MEMORY_OFFHEAP extends LogKey
+ case object EXECUTOR_MEMORY_OVERHEAD_SIZE extends LogKey
case object EXECUTOR_MEMORY_SIZE extends LogKey
case object EXECUTOR_RESOURCES extends LogKey
case object EXECUTOR_SHUFFLE_INFO extends LogKey
diff --git
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index e880cf8da9ec..a73380cab690 100644
---
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -479,6 +479,29 @@ private[spark] object CoarseGrainedExecutorBackend extends
Logging {
driverConf.set(EXECUTOR_ID, arguments.executorId)
cfg.logLevel.foreach(logLevel => Utils.setLogLevelIfNeeded(logLevel))
+ // Set executor memory related config here according to resource profile
+ if (cfg.resourceProfile.id !=
ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) {
+ cfg.resourceProfile
+ .executorResources
+ .foreach {
+ case (ResourceProfile.OFFHEAP_MEM, request) =>
+ driverConf.set(MEMORY_OFFHEAP_SIZE.key, request.amount.toString
+ "m")
+ logInfo(log"Set executor off-heap memory to " +
+ log"${MDC(LogKeys.EXECUTOR_MEMORY_OFFHEAP, request)}")
+ case (ResourceProfile.MEMORY, request) =>
+ driverConf.set(EXECUTOR_MEMORY.key, request.amount.toString +
"m")
+ logInfo(log"Set executor memory to
${MDC(LogKeys.EXECUTOR_MEMORY_SIZE, request)}")
+ case (ResourceProfile.OVERHEAD_MEM, request) =>
+ // Maybe don't need to set this since it's nearly used by tasks.
+ driverConf.set(EXECUTOR_MEMORY_OVERHEAD.key,
request.amount.toString + "m")
+ logInfo(log"Set executor memory_overhead to " +
+ log"${MDC(LogKeys.EXECUTOR_MEMORY_OVERHEAD_SIZE, request)}")
+ case (ResourceProfile.CORES, request) =>
+ driverConf.set(EXECUTOR_CORES.key, request.amount.toString)
+ logInfo(log"Set executor cores to
${MDC(LogKeys.NUM_EXECUTOR_CORES, request)}")
+ case _ =>
+ }
+ }
val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId,
arguments.bindAddress,
arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal =
false)
// Set the application attemptId in the BlockStoreClient if available.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]