Repository: spark
Updated Branches:
  refs/heads/master 040e46979 -> ff6e4cbdc


[SPARK-17511] Yarn Dynamic Allocation: Avoid marking released container as 
Failed

## What changes were proposed in this pull request?

Due to race conditions, the ` assert(numExecutorsRunning <= 
targetNumExecutors)` can fail causing `AssertionError`. So removed the 
assertion, instead moved the conditional check before launching new container:
```
java.lang.AssertionError: assertion failed
        at scala.Predef$.assert(Predef.scala:156)
        at 
org.apache.spark.deploy.yarn.YarnAllocator$$anonfun$runAllocatedContainers$1.org$apache$spark$deploy$yarn$YarnAllocator$$anonfun$$updateInternalState$1(YarnAllocator.scala:489)
        at 
org.apache.spark.deploy.yarn.YarnAllocator$$anonfun$runAllocatedContainers$1$$anon$1.run(YarnAllocator.scala:519)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
```
## How was this patch tested?
This was manually tested using a large ForkAndJoin job with Dynamic Allocation 
enabled to validate the failing job succeeds, without any such exception.

Author: Kishor Patil <kpa...@yahoo-inc.com>

Closes #15069 from kishorvpatil/SPARK-17511.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ff6e4cbd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ff6e4cbd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ff6e4cbd

Branch: refs/heads/master
Commit: ff6e4cbdc80e2ad84c5d70ee07f323fad9374e3e
Parents: 040e469
Author: Kishor Patil <kpa...@yahoo-inc.com>
Authored: Wed Sep 14 14:19:35 2016 -0500
Committer: Tom Graves <tgra...@yahoo-inc.com>
Committed: Wed Sep 14 14:19:35 2016 -0500

----------------------------------------------------------------------
 .../spark/deploy/yarn/YarnAllocator.scala       | 62 +++++++++++---------
 .../spark/deploy/yarn/YarnAllocatorSuite.scala  | 19 ++++++
 2 files changed, 52 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ff6e4cbd/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 2f4b498..0b66d1c 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -496,7 +496,6 @@ private[yarn] class YarnAllocator(
 
       def updateInternalState(): Unit = synchronized {
         numExecutorsRunning += 1
-        assert(numExecutorsRunning <= targetNumExecutors)
         executorIdToContainer(executorId) = container
         containerIdToExecutorId(container.getId) = executorId
 
@@ -506,36 +505,41 @@ private[yarn] class YarnAllocator(
         allocatedContainerToHostMap.put(containerId, executorHostname)
       }
 
-      if (launchContainers) {
-        launcherPool.execute(new Runnable {
-          override def run(): Unit = {
-            try {
-              new ExecutorRunnable(
-                Some(container),
-                conf,
-                sparkConf,
-                driverUrl,
-                executorId,
-                executorHostname,
-                executorMemory,
-                executorCores,
-                appAttemptId.getApplicationId.toString,
-                securityMgr,
-                localResources
-              ).run()
-              updateInternalState()
-            } catch {
-              case NonFatal(e) =>
-                logError(s"Failed to launch executor $executorId on container 
$containerId", e)
-                // Assigned container should be released immediately to avoid 
unnecessary resource
-                // occupation.
-                amClient.releaseAssignedContainer(containerId)
+      if (numExecutorsRunning < targetNumExecutors) {
+        if (launchContainers) {
+          launcherPool.execute(new Runnable {
+            override def run(): Unit = {
+              try {
+                new ExecutorRunnable(
+                  Some(container),
+                  conf,
+                  sparkConf,
+                  driverUrl,
+                  executorId,
+                  executorHostname,
+                  executorMemory,
+                  executorCores,
+                  appAttemptId.getApplicationId.toString,
+                  securityMgr,
+                  localResources
+                ).run()
+                updateInternalState()
+              } catch {
+                case NonFatal(e) =>
+                  logError(s"Failed to launch executor $executorId on 
container $containerId", e)
+                  // Assigned container should be released immediately to 
avoid unnecessary resource
+                  // occupation.
+                  amClient.releaseAssignedContainer(containerId)
+              }
             }
-          }
-        })
+          })
+        } else {
+          // For test only
+          updateInternalState()
+        }
       } else {
-        // For test only
-        updateInternalState()
+        logInfo(("Skip launching executorRunnable as runnning Excecutors 
count: %d " +
+          "reached target Executors count: %d.").format(numExecutorsRunning, 
targetNumExecutors))
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/ff6e4cbd/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala 
b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index 696e552..994dc75 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -136,6 +136,25 @@ class YarnAllocatorSuite extends SparkFunSuite with 
Matchers with BeforeAndAfter
     size should be (0)
   }
 
+  test("container should not be created if requested number if met") {
+    // request a single container and receive it
+    val handler = createAllocator(1)
+    handler.updateResourceRequests()
+    handler.getNumExecutorsRunning should be (0)
+    handler.getPendingAllocate.size should be (1)
+
+    val container = createContainer("host1")
+    handler.handleAllocatedContainers(Array(container))
+
+    handler.getNumExecutorsRunning should be (1)
+    handler.allocatedContainerToHostMap.get(container.getId).get should be 
("host1")
+    handler.allocatedHostToContainersMap.get("host1").get should contain 
(container.getId)
+
+    val container2 = createContainer("host2")
+    handler.handleAllocatedContainers(Array(container2))
+    handler.getNumExecutorsRunning should be (1)
+  }
+
   test("some containers allocated") {
     // request a few containers and receive some of them
     val handler = createAllocator(4)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to