Repository: spark Updated Branches: refs/heads/branch-2.0 6fe5972e6 -> fab77dadf
[SPARK-17511] Yarn Dynamic Allocation: Avoid marking released container as Failed Due to race conditions, the ` assert(numExecutorsRunning <= targetNumExecutors)` can fail causing `AssertionError`. So removed the assertion, instead moved the conditional check before launching new container: ``` java.lang.AssertionError: assertion failed at scala.Predef$.assert(Predef.scala:156) at org.apache.spark.deploy.yarn.YarnAllocator$$anonfun$runAllocatedContainers$1.org$apache$spark$deploy$yarn$YarnAllocator$$anonfun$$updateInternalState$1(YarnAllocator.scala:489) at org.apache.spark.deploy.yarn.YarnAllocator$$anonfun$runAllocatedContainers$1$$anon$1.run(YarnAllocator.scala:519) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) ``` This was manually tested using a large ForkAndJoin job with Dynamic Allocation enabled to validate the failing job succeeds, without any such exception. Author: Kishor Patil <kpa...@yahoo-inc.com> Closes #15069 from kishorvpatil/SPARK-17511. (cherry picked from commit ff6e4cbdc80e2ad84c5d70ee07f323fad9374e3e) Signed-off-by: Tom Graves <tgra...@yahoo-inc.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fab77dad Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fab77dad Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fab77dad Branch: refs/heads/branch-2.0 Commit: fab77dadf70d011cec8976acfe8c851816f82426 Parents: 6fe5972 Author: Kishor Patil <kpa...@yahoo-inc.com> Authored: Wed Sep 14 14:19:35 2016 -0500 Committer: Tom Graves <tgra...@yahoo-inc.com> Committed: Wed Sep 14 14:33:40 2016 -0500 ---------------------------------------------------------------------- .../spark/deploy/yarn/YarnAllocator.scala | 68 +++++++++++--------- .../spark/deploy/yarn/YarnAllocatorSuite.scala | 19 ++++++ 2 files changed, 55 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/fab77dad/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 1b80071..b321901 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -483,7 +483,6 @@ private[yarn] class YarnAllocator( def updateInternalState(): Unit = synchronized { numExecutorsRunning += 1 - assert(numExecutorsRunning <= targetNumExecutors) executorIdToContainer(executorId) = container containerIdToExecutorId(container.getId) = executorId @@ -493,39 +492,44 @@ private[yarn] class YarnAllocator( allocatedContainerToHostMap.put(containerId, executorHostname) } - if (launchContainers) { - logInfo("Launching ExecutorRunnable. driverUrl: %s, executorHostname: %s".format( - driverUrl, executorHostname)) - - launcherPool.execute(new Runnable { - override def run(): Unit = { - try { - new ExecutorRunnable( - container, - conf, - sparkConf, - driverUrl, - executorId, - executorHostname, - executorMemory, - executorCores, - appAttemptId.getApplicationId.toString, - securityMgr, - localResources - ).run() - updateInternalState() - } catch { - case NonFatal(e) => - logError(s"Failed to launch executor $executorId on container $containerId", e) - // Assigned container should be released immediately to avoid unnecessary resource - // occupation. - amClient.releaseAssignedContainer(containerId) + if (numExecutorsRunning < targetNumExecutors) { + if (launchContainers) { + logInfo("Launching ExecutorRunnable. driverUrl: %s, executorHostname: %s".format( + driverUrl, executorHostname)) + + launcherPool.execute(new Runnable { + override def run(): Unit = { + try { + new ExecutorRunnable( + container, + conf, + sparkConf, + driverUrl, + executorId, + executorHostname, + executorMemory, + executorCores, + appAttemptId.getApplicationId.toString, + securityMgr, + localResources + ).run() + updateInternalState() + } catch { + case NonFatal(e) => + logError(s"Failed to launch executor $executorId on container $containerId", e) + // Assigned container should be released immediately to avoid unnecessary resource + // occupation. + amClient.releaseAssignedContainer(containerId) + } } - } - }) + }) + } else { + // For test only + updateInternalState() + } } else { - // For test only - updateInternalState() + logInfo(("Skip launching executorRunnable as runnning Excecutors count: %d " + + "reached target Executors count: %d.").format(numExecutorsRunning, targetNumExecutors)) } } } http://git-wip-us.apache.org/repos/asf/spark/blob/fab77dad/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala ---------------------------------------------------------------------- diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index 207dbf5..f8351c0 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -136,6 +136,25 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter size should be (0) } + test("container should not be created if requested number if met") { + // request a single container and receive it + val handler = createAllocator(1) + handler.updateResourceRequests() + handler.getNumExecutorsRunning should be (0) + handler.getPendingAllocate.size should be (1) + + val container = createContainer("host1") + handler.handleAllocatedContainers(Array(container)) + + handler.getNumExecutorsRunning should be (1) + handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1") + handler.allocatedHostToContainersMap.get("host1").get should contain (container.getId) + + val container2 = createContainer("host2") + handler.handleAllocatedContainers(Array(container2)) + handler.getNumExecutorsRunning should be (1) + } + test("some containers allocated") { // request a few containers and receive some of them val handler = createAllocator(4) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org