Repository: spark
Updated Branches:
  refs/heads/branch-2.3 a81e20314 -> 0b7b8cced


[SPARK-23637][YARN] Yarn might allocate more resource if a same executor is 
killed multiple times.

## What changes were proposed in this pull request?
`YarnAllocator` uses `numExecutorsRunning` to track the number of running 
executor. `numExecutorsRunning` is used to check if there're executors missing 
and need to allocate more.

 In current code, `numExecutorsRunning` can be negative when driver asks to 
kill a same idle executor multiple times.

## How was this patch tested?
UT added

Author: jinxing <jinxing6...@126.com>

Closes #20781 from jinxing64/SPARK-23637.

(cherry picked from commit d3bd0435ee4ff3d414f32cce3f58b6b9f67e68bc)
Signed-off-by: Marcelo Vanzin <van...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0b7b8cce
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0b7b8cce
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0b7b8cce

Branch: refs/heads/branch-2.3
Commit: 0b7b8cceda4ce7791d78259451b8c51b49fb2786
Parents: a81e203
Author: jinxing <jinxing6...@126.com>
Authored: Wed Apr 4 15:51:27 2018 -0700
Committer: Marcelo Vanzin <van...@cloudera.com>
Committed: Wed Apr 4 15:51:38 2018 -0700

----------------------------------------------------------------------
 .../spark/deploy/yarn/YarnAllocator.scala       | 36 ++++++++-------
 .../spark/deploy/yarn/YarnAllocatorSuite.scala  | 48 +++++++++++++++++++-
 2 files changed, 66 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0b7b8cce/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 506adb3..b2d960b 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -81,7 +81,8 @@ private[yarn] class YarnAllocator(
   private val releasedContainers = Collections.newSetFromMap[ContainerId](
     new ConcurrentHashMap[ContainerId, java.lang.Boolean])
 
-  private val numExecutorsRunning = new AtomicInteger(0)
+  private val runningExecutors = Collections.newSetFromMap[String](
+    new ConcurrentHashMap[String, java.lang.Boolean]())
 
   private val numExecutorsStarting = new AtomicInteger(0)
 
@@ -166,7 +167,7 @@ private[yarn] class YarnAllocator(
     clock = newClock
   }
 
-  def getNumExecutorsRunning: Int = numExecutorsRunning.get()
+  def getNumExecutorsRunning: Int = runningExecutors.size()
 
   def getNumExecutorsFailed: Int = synchronized {
     val endTime = clock.getTimeMillis()
@@ -242,12 +243,11 @@ private[yarn] class YarnAllocator(
    * Request that the ResourceManager release the container running the 
specified executor.
    */
   def killExecutor(executorId: String): Unit = synchronized {
-    if (executorIdToContainer.contains(executorId)) {
-      val container = executorIdToContainer.get(executorId).get
-      internalReleaseContainer(container)
-      numExecutorsRunning.decrementAndGet()
-    } else {
-      logWarning(s"Attempted to kill unknown executor $executorId!")
+    executorIdToContainer.get(executorId) match {
+      case Some(container) if !releasedContainers.contains(container.getId) =>
+        internalReleaseContainer(container)
+        runningExecutors.remove(executorId)
+      case _ => logWarning(s"Attempted to kill unknown executor $executorId!")
     }
   }
 
@@ -274,7 +274,7 @@ private[yarn] class YarnAllocator(
         "Launching executor count: %d. Cluster resources: %s.")
         .format(
           allocatedContainers.size,
-          numExecutorsRunning.get,
+          runningExecutors.size,
           numExecutorsStarting.get,
           allocateResponse.getAvailableResources))
 
@@ -286,7 +286,7 @@ private[yarn] class YarnAllocator(
       logDebug("Completed %d containers".format(completedContainers.size))
       processCompletedContainers(completedContainers.asScala)
       logDebug("Finished processing %d completed containers. Current running 
executor count: %d."
-        .format(completedContainers.size, numExecutorsRunning.get))
+        .format(completedContainers.size, runningExecutors.size))
     }
   }
 
@@ -300,9 +300,9 @@ private[yarn] class YarnAllocator(
     val pendingAllocate = getPendingAllocate
     val numPendingAllocate = pendingAllocate.size
     val missing = targetNumExecutors - numPendingAllocate -
-      numExecutorsStarting.get - numExecutorsRunning.get
+      numExecutorsStarting.get - runningExecutors.size
     logDebug(s"Updating resource requests, target: $targetNumExecutors, " +
-      s"pending: $numPendingAllocate, running: ${numExecutorsRunning.get}, " +
+      s"pending: $numPendingAllocate, running: ${runningExecutors.size}, " +
       s"executorsStarting: ${numExecutorsStarting.get}")
 
     if (missing > 0) {
@@ -502,7 +502,7 @@ private[yarn] class YarnAllocator(
         s"for executor with ID $executorId")
 
       def updateInternalState(): Unit = synchronized {
-        numExecutorsRunning.incrementAndGet()
+        runningExecutors.add(executorId)
         numExecutorsStarting.decrementAndGet()
         executorIdToContainer(executorId) = container
         containerIdToExecutorId(container.getId) = executorId
@@ -513,7 +513,7 @@ private[yarn] class YarnAllocator(
         allocatedContainerToHostMap.put(containerId, executorHostname)
       }
 
-      if (numExecutorsRunning.get < targetNumExecutors) {
+      if (runningExecutors.size() < targetNumExecutors) {
         numExecutorsStarting.incrementAndGet()
         if (launchContainers) {
           launcherPool.execute(new Runnable {
@@ -554,7 +554,7 @@ private[yarn] class YarnAllocator(
       } else {
         logInfo(("Skip launching executorRunnable as running executors count: 
%d " +
           "reached target executors count: %d.").format(
-          numExecutorsRunning.get, targetNumExecutors))
+          runningExecutors.size, targetNumExecutors))
       }
     }
   }
@@ -569,7 +569,11 @@ private[yarn] class YarnAllocator(
       val exitReason = if (!alreadyReleased) {
         // Decrement the number of executors running. The next iteration of
         // the ApplicationMaster's reporting thread will take care of 
allocating.
-        numExecutorsRunning.decrementAndGet()
+        containerIdToExecutorId.get(containerId) match {
+          case Some(executorId) => runningExecutors.remove(executorId)
+          case None => logWarning(s"Cannot find executorId for container: 
${containerId.toString}")
+        }
+
         logInfo("Completed container %s%s (state: %s, exit status: %s)".format(
           containerId,
           onHostStr,

http://git-wip-us.apache.org/repos/asf/spark/blob/0b7b8cce/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index cb1e3c5..525abb6 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -251,11 +251,55 @@ class YarnAllocatorSuite extends SparkFunSuite with 
Matchers with BeforeAndAfter
       ContainerStatus.newInstance(c.getId(), ContainerState.COMPLETE, 
"Finished", 0)
     }
     handler.updateResourceRequests()
-    handler.processCompletedContainers(statuses.toSeq)
+    handler.processCompletedContainers(statuses)
     handler.getNumExecutorsRunning should be (0)
     handler.getPendingAllocate.size should be (1)
   }
 
+  test("kill same executor multiple times") {
+    val handler = createAllocator(2)
+    handler.updateResourceRequests()
+    handler.getNumExecutorsRunning should be (0)
+    handler.getPendingAllocate.size should be (2)
+
+    val container1 = createContainer("host1")
+    val container2 = createContainer("host2")
+    handler.handleAllocatedContainers(Array(container1, container2))
+    handler.getNumExecutorsRunning should be (2)
+    handler.getPendingAllocate.size should be (0)
+
+    val executorToKill = handler.executorIdToContainer.keys.head
+    handler.killExecutor(executorToKill)
+    handler.getNumExecutorsRunning should be (1)
+    handler.killExecutor(executorToKill)
+    handler.killExecutor(executorToKill)
+    handler.killExecutor(executorToKill)
+    handler.getNumExecutorsRunning should be (1)
+    handler.requestTotalExecutorsWithPreferredLocalities(2, 0, Map.empty, 
Set.empty)
+    handler.updateResourceRequests()
+    handler.getPendingAllocate.size should be (1)
+  }
+
+  test("process same completed container multiple times") {
+    val handler = createAllocator(2)
+    handler.updateResourceRequests()
+    handler.getNumExecutorsRunning should be (0)
+    handler.getPendingAllocate.size should be (2)
+
+    val container1 = createContainer("host1")
+    val container2 = createContainer("host2")
+    handler.handleAllocatedContainers(Array(container1, container2))
+    handler.getNumExecutorsRunning should be (2)
+    handler.getPendingAllocate.size should be (0)
+
+    val statuses = Seq(container1, container1, container2).map { c =>
+      ContainerStatus.newInstance(c.getId(), ContainerState.COMPLETE, 
"Finished", 0)
+    }
+    handler.processCompletedContainers(statuses)
+    handler.getNumExecutorsRunning should be (0)
+
+  }
+
   test("lost executor removed from backend") {
     val handler = createAllocator(4)
     handler.updateResourceRequests()
@@ -272,7 +316,7 @@ class YarnAllocatorSuite extends SparkFunSuite with 
Matchers with BeforeAndAfter
       ContainerStatus.newInstance(c.getId(), ContainerState.COMPLETE, 
"Failed", -1)
     }
     handler.updateResourceRequests()
-    handler.processCompletedContainers(statuses.toSeq)
+    handler.processCompletedContainers(statuses)
     handler.updateResourceRequests()
     handler.getNumExecutorsRunning should be (0)
     handler.getPendingAllocate.size should be (2)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to