This is an automated email from the ASF dual-hosted git repository. mridulm80 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 969d672 [SPARK-37688][CORE] ExecutorMonitor should ignore SparkListenerBlockUpdated event if executor was not active 969d672 is described below commit 969d672ba79891fb00edd84124b866e1a097c1bd Author: hujiahua <hujia...@youzan.com> AuthorDate: Mon Feb 28 21:41:58 2022 -0600 [SPARK-37688][CORE] ExecutorMonitor should ignore SparkListenerBlockUpdated event if executor was not active ### What changes were proposed in this pull request? `ExecutorMonitor` should ignore `SparkListenerBlockUpdated` event if executor was not active ### Why are the changes needed? If not ignored, `ExecutorMonitor` will create a new executor tracker with UNKNOWN_RESOURCE_PROFILE_ID for the dead executor. And `ExecutorAllocationManager` will not remove executor with UNKNOWN_RESOURCE_PROFILE_ID, which cause a executor slot is occupied by the dead executor, so a new one cannot be created. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Add a new test. Closes #34956 from sleep1661/SPARK-37688. Authored-by: hujiahua <hujia...@youzan.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> --- .../spark/scheduler/dynalloc/ExecutorMonitor.scala | 4 ++++ .../scheduler/dynalloc/ExecutorMonitorSuite.scala | 19 +++++++++++++++++++ 2 files changed, 23 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala index def63b9..defef5b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala @@ -379,6 +379,10 @@ private[spark] class ExecutorMonitor( } override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = { + if (!client.isExecutorActive(event.blockUpdatedInfo.blockManagerId.executorId)) { + return + } + val exec = ensureExecutorIsTracked(event.blockUpdatedInfo.blockManagerId.executorId, UNKNOWN_RESOURCE_PROFILE_ID) diff --git a/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala index 6fb89b8..336198b 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala @@ -169,6 +169,8 @@ class ExecutorMonitorSuite extends SparkFunSuite { } test("keeps track of stored blocks for each rdd and split") { + knownExecs ++= Set("1", "2") + monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo)) monitor.onBlockUpdated(rddUpdate(1, 0, "1")) @@ -249,6 +251,7 @@ class ExecutorMonitorSuite extends SparkFunSuite { } test("SPARK-27677: don't track blocks stored on disk when using shuffle service") { + knownExecs += "1" // First make sure that blocks on disk are counted when no shuffle service is available. monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo)) monitor.onBlockUpdated(rddUpdate(1, 0, "1", level = StorageLevel.DISK_ONLY)) @@ -458,6 +461,22 @@ class ExecutorMonitorSuite extends SparkFunSuite { assert(monitor.timedOutExecutors(idleDeadline).isEmpty) } + test("SPARK-37688: ignore SparkListenerBlockUpdated event if executor was not active") { + conf + .set(DYN_ALLOCATION_SHUFFLE_TRACKING_TIMEOUT, Long.MaxValue) + .set(DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED, true) + .set(SHUFFLE_SERVICE_ENABLED, false) + monitor = new ExecutorMonitor(conf, client, null, clock, allocationManagerSource()) + + monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo)) + monitor.onExecutorRemoved(SparkListenerExecutorRemoved(clock.getTimeMillis(), "1", + "heartbeats timeout")) + monitor.onBlockUpdated(rddUpdate(1, 1, "1", level = StorageLevel.MEMORY_AND_DISK)) + + assert(monitor.executorCount == 0 ) + } + + private def idleDeadline: Long = clock.nanoTime() + idleTimeoutNs + 1 private def storageDeadline: Long = clock.nanoTime() + storageTimeoutNs + 1 private def shuffleDeadline: Long = clock.nanoTime() + shuffleTimeoutNs + 1 --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org