This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 969d672  [SPARK-37688][CORE] ExecutorMonitor should ignore 
SparkListenerBlockUpdated event if executor was not active
969d672 is described below

commit 969d672ba79891fb00edd84124b866e1a097c1bd
Author: hujiahua <hujia...@youzan.com>
AuthorDate: Mon Feb 28 21:41:58 2022 -0600

    [SPARK-37688][CORE] ExecutorMonitor should ignore SparkListenerBlockUpdated 
event if executor was not active
    
    ### What changes were proposed in this pull request?
    
    `ExecutorMonitor` should ignore `SparkListenerBlockUpdated` event if 
executor was not active
    
    ### Why are the changes needed?
    
    If not ignored, `ExecutorMonitor` will create a new executor tracker with 
UNKNOWN_RESOURCE_PROFILE_ID for the dead executor. And 
`ExecutorAllocationManager` will not remove executor with 
UNKNOWN_RESOURCE_PROFILE_ID, which cause a executor slot is occupied by the 
dead executor, so a new one cannot be created.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Add a new test.
    
    Closes #34956 from sleep1661/SPARK-37688.
    
    Authored-by: hujiahua <hujia...@youzan.com>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
 .../spark/scheduler/dynalloc/ExecutorMonitor.scala    |  4 ++++
 .../scheduler/dynalloc/ExecutorMonitorSuite.scala     | 19 +++++++++++++++++++
 2 files changed, 23 insertions(+)

diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala 
b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
index def63b9..defef5b 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
@@ -379,6 +379,10 @@ private[spark] class ExecutorMonitor(
   }
 
   override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = {
+    if 
(!client.isExecutorActive(event.blockUpdatedInfo.blockManagerId.executorId)) {
+      return
+    }
+
     val exec = 
ensureExecutorIsTracked(event.blockUpdatedInfo.blockManagerId.executorId,
       UNKNOWN_RESOURCE_PROFILE_ID)
 
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala
index 6fb89b8..336198b 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala
@@ -169,6 +169,8 @@ class ExecutorMonitorSuite extends SparkFunSuite {
   }
 
   test("keeps track of stored blocks for each rdd and split") {
+    knownExecs ++= Set("1", "2")
+
     monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), 
"1", execInfo))
 
     monitor.onBlockUpdated(rddUpdate(1, 0, "1"))
@@ -249,6 +251,7 @@ class ExecutorMonitorSuite extends SparkFunSuite {
   }
 
   test("SPARK-27677: don't track blocks stored on disk when using shuffle 
service") {
+    knownExecs += "1"
     // First make sure that blocks on disk are counted when no shuffle service 
is available.
     monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), 
"1", execInfo))
     monitor.onBlockUpdated(rddUpdate(1, 0, "1", level = 
StorageLevel.DISK_ONLY))
@@ -458,6 +461,22 @@ class ExecutorMonitorSuite extends SparkFunSuite {
     assert(monitor.timedOutExecutors(idleDeadline).isEmpty)
   }
 
+  test("SPARK-37688: ignore SparkListenerBlockUpdated event if executor was 
not active") {
+    conf
+      .set(DYN_ALLOCATION_SHUFFLE_TRACKING_TIMEOUT, Long.MaxValue)
+      .set(DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED, true)
+      .set(SHUFFLE_SERVICE_ENABLED, false)
+    monitor = new ExecutorMonitor(conf, client, null, clock, 
allocationManagerSource())
+
+    monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), 
"1", execInfo))
+    
monitor.onExecutorRemoved(SparkListenerExecutorRemoved(clock.getTimeMillis(), 
"1",
+      "heartbeats timeout"))
+    monitor.onBlockUpdated(rddUpdate(1, 1, "1", level = 
StorageLevel.MEMORY_AND_DISK))
+
+    assert(monitor.executorCount == 0 )
+  }
+
+
   private def idleDeadline: Long = clock.nanoTime() + idleTimeoutNs + 1
   private def storageDeadline: Long = clock.nanoTime() + storageTimeoutNs + 1
   private def shuffleDeadline: Long = clock.nanoTime() + shuffleTimeoutNs + 1

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to