http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala deleted file mode 100644 index 0378ef4..0000000 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ /dev/null @@ -1,791 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.yarn - -import java.io.{File, IOException} -import java.lang.reflect.InvocationTargetException -import java.net.{Socket, URI, URL} -import java.util.concurrent.{TimeoutException, TimeUnit} - -import scala.collection.mutable.HashMap -import scala.concurrent.Promise -import scala.concurrent.duration.Duration -import scala.util.control.NonFatal - -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.hadoop.yarn.api._ -import org.apache.hadoop.yarn.api.records._ -import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.apache.hadoop.yarn.util.{ConverterUtils, Records} - -import org.apache.spark._ -import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.deploy.history.HistoryServer -import org.apache.spark.deploy.yarn.config._ -import org.apache.spark.deploy.yarn.security.{AMCredentialRenewer, ConfigurableCredentialManager} -import org.apache.spark.internal.Logging -import org.apache.spark.internal.config._ -import org.apache.spark.rpc._ -import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, YarnSchedulerBackend} -import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ -import org.apache.spark.util._ - -/** - * Common application master functionality for Spark on Yarn. - */ -private[spark] class ApplicationMaster( - args: ApplicationMasterArguments, - client: YarnRMClient) - extends Logging { - - // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be - // optimal as more containers are available. Might need to handle this better. - - private val sparkConf = new SparkConf() - private val yarnConf: YarnConfiguration = SparkHadoopUtil.get.newConfiguration(sparkConf) - .asInstanceOf[YarnConfiguration] - private val isClusterMode = args.userClass != null - - // Default to twice the number of executors (twice the maximum number of executors if dynamic - // allocation is enabled), with a minimum of 3. - - private val maxNumExecutorFailures = { - val effectiveNumExecutors = - if (Utils.isDynamicAllocationEnabled(sparkConf)) { - sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS) - } else { - sparkConf.get(EXECUTOR_INSTANCES).getOrElse(0) - } - // By default, effectiveNumExecutors is Int.MaxValue if dynamic allocation is enabled. We need - // avoid the integer overflow here. - val defaultMaxNumExecutorFailures = math.max(3, - if (effectiveNumExecutors > Int.MaxValue / 2) Int.MaxValue else (2 * effectiveNumExecutors)) - - sparkConf.get(MAX_EXECUTOR_FAILURES).getOrElse(defaultMaxNumExecutorFailures) - } - - @volatile private var exitCode = 0 - @volatile private var unregistered = false - @volatile private var finished = false - @volatile private var finalStatus = getDefaultFinalStatus - @volatile private var finalMsg: String = "" - @volatile private var userClassThread: Thread = _ - - @volatile private var reporterThread: Thread = _ - @volatile private var allocator: YarnAllocator = _ - - // Lock for controlling the allocator (heartbeat) thread. - private val allocatorLock = new Object() - - // Steady state heartbeat interval. We want to be reasonably responsive without causing too many - // requests to RM. - private val heartbeatInterval = { - // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses. - val expiryInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000) - math.max(0, math.min(expiryInterval / 2, sparkConf.get(RM_HEARTBEAT_INTERVAL))) - } - - // Initial wait interval before allocator poll, to allow for quicker ramp up when executors are - // being requested. - private val initialAllocationInterval = math.min(heartbeatInterval, - sparkConf.get(INITIAL_HEARTBEAT_INTERVAL)) - - // Next wait interval before allocator poll. - private var nextAllocationInterval = initialAllocationInterval - - private var rpcEnv: RpcEnv = null - private var amEndpoint: RpcEndpointRef = _ - - // In cluster mode, used to tell the AM when the user's SparkContext has been initialized. - private val sparkContextPromise = Promise[SparkContext]() - - private var credentialRenewer: AMCredentialRenewer = _ - - // Load the list of localized files set by the client. This is used when launching executors, - // and is loaded here so that these configs don't pollute the Web UI's environment page in - // cluster mode. - private val localResources = { - logInfo("Preparing Local resources") - val resources = HashMap[String, LocalResource]() - - def setupDistributedCache( - file: String, - rtype: LocalResourceType, - timestamp: String, - size: String, - vis: String): Unit = { - val uri = new URI(file) - val amJarRsrc = Records.newRecord(classOf[LocalResource]) - amJarRsrc.setType(rtype) - amJarRsrc.setVisibility(LocalResourceVisibility.valueOf(vis)) - amJarRsrc.setResource(ConverterUtils.getYarnUrlFromURI(uri)) - amJarRsrc.setTimestamp(timestamp.toLong) - amJarRsrc.setSize(size.toLong) - - val fileName = Option(uri.getFragment()).getOrElse(new Path(uri).getName()) - resources(fileName) = amJarRsrc - } - - val distFiles = sparkConf.get(CACHED_FILES) - val fileSizes = sparkConf.get(CACHED_FILES_SIZES) - val timeStamps = sparkConf.get(CACHED_FILES_TIMESTAMPS) - val visibilities = sparkConf.get(CACHED_FILES_VISIBILITIES) - val resTypes = sparkConf.get(CACHED_FILES_TYPES) - - for (i <- 0 to distFiles.size - 1) { - val resType = LocalResourceType.valueOf(resTypes(i)) - setupDistributedCache(distFiles(i), resType, timeStamps(i).toString, fileSizes(i).toString, - visibilities(i)) - } - - // Distribute the conf archive to executors. - sparkConf.get(CACHED_CONF_ARCHIVE).foreach { path => - val uri = new URI(path) - val fs = FileSystem.get(uri, yarnConf) - val status = fs.getFileStatus(new Path(uri)) - // SPARK-16080: Make sure to use the correct name for the destination when distributing the - // conf archive to executors. - val destUri = new URI(uri.getScheme(), uri.getRawSchemeSpecificPart(), - Client.LOCALIZED_CONF_DIR) - setupDistributedCache(destUri.toString(), LocalResourceType.ARCHIVE, - status.getModificationTime().toString, status.getLen.toString, - LocalResourceVisibility.PRIVATE.name()) - } - - // Clean up the configuration so it doesn't show up in the Web UI (since it's really noisy). - CACHE_CONFIGS.foreach { e => - sparkConf.remove(e) - sys.props.remove(e.key) - } - - resources.toMap - } - - def getAttemptId(): ApplicationAttemptId = { - client.getAttemptId() - } - - final def run(): Int = { - try { - val appAttemptId = client.getAttemptId() - - var attemptID: Option[String] = None - - if (isClusterMode) { - // Set the web ui port to be ephemeral for yarn so we don't conflict with - // other spark processes running on the same box - System.setProperty("spark.ui.port", "0") - - // Set the master and deploy mode property to match the requested mode. - System.setProperty("spark.master", "yarn") - System.setProperty("spark.submit.deployMode", "cluster") - - // Set this internal configuration if it is running on cluster mode, this - // configuration will be checked in SparkContext to avoid misuse of yarn cluster mode. - System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString()) - - attemptID = Option(appAttemptId.getAttemptId.toString) - } - - new CallerContext( - "APPMASTER", sparkConf.get(APP_CALLER_CONTEXT), - Option(appAttemptId.getApplicationId.toString), attemptID).setCurrentContext() - - logInfo("ApplicationAttemptId: " + appAttemptId) - - val fs = FileSystem.get(yarnConf) - - // This shutdown hook should run *after* the SparkContext is shut down. - val priority = ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY - 1 - ShutdownHookManager.addShutdownHook(priority) { () => - val maxAppAttempts = client.getMaxRegAttempts(sparkConf, yarnConf) - val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts - - if (!finished) { - // The default state of ApplicationMaster is failed if it is invoked by shut down hook. - // This behavior is different compared to 1.x version. - // If user application is exited ahead of time by calling System.exit(N), here mark - // this application as failed with EXIT_EARLY. For a good shutdown, user shouldn't call - // System.exit(0) to terminate the application. - finish(finalStatus, - ApplicationMaster.EXIT_EARLY, - "Shutdown hook called before final status was reported.") - } - - if (!unregistered) { - // we only want to unregister if we don't want the RM to retry - if (finalStatus == FinalApplicationStatus.SUCCEEDED || isLastAttempt) { - unregister(finalStatus, finalMsg) - cleanupStagingDir(fs) - } - } - } - - // Call this to force generation of secret so it gets populated into the - // Hadoop UGI. This has to happen before the startUserApplication which does a - // doAs in order for the credentials to be passed on to the executor containers. - val securityMgr = new SecurityManager(sparkConf) - - // If the credentials file config is present, we must periodically renew tokens. So create - // a new AMDelegationTokenRenewer - if (sparkConf.contains(CREDENTIALS_FILE_PATH.key)) { - // If a principal and keytab have been set, use that to create new credentials for executors - // periodically - credentialRenewer = - new ConfigurableCredentialManager(sparkConf, yarnConf).credentialRenewer() - credentialRenewer.scheduleLoginFromKeytab() - } - - if (isClusterMode) { - runDriver(securityMgr) - } else { - runExecutorLauncher(securityMgr) - } - } catch { - case e: Exception => - // catch everything else if not specifically handled - logError("Uncaught exception: ", e) - finish(FinalApplicationStatus.FAILED, - ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION, - "Uncaught exception: " + e) - } - exitCode - } - - /** - * Set the default final application status for client mode to UNDEFINED to handle - * if YARN HA restarts the application so that it properly retries. Set the final - * status to SUCCEEDED in cluster mode to handle if the user calls System.exit - * from the application code. - */ - final def getDefaultFinalStatus(): FinalApplicationStatus = { - if (isClusterMode) { - FinalApplicationStatus.FAILED - } else { - FinalApplicationStatus.UNDEFINED - } - } - - /** - * unregister is used to completely unregister the application from the ResourceManager. - * This means the ResourceManager will not retry the application attempt on your behalf if - * a failure occurred. - */ - final def unregister(status: FinalApplicationStatus, diagnostics: String = null): Unit = { - synchronized { - if (!unregistered) { - logInfo(s"Unregistering ApplicationMaster with $status" + - Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse("")) - unregistered = true - client.unregister(status, Option(diagnostics).getOrElse("")) - } - } - } - - final def finish(status: FinalApplicationStatus, code: Int, msg: String = null): Unit = { - synchronized { - if (!finished) { - val inShutdown = ShutdownHookManager.inShutdown() - logInfo(s"Final app status: $status, exitCode: $code" + - Option(msg).map(msg => s", (reason: $msg)").getOrElse("")) - exitCode = code - finalStatus = status - finalMsg = msg - finished = true - if (!inShutdown && Thread.currentThread() != reporterThread && reporterThread != null) { - logDebug("shutting down reporter thread") - reporterThread.interrupt() - } - if (!inShutdown && Thread.currentThread() != userClassThread && userClassThread != null) { - logDebug("shutting down user thread") - userClassThread.interrupt() - } - if (!inShutdown && credentialRenewer != null) { - credentialRenewer.stop() - credentialRenewer = null - } - } - } - } - - private def sparkContextInitialized(sc: SparkContext) = { - sparkContextPromise.success(sc) - } - - private def registerAM( - _sparkConf: SparkConf, - _rpcEnv: RpcEnv, - driverRef: RpcEndpointRef, - uiAddress: String, - securityMgr: SecurityManager) = { - val appId = client.getAttemptId().getApplicationId().toString() - val attemptId = client.getAttemptId().getAttemptId().toString() - val historyAddress = - _sparkConf.get(HISTORY_SERVER_ADDRESS) - .map { text => SparkHadoopUtil.get.substituteHadoopVariables(text, yarnConf) } - .map { address => s"${address}${HistoryServer.UI_PATH_PREFIX}/${appId}/${attemptId}" } - .getOrElse("") - - val driverUrl = RpcEndpointAddress( - _sparkConf.get("spark.driver.host"), - _sparkConf.get("spark.driver.port").toInt, - CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString - - // Before we initialize the allocator, let's log the information about how executors will - // be run up front, to avoid printing this out for every single executor being launched. - // Use placeholders for information that changes such as executor IDs. - logInfo { - val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt - val executorCores = sparkConf.get(EXECUTOR_CORES) - val dummyRunner = new ExecutorRunnable(None, yarnConf, sparkConf, driverUrl, "<executorId>", - "<hostname>", executorMemory, executorCores, appId, securityMgr, localResources) - dummyRunner.launchContextDebugInfo() - } - - allocator = client.register(driverUrl, - driverRef, - yarnConf, - _sparkConf, - uiAddress, - historyAddress, - securityMgr, - localResources) - - allocator.allocateResources() - reporterThread = launchReporterThread() - } - - /** - * Create an [[RpcEndpoint]] that communicates with the driver. - * - * In cluster mode, the AM and the driver belong to same process - * so the AMEndpoint need not monitor lifecycle of the driver. - * - * @return A reference to the driver's RPC endpoint. - */ - private def runAMEndpoint( - host: String, - port: String, - isClusterMode: Boolean): RpcEndpointRef = { - val driverEndpoint = rpcEnv.setupEndpointRef( - RpcAddress(host, port.toInt), - YarnSchedulerBackend.ENDPOINT_NAME) - amEndpoint = - rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverEndpoint, isClusterMode)) - driverEndpoint - } - - private def runDriver(securityMgr: SecurityManager): Unit = { - addAmIpFilter() - userClassThread = startUserApplication() - - // This a bit hacky, but we need to wait until the spark.driver.port property has - // been set by the Thread executing the user class. - logInfo("Waiting for spark context initialization...") - val totalWaitTime = sparkConf.get(AM_MAX_WAIT_TIME) - try { - val sc = ThreadUtils.awaitResult(sparkContextPromise.future, - Duration(totalWaitTime, TimeUnit.MILLISECONDS)) - if (sc != null) { - rpcEnv = sc.env.rpcEnv - val driverRef = runAMEndpoint( - sc.getConf.get("spark.driver.host"), - sc.getConf.get("spark.driver.port"), - isClusterMode = true) - registerAM(sc.getConf, rpcEnv, driverRef, sc.ui.map(_.webUrl).getOrElse(""), - securityMgr) - } else { - // Sanity check; should never happen in normal operation, since sc should only be null - // if the user app did not create a SparkContext. - if (!finished) { - throw new IllegalStateException("SparkContext is null but app is still running!") - } - } - userClassThread.join() - } catch { - case e: SparkException if e.getCause().isInstanceOf[TimeoutException] => - logError( - s"SparkContext did not initialize after waiting for $totalWaitTime ms. " + - "Please check earlier log output for errors. Failing the application.") - finish(FinalApplicationStatus.FAILED, - ApplicationMaster.EXIT_SC_NOT_INITED, - "Timed out waiting for SparkContext.") - } - } - - private def runExecutorLauncher(securityMgr: SecurityManager): Unit = { - val port = sparkConf.get(AM_PORT) - rpcEnv = RpcEnv.create("sparkYarnAM", Utils.localHostName, port, sparkConf, securityMgr, - clientMode = true) - val driverRef = waitForSparkDriver() - addAmIpFilter() - registerAM(sparkConf, rpcEnv, driverRef, sparkConf.get("spark.driver.appUIAddress", ""), - securityMgr) - - // In client mode the actor will stop the reporter thread. - reporterThread.join() - } - - private def launchReporterThread(): Thread = { - // The number of failures in a row until Reporter thread give up - val reporterMaxFailures = sparkConf.get(MAX_REPORTER_THREAD_FAILURES) - - val t = new Thread { - override def run() { - var failureCount = 0 - while (!finished) { - try { - if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) { - finish(FinalApplicationStatus.FAILED, - ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES, - s"Max number of executor failures ($maxNumExecutorFailures) reached") - } else { - logDebug("Sending progress") - allocator.allocateResources() - } - failureCount = 0 - } catch { - case i: InterruptedException => - case e: Throwable => - failureCount += 1 - // this exception was introduced in hadoop 2.4 and this code would not compile - // with earlier versions if we refer it directly. - if ("org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException" == - e.getClass().getName()) { - logError("Exception from Reporter thread.", e) - finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_REPORTER_FAILURE, - e.getMessage) - } else if (!NonFatal(e) || failureCount >= reporterMaxFailures) { - finish(FinalApplicationStatus.FAILED, - ApplicationMaster.EXIT_REPORTER_FAILURE, "Exception was thrown " + - s"$failureCount time(s) from Reporter thread.") - } else { - logWarning(s"Reporter thread fails $failureCount time(s) in a row.", e) - } - } - try { - val numPendingAllocate = allocator.getPendingAllocate.size - var sleepStart = 0L - var sleepInterval = 200L // ms - allocatorLock.synchronized { - sleepInterval = - if (numPendingAllocate > 0 || allocator.getNumPendingLossReasonRequests > 0) { - val currentAllocationInterval = - math.min(heartbeatInterval, nextAllocationInterval) - nextAllocationInterval = currentAllocationInterval * 2 // avoid overflow - currentAllocationInterval - } else { - nextAllocationInterval = initialAllocationInterval - heartbeatInterval - } - sleepStart = System.currentTimeMillis() - allocatorLock.wait(sleepInterval) - } - val sleepDuration = System.currentTimeMillis() - sleepStart - if (sleepDuration < sleepInterval) { - // log when sleep is interrupted - logDebug(s"Number of pending allocations is $numPendingAllocate. " + - s"Slept for $sleepDuration/$sleepInterval ms.") - // if sleep was less than the minimum interval, sleep for the rest of it - val toSleep = math.max(0, initialAllocationInterval - sleepDuration) - if (toSleep > 0) { - logDebug(s"Going back to sleep for $toSleep ms") - // use Thread.sleep instead of allocatorLock.wait. there is no need to be woken up - // by the methods that signal allocatorLock because this is just finishing the min - // sleep interval, which should happen even if this is signalled again. - Thread.sleep(toSleep) - } - } else { - logDebug(s"Number of pending allocations is $numPendingAllocate. " + - s"Slept for $sleepDuration/$sleepInterval.") - } - } catch { - case e: InterruptedException => - } - } - } - } - // setting to daemon status, though this is usually not a good idea. - t.setDaemon(true) - t.setName("Reporter") - t.start() - logInfo(s"Started progress reporter thread with (heartbeat : $heartbeatInterval, " + - s"initial allocation : $initialAllocationInterval) intervals") - t - } - - /** - * Clean up the staging directory. - */ - private def cleanupStagingDir(fs: FileSystem) { - var stagingDirPath: Path = null - try { - val preserveFiles = sparkConf.get(PRESERVE_STAGING_FILES) - if (!preserveFiles) { - stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR")) - if (stagingDirPath == null) { - logError("Staging directory is null") - return - } - logInfo("Deleting staging directory " + stagingDirPath) - fs.delete(stagingDirPath, true) - } - } catch { - case ioe: IOException => - logError("Failed to cleanup staging dir " + stagingDirPath, ioe) - } - } - - private def waitForSparkDriver(): RpcEndpointRef = { - logInfo("Waiting for Spark driver to be reachable.") - var driverUp = false - val hostport = args.userArgs(0) - val (driverHost, driverPort) = Utils.parseHostPort(hostport) - - // Spark driver should already be up since it launched us, but we don't want to - // wait forever, so wait 100 seconds max to match the cluster mode setting. - val totalWaitTimeMs = sparkConf.get(AM_MAX_WAIT_TIME) - val deadline = System.currentTimeMillis + totalWaitTimeMs - - while (!driverUp && !finished && System.currentTimeMillis < deadline) { - try { - val socket = new Socket(driverHost, driverPort) - socket.close() - logInfo("Driver now available: %s:%s".format(driverHost, driverPort)) - driverUp = true - } catch { - case e: Exception => - logError("Failed to connect to driver at %s:%s, retrying ...". - format(driverHost, driverPort)) - Thread.sleep(100L) - } - } - - if (!driverUp) { - throw new SparkException("Failed to connect to driver!") - } - - sparkConf.set("spark.driver.host", driverHost) - sparkConf.set("spark.driver.port", driverPort.toString) - - runAMEndpoint(driverHost, driverPort.toString, isClusterMode = false) - } - - /** Add the Yarn IP filter that is required for properly securing the UI. */ - private def addAmIpFilter() = { - val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV) - val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter" - val params = client.getAmIpFilterParams(yarnConf, proxyBase) - if (isClusterMode) { - System.setProperty("spark.ui.filters", amFilter) - params.foreach { case (k, v) => System.setProperty(s"spark.$amFilter.param.$k", v) } - } else { - amEndpoint.send(AddWebUIFilter(amFilter, params.toMap, proxyBase)) - } - } - - /** - * Start the user class, which contains the spark driver, in a separate Thread. - * If the main routine exits cleanly or exits with System.exit(N) for any N - * we assume it was successful, for all other cases we assume failure. - * - * Returns the user thread that was started. - */ - private def startUserApplication(): Thread = { - logInfo("Starting the user application in a separate Thread") - - val classpath = Client.getUserClasspath(sparkConf) - val urls = classpath.map { entry => - new URL("file:" + new File(entry.getPath()).getAbsolutePath()) - } - val userClassLoader = - if (Client.isUserClassPathFirst(sparkConf, isDriver = true)) { - new ChildFirstURLClassLoader(urls, Utils.getContextOrSparkClassLoader) - } else { - new MutableURLClassLoader(urls, Utils.getContextOrSparkClassLoader) - } - - var userArgs = args.userArgs - if (args.primaryPyFile != null && args.primaryPyFile.endsWith(".py")) { - // When running pyspark, the app is run using PythonRunner. The second argument is the list - // of files to add to PYTHONPATH, which Client.scala already handles, so it's empty. - userArgs = Seq(args.primaryPyFile, "") ++ userArgs - } - if (args.primaryRFile != null && args.primaryRFile.endsWith(".R")) { - // TODO(davies): add R dependencies here - } - val mainMethod = userClassLoader.loadClass(args.userClass) - .getMethod("main", classOf[Array[String]]) - - val userThread = new Thread { - override def run() { - try { - mainMethod.invoke(null, userArgs.toArray) - finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS) - logDebug("Done running users class") - } catch { - case e: InvocationTargetException => - e.getCause match { - case _: InterruptedException => - // Reporter thread can interrupt to stop user class - case SparkUserAppException(exitCode) => - val msg = s"User application exited with status $exitCode" - logError(msg) - finish(FinalApplicationStatus.FAILED, exitCode, msg) - case cause: Throwable => - logError("User class threw exception: " + cause, cause) - finish(FinalApplicationStatus.FAILED, - ApplicationMaster.EXIT_EXCEPTION_USER_CLASS, - "User class threw exception: " + cause) - } - sparkContextPromise.tryFailure(e.getCause()) - } finally { - // Notify the thread waiting for the SparkContext, in case the application did not - // instantiate one. This will do nothing when the user code instantiates a SparkContext - // (with the correct master), or when the user code throws an exception (due to the - // tryFailure above). - sparkContextPromise.trySuccess(null) - } - } - } - userThread.setContextClassLoader(userClassLoader) - userThread.setName("Driver") - userThread.start() - userThread - } - - private def resetAllocatorInterval(): Unit = allocatorLock.synchronized { - nextAllocationInterval = initialAllocationInterval - allocatorLock.notifyAll() - } - - /** - * An [[RpcEndpoint]] that communicates with the driver's scheduler backend. - */ - private class AMEndpoint( - override val rpcEnv: RpcEnv, driver: RpcEndpointRef, isClusterMode: Boolean) - extends RpcEndpoint with Logging { - - override def onStart(): Unit = { - driver.send(RegisterClusterManager(self)) - } - - override def receive: PartialFunction[Any, Unit] = { - case x: AddWebUIFilter => - logInfo(s"Add WebUI Filter. $x") - driver.send(x) - } - - override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { - case RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount) => - Option(allocator) match { - case Some(a) => - if (a.requestTotalExecutorsWithPreferredLocalities(requestedTotal, - localityAwareTasks, hostToLocalTaskCount)) { - resetAllocatorInterval() - } - context.reply(true) - - case None => - logWarning("Container allocator is not ready to request executors yet.") - context.reply(false) - } - - case KillExecutors(executorIds) => - logInfo(s"Driver requested to kill executor(s) ${executorIds.mkString(", ")}.") - Option(allocator) match { - case Some(a) => executorIds.foreach(a.killExecutor) - case None => logWarning("Container allocator is not ready to kill executors yet.") - } - context.reply(true) - - case GetExecutorLossReason(eid) => - Option(allocator) match { - case Some(a) => - a.enqueueGetLossReasonRequest(eid, context) - resetAllocatorInterval() - case None => - logWarning("Container allocator is not ready to find executor loss reasons yet.") - } - } - - override def onDisconnected(remoteAddress: RpcAddress): Unit = { - // In cluster mode, do not rely on the disassociated event to exit - // This avoids potentially reporting incorrect exit codes if the driver fails - if (!isClusterMode) { - logInfo(s"Driver terminated or disconnected! Shutting down. $remoteAddress") - finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS) - } - } - } - -} - -object ApplicationMaster extends Logging { - - // exit codes for different causes, no reason behind the values - private val EXIT_SUCCESS = 0 - private val EXIT_UNCAUGHT_EXCEPTION = 10 - private val EXIT_MAX_EXECUTOR_FAILURES = 11 - private val EXIT_REPORTER_FAILURE = 12 - private val EXIT_SC_NOT_INITED = 13 - private val EXIT_SECURITY = 14 - private val EXIT_EXCEPTION_USER_CLASS = 15 - private val EXIT_EARLY = 16 - - private var master: ApplicationMaster = _ - - def main(args: Array[String]): Unit = { - SignalUtils.registerLogger(log) - val amArgs = new ApplicationMasterArguments(args) - - // Load the properties file with the Spark configuration and set entries as system properties, - // so that user code run inside the AM also has access to them. - // Note: we must do this before SparkHadoopUtil instantiated - if (amArgs.propertiesFile != null) { - Utils.getPropertiesFromFile(amArgs.propertiesFile).foreach { case (k, v) => - sys.props(k) = v - } - } - SparkHadoopUtil.get.runAsSparkUser { () => - master = new ApplicationMaster(amArgs, new YarnRMClient) - System.exit(master.run()) - } - } - - private[spark] def sparkContextInitialized(sc: SparkContext): Unit = { - master.sparkContextInitialized(sc) - } - - private[spark] def getAttemptId(): ApplicationAttemptId = { - master.getAttemptId - } - -} - -/** - * This object does not provide any special functionality. It exists so that it's easy to tell - * apart the client-mode AM from the cluster-mode AM when using tools such as ps or jps. - */ -object ExecutorLauncher { - - def main(args: Array[String]): Unit = { - ApplicationMaster.main(args) - } - -}
http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala deleted file mode 100644 index 5cdec87..0000000 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.yarn - -import scala.collection.mutable.ArrayBuffer - -import org.apache.spark.util.{IntParam, MemoryParam} - -class ApplicationMasterArguments(val args: Array[String]) { - var userJar: String = null - var userClass: String = null - var primaryPyFile: String = null - var primaryRFile: String = null - var userArgs: Seq[String] = Nil - var propertiesFile: String = null - - parseArgs(args.toList) - - private def parseArgs(inputArgs: List[String]): Unit = { - val userArgsBuffer = new ArrayBuffer[String]() - - var args = inputArgs - - while (!args.isEmpty) { - // --num-workers, --worker-memory, and --worker-cores are deprecated since 1.0, - // the properties with executor in their names are preferred. - args match { - case ("--jar") :: value :: tail => - userJar = value - args = tail - - case ("--class") :: value :: tail => - userClass = value - args = tail - - case ("--primary-py-file") :: value :: tail => - primaryPyFile = value - args = tail - - case ("--primary-r-file") :: value :: tail => - primaryRFile = value - args = tail - - case ("--arg") :: value :: tail => - userArgsBuffer += value - args = tail - - case ("--properties-file") :: value :: tail => - propertiesFile = value - args = tail - - case _ => - printUsageAndExit(1, args) - } - } - - if (primaryPyFile != null && primaryRFile != null) { - // scalastyle:off println - System.err.println("Cannot have primary-py-file and primary-r-file at the same time") - // scalastyle:on println - System.exit(-1) - } - - userArgs = userArgsBuffer.toList - } - - def printUsageAndExit(exitCode: Int, unknownParam: Any = null) { - // scalastyle:off println - if (unknownParam != null) { - System.err.println("Unknown/unsupported param " + unknownParam) - } - System.err.println(""" - |Usage: org.apache.spark.deploy.yarn.ApplicationMaster [options] - |Options: - | --jar JAR_PATH Path to your application's JAR file - | --class CLASS_NAME Name of your application's main class - | --primary-py-file A main Python file - | --primary-r-file A main R file - | --arg ARG Argument to be passed to your application's main class. - | Multiple invocations are possible, each will be passed in order. - | --properties-file FILE Path to a custom Spark properties file. - """.stripMargin) - // scalastyle:on println - System.exit(exitCode) - } -} - -object ApplicationMasterArguments { - val DEFAULT_NUMBER_EXECUTORS = 2 -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
