This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 0bc4a37  [SPARK-38019][CORE] Make `ExecutorMonitor.timedOutExecutors` 
deterministic
0bc4a37 is described below

commit 0bc4a370e144008d3c687e687714cb31873792e6
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Tue Jan 25 03:00:34 2022 -0800

    [SPARK-38019][CORE] Make `ExecutorMonitor.timedOutExecutors` deterministic
    
    ### What changes were proposed in this pull request?
    
    This PR aims to make `ExecutorMonitor.timedOutExecutors` method 
deterministic.
    
    ### Why are the changes needed?
    
    Since the AS-IS `timedOutExecutors` returns the result indeterministic, it 
kills the executors in a random order at Dynamic Allocation setting.
    
    
https://github.com/apache/spark/blob/18f9e7efac5100744f255b6c8ae267579cd8d9ce/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala#L58
    
    
https://github.com/apache/spark/blob/18f9e7efac5100744f255b6c8ae267579cd8d9ce/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala#L119
    
    This random behavior not only makes the users confusing but also causes a 
K8s decommission tests flaky like the following case in Java 17 on Apple 
Silicon environment. The K8s test expects the decommission of executor 1 while 
the executor 2 is chosen at this time.
    
    ```
    22/01/25 06:11:16 DEBUG ExecutorMonitor: Executors 1,2 do not have active 
shuffle data after job 0 finished.
    22/01/25 06:11:16 DEBUG ExecutorAllocationManager: max needed for rpId: 0 
numpending: 0, tasksperexecutor: 1
    22/01/25 06:11:16 DEBUG ExecutorAllocationManager: No change in number of 
executors
    22/01/25 06:11:16 DEBUG ExecutorAllocationManager: Request to remove 
executorIds: (2,0), (1,0)
    22/01/25 06:11:16 DEBUG ExecutorAllocationManager: Not removing idle 
executor 1 because there are only 1 executor(s) left (minimum number of 
executor limit 1)
    22/01/25 06:11:16 INFO KubernetesClusterSchedulerBackend: Decommission 
executors: 2
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No because the previous behavior was a random list and new behavior is now 
deterministic.
    
    ### How was this patch tested?
    
    Pass the CIs with the newly added test case.
    
    Closes #35315 from dongjoon-hyun/SPARK-38019.
    
    Authored-by: Dongjoon Hyun <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
    (cherry picked from commit e2c4913c2e43481d1a12e5a2f307ed8a8d913311)
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../apache/spark/scheduler/dynalloc/ExecutorMonitor.scala |  2 +-
 .../spark/scheduler/dynalloc/ExecutorMonitorSuite.scala   | 15 +++++++++++++++
 2 files changed, 16 insertions(+), 1 deletion(-)

diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala 
b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
index cecd4b0..acdfaa5 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
@@ -133,7 +133,7 @@ private[spark] class ExecutorMonitor(
         .toSeq
       updateNextTimeout(newNextTimeout)
     }
-    timedOutExecs
+    timedOutExecs.sortBy(_._1)
   }
 
   /**
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala
index 69afdb5..6fb89b8 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala
@@ -233,6 +233,21 @@ class ExecutorMonitorSuite extends SparkFunSuite {
     assert(monitor.timedOutExecutors(clock.nanoTime()).toSet === Set("1", "2", 
"3"))
   }
 
+  test("SPARK-38019: timedOutExecutors should be deterministic") {
+    knownExecs ++= Set("1", "2", "3")
+
+    // start exec 1, 2, 3 at 0s (should idle time out at 60s)
+    monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), 
"1", execInfo))
+    assert(monitor.isExecutorIdle("1"))
+    monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), 
"2", execInfo))
+    assert(monitor.isExecutorIdle("2"))
+    monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), 
"3", execInfo))
+    assert(monitor.isExecutorIdle("3"))
+
+    clock.setTime(TimeUnit.SECONDS.toMillis(150))
+    assert(monitor.timedOutExecutors().map(_._1) === Seq("1", "2", "3"))
+  }
+
   test("SPARK-27677: don't track blocks stored on disk when using shuffle 
service") {
     // First make sure that blocks on disk are counted when no shuffle service 
is available.
     monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), 
"1", execInfo))

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to