Repository: spark
Updated Branches:
  refs/heads/master 75cf369c7 -> 54032682b


[SPARK-24182][YARN] Improve error message when client AM fails.

Instead of always throwing a generic exception when the AM fails,
print a generic error and throw the exception with the YARN
diagnostics containing the reason for the failure.

There was an issue with YARN sometimes providing a generic diagnostic
message, even though the AM provides a failure reason when
unregistering. That was happening because the AM was registering
too late, and if errors happened before the registration, YARN would
just create a generic "ExitCodeException" which wasn't very helpful.

Since most errors in this path are a result of not being able to
connect to the driver, this change modifies the AM registration
a bit so that the AM is registered before the connection to the
driver is established. That way, errors are properly propagated
through YARN back to the driver.

As part of that, I also removed the code that retried connections
to the driver from the client AM. At that point, the driver should
already be up and waiting for connections, so it's unlikely that
retrying would help - and in case it does, that means a flaky
network, which would mean problems would probably show up again.
The effect of that is that connection-related errors are reported
back to the driver much faster now (through the YARN report).

One thing to note is that there seems to be a race on the YARN
side that causes a report to be sent to the client without the
corresponding diagnostics string from the AM; the diagnostics are
available later from the RM web page. For that reason, the generic
error messages are kept in the Spark scheduler code, to help
guide users to a way of debugging their failure.

Also of note is that if YARN's max attempts configuration is lower
than Spark's, Spark will not unregister the AM with a proper
diagnostics message. Unfortunately there seems to be no way to
unregister the AM and still allow further re-attempts to happen.

Testing:
- existing unit tests
- some of our integration tests
- hardcoded an invalid driver address in the code and verified
  the error in the shell. e.g.

```
scala> 18/05/04 15:09:34 ERROR cluster.YarnClientSchedulerBackend: YARN 
application has exited unexpectedly with state FAILED! Check the YARN 
application logs for more details.
18/05/04 15:09:34 ERROR cluster.YarnClientSchedulerBackend: Diagnostics 
message: Uncaught exception: org.apache.spark.SparkException: Exception thrown 
in awaitResult:
  <AM stack trace>
Caused by: java.io.IOException: Failed to connect to localhost/127.0.0.1:1234
  <More stack trace>
```

Author: Marcelo Vanzin <van...@cloudera.com>

Closes #21243 from vanzin/SPARK-24182.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/54032682
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/54032682
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/54032682

Branch: refs/heads/master
Commit: 54032682b910dc5089af27d2c7b6efe55700f034
Parents: 75cf369
Author: Marcelo Vanzin <van...@cloudera.com>
Authored: Fri May 11 17:40:35 2018 +0800
Committer: jerryshao <ss...@hortonworks.com>
Committed: Fri May 11 17:40:35 2018 +0800

----------------------------------------------------------------------
 docs/running-on-yarn.md                         |   5 +-
 .../spark/deploy/yarn/ApplicationMaster.scala   | 103 +++++++------------
 .../org/apache/spark/deploy/yarn/Client.scala   |  43 +++++---
 .../apache/spark/deploy/yarn/YarnRMClient.scala |  29 ++++--
 .../cluster/YarnClientSchedulerBackend.scala    |  35 +++++--
 5 files changed, 112 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/54032682/docs/running-on-yarn.md
----------------------------------------------------------------------
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index ceda8a3..c9e68c3 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -133,9 +133,8 @@ To use a custom metrics.properties for the application 
master and executors, upd
   <td><code>spark.yarn.am.waitTime</code></td>
   <td><code>100s</code></td>
   <td>
-    In <code>cluster</code> mode, time for the YARN Application Master to wait 
for the
-    SparkContext to be initialized. In <code>client</code> mode, time for the 
YARN Application Master to wait
-    for the driver to connect to it.
+    Only used in <code>cluster</code> mode. Time for the YARN Application 
Master to wait for the
+    SparkContext to be initialized.
   </td>
 </tr>
 <tr>

http://git-wip-us.apache.org/repos/asf/spark/blob/54032682/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 595077e..3d6ee50 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -346,7 +346,7 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
     synchronized {
       if (!finished) {
         val inShutdown = ShutdownHookManager.inShutdown()
-        if (registered) {
+        if (registered || !isClusterMode) {
           exitCode = code
           finalStatus = status
         } else {
@@ -389,37 +389,40 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
   }
 
   private def registerAM(
+      host: String,
+      port: Int,
       _sparkConf: SparkConf,
-      _rpcEnv: RpcEnv,
-      driverRef: RpcEndpointRef,
-      uiAddress: Option[String]) = {
+      uiAddress: Option[String]): Unit = {
     val appId = client.getAttemptId().getApplicationId().toString()
     val attemptId = client.getAttemptId().getAttemptId().toString()
     val historyAddress = ApplicationMaster
       .getHistoryServerAddress(_sparkConf, yarnConf, appId, attemptId)
 
-    val driverUrl = RpcEndpointAddress(
-      _sparkConf.get("spark.driver.host"),
-      _sparkConf.get("spark.driver.port").toInt,
+    client.register(host, port, yarnConf, _sparkConf, uiAddress, 
historyAddress)
+    registered = true
+  }
+
+  private def createAllocator(driverRef: RpcEndpointRef, _sparkConf: 
SparkConf): Unit = {
+    val appId = client.getAttemptId().getApplicationId().toString()
+    val driverUrl = RpcEndpointAddress(driverRef.address.host, 
driverRef.address.port,
       CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
 
     // Before we initialize the allocator, let's log the information about how 
executors will
     // be run up front, to avoid printing this out for every single executor 
being launched.
     // Use placeholders for information that changes such as executor IDs.
     logInfo {
-      val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt
-      val executorCores = sparkConf.get(EXECUTOR_CORES)
-      val dummyRunner = new ExecutorRunnable(None, yarnConf, sparkConf, 
driverUrl, "<executorId>",
+      val executorMemory = _sparkConf.get(EXECUTOR_MEMORY).toInt
+      val executorCores = _sparkConf.get(EXECUTOR_CORES)
+      val dummyRunner = new ExecutorRunnable(None, yarnConf, _sparkConf, 
driverUrl, "<executorId>",
         "<hostname>", executorMemory, executorCores, appId, securityMgr, 
localResources)
       dummyRunner.launchContextDebugInfo()
     }
 
-    allocator = client.register(driverUrl,
-      driverRef,
+    allocator = client.createAllocator(
       yarnConf,
       _sparkConf,
-      uiAddress,
-      historyAddress,
+      driverUrl,
+      driverRef,
       securityMgr,
       localResources)
 
@@ -434,15 +437,6 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
     reporterThread = launchReporterThread()
   }
 
-  /**
-   * @return An [[RpcEndpoint]] that communicates with the driver's scheduler 
backend.
-   */
-  private def createSchedulerRef(host: String, port: String): RpcEndpointRef = 
{
-    rpcEnv.setupEndpointRef(
-      RpcAddress(host, port.toInt),
-      YarnSchedulerBackend.ENDPOINT_NAME)
-  }
-
   private def runDriver(): Unit = {
     addAmIpFilter(None)
     userClassThread = startUserApplication()
@@ -456,11 +450,16 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
         Duration(totalWaitTime, TimeUnit.MILLISECONDS))
       if (sc != null) {
         rpcEnv = sc.env.rpcEnv
-        val driverRef = createSchedulerRef(
-          sc.getConf.get("spark.driver.host"),
-          sc.getConf.get("spark.driver.port"))
-        registerAM(sc.getConf, rpcEnv, driverRef, sc.ui.map(_.webUrl))
-        registered = true
+
+        val userConf = sc.getConf
+        val host = userConf.get("spark.driver.host")
+        val port = userConf.get("spark.driver.port").toInt
+        registerAM(host, port, userConf, sc.ui.map(_.webUrl))
+
+        val driverRef = rpcEnv.setupEndpointRef(
+          RpcAddress(host, port),
+          YarnSchedulerBackend.ENDPOINT_NAME)
+        createAllocator(driverRef, userConf)
       } else {
         // Sanity check; should never happen in normal operation, since sc 
should only be null
         // if the user app did not create a SparkContext.
@@ -486,10 +485,18 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
     val amCores = sparkConf.get(AM_CORES)
     rpcEnv = RpcEnv.create("sparkYarnAM", hostname, hostname, -1, sparkConf, 
securityMgr,
       amCores, true)
-    val driverRef = waitForSparkDriver()
+
+    // The client-mode AM doesn't listen for incoming connections, so report 
an invalid port.
+    registerAM(hostname, -1, sparkConf, 
sparkConf.getOption("spark.driver.appUIAddress"))
+
+    // The driver should be up and listening, so unlike cluster mode, just try 
to connect to it
+    // with no waiting or retrying.
+    val (driverHost, driverPort) = Utils.parseHostPort(args.userArgs(0))
+    val driverRef = rpcEnv.setupEndpointRef(
+      RpcAddress(driverHost, driverPort),
+      YarnSchedulerBackend.ENDPOINT_NAME)
     addAmIpFilter(Some(driverRef))
-    registerAM(sparkConf, rpcEnv, driverRef, 
sparkConf.getOption("spark.driver.appUIAddress"))
-    registered = true
+    createAllocator(driverRef, sparkConf)
 
     // In client mode the actor will stop the reporter thread.
     reporterThread.join()
@@ -600,40 +607,6 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
     }
   }
 
-  private def waitForSparkDriver(): RpcEndpointRef = {
-    logInfo("Waiting for Spark driver to be reachable.")
-    var driverUp = false
-    val hostport = args.userArgs(0)
-    val (driverHost, driverPort) = Utils.parseHostPort(hostport)
-
-    // Spark driver should already be up since it launched us, but we don't 
want to
-    // wait forever, so wait 100 seconds max to match the cluster mode setting.
-    val totalWaitTimeMs = sparkConf.get(AM_MAX_WAIT_TIME)
-    val deadline = System.currentTimeMillis + totalWaitTimeMs
-
-    while (!driverUp && !finished && System.currentTimeMillis < deadline) {
-      try {
-        val socket = new Socket(driverHost, driverPort)
-        socket.close()
-        logInfo("Driver now available: %s:%s".format(driverHost, driverPort))
-        driverUp = true
-      } catch {
-        case e: Exception =>
-          logError("Failed to connect to driver at %s:%s, retrying ...".
-            format(driverHost, driverPort))
-          Thread.sleep(100L)
-      }
-    }
-
-    if (!driverUp) {
-      throw new SparkException("Failed to connect to driver!")
-    }
-
-    sparkConf.set("spark.driver.host", driverHost)
-    sparkConf.set("spark.driver.port", driverPort.toString)
-    createSchedulerRef(driverHost, driverPort.toString)
-  }
-
   /** Add the Yarn IP filter that is required for properly securing the UI. */
   private def addAmIpFilter(driver: Option[RpcEndpointRef]) = {
     val proxyBase = 
System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)

http://git-wip-us.apache.org/repos/asf/spark/blob/54032682/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 134b3e5..7225ff0 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -1019,8 +1019,7 @@ private[spark] class Client(
       appId: ApplicationId,
       returnOnRunning: Boolean = false,
       logApplicationReport: Boolean = true,
-      interval: Long = sparkConf.get(REPORT_INTERVAL)):
-      (YarnApplicationState, FinalApplicationStatus) = {
+      interval: Long = sparkConf.get(REPORT_INTERVAL)): YarnAppReport = {
     var lastState: YarnApplicationState = null
     while (true) {
       Thread.sleep(interval)
@@ -1031,11 +1030,13 @@ private[spark] class Client(
           case e: ApplicationNotFoundException =>
             logError(s"Application $appId not found.")
             cleanupStagingDir(appId)
-            return (YarnApplicationState.KILLED, FinalApplicationStatus.KILLED)
+            return YarnAppReport(YarnApplicationState.KILLED, 
FinalApplicationStatus.KILLED, None)
           case NonFatal(e) =>
-            logError(s"Failed to contact YARN for application $appId.", e)
+            val msg = s"Failed to contact YARN for application $appId."
+            logError(msg, e)
             // Don't necessarily clean up staging dir because status is unknown
-            return (YarnApplicationState.FAILED, FinalApplicationStatus.FAILED)
+            return YarnAppReport(YarnApplicationState.FAILED, 
FinalApplicationStatus.FAILED,
+              Some(msg))
         }
       val state = report.getYarnApplicationState
 
@@ -1073,14 +1074,14 @@ private[spark] class Client(
       }
 
       if (state == YarnApplicationState.FINISHED ||
-        state == YarnApplicationState.FAILED ||
-        state == YarnApplicationState.KILLED) {
+          state == YarnApplicationState.FAILED ||
+          state == YarnApplicationState.KILLED) {
         cleanupStagingDir(appId)
-        return (state, report.getFinalApplicationStatus)
+        return createAppReport(report)
       }
 
       if (returnOnRunning && state == YarnApplicationState.RUNNING) {
-        return (state, report.getFinalApplicationStatus)
+        return createAppReport(report)
       }
 
       lastState = state
@@ -1129,16 +1130,17 @@ private[spark] class Client(
         throw new SparkException(s"Application $appId finished with status: 
$state")
       }
     } else {
-      val (yarnApplicationState, finalApplicationStatus) = 
monitorApplication(appId)
-      if (yarnApplicationState == YarnApplicationState.FAILED ||
-        finalApplicationStatus == FinalApplicationStatus.FAILED) {
+      val YarnAppReport(appState, finalState, diags) = 
monitorApplication(appId)
+      if (appState == YarnApplicationState.FAILED || finalState == 
FinalApplicationStatus.FAILED) {
+        diags.foreach { err =>
+          logError(s"Application diagnostics message: $err")
+        }
         throw new SparkException(s"Application $appId finished with failed 
status")
       }
-      if (yarnApplicationState == YarnApplicationState.KILLED ||
-        finalApplicationStatus == FinalApplicationStatus.KILLED) {
+      if (appState == YarnApplicationState.KILLED || finalState == 
FinalApplicationStatus.KILLED) {
         throw new SparkException(s"Application $appId is killed")
       }
-      if (finalApplicationStatus == FinalApplicationStatus.UNDEFINED) {
+      if (finalState == FinalApplicationStatus.UNDEFINED) {
         throw new SparkException(s"The final status of application $appId is 
undefined")
       }
     }
@@ -1477,6 +1479,12 @@ private object Client extends Logging {
     uri.startsWith(s"$LOCAL_SCHEME:")
   }
 
+  def createAppReport(report: ApplicationReport): YarnAppReport = {
+    val diags = report.getDiagnostics()
+    val diagsOpt = if (diags != null && diags.nonEmpty) Some(diags) else None
+    YarnAppReport(report.getYarnApplicationState(), 
report.getFinalApplicationStatus(), diagsOpt)
+  }
+
 }
 
 private[spark] class YarnClusterApplication extends SparkApplication {
@@ -1491,3 +1499,8 @@ private[spark] class YarnClusterApplication extends 
SparkApplication {
   }
 
 }
+
+private[spark] case class YarnAppReport(
+    appState: YarnApplicationState,
+    finalState: FinalApplicationStatus,
+    diagnostics: Option[String])

http://git-wip-us.apache.org/repos/asf/spark/blob/54032682/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
index 17234b1..b59dcf1 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
@@ -42,23 +42,20 @@ private[spark] class YarnRMClient extends Logging {
   /**
    * Registers the application master with the RM.
    *
+   * @param driverHost Host name where driver is running.
+   * @param driverPort Port where driver is listening.
    * @param conf The Yarn configuration.
    * @param sparkConf The Spark configuration.
    * @param uiAddress Address of the SparkUI.
    * @param uiHistoryAddress Address of the application on the History Server.
-   * @param securityMgr The security manager.
-   * @param localResources Map with information about files distributed via 
YARN's cache.
    */
   def register(
-      driverUrl: String,
-      driverRef: RpcEndpointRef,
+      driverHost: String,
+      driverPort: Int,
       conf: YarnConfiguration,
       sparkConf: SparkConf,
       uiAddress: Option[String],
-      uiHistoryAddress: String,
-      securityMgr: SecurityManager,
-      localResources: Map[String, LocalResource]
-    ): YarnAllocator = {
+      uiHistoryAddress: String): Unit = {
     amClient = AMRMClient.createAMRMClient()
     amClient.init(conf)
     amClient.start()
@@ -70,10 +67,19 @@ private[spark] class YarnRMClient extends Logging {
 
     logInfo("Registering the ApplicationMaster")
     synchronized {
-      amClient.registerApplicationMaster(driverRef.address.host, 
driverRef.address.port,
-        trackingUrl)
+      amClient.registerApplicationMaster(driverHost, driverPort, trackingUrl)
       registered = true
     }
+  }
+
+  def createAllocator(
+      conf: YarnConfiguration,
+      sparkConf: SparkConf,
+      driverUrl: String,
+      driverRef: RpcEndpointRef,
+      securityMgr: SecurityManager,
+      localResources: Map[String, LocalResource]): YarnAllocator = {
+    require(registered, "Must register AM before creating allocator.")
     new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, 
getAttemptId(), securityMgr,
       localResources, new SparkRackResolver())
   }
@@ -88,6 +94,9 @@ private[spark] class YarnRMClient extends Logging {
     if (registered) {
       amClient.unregisterApplicationMaster(status, diagnostics, 
uiHistoryAddress)
     }
+    if (amClient != null) {
+      amClient.stop()
+    }
   }
 
   /** Returns the attempt ID. */

http://git-wip-us.apache.org/repos/asf/spark/blob/54032682/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 06e54a2..f1a8df0 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -22,7 +22,7 @@ import scala.collection.mutable.ArrayBuffer
 import org.apache.hadoop.yarn.api.records.YarnApplicationState
 
 import org.apache.spark.{SparkContext, SparkException}
-import org.apache.spark.deploy.yarn.{Client, ClientArguments}
+import org.apache.spark.deploy.yarn.{Client, ClientArguments, YarnAppReport}
 import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.internal.Logging
 import org.apache.spark.launcher.SparkAppHandle
@@ -75,13 +75,23 @@ private[spark] class YarnClientSchedulerBackend(
     val monitorInterval = conf.get(CLIENT_LAUNCH_MONITOR_INTERVAL)
 
     assert(client != null && appId.isDefined, "Application has not been 
submitted yet!")
-    val (state, _) = client.monitorApplication(appId.get, returnOnRunning = 
true,
-      interval = monitorInterval) // blocking
+    val YarnAppReport(state, _, diags) = client.monitorApplication(appId.get,
+      returnOnRunning = true, interval = monitorInterval)
     if (state == YarnApplicationState.FINISHED ||
-      state == YarnApplicationState.FAILED ||
-      state == YarnApplicationState.KILLED) {
-      throw new SparkException("Yarn application has already ended! " +
-        "It might have been killed or unable to launch application master.")
+        state == YarnApplicationState.FAILED ||
+        state == YarnApplicationState.KILLED) {
+      val genericMessage = "The YARN application has already ended! " +
+        "It might have been killed or the Application Master may have failed 
to start. " +
+        "Check the YARN application logs for more details."
+      val exceptionMsg = diags match {
+        case Some(msg) =>
+          logError(genericMessage)
+          msg
+
+        case None =>
+          genericMessage
+      }
+      throw new SparkException(exceptionMsg)
     }
     if (state == YarnApplicationState.RUNNING) {
       logInfo(s"Application ${appId.get} has started running.")
@@ -100,8 +110,13 @@ private[spark] class YarnClientSchedulerBackend(
 
     override def run() {
       try {
-        val (state, _) = client.monitorApplication(appId.get, 
logApplicationReport = false)
-        logError(s"Yarn application has already exited with state $state!")
+        val YarnAppReport(_, state, diags) =
+          client.monitorApplication(appId.get, logApplicationReport = true)
+        logError(s"YARN application has exited unexpectedly with state $state! 
" +
+          "Check the YARN application logs for more details.")
+        diags.foreach { err =>
+          logError(s"Diagnostics message: $err")
+        }
         allowInterrupt = false
         sc.stop()
       } catch {
@@ -124,7 +139,7 @@ private[spark] class YarnClientSchedulerBackend(
   private def asyncMonitorApplication(): MonitorThread = {
     assert(client != null && appId.isDefined, "Application has not been 
submitted yet!")
     val t = new MonitorThread
-    t.setName("Yarn application state monitor")
+    t.setName("YARN application state monitor")
     t.setDaemon(true)
     t
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to