This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 0bc4a37 [SPARK-38019][CORE] Make `ExecutorMonitor.timedOutExecutors`
deterministic
0bc4a37 is described below
commit 0bc4a370e144008d3c687e687714cb31873792e6
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Tue Jan 25 03:00:34 2022 -0800
[SPARK-38019][CORE] Make `ExecutorMonitor.timedOutExecutors` deterministic
### What changes were proposed in this pull request?
This PR aims to make `ExecutorMonitor.timedOutExecutors` method
deterministic.
### Why are the changes needed?
Since the AS-IS `timedOutExecutors` returns the result indeterministic, it
kills the executors in a random order at Dynamic Allocation setting.
https://github.com/apache/spark/blob/18f9e7efac5100744f255b6c8ae267579cd8d9ce/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala#L58
https://github.com/apache/spark/blob/18f9e7efac5100744f255b6c8ae267579cd8d9ce/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala#L119
This random behavior not only makes the users confusing but also causes a
K8s decommission tests flaky like the following case in Java 17 on Apple
Silicon environment. The K8s test expects the decommission of executor 1 while
the executor 2 is chosen at this time.
```
22/01/25 06:11:16 DEBUG ExecutorMonitor: Executors 1,2 do not have active
shuffle data after job 0 finished.
22/01/25 06:11:16 DEBUG ExecutorAllocationManager: max needed for rpId: 0
numpending: 0, tasksperexecutor: 1
22/01/25 06:11:16 DEBUG ExecutorAllocationManager: No change in number of
executors
22/01/25 06:11:16 DEBUG ExecutorAllocationManager: Request to remove
executorIds: (2,0), (1,0)
22/01/25 06:11:16 DEBUG ExecutorAllocationManager: Not removing idle
executor 1 because there are only 1 executor(s) left (minimum number of
executor limit 1)
22/01/25 06:11:16 INFO KubernetesClusterSchedulerBackend: Decommission
executors: 2
```
### Does this PR introduce _any_ user-facing change?
No because the previous behavior was a random list and new behavior is now
deterministic.
### How was this patch tested?
Pass the CIs with the newly added test case.
Closes #35315 from dongjoon-hyun/SPARK-38019.
Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit e2c4913c2e43481d1a12e5a2f307ed8a8d913311)
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../apache/spark/scheduler/dynalloc/ExecutorMonitor.scala | 2 +-
.../spark/scheduler/dynalloc/ExecutorMonitorSuite.scala | 15 +++++++++++++++
2 files changed, 16 insertions(+), 1 deletion(-)
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
index cecd4b0..acdfaa5 100644
---
a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
+++
b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
@@ -133,7 +133,7 @@ private[spark] class ExecutorMonitor(
.toSeq
updateNextTimeout(newNextTimeout)
}
- timedOutExecs
+ timedOutExecs.sortBy(_._1)
}
/**
diff --git
a/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala
b/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala
index 69afdb5..6fb89b8 100644
---
a/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala
+++
b/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala
@@ -233,6 +233,21 @@ class ExecutorMonitorSuite extends SparkFunSuite {
assert(monitor.timedOutExecutors(clock.nanoTime()).toSet === Set("1", "2",
"3"))
}
+ test("SPARK-38019: timedOutExecutors should be deterministic") {
+ knownExecs ++= Set("1", "2", "3")
+
+ // start exec 1, 2, 3 at 0s (should idle time out at 60s)
+ monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(),
"1", execInfo))
+ assert(monitor.isExecutorIdle("1"))
+ monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(),
"2", execInfo))
+ assert(monitor.isExecutorIdle("2"))
+ monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(),
"3", execInfo))
+ assert(monitor.isExecutorIdle("3"))
+
+ clock.setTime(TimeUnit.SECONDS.toMillis(150))
+ assert(monitor.timedOutExecutors().map(_._1) === Seq("1", "2", "3"))
+ }
+
test("SPARK-27677: don't track blocks stored on disk when using shuffle
service") {
// First make sure that blocks on disk are counted when no shuffle service
is available.
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(),
"1", execInfo))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]