This is an automated email from the ASF dual-hosted git repository.
wuyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 4eb0edf5382 [SPARK-40596][CORE] Populate ExecutorDecommission with
messages in ExecutorDecommissionInfo
4eb0edf5382 is described below
commit 4eb0edf538266a8f7085fe57255a6870b2c13769
Author: Bo Zhang <[email protected]>
AuthorDate: Tue Oct 11 09:50:06 2022 +0800
[SPARK-40596][CORE] Populate ExecutorDecommission with messages in
ExecutorDecommissionInfo
### What changes were proposed in this pull request?
This change populates `ExecutorDecommission` with messages in
`ExecutorDecommissionInfo`.
### Why are the changes needed?
Currently the message in `ExecutorDecommission` is a fixed value ("Executor
decommission."), so it is the same for all cases, e.g. spot instance
interruptions and auto-scaling down. With this change we can better
differentiate those cases.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added a unit test.
Closes #38030 from bozhang2820/spark-40596.
Authored-by: Bo Zhang <[email protected]>
Signed-off-by: Yi Wu <[email protected]>
---
.../scala/org/apache/spark/scheduler/DAGScheduler.scala | 2 +-
.../org/apache/spark/scheduler/ExecutorLossReason.scala | 11 +++++++++--
.../scala/org/apache/spark/scheduler/TaskSetManager.scala | 2 +-
.../scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 13 +++++++------
.../apache/spark/scheduler/dynalloc/ExecutorMonitor.scala | 2 +-
.../storage/BlockManagerDecommissionIntegrationSuite.scala | 14 +++++++++++---
6 files changed, 30 insertions(+), 14 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index c5529851382..7ad53b8f9f8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -2949,7 +2949,7 @@ private[scheduler] class
DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler
case ExecutorLost(execId, reason) =>
val workerHost = reason match {
case ExecutorProcessLost(_, workerHost, _) => workerHost
- case ExecutorDecommission(workerHost) => workerHost
+ case ExecutorDecommission(workerHost, _) => workerHost
case _ => None
}
dagScheduler.handleExecutorLost(execId, workerHost)
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
index f333c01bb89..fb6a62551fa 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
@@ -77,6 +77,13 @@ case class ExecutorProcessLost(
* If you update this code make sure to re-run the K8s integration tests.
*
* @param workerHost it is defined when the worker is decommissioned too
+ * @param reason detailed decommission message
*/
-private [spark] case class ExecutorDecommission(workerHost: Option[String] =
None)
- extends ExecutorLossReason("Executor decommission.")
+private [spark] case class ExecutorDecommission(
+ workerHost: Option[String] = None,
+ reason: String = "")
+ extends ExecutorLossReason(ExecutorDecommission.msgPrefix + reason)
+
+private[spark] object ExecutorDecommission {
+ val msgPrefix = "Executor decommission: "
+}
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 1d157f51fe6..943d1e53df4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -1071,7 +1071,7 @@ private[spark] class TaskSetManager(
for ((tid, info) <- taskInfos if info.running && info.executorId ==
execId) {
val exitCausedByApp: Boolean = reason match {
case ExecutorExited(_, false, _) => false
- case ExecutorKilled | ExecutorDecommission(_) => false
+ case ExecutorKilled | ExecutorDecommission(_, _) => false
case ExecutorProcessLost(_, _, false) => false
// If the task is launching, this indicates that Driver has sent
LaunchTask to Executor,
// but Executor has not sent StatusUpdate(TaskState.RUNNING) to
Driver. Hence, we assume
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index e37abd76296..225dd1d75bf 100644
---
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -99,8 +99,8 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
// Executors that have been lost, but for which we don't yet know the real
exit reason.
protected val executorsPendingLossReason = new HashSet[String]
- // Executors which are being decommissioned. Maps from executorId to
workerHost.
- protected val executorsPendingDecommission = new HashMap[String,
Option[String]]
+ // Executors which are being decommissioned. Maps from executorId to
ExecutorDecommissionInfo.
+ protected val executorsPendingDecommission = new HashMap[String,
ExecutorDecommissionInfo]
// A map of ResourceProfile id to map of hostname with its possible task
number running on it
@GuardedBy("CoarseGrainedSchedulerBackend.this")
@@ -447,11 +447,12 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
executorDataMap -= executorId
executorsPendingLossReason -= executorId
val killedByDriver =
executorsPendingToRemove.remove(executorId).getOrElse(false)
- val workerHostOpt = executorsPendingDecommission.remove(executorId)
+ val decommissionInfoOpt =
executorsPendingDecommission.remove(executorId)
if (killedByDriver) {
ExecutorKilled
- } else if (workerHostOpt.isDefined) {
- ExecutorDecommission(workerHostOpt.get)
+ } else if (decommissionInfoOpt.isDefined) {
+ val decommissionInfo = decommissionInfoOpt.get
+ ExecutorDecommission(decommissionInfo.workerHost,
decommissionInfo.message)
} else {
reason
}
@@ -535,7 +536,7 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
// Only bother decommissioning executors which are alive.
if (isExecutorActive(executorId)) {
scheduler.executorDecommission(executorId, decomInfo)
- executorsPendingDecommission(executorId) = decomInfo.workerHost
+ executorsPendingDecommission(executorId) = decomInfo
Some(executorId)
} else {
None
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
index 12942896369..fc9248de7ee 100644
---
a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
+++
b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
@@ -356,7 +356,7 @@ private[spark] class ExecutorMonitor(
if (removed != null) {
decrementExecResourceProfileCount(removed.resourceProfileId)
if (event.reason == ExecutorLossMessage.decommissionFinished ||
- event.reason == ExecutorDecommission().message) {
+ (event.reason != null &&
event.reason.startsWith(ExecutorDecommission.msgPrefix))) {
metrics.gracefullyDecommissioned.inc()
} else if (removed.decommissioning) {
metrics.decommissionUnfinished.inc()
diff --git
a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
index e004c334dee..d9d2e6102f1 100644
---
a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
+++
b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
@@ -183,9 +183,14 @@ class BlockManagerDecommissionIntegrationSuite extends
SparkFunSuite with LocalS
taskEndEvents.asScala.filter(_.taskInfo.successful).map(_.taskInfo.executorId).headOption
}
- sc.addSparkListener(new SparkListener {
+ val listener = new SparkListener {
+ var removeReasonValidated = false
+
override def onExecutorRemoved(execRemoved:
SparkListenerExecutorRemoved): Unit = {
executorRemovedSem.release()
+ if (execRemoved.reason == ExecutorDecommission.msgPrefix + "test msg
0") {
+ removeReasonValidated = true
+ }
}
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
@@ -211,7 +216,8 @@ class BlockManagerDecommissionIntegrationSuite extends
SparkFunSuite with LocalS
}
}
}
- })
+ }
+ sc.addSparkListener(listener)
// Cache the RDD lazily
if (persist) {
@@ -247,7 +253,7 @@ class BlockManagerDecommissionIntegrationSuite extends
SparkFunSuite with LocalS
// Decommission executor and ensure it is not relaunched by setting
adjustTargetNumExecutors
sched.decommissionExecutor(
execToDecommission,
- ExecutorDecommissionInfo("", None),
+ ExecutorDecommissionInfo("test msg 0", None),
adjustTargetNumExecutors = true)
val decomTime = new SystemClock().getTimeMillis()
@@ -343,5 +349,7 @@ class BlockManagerDecommissionIntegrationSuite extends
SparkFunSuite with LocalS
// should have same value like before
assert(testRdd.count() === numParts)
assert(accum.value === numParts)
+ import scala.language.reflectiveCalls
+ assert(listener.removeReasonValidated)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]