This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch branch-2.3 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.3 by this push: new 9c78669 [SPARK-26758][CORE] Idle Executors are not getting killed after spark.dynamiAllocation.executorIdleTimeout value 9c78669 is described below commit 9c7866956cc6b5e8ef3c5f907aa97b557369390b Author: sandeep-katta <sandeep.katta2...@gmail.com> AuthorDate: Mon Feb 4 20:13:22 2019 -0800 [SPARK-26758][CORE] Idle Executors are not getting killed after spark.dynamiAllocation.executorIdleTimeout value ## What changes were proposed in this pull request? **updateAndSyncNumExecutorsTarget** API should be called after **initializing** flag is unset ## How was this patch tested? Added UT and also manually tested After Fix ![afterfix](https://user-images.githubusercontent.com/35216143/51983136-ed4a5000-24bd-11e9-90c8-c4a562c17a4b.png) Closes #23697 from sandeep-katta/executorIssue. Authored-by: sandeep-katta <sandeep.katta2...@gmail.com> Signed-off-by: Sean Owen <sean.o...@databricks.com> (cherry picked from commit 1dd7419702c5bc7e36fee9fa1eec06b66f25806e) Signed-off-by: Sean Owen <sean.o...@databricks.com> --- .../apache/spark/ExecutorAllocationManager.scala | 4 ++-- .../spark/ExecutorAllocationManagerSuite.scala | 26 +++++++++++++++++----- 2 files changed, 22 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 189d913..59d8826 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -292,8 +292,6 @@ private[spark] class ExecutorAllocationManager( private def schedule(): Unit = synchronized { val now = clock.getTimeMillis - updateAndSyncNumExecutorsTarget(now) - val executorIdsToBeRemoved = ArrayBuffer[String]() removeTimes.retain { case (executorId, expireTime) => val expired = now >= expireTime @@ -303,6 +301,8 @@ private[spark] class ExecutorAllocationManager( } !expired } + // Update executor target number only after initializing flag is unset + updateAndSyncNumExecutorsTarget(now) if (executorIdsToBeRemoved.nonEmpty) { removeExecutors(executorIdsToBeRemoved) } diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 9807d12..784beac 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -901,12 +901,7 @@ class ExecutorAllocationManagerSuite assert(maxNumExecutorsNeeded(manager) === 0) schedule(manager) - // Verify executor is timeout but numExecutorsTarget is not recalculated - assert(numExecutorsTarget(manager) === 3) - - // Schedule again to recalculate the numExecutorsTarget after executor is timeout - schedule(manager) - // Verify that current number of executors should be ramp down when executor is timeout + // Verify executor is timeout,numExecutorsTarget is recalculated assert(numExecutorsTarget(manager) === 2) } @@ -1113,6 +1108,25 @@ class ExecutorAllocationManagerSuite verify(mockAllocationClient).killExecutors(Seq("executor-1"), false, false, false) } + test("SPARK-26758 check executor target number after idle time out ") { + sc = createSparkContext(1, 5, 3) + val manager = sc.executorAllocationManager.get + val clock = new ManualClock(10000L) + manager.setClock(clock) + assert(numExecutorsTarget(manager) === 3) + manager.listener.onExecutorAdded(SparkListenerExecutorAdded( + clock.getTimeMillis(), "executor-1", new ExecutorInfo("host1", 1, Map.empty))) + manager.listener.onExecutorAdded(SparkListenerExecutorAdded( + clock.getTimeMillis(), "executor-2", new ExecutorInfo("host1", 2, Map.empty))) + manager.listener.onExecutorAdded(SparkListenerExecutorAdded( + clock.getTimeMillis(), "executor-3", new ExecutorInfo("host1", 3, Map.empty))) + // make all the executors as idle, so that it will be killed + clock.advance(executorIdleTimeout * 1000) + schedule(manager) + // once the schedule is run target executor number should be 1 + assert(numExecutorsTarget(manager) === 1) + } + private def createSparkContext( minExecutors: Int = 1, maxExecutors: Int = 5, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org