This is an automated email from the ASF dual-hosted git repository.
vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new ec5e342 [SPARK-27094][YARN] Work around RackResolver swallowing
thread interrupt.
ec5e342 is described below
commit ec5e34205a8b0e2f6bc4287b86e7eac269452ffb
Author: Marcelo Vanzin <[email protected]>
AuthorDate: Wed Mar 20 11:48:06 2019 -0700
[SPARK-27094][YARN] Work around RackResolver swallowing thread interrupt.
To avoid the case where the YARN libraries would swallow the exception and
prevent YarnAllocator from shutting down, call the offending code in a
separate thread, so that the parent thread can respond appropriately to
the shut down.
As a safeguard, also explicitly stop the executor launch thread pool when
shutting down the application, to prevent new executors from coming up
after the application started its shutdown.
Tested with unit tests + some internal tests on real cluster.
Closes #24017 from vanzin/SPARK-27094.
Authored-by: Marcelo Vanzin <[email protected]>
Signed-off-by: Marcelo Vanzin <[email protected]>
---
.../spark/deploy/yarn/ApplicationMaster.scala | 154 +++++++++++----------
.../apache/spark/deploy/yarn/YarnAllocator.scala | 45 +++++-
2 files changed, 120 insertions(+), 79 deletions(-)
diff --git
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 9ed3b78..743c2e0 100644
---
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -550,88 +550,94 @@ private[spark] class ApplicationMaster(
reporterThread.join()
}
- private def launchReporterThread(): Thread = {
- // The number of failures in a row until Reporter thread give up
+ private def allocationThreadImpl(): Unit = {
+ // The number of failures in a row until the allocation thread gives up.
val reporterMaxFailures = sparkConf.get(MAX_REPORTER_THREAD_FAILURES)
-
- val t = new Thread {
- override def run() {
- var failureCount = 0
- while (!finished) {
- try {
- if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
- finish(FinalApplicationStatus.FAILED,
- ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES,
- s"Max number of executor failures ($maxNumExecutorFailures)
reached")
- } else if (allocator.isAllNodeBlacklisted) {
- finish(FinalApplicationStatus.FAILED,
- ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES,
- "Due to executor failures all available nodes are blacklisted")
- } else {
- logDebug("Sending progress")
- allocator.allocateResources()
- }
- failureCount = 0
- } catch {
- case i: InterruptedException => // do nothing
- case e: ApplicationAttemptNotFoundException =>
- failureCount += 1
- logError("Exception from Reporter thread.", e)
- finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_REPORTER_FAILURE,
- e.getMessage)
- case e: Throwable =>
- failureCount += 1
- if (!NonFatal(e) || failureCount >= reporterMaxFailures) {
- finish(FinalApplicationStatus.FAILED,
- ApplicationMaster.EXIT_REPORTER_FAILURE, "Exception was
thrown " +
- s"$failureCount time(s) from Reporter thread.")
- } else {
- logWarning(s"Reporter thread fails $failureCount time(s) in a
row.", e)
- }
+ var failureCount = 0
+ while (!finished) {
+ try {
+ if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
+ finish(FinalApplicationStatus.FAILED,
+ ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES,
+ s"Max number of executor failures ($maxNumExecutorFailures)
reached")
+ } else if (allocator.isAllNodeBlacklisted) {
+ finish(FinalApplicationStatus.FAILED,
+ ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES,
+ "Due to executor failures all available nodes are blacklisted")
+ } else {
+ logDebug("Sending progress")
+ allocator.allocateResources()
+ }
+ failureCount = 0
+ } catch {
+ case i: InterruptedException => // do nothing
+ case e: ApplicationAttemptNotFoundException =>
+ failureCount += 1
+ logError("Exception from Reporter thread.", e)
+ finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_REPORTER_FAILURE,
+ e.getMessage)
+ case e: Throwable =>
+ failureCount += 1
+ if (!NonFatal(e) || failureCount >= reporterMaxFailures) {
+ finish(FinalApplicationStatus.FAILED,
+ ApplicationMaster.EXIT_REPORTER_FAILURE, "Exception was thrown "
+
+ s"$failureCount time(s) from Reporter thread.")
+ } else {
+ logWarning(s"Reporter thread fails $failureCount time(s) in a
row.", e)
}
- try {
- val numPendingAllocate = allocator.getPendingAllocate.size
- var sleepStartNs = 0L
- var sleepInterval = 200L // ms
- allocatorLock.synchronized {
- sleepInterval =
- if (numPendingAllocate > 0 ||
allocator.getNumPendingLossReasonRequests > 0) {
- val currentAllocationInterval =
- math.min(heartbeatInterval, nextAllocationInterval)
- nextAllocationInterval = currentAllocationInterval * 2 //
avoid overflow
- currentAllocationInterval
- } else {
- nextAllocationInterval = initialAllocationInterval
- heartbeatInterval
- }
- sleepStartNs = System.nanoTime()
- allocatorLock.wait(sleepInterval)
- }
- val sleepDuration = System.nanoTime() - sleepStartNs
- if (sleepDuration < TimeUnit.MILLISECONDS.toNanos(sleepInterval)) {
- // log when sleep is interrupted
- logDebug(s"Number of pending allocations is $numPendingAllocate.
" +
- s"Slept for $sleepDuration/$sleepInterval ms.")
- // if sleep was less than the minimum interval, sleep for the
rest of it
- val toSleep = math.max(0, initialAllocationInterval -
sleepDuration)
- if (toSleep > 0) {
- logDebug(s"Going back to sleep for $toSleep ms")
- // use Thread.sleep instead of allocatorLock.wait. there is no
need to be woken up
- // by the methods that signal allocatorLock because this is
just finishing the min
- // sleep interval, which should happen even if this is
signalled again.
- Thread.sleep(toSleep)
- }
+ }
+ try {
+ val numPendingAllocate = allocator.getPendingAllocate.size
+ var sleepStartNs = 0L
+ var sleepInterval = 200L // ms
+ allocatorLock.synchronized {
+ sleepInterval =
+ if (numPendingAllocate > 0 ||
allocator.getNumPendingLossReasonRequests > 0) {
+ val currentAllocationInterval =
+ math.min(heartbeatInterval, nextAllocationInterval)
+ nextAllocationInterval = currentAllocationInterval * 2 // avoid
overflow
+ currentAllocationInterval
} else {
- logDebug(s"Number of pending allocations is $numPendingAllocate.
" +
- s"Slept for $sleepDuration/$sleepInterval.")
+ nextAllocationInterval = initialAllocationInterval
+ heartbeatInterval
}
- } catch {
- case e: InterruptedException =>
+ sleepStartNs = System.nanoTime()
+ allocatorLock.wait(sleepInterval)
+ }
+ val sleepDuration = System.nanoTime() - sleepStartNs
+ if (sleepDuration < TimeUnit.MILLISECONDS.toNanos(sleepInterval)) {
+ // log when sleep is interrupted
+ logDebug(s"Number of pending allocations is $numPendingAllocate. " +
+ s"Slept for $sleepDuration/$sleepInterval ms.")
+ // if sleep was less than the minimum interval, sleep for the rest
of it
+ val toSleep = math.max(0, initialAllocationInterval - sleepDuration)
+ if (toSleep > 0) {
+ logDebug(s"Going back to sleep for $toSleep ms")
+ // use Thread.sleep instead of allocatorLock.wait. there is no
need to be woken up
+ // by the methods that signal allocatorLock because this is just
finishing the min
+ // sleep interval, which should happen even if this is signalled
again.
+ Thread.sleep(toSleep)
}
+ } else {
+ logDebug(s"Number of pending allocations is $numPendingAllocate. " +
+ s"Slept for $sleepDuration/$sleepInterval.")
+ }
+ } catch {
+ case e: InterruptedException =>
+ }
+ }
+ }
+
+ private def launchReporterThread(): Thread = {
+ val t = new Thread {
+ override def run(): Unit = {
+ try {
+ allocationThreadImpl()
+ } finally {
+ allocator.stop()
}
}
}
- // setting to daemon status, though this is usually not a good idea.
t.setDaemon(true)
t.setName("Reporter")
t.start()
diff --git
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 8c6eff99..f9bdddc 100644
---
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -381,6 +381,13 @@ private[yarn] class YarnAllocator(
}
}
+ def stop(): Unit = {
+ // Forcefully shut down the launcher pool, in case this is being called in
the middle of
+ // container allocation. This will prevent queued executors from being
started - and
+ // potentially interrupt active ExecutorRunnable instaces too.
+ launcherPool.shutdownNow()
+ }
+
private def hostStr(request: ContainerRequest): String = {
Option(request.getNodes) match {
case Some(nodes) => nodes.asScala.mkString(",")
@@ -417,12 +424,40 @@ private[yarn] class YarnAllocator(
containersToUse, remainingAfterHostMatches)
}
- // Match remaining by rack
+ // Match remaining by rack. Because YARN's RackResolver swallows thread
interrupts
+ // (see SPARK-27094), which can cause this code to miss interrupts from
the AM, use
+ // a separate thread to perform the operation.
val remainingAfterRackMatches = new ArrayBuffer[Container]
- for (allocatedContainer <- remainingAfterHostMatches) {
- val rack = resolver.resolve(conf, allocatedContainer.getNodeId.getHost)
- matchContainerToRequest(allocatedContainer, rack, containersToUse,
- remainingAfterRackMatches)
+ if (remainingAfterHostMatches.nonEmpty) {
+ var exception: Option[Throwable] = None
+ val thread = new Thread("spark-rack-resolver") {
+ override def run(): Unit = {
+ try {
+ for (allocatedContainer <- remainingAfterHostMatches) {
+ val rack = resolver.resolve(conf,
allocatedContainer.getNodeId.getHost)
+ matchContainerToRequest(allocatedContainer, rack,
containersToUse,
+ remainingAfterRackMatches)
+ }
+ } catch {
+ case e: Throwable =>
+ exception = Some(e)
+ }
+ }
+ }
+ thread.setDaemon(true)
+ thread.start()
+
+ try {
+ thread.join()
+ } catch {
+ case e: InterruptedException =>
+ thread.interrupt()
+ throw e
+ }
+
+ if (exception.isDefined) {
+ throw exception.get
+ }
}
// Assign remaining that are neither node-local nor rack-local
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]