Repository: spark
Updated Branches:
refs/heads/branch-2.0 6fe5972e6 -> fab77dadf
[SPARK-17511] Yarn Dynamic Allocation: Avoid marking released container as
Failed
Due to race conditions, the ` assert(numExecutorsRunning <=
targetNumExecutors)` can fail causing `AssertionError`. So removed the
assertion, instead moved the conditional check before launching new container:
```
java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:156)
at
org.apache.spark.deploy.yarn.YarnAllocator$$anonfun$runAllocatedContainers$1.org$apache$spark$deploy$yarn$YarnAllocator$$anonfun$$updateInternalState$1(YarnAllocator.scala:489)
at
org.apache.spark.deploy.yarn.YarnAllocator$$anonfun$runAllocatedContainers$1$$anon$1.run(YarnAllocator.scala:519)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
```
This was manually tested using a large ForkAndJoin job with Dynamic Allocation
enabled to validate the failing job succeeds, without any such exception.
Author: Kishor Patil <[email protected]>
Closes #15069 from kishorvpatil/SPARK-17511.
(cherry picked from commit ff6e4cbdc80e2ad84c5d70ee07f323fad9374e3e)
Signed-off-by: Tom Graves <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fab77dad
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fab77dad
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fab77dad
Branch: refs/heads/branch-2.0
Commit: fab77dadf70d011cec8976acfe8c851816f82426
Parents: 6fe5972
Author: Kishor Patil <[email protected]>
Authored: Wed Sep 14 14:19:35 2016 -0500
Committer: Tom Graves <[email protected]>
Committed: Wed Sep 14 14:33:40 2016 -0500
----------------------------------------------------------------------
.../spark/deploy/yarn/YarnAllocator.scala | 68 +++++++++++---------
.../spark/deploy/yarn/YarnAllocatorSuite.scala | 19 ++++++
2 files changed, 55 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/fab77dad/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
----------------------------------------------------------------------
diff --git
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 1b80071..b321901 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -483,7 +483,6 @@ private[yarn] class YarnAllocator(
def updateInternalState(): Unit = synchronized {
numExecutorsRunning += 1
- assert(numExecutorsRunning <= targetNumExecutors)
executorIdToContainer(executorId) = container
containerIdToExecutorId(container.getId) = executorId
@@ -493,39 +492,44 @@ private[yarn] class YarnAllocator(
allocatedContainerToHostMap.put(containerId, executorHostname)
}
- if (launchContainers) {
- logInfo("Launching ExecutorRunnable. driverUrl: %s, executorHostname:
%s".format(
- driverUrl, executorHostname))
-
- launcherPool.execute(new Runnable {
- override def run(): Unit = {
- try {
- new ExecutorRunnable(
- container,
- conf,
- sparkConf,
- driverUrl,
- executorId,
- executorHostname,
- executorMemory,
- executorCores,
- appAttemptId.getApplicationId.toString,
- securityMgr,
- localResources
- ).run()
- updateInternalState()
- } catch {
- case NonFatal(e) =>
- logError(s"Failed to launch executor $executorId on container
$containerId", e)
- // Assigned container should be released immediately to avoid
unnecessary resource
- // occupation.
- amClient.releaseAssignedContainer(containerId)
+ if (numExecutorsRunning < targetNumExecutors) {
+ if (launchContainers) {
+ logInfo("Launching ExecutorRunnable. driverUrl: %s,
executorHostname: %s".format(
+ driverUrl, executorHostname))
+
+ launcherPool.execute(new Runnable {
+ override def run(): Unit = {
+ try {
+ new ExecutorRunnable(
+ container,
+ conf,
+ sparkConf,
+ driverUrl,
+ executorId,
+ executorHostname,
+ executorMemory,
+ executorCores,
+ appAttemptId.getApplicationId.toString,
+ securityMgr,
+ localResources
+ ).run()
+ updateInternalState()
+ } catch {
+ case NonFatal(e) =>
+ logError(s"Failed to launch executor $executorId on
container $containerId", e)
+ // Assigned container should be released immediately to
avoid unnecessary resource
+ // occupation.
+ amClient.releaseAssignedContainer(containerId)
+ }
}
- }
- })
+ })
+ } else {
+ // For test only
+ updateInternalState()
+ }
} else {
- // For test only
- updateInternalState()
+ logInfo(("Skip launching executorRunnable as runnning Excecutors
count: %d " +
+ "reached target Executors count: %d.").format(numExecutorsRunning,
targetNumExecutors))
}
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/fab77dad/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
----------------------------------------------------------------------
diff --git
a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index 207dbf5..f8351c0 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -136,6 +136,25 @@ class YarnAllocatorSuite extends SparkFunSuite with
Matchers with BeforeAndAfter
size should be (0)
}
+ test("container should not be created if requested number if met") {
+ // request a single container and receive it
+ val handler = createAllocator(1)
+ handler.updateResourceRequests()
+ handler.getNumExecutorsRunning should be (0)
+ handler.getPendingAllocate.size should be (1)
+
+ val container = createContainer("host1")
+ handler.handleAllocatedContainers(Array(container))
+
+ handler.getNumExecutorsRunning should be (1)
+ handler.allocatedContainerToHostMap.get(container.getId).get should be
("host1")
+ handler.allocatedHostToContainersMap.get("host1").get should contain
(container.getId)
+
+ val container2 = createContainer("host2")
+ handler.handleAllocatedContainers(Array(container2))
+ handler.getNumExecutorsRunning should be (1)
+ }
+
test("some containers allocated") {
// request a few containers and receive some of them
val handler = createAllocator(4)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]