http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala ---------------------------------------------------------------------- diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala deleted file mode 100644 index f95d723..0000000 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ /dev/null @@ -1,823 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.yarn - -import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException} - -import scala.collection.JavaConversions._ -import scala.collection.mutable.{HashMap, ListBuffer, Map} -import scala.util.{Try, Success, Failure} - -import com.google.common.base.Objects -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs._ -import org.apache.hadoop.fs.permission.FsPermission -import org.apache.hadoop.mapred.Master -import org.apache.hadoop.mapreduce.MRJobConfig -import org.apache.hadoop.security.{Credentials, UserGroupInformation} -import org.apache.hadoop.util.StringUtils -import org.apache.hadoop.yarn.api._ -import org.apache.hadoop.yarn.api.ApplicationConstants.Environment -import org.apache.hadoop.yarn.api.protocolrecords._ -import org.apache.hadoop.yarn.api.records._ -import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.apache.hadoop.yarn.util.Records - -import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkException} -import org.apache.spark.util.Utils - -/** - * The entry point (starting in Client#main() and Client#run()) for launching Spark on YARN. - * The Client submits an application to the YARN ResourceManager. - */ -private[spark] trait ClientBase extends Logging { - import ClientBase._ - - protected val args: ClientArguments - protected val hadoopConf: Configuration - protected val sparkConf: SparkConf - protected val yarnConf: YarnConfiguration - protected val credentials = UserGroupInformation.getCurrentUser.getCredentials - protected val amMemoryOverhead = args.amMemoryOverhead // MB - protected val executorMemoryOverhead = args.executorMemoryOverhead // MB - private val distCacheMgr = new ClientDistributedCacheManager() - private val isLaunchingDriver = args.userClass != null - - /** - * Fail fast if we have requested more resources per container than is available in the cluster. - */ - protected def verifyClusterResources(newAppResponse: GetNewApplicationResponse): Unit = { - val maxMem = newAppResponse.getMaximumResourceCapability().getMemory() - logInfo("Verifying our application has not requested more than the maximum " + - s"memory capability of the cluster ($maxMem MB per container)") - val executorMem = args.executorMemory + executorMemoryOverhead - if (executorMem > maxMem) { - throw new IllegalArgumentException(s"Required executor memory (${args.executorMemory}" + - s"+$executorMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster!") - } - val amMem = args.amMemory + amMemoryOverhead - if (amMem > maxMem) { - throw new IllegalArgumentException(s"Required AM memory (${args.amMemory}" + - s"+$amMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster!") - } - logInfo("Will allocate AM container, with %d MB memory including %d MB overhead".format( - amMem, - amMemoryOverhead)) - - // We could add checks to make sure the entire cluster has enough resources but that involves - // getting all the node reports and computing ourselves. - } - - /** - * Copy the given file to a remote file system (e.g. HDFS) if needed. - * The file is only copied if the source and destination file systems are different. This is used - * for preparing resources for launching the ApplicationMaster container. Exposed for testing. - */ - def copyFileToRemote( - destDir: Path, - srcPath: Path, - replication: Short, - setPerms: Boolean = false): Path = { - val destFs = destDir.getFileSystem(hadoopConf) - val srcFs = srcPath.getFileSystem(hadoopConf) - var destPath = srcPath - if (!compareFs(srcFs, destFs)) { - destPath = new Path(destDir, srcPath.getName()) - logInfo(s"Uploading resource $srcPath -> $destPath") - FileUtil.copy(srcFs, srcPath, destFs, destPath, false, hadoopConf) - destFs.setReplication(destPath, replication) - if (setPerms) { - destFs.setPermission(destPath, new FsPermission(APP_FILE_PERMISSION)) - } - } else { - logInfo(s"Source and destination file systems are the same. Not copying $srcPath") - } - // Resolve any symlinks in the URI path so using a "current" symlink to point to a specific - // version shows the specific version in the distributed cache configuration - val qualifiedDestPath = destFs.makeQualified(destPath) - val fc = FileContext.getFileContext(qualifiedDestPath.toUri(), hadoopConf) - fc.resolvePath(qualifiedDestPath) - } - - /** - * Given a local URI, resolve it and return a qualified local path that corresponds to the URI. - * This is used for preparing local resources to be included in the container launch context. - */ - private def getQualifiedLocalPath(localURI: URI): Path = { - val qualifiedURI = - if (localURI.getScheme == null) { - // If not specified, assume this is in the local filesystem to keep the behavior - // consistent with that of Hadoop - new URI(FileSystem.getLocal(hadoopConf).makeQualified(new Path(localURI)).toString) - } else { - localURI - } - new Path(qualifiedURI) - } - - /** - * Upload any resources to the distributed cache if needed. If a resource is intended to be - * consumed locally, set up the appropriate config for downstream code to handle it properly. - * This is used for setting up a container launch context for our ApplicationMaster. - * Exposed for testing. - */ - def prepareLocalResources(appStagingDir: String): HashMap[String, LocalResource] = { - logInfo("Preparing resources for our AM container") - // Upload Spark and the application JAR to the remote file system if necessary, - // and add them as local resources to the application master. - val fs = FileSystem.get(hadoopConf) - val dst = new Path(fs.getHomeDirectory(), appStagingDir) - val nns = getNameNodesToAccess(sparkConf) + dst - obtainTokensForNamenodes(nns, hadoopConf, credentials) - - val replication = sparkConf.getInt("spark.yarn.submit.file.replication", - fs.getDefaultReplication(dst)).toShort - val localResources = HashMap[String, LocalResource]() - FileSystem.mkdirs(fs, dst, new FsPermission(STAGING_DIR_PERMISSION)) - - val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]() - - val oldLog4jConf = Option(System.getenv("SPARK_LOG4J_CONF")) - if (oldLog4jConf.isDefined) { - logWarning( - "SPARK_LOG4J_CONF detected in the system environment. This variable has been " + - "deprecated. Please refer to the \"Launching Spark on YARN\" documentation " + - "for alternatives.") - } - - /** - * Copy the given main resource to the distributed cache if the scheme is not "local". - * Otherwise, set the corresponding key in our SparkConf to handle it downstream. - * Each resource is represented by a 4-tuple of: - * (1) destination resource name, - * (2) local path to the resource, - * (3) Spark property key to set if the scheme is not local, and - * (4) whether to set permissions for this resource - */ - List( - (SPARK_JAR, sparkJar(sparkConf), CONF_SPARK_JAR, false), - (APP_JAR, args.userJar, CONF_SPARK_USER_JAR, true), - ("log4j.properties", oldLog4jConf.orNull, null, false) - ).foreach { case (destName, _localPath, confKey, setPermissions) => - val localPath: String = if (_localPath != null) _localPath.trim() else "" - if (!localPath.isEmpty()) { - val localURI = new URI(localPath) - if (localURI.getScheme != LOCAL_SCHEME) { - val src = getQualifiedLocalPath(localURI) - val destPath = copyFileToRemote(dst, src, replication, setPermissions) - val destFs = FileSystem.get(destPath.toUri(), hadoopConf) - distCacheMgr.addResource(destFs, hadoopConf, destPath, - localResources, LocalResourceType.FILE, destName, statCache) - } else if (confKey != null) { - // If the resource is intended for local use only, handle this downstream - // by setting the appropriate property - sparkConf.set(confKey, localPath) - } - } - } - - /** - * Do the same for any additional resources passed in through ClientArguments. - * Each resource category is represented by a 3-tuple of: - * (1) comma separated list of resources in this category, - * (2) resource type, and - * (3) whether to add these resources to the classpath - */ - val cachedSecondaryJarLinks = ListBuffer.empty[String] - List( - (args.addJars, LocalResourceType.FILE, true), - (args.files, LocalResourceType.FILE, false), - (args.archives, LocalResourceType.ARCHIVE, false) - ).foreach { case (flist, resType, addToClasspath) => - if (flist != null && !flist.isEmpty()) { - flist.split(',').foreach { file => - val localURI = new URI(file.trim()) - if (localURI.getScheme != LOCAL_SCHEME) { - val localPath = new Path(localURI) - val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName()) - val destPath = copyFileToRemote(dst, localPath, replication) - distCacheMgr.addResource( - fs, hadoopConf, destPath, localResources, resType, linkname, statCache) - if (addToClasspath) { - cachedSecondaryJarLinks += linkname - } - } else if (addToClasspath) { - // Resource is intended for local use only and should be added to the class path - cachedSecondaryJarLinks += file.trim() - } - } - } - } - if (cachedSecondaryJarLinks.nonEmpty) { - sparkConf.set(CONF_SPARK_YARN_SECONDARY_JARS, cachedSecondaryJarLinks.mkString(",")) - } - - localResources - } - - /** - * Set up the environment for launching our ApplicationMaster container. - */ - private def setupLaunchEnv(stagingDir: String): HashMap[String, String] = { - logInfo("Setting up the launch environment for our AM container") - val env = new HashMap[String, String]() - val extraCp = sparkConf.getOption("spark.driver.extraClassPath") - populateClasspath(args, yarnConf, sparkConf, env, extraCp) - env("SPARK_YARN_MODE") = "true" - env("SPARK_YARN_STAGING_DIR") = stagingDir - env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName() - - // Set the environment variables to be passed on to the executors. - distCacheMgr.setDistFilesEnv(env) - distCacheMgr.setDistArchivesEnv(env) - - // Pick up any environment variables for the AM provided through spark.yarn.appMasterEnv.* - val amEnvPrefix = "spark.yarn.appMasterEnv." - sparkConf.getAll - .filter { case (k, v) => k.startsWith(amEnvPrefix) } - .map { case (k, v) => (k.substring(amEnvPrefix.length), v) } - .foreach { case (k, v) => YarnSparkHadoopUtil.addPathToEnvironment(env, k, v) } - - // Keep this for backwards compatibility but users should move to the config - sys.env.get("SPARK_YARN_USER_ENV").foreach { userEnvs => - // Allow users to specify some environment variables. - YarnSparkHadoopUtil.setEnvFromInputString(env, userEnvs) - // Pass SPARK_YARN_USER_ENV itself to the AM so it can use it to set up executor environments. - env("SPARK_YARN_USER_ENV") = userEnvs - } - - // In cluster mode, if the deprecated SPARK_JAVA_OPTS is set, we need to propagate it to - // executors. But we can't just set spark.executor.extraJavaOptions, because the driver's - // SparkContext will not let that set spark* system properties, which is expected behavior for - // Yarn clients. So propagate it through the environment. - // - // Note that to warn the user about the deprecation in cluster mode, some code from - // SparkConf#validateSettings() is duplicated here (to avoid triggering the condition - // described above). - if (isLaunchingDriver) { - sys.env.get("SPARK_JAVA_OPTS").foreach { value => - val warning = - s""" - |SPARK_JAVA_OPTS was detected (set to '$value'). - |This is deprecated in Spark 1.0+. - | - |Please instead use: - | - ./spark-submit with conf/spark-defaults.conf to set defaults for an application - | - ./spark-submit with --driver-java-options to set -X options for a driver - | - spark.executor.extraJavaOptions to set -X options for executors - """.stripMargin - logWarning(warning) - for (proc <- Seq("driver", "executor")) { - val key = s"spark.$proc.extraJavaOptions" - if (sparkConf.contains(key)) { - throw new SparkException(s"Found both $key and SPARK_JAVA_OPTS. Use only the former.") - } - } - env("SPARK_JAVA_OPTS") = value - } - } - - env - } - - /** - * Set up a ContainerLaunchContext to launch our ApplicationMaster container. - * This sets up the launch environment, java options, and the command for launching the AM. - */ - protected def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse) - : ContainerLaunchContext = { - logInfo("Setting up container launch context for our AM") - - val appId = newAppResponse.getApplicationId - val appStagingDir = getAppStagingDir(appId) - val localResources = prepareLocalResources(appStagingDir) - val launchEnv = setupLaunchEnv(appStagingDir) - val amContainer = Records.newRecord(classOf[ContainerLaunchContext]) - amContainer.setLocalResources(localResources) - amContainer.setEnvironment(launchEnv) - - val javaOpts = ListBuffer[String]() - - // Set the environment variable through a command prefix - // to append to the existing value of the variable - var prefixEnv: Option[String] = None - - // Add Xmx for AM memory - javaOpts += "-Xmx" + args.amMemory + "m" - - val tmpDir = new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) - javaOpts += "-Djava.io.tmpdir=" + tmpDir - - // TODO: Remove once cpuset version is pushed out. - // The context is, default gc for server class machines ends up using all cores to do gc - - // hence if there are multiple containers in same node, Spark GC affects all other containers' - // performance (which can be that of other Spark containers) - // Instead of using this, rely on cpusets by YARN to enforce "proper" Spark behavior in - // multi-tenant environments. Not sure how default Java GC behaves if it is limited to subset - // of cores on a node. - val useConcurrentAndIncrementalGC = launchEnv.get("SPARK_USE_CONC_INCR_GC").exists(_.toBoolean) - if (useConcurrentAndIncrementalGC) { - // In our expts, using (default) throughput collector has severe perf ramifications in - // multi-tenant machines - javaOpts += "-XX:+UseConcMarkSweepGC" - javaOpts += "-XX:+CMSIncrementalMode" - javaOpts += "-XX:+CMSIncrementalPacing" - javaOpts += "-XX:CMSIncrementalDutyCycleMin=0" - javaOpts += "-XX:CMSIncrementalDutyCycle=10" - } - - // Forward the Spark configuration to the application master / executors. - // TODO: it might be nicer to pass these as an internal environment variable rather than - // as Java options, due to complications with string parsing of nested quotes. - for ((k, v) <- sparkConf.getAll) { - javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") - } - - // Include driver-specific java options if we are launching a driver - if (isLaunchingDriver) { - sparkConf.getOption("spark.driver.extraJavaOptions") - .orElse(sys.env.get("SPARK_JAVA_OPTS")) - .foreach(opts => javaOpts += opts) - val libraryPaths = Seq(sys.props.get("spark.driver.extraLibraryPath"), - sys.props.get("spark.driver.libraryPath")).flatten - if (libraryPaths.nonEmpty) { - prefixEnv = Some(Utils.libraryPathEnvPrefix(libraryPaths)) - } - } - - // For log4j configuration to reference - javaOpts += ("-Dspark.yarn.app.container.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR) - - val userClass = - if (isLaunchingDriver) { - Seq("--class", YarnSparkHadoopUtil.escapeForShell(args.userClass)) - } else { - Nil - } - val userJar = - if (args.userJar != null) { - Seq("--jar", args.userJar) - } else { - Nil - } - val amClass = - if (isLaunchingDriver) { - Class.forName("org.apache.spark.deploy.yarn.ApplicationMaster").getName - } else { - Class.forName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName - } - val userArgs = args.userArgs.flatMap { arg => - Seq("--arg", YarnSparkHadoopUtil.escapeForShell(arg)) - } - val amArgs = - Seq(amClass) ++ userClass ++ userJar ++ userArgs ++ - Seq( - "--executor-memory", args.executorMemory.toString + "m", - "--executor-cores", args.executorCores.toString, - "--num-executors ", args.numExecutors.toString) - - // Command for the ApplicationMaster - val commands = prefixEnv ++ Seq(Environment.JAVA_HOME.$() + "/bin/java", "-server") ++ - javaOpts ++ amArgs ++ - Seq( - "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout", - "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr") - - // TODO: it would be nicer to just make sure there are no null commands here - val printableCommands = commands.map(s => if (s == null) "null" else s).toList - amContainer.setCommands(printableCommands) - - logDebug("===============================================================================") - logDebug("Yarn AM launch context:") - logDebug(s" user class: ${Option(args.userClass).getOrElse("N/A")}") - logDebug(" env:") - launchEnv.foreach { case (k, v) => logDebug(s" $k -> $v") } - logDebug(" resources:") - localResources.foreach { case (k, v) => logDebug(s" $k -> $v")} - logDebug(" command:") - logDebug(s" ${printableCommands.mkString(" ")}") - logDebug("===============================================================================") - - // send the acl settings into YARN to control who has access via YARN interfaces - val securityManager = new SecurityManager(sparkConf) - amContainer.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityManager)) - setupSecurityToken(amContainer) - UserGroupInformation.getCurrentUser().addCredentials(credentials) - - amContainer - } - - /** - * Report the state of an application until it has exited, either successfully or - * due to some failure, then return a pair of the yarn application state (FINISHED, FAILED, - * KILLED, or RUNNING) and the final application state (UNDEFINED, SUCCEEDED, FAILED, - * or KILLED). - * - * @param appId ID of the application to monitor. - * @param returnOnRunning Whether to also return the application state when it is RUNNING. - * @param logApplicationReport Whether to log details of the application report every iteration. - * @return A pair of the yarn application state and the final application state. - */ - def monitorApplication( - appId: ApplicationId, - returnOnRunning: Boolean = false, - logApplicationReport: Boolean = true): (YarnApplicationState, FinalApplicationStatus) = { - val interval = sparkConf.getLong("spark.yarn.report.interval", 1000) - var lastState: YarnApplicationState = null - while (true) { - Thread.sleep(interval) - val report = getApplicationReport(appId) - val state = report.getYarnApplicationState - - if (logApplicationReport) { - logInfo(s"Application report for $appId (state: $state)") - val details = Seq[(String, String)]( - ("client token", getClientToken(report)), - ("diagnostics", report.getDiagnostics), - ("ApplicationMaster host", report.getHost), - ("ApplicationMaster RPC port", report.getRpcPort.toString), - ("queue", report.getQueue), - ("start time", report.getStartTime.toString), - ("final status", report.getFinalApplicationStatus.toString), - ("tracking URL", report.getTrackingUrl), - ("user", report.getUser) - ) - - // Use more loggable format if value is null or empty - val formattedDetails = details - .map { case (k, v) => - val newValue = Option(v).filter(_.nonEmpty).getOrElse("N/A") - s"\n\t $k: $newValue" } - .mkString("") - - // If DEBUG is enabled, log report details every iteration - // Otherwise, log them every time the application changes state - if (log.isDebugEnabled) { - logDebug(formattedDetails) - } else if (lastState != state) { - logInfo(formattedDetails) - } - } - - if (state == YarnApplicationState.FINISHED || - state == YarnApplicationState.FAILED || - state == YarnApplicationState.KILLED) { - return (state, report.getFinalApplicationStatus) - } - - if (returnOnRunning && state == YarnApplicationState.RUNNING) { - return (state, report.getFinalApplicationStatus) - } - - lastState = state - } - - // Never reached, but keeps compiler happy - throw new SparkException("While loop is depleted! This should never happen...") - } - - /** - * Submit an application to the ResourceManager and monitor its state. - * This continues until the application has exited for any reason. - * If the application finishes with a failed, killed, or undefined status, - * throw an appropriate SparkException. - */ - def run(): Unit = { - val (yarnApplicationState, finalApplicationStatus) = monitorApplication(submitApplication()) - if (yarnApplicationState == YarnApplicationState.FAILED || - finalApplicationStatus == FinalApplicationStatus.FAILED) { - throw new SparkException("Application finished with failed status") - } - if (yarnApplicationState == YarnApplicationState.KILLED || - finalApplicationStatus == FinalApplicationStatus.KILLED) { - throw new SparkException("Application is killed") - } - if (finalApplicationStatus == FinalApplicationStatus.UNDEFINED) { - throw new SparkException("The final status of application is undefined") - } - } - - /* --------------------------------------------------------------------------------------- * - | Methods that cannot be implemented here due to API differences across hadoop versions | - * --------------------------------------------------------------------------------------- */ - - /** Submit an application running our ApplicationMaster to the ResourceManager. */ - def submitApplication(): ApplicationId - - /** Set up security tokens for launching our ApplicationMaster container. */ - protected def setupSecurityToken(containerContext: ContainerLaunchContext): Unit - - /** Get the application report from the ResourceManager for an application we have submitted. */ - protected def getApplicationReport(appId: ApplicationId): ApplicationReport - - /** - * Return the security token used by this client to communicate with the ApplicationMaster. - * If no security is enabled, the token returned by the report is null. - */ - protected def getClientToken(report: ApplicationReport): String -} - -private[spark] object ClientBase extends Logging { - - // Alias for the Spark assembly jar and the user jar - val SPARK_JAR: String = "__spark__.jar" - val APP_JAR: String = "__app__.jar" - - // URI scheme that identifies local resources - val LOCAL_SCHEME = "local" - - // Staging directory for any temporary jars or files - val SPARK_STAGING: String = ".sparkStaging" - - // Location of any user-defined Spark jars - val CONF_SPARK_JAR = "spark.yarn.jar" - val ENV_SPARK_JAR = "SPARK_JAR" - - // Internal config to propagate the location of the user's jar to the driver/executors - val CONF_SPARK_USER_JAR = "spark.yarn.user.jar" - - // Internal config to propagate the locations of any extra jars to add to the classpath - // of the executors - val CONF_SPARK_YARN_SECONDARY_JARS = "spark.yarn.secondary.jars" - - // Staging directory is private! -> rwx-------- - val STAGING_DIR_PERMISSION: FsPermission = - FsPermission.createImmutable(Integer.parseInt("700", 8).toShort) - - // App files are world-wide readable and owner writable -> rw-r--r-- - val APP_FILE_PERMISSION: FsPermission = - FsPermission.createImmutable(Integer.parseInt("644", 8).toShort) - - /** - * Find the user-defined Spark jar if configured, or return the jar containing this - * class if not. - * - * This method first looks in the SparkConf object for the CONF_SPARK_JAR key, and in the - * user environment if that is not found (for backwards compatibility). - */ - private def sparkJar(conf: SparkConf): String = { - if (conf.contains(CONF_SPARK_JAR)) { - conf.get(CONF_SPARK_JAR) - } else if (System.getenv(ENV_SPARK_JAR) != null) { - logWarning( - s"$ENV_SPARK_JAR detected in the system environment. This variable has been deprecated " + - s"in favor of the $CONF_SPARK_JAR configuration variable.") - System.getenv(ENV_SPARK_JAR) - } else { - SparkContext.jarOfClass(this.getClass).head - } - } - - /** - * Return the path to the given application's staging directory. - */ - private def getAppStagingDir(appId: ApplicationId): String = { - SPARK_STAGING + Path.SEPARATOR + appId.toString() + Path.SEPARATOR - } - - /** - * Populate the classpath entry in the given environment map with any application - * classpath specified through the Hadoop and Yarn configurations. - */ - def populateHadoopClasspath(conf: Configuration, env: HashMap[String, String]): Unit = { - val classPathElementsToAdd = getYarnAppClasspath(conf) ++ getMRAppClasspath(conf) - for (c <- classPathElementsToAdd.flatten) { - YarnSparkHadoopUtil.addPathToEnvironment(env, Environment.CLASSPATH.name, c.trim) - } - } - - private def getYarnAppClasspath(conf: Configuration): Option[Seq[String]] = - Option(conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH)) match { - case Some(s) => Some(s.toSeq) - case None => getDefaultYarnApplicationClasspath - } - - private def getMRAppClasspath(conf: Configuration): Option[Seq[String]] = - Option(conf.getStrings("mapreduce.application.classpath")) match { - case Some(s) => Some(s.toSeq) - case None => getDefaultMRApplicationClasspath - } - - def getDefaultYarnApplicationClasspath: Option[Seq[String]] = { - val triedDefault = Try[Seq[String]] { - val field = classOf[YarnConfiguration].getField("DEFAULT_YARN_APPLICATION_CLASSPATH") - val value = field.get(null).asInstanceOf[Array[String]] - value.toSeq - } recoverWith { - case e: NoSuchFieldException => Success(Seq.empty[String]) - } - - triedDefault match { - case f: Failure[_] => - logError("Unable to obtain the default YARN Application classpath.", f.exception) - case s: Success[_] => - logDebug(s"Using the default YARN application classpath: ${s.get.mkString(",")}") - } - - triedDefault.toOption - } - - /** - * In Hadoop 0.23, the MR application classpath comes with the YARN application - * classpath. In Hadoop 2.0, it's an array of Strings, and in 2.2+ it's a String. - * So we need to use reflection to retrieve it. - */ - def getDefaultMRApplicationClasspath: Option[Seq[String]] = { - val triedDefault = Try[Seq[String]] { - val field = classOf[MRJobConfig].getField("DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH") - val value = if (field.getType == classOf[String]) { - StringUtils.getStrings(field.get(null).asInstanceOf[String]).toArray - } else { - field.get(null).asInstanceOf[Array[String]] - } - value.toSeq - } recoverWith { - case e: NoSuchFieldException => Success(Seq.empty[String]) - } - - triedDefault match { - case f: Failure[_] => - logError("Unable to obtain the default MR Application classpath.", f.exception) - case s: Success[_] => - logDebug(s"Using the default MR application classpath: ${s.get.mkString(",")}") - } - - triedDefault.toOption - } - - /** - * Populate the classpath entry in the given environment map. - * This includes the user jar, Spark jar, and any extra application jars. - */ - def populateClasspath( - args: ClientArguments, - conf: Configuration, - sparkConf: SparkConf, - env: HashMap[String, String], - extraClassPath: Option[String] = None): Unit = { - extraClassPath.foreach(addClasspathEntry(_, env)) - addClasspathEntry(Environment.PWD.$(), env) - - // Normally the users app.jar is last in case conflicts with spark jars - if (sparkConf.get("spark.yarn.user.classpath.first", "false").toBoolean) { - addUserClasspath(args, sparkConf, env) - addFileToClasspath(sparkJar(sparkConf), SPARK_JAR, env) - populateHadoopClasspath(conf, env) - } else { - addFileToClasspath(sparkJar(sparkConf), SPARK_JAR, env) - populateHadoopClasspath(conf, env) - addUserClasspath(args, sparkConf, env) - } - - // Append all jar files under the working directory to the classpath. - addClasspathEntry(Environment.PWD.$() + Path.SEPARATOR + "*", env) - } - - /** - * Adds the user jars which have local: URIs (or alternate names, such as APP_JAR) explicitly - * to the classpath. - */ - private def addUserClasspath( - args: ClientArguments, - conf: SparkConf, - env: HashMap[String, String]): Unit = { - - // If `args` is not null, we are launching an AM container. - // Otherwise, we are launching executor containers. - val (mainJar, secondaryJars) = - if (args != null) { - (args.userJar, args.addJars) - } else { - (conf.get(CONF_SPARK_USER_JAR, null), conf.get(CONF_SPARK_YARN_SECONDARY_JARS, null)) - } - - addFileToClasspath(mainJar, APP_JAR, env) - if (secondaryJars != null) { - secondaryJars.split(",").filter(_.nonEmpty).foreach { jar => - addFileToClasspath(jar, null, env) - } - } - } - - /** - * Adds the given path to the classpath, handling "local:" URIs correctly. - * - * If an alternate name for the file is given, and it's not a "local:" file, the alternate - * name will be added to the classpath (relative to the job's work directory). - * - * If not a "local:" file and no alternate name, the environment is not modified. - * - * @param path Path to add to classpath (optional). - * @param fileName Alternate name for the file (optional). - * @param env Map holding the environment variables. - */ - private def addFileToClasspath( - path: String, - fileName: String, - env: HashMap[String, String]): Unit = { - if (path != null) { - scala.util.control.Exception.ignoring(classOf[URISyntaxException]) { - val uri = new URI(path) - if (uri.getScheme == LOCAL_SCHEME) { - addClasspathEntry(uri.getPath, env) - return - } - } - } - if (fileName != null) { - addClasspathEntry(Environment.PWD.$() + Path.SEPARATOR + fileName, env) - } - } - - /** - * Add the given path to the classpath entry of the given environment map. - * If the classpath is already set, this appends the new path to the existing classpath. - */ - private def addClasspathEntry(path: String, env: HashMap[String, String]): Unit = - YarnSparkHadoopUtil.addPathToEnvironment(env, Environment.CLASSPATH.name, path) - - /** - * Get the list of namenodes the user may access. - */ - def getNameNodesToAccess(sparkConf: SparkConf): Set[Path] = { - sparkConf.get("spark.yarn.access.namenodes", "") - .split(",") - .map(_.trim()) - .filter(!_.isEmpty) - .map(new Path(_)) - .toSet - } - - def getTokenRenewer(conf: Configuration): String = { - val delegTokenRenewer = Master.getMasterPrincipal(conf) - logDebug("delegation token renewer is: " + delegTokenRenewer) - if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) { - val errorMessage = "Can't get Master Kerberos principal for use as renewer" - logError(errorMessage) - throw new SparkException(errorMessage) - } - delegTokenRenewer - } - - /** - * Obtains tokens for the namenodes passed in and adds them to the credentials. - */ - def obtainTokensForNamenodes( - paths: Set[Path], - conf: Configuration, - creds: Credentials): Unit = { - if (UserGroupInformation.isSecurityEnabled()) { - val delegTokenRenewer = getTokenRenewer(conf) - paths.foreach { dst => - val dstFs = dst.getFileSystem(conf) - logDebug("getting token for namenode: " + dst) - dstFs.addDelegationTokens(delegTokenRenewer, creds) - } - } - } - - /** - * Return whether the two file systems are the same. - */ - private def compareFs(srcFs: FileSystem, destFs: FileSystem): Boolean = { - val srcUri = srcFs.getUri() - val dstUri = destFs.getUri() - if (srcUri.getScheme() == null || srcUri.getScheme() != dstUri.getScheme()) { - return false - } - - var srcHost = srcUri.getHost() - var dstHost = dstUri.getHost() - - // In HA or when using viewfs, the host part of the URI may not actually be a host, but the - // name of the HDFS namespace. Those names won't resolve, so avoid even trying if they - // match. - if (srcHost != null && dstHost != null && srcHost != dstHost) { - try { - srcHost = InetAddress.getByName(srcHost).getCanonicalHostName() - dstHost = InetAddress.getByName(dstHost).getCanonicalHostName() - } catch { - case e: UnknownHostException => - return false - } - } - - Objects.equal(srcHost, dstHost) && srcUri.getPort() == dstUri.getPort() - } - -}
http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala ---------------------------------------------------------------------- diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala deleted file mode 100644 index c592ecf..0000000 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala +++ /dev/null @@ -1,207 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.yarn - -import java.net.URI - -import scala.collection.mutable.{HashMap, LinkedHashMap, Map} - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} -import org.apache.hadoop.fs.permission.FsAction -import org.apache.hadoop.yarn.api.records._ -import org.apache.hadoop.yarn.util.{Records, ConverterUtils} - -import org.apache.spark.Logging - -/** Client side methods to setup the Hadoop distributed cache */ -private[spark] class ClientDistributedCacheManager() extends Logging { - - // Mappings from remote URI to (file status, modification time, visibility) - private val distCacheFiles: Map[String, (String, String, String)] = - LinkedHashMap[String, (String, String, String)]() - private val distCacheArchives: Map[String, (String, String, String)] = - LinkedHashMap[String, (String, String, String)]() - - - /** - * Add a resource to the list of distributed cache resources. This list can - * be sent to the ApplicationMaster and possibly the executors so that it can - * be downloaded into the Hadoop distributed cache for use by this application. - * Adds the LocalResource to the localResources HashMap passed in and saves - * the stats of the resources to they can be sent to the executors and verified. - * - * @param fs FileSystem - * @param conf Configuration - * @param destPath path to the resource - * @param localResources localResource hashMap to insert the resource into - * @param resourceType LocalResourceType - * @param link link presented in the distributed cache to the destination - * @param statCache cache to store the file/directory stats - * @param appMasterOnly Whether to only add the resource to the app master - */ - def addResource( - fs: FileSystem, - conf: Configuration, - destPath: Path, - localResources: HashMap[String, LocalResource], - resourceType: LocalResourceType, - link: String, - statCache: Map[URI, FileStatus], - appMasterOnly: Boolean = false): Unit = { - val destStatus = fs.getFileStatus(destPath) - val amJarRsrc = Records.newRecord(classOf[LocalResource]) - amJarRsrc.setType(resourceType) - val visibility = getVisibility(conf, destPath.toUri(), statCache) - amJarRsrc.setVisibility(visibility) - amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(destPath)) - amJarRsrc.setTimestamp(destStatus.getModificationTime()) - amJarRsrc.setSize(destStatus.getLen()) - if (link == null || link.isEmpty()) throw new Exception("You must specify a valid link name") - localResources(link) = amJarRsrc - - if (!appMasterOnly) { - val uri = destPath.toUri() - val pathURI = new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(), null, link) - if (resourceType == LocalResourceType.FILE) { - distCacheFiles(pathURI.toString()) = (destStatus.getLen().toString(), - destStatus.getModificationTime().toString(), visibility.name()) - } else { - distCacheArchives(pathURI.toString()) = (destStatus.getLen().toString(), - destStatus.getModificationTime().toString(), visibility.name()) - } - } - } - - /** - * Adds the necessary cache file env variables to the env passed in - */ - def setDistFilesEnv(env: Map[String, String]): Unit = { - val (keys, tupleValues) = distCacheFiles.unzip - val (sizes, timeStamps, visibilities) = tupleValues.unzip3 - if (keys.size > 0) { - env("SPARK_YARN_CACHE_FILES") = keys.reduceLeft[String] { (acc,n) => acc + "," + n } - env("SPARK_YARN_CACHE_FILES_TIME_STAMPS") = - timeStamps.reduceLeft[String] { (acc,n) => acc + "," + n } - env("SPARK_YARN_CACHE_FILES_FILE_SIZES") = - sizes.reduceLeft[String] { (acc,n) => acc + "," + n } - env("SPARK_YARN_CACHE_FILES_VISIBILITIES") = - visibilities.reduceLeft[String] { (acc,n) => acc + "," + n } - } - } - - /** - * Adds the necessary cache archive env variables to the env passed in - */ - def setDistArchivesEnv(env: Map[String, String]): Unit = { - val (keys, tupleValues) = distCacheArchives.unzip - val (sizes, timeStamps, visibilities) = tupleValues.unzip3 - if (keys.size > 0) { - env("SPARK_YARN_CACHE_ARCHIVES") = keys.reduceLeft[String] { (acc,n) => acc + "," + n } - env("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") = - timeStamps.reduceLeft[String] { (acc,n) => acc + "," + n } - env("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") = - sizes.reduceLeft[String] { (acc,n) => acc + "," + n } - env("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") = - visibilities.reduceLeft[String] { (acc,n) => acc + "," + n } - } - } - - /** - * Returns the local resource visibility depending on the cache file permissions - * @return LocalResourceVisibility - */ - def getVisibility( - conf: Configuration, - uri: URI, - statCache: Map[URI, FileStatus]): LocalResourceVisibility = { - if (isPublic(conf, uri, statCache)) { - LocalResourceVisibility.PUBLIC - } else { - LocalResourceVisibility.PRIVATE - } - } - - /** - * Returns a boolean to denote whether a cache file is visible to all (public) - * @return true if the path in the uri is visible to all, false otherwise - */ - def isPublic(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]): Boolean = { - val fs = FileSystem.get(uri, conf) - val current = new Path(uri.getPath()) - // the leaf level file should be readable by others - if (!checkPermissionOfOther(fs, current, FsAction.READ, statCache)) { - return false - } - ancestorsHaveExecutePermissions(fs, current.getParent(), statCache) - } - - /** - * Returns true if all ancestors of the specified path have the 'execute' - * permission set for all users (i.e. that other users can traverse - * the directory hierarchy to the given path) - * @return true if all ancestors have the 'execute' permission set for all users - */ - def ancestorsHaveExecutePermissions( - fs: FileSystem, - path: Path, - statCache: Map[URI, FileStatus]): Boolean = { - var current = path - while (current != null) { - // the subdirs in the path should have execute permissions for others - if (!checkPermissionOfOther(fs, current, FsAction.EXECUTE, statCache)) { - return false - } - current = current.getParent() - } - true - } - - /** - * Checks for a given path whether the Other permissions on it - * imply the permission in the passed FsAction - * @return true if the path in the uri is visible to all, false otherwise - */ - def checkPermissionOfOther( - fs: FileSystem, - path: Path, - action: FsAction, - statCache: Map[URI, FileStatus]): Boolean = { - val status = getFileStatus(fs, path.toUri(), statCache) - val perms = status.getPermission() - val otherAction = perms.getOtherAction() - otherAction.implies(action) - } - - /** - * Checks to see if the given uri exists in the cache, if it does it - * returns the existing FileStatus, otherwise it stats the uri, stores - * it in the cache, and returns the FileStatus. - * @return FileStatus - */ - def getFileStatus(fs: FileSystem, uri: URI, statCache: Map[URI, FileStatus]): FileStatus = { - val stat = statCache.get(uri) match { - case Some(existstat) => existstat - case None => - val newStat = fs.getFileStatus(new Path(uri)) - statCache.put(uri, newStat) - newStat - } - stat - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala ---------------------------------------------------------------------- diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala deleted file mode 100644 index 88dad0f..0000000 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala +++ /dev/null @@ -1,202 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.yarn - -import java.net.URI - -import scala.collection.JavaConversions._ -import scala.collection.mutable.{HashMap, ListBuffer} - -import org.apache.hadoop.fs.Path -import org.apache.hadoop.yarn.api._ -import org.apache.hadoop.yarn.api.ApplicationConstants.Environment -import org.apache.hadoop.yarn.api.records._ -import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.apache.hadoop.yarn.util.{ConverterUtils, Records} - -import org.apache.spark.{Logging, SparkConf} -import org.apache.spark.util.Utils - -trait ExecutorRunnableUtil extends Logging { - - val yarnConf: YarnConfiguration - val sparkConf: SparkConf - lazy val env = prepareEnvironment - - def prepareCommand( - masterAddress: String, - slaveId: String, - hostname: String, - executorMemory: Int, - executorCores: Int, - appId: String, - localResources: HashMap[String, LocalResource]): List[String] = { - // Extra options for the JVM - val javaOpts = ListBuffer[String]() - - // Set the environment variable through a command prefix - // to append to the existing value of the variable - var prefixEnv: Option[String] = None - - // Set the JVM memory - val executorMemoryString = executorMemory + "m" - javaOpts += "-Xms" + executorMemoryString + " -Xmx" + executorMemoryString + " " - - // Set extra Java options for the executor, if defined - sys.props.get("spark.executor.extraJavaOptions").foreach { opts => - javaOpts += opts - } - sys.env.get("SPARK_JAVA_OPTS").foreach { opts => - javaOpts += opts - } - sys.props.get("spark.executor.extraLibraryPath").foreach { p => - prefixEnv = Some(Utils.libraryPathEnvPrefix(Seq(p))) - } - - javaOpts += "-Djava.io.tmpdir=" + - new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) - - // Certain configs need to be passed here because they are needed before the Executor - // registers with the Scheduler and transfers the spark configs. Since the Executor backend - // uses Akka to connect to the scheduler, the akka settings are needed as well as the - // authentication settings. - sparkConf.getAll. - filter { case (k, v) => k.startsWith("spark.auth") || k.startsWith("spark.akka") }. - foreach { case (k, v) => javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") } - - sparkConf.getAkkaConf. - foreach { case (k, v) => javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") } - - // Commenting it out for now - so that people can refer to the properties if required. Remove - // it once cpuset version is pushed out. - // The context is, default gc for server class machines end up using all cores to do gc - hence - // if there are multiple containers in same node, spark gc effects all other containers - // performance (which can also be other spark containers) - // Instead of using this, rely on cpusets by YARN to enforce spark behaves 'properly' in - // multi-tenant environments. Not sure how default java gc behaves if it is limited to subset - // of cores on a node. - /* - else { - // If no java_opts specified, default to using -XX:+CMSIncrementalMode - // It might be possible that other modes/config is being done in spark.executor.extraJavaOptions, - // so we dont want to mess with it. - // In our expts, using (default) throughput collector has severe perf ramnifications in - // multi-tennent machines - // The options are based on - // http://www.oracle.com/technetwork/java/gc-tuning-5-138395.html#0.0.0.%20When%20to%20Use%20the%20Concurrent%20Low%20Pause%20Collector|outline - javaOpts += " -XX:+UseConcMarkSweepGC " - javaOpts += " -XX:+CMSIncrementalMode " - javaOpts += " -XX:+CMSIncrementalPacing " - javaOpts += " -XX:CMSIncrementalDutyCycleMin=0 " - javaOpts += " -XX:CMSIncrementalDutyCycle=10 " - } - */ - - // For log4j configuration to reference - javaOpts += ("-Dspark.yarn.app.container.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR) - - val commands = prefixEnv ++ Seq(Environment.JAVA_HOME.$() + "/bin/java", - "-server", - // Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling. - // Not killing the task leaves various aspects of the executor and (to some extent) the jvm in - // an inconsistent state. - // TODO: If the OOM is not recoverable by rescheduling it on different node, then do - // 'something' to fail job ... akin to blacklisting trackers in mapred ? - "-XX:OnOutOfMemoryError='kill %p'") ++ - javaOpts ++ - Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend", - masterAddress.toString, - slaveId.toString, - hostname.toString, - executorCores.toString, - appId, - "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout", - "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr") - - // TODO: it would be nicer to just make sure there are no null commands here - commands.map(s => if (s == null) "null" else s).toList - } - - private def setupDistributedCache( - file: String, - rtype: LocalResourceType, - localResources: HashMap[String, LocalResource], - timestamp: String, - size: String, - vis: String): Unit = { - val uri = new URI(file) - val amJarRsrc = Records.newRecord(classOf[LocalResource]) - amJarRsrc.setType(rtype) - amJarRsrc.setVisibility(LocalResourceVisibility.valueOf(vis)) - amJarRsrc.setResource(ConverterUtils.getYarnUrlFromURI(uri)) - amJarRsrc.setTimestamp(timestamp.toLong) - amJarRsrc.setSize(size.toLong) - localResources(uri.getFragment()) = amJarRsrc - } - - def prepareLocalResources: HashMap[String, LocalResource] = { - logInfo("Preparing Local resources") - val localResources = HashMap[String, LocalResource]() - - if (System.getenv("SPARK_YARN_CACHE_FILES") != null) { - val timeStamps = System.getenv("SPARK_YARN_CACHE_FILES_TIME_STAMPS").split(',') - val fileSizes = System.getenv("SPARK_YARN_CACHE_FILES_FILE_SIZES").split(',') - val distFiles = System.getenv("SPARK_YARN_CACHE_FILES").split(',') - val visibilities = System.getenv("SPARK_YARN_CACHE_FILES_VISIBILITIES").split(',') - for( i <- 0 to distFiles.length - 1) { - setupDistributedCache(distFiles(i), LocalResourceType.FILE, localResources, timeStamps(i), - fileSizes(i), visibilities(i)) - } - } - - if (System.getenv("SPARK_YARN_CACHE_ARCHIVES") != null) { - val timeStamps = System.getenv("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS").split(',') - val fileSizes = System.getenv("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES").split(',') - val distArchives = System.getenv("SPARK_YARN_CACHE_ARCHIVES").split(',') - val visibilities = System.getenv("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES").split(',') - for( i <- 0 to distArchives.length - 1) { - setupDistributedCache(distArchives(i), LocalResourceType.ARCHIVE, localResources, - timeStamps(i), fileSizes(i), visibilities(i)) - } - } - - logInfo("Prepared Local resources " + localResources) - localResources - } - - def prepareEnvironment: HashMap[String, String] = { - val env = new HashMap[String, String]() - val extraCp = sparkConf.getOption("spark.executor.extraClassPath") - ClientBase.populateClasspath(null, yarnConf, sparkConf, env, extraCp) - - sparkConf.getExecutorEnv.foreach { case (key, value) => - // This assumes each executor environment variable set here is a path - // This is kept for backward compatibility and consistency with hadoop - YarnSparkHadoopUtil.addPathToEnvironment(env, key, value) - } - - // Keep this for backwards compatibility but users should move to the config - sys.env.get("SPARK_YARN_USER_ENV").foreach { userEnvs => - YarnSparkHadoopUtil.setEnvFromInputString(env, userEnvs) - } - - System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k, v) => env(k) = v } - env - } - -} http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala ---------------------------------------------------------------------- diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala deleted file mode 100644 index b32e157..0000000 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ /dev/null @@ -1,538 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.yarn - -import java.util.{List => JList} -import java.util.concurrent._ -import java.util.concurrent.atomic.AtomicInteger -import java.util.regex.Pattern - -import scala.collection.JavaConversions._ -import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.yarn.api.records._ -import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse - -import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv} -import org.apache.spark.scheduler.{SplitInfo, TaskSchedulerImpl} -import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend - -import com.google.common.util.concurrent.ThreadFactoryBuilder -import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ - -object AllocationType extends Enumeration { - type AllocationType = Value - val HOST, RACK, ANY = Value -} - -// TODO: -// Too many params. -// Needs to be mt-safe -// Need to refactor this to make it 'cleaner' ... right now, all computation is reactive - should -// make it more proactive and decoupled. - -// Note that right now, we assume all node asks as uniform in terms of capabilities and priority -// Refer to http://developer.yahoo.com/blogs/hadoop/posts/2011/03/mapreduce-nextgen-scheduler/ for -// more info on how we are requesting for containers. - -/** - * Common code for the Yarn container allocator. Contains all the version-agnostic code to - * manage container allocation for a running Spark application. - */ -private[yarn] abstract class YarnAllocator( - conf: Configuration, - sparkConf: SparkConf, - appAttemptId: ApplicationAttemptId, - args: ApplicationMasterArguments, - preferredNodes: collection.Map[String, collection.Set[SplitInfo]], - securityMgr: SecurityManager) - extends Logging { - - import YarnAllocator._ - - // These three are locked on allocatedHostToContainersMap. Complementary data structures - // allocatedHostToContainersMap : containers which are running : host, Set<containerid> - // allocatedContainerToHostMap: container to host mapping. - private val allocatedHostToContainersMap = - new HashMap[String, collection.mutable.Set[ContainerId]]() - - private val allocatedContainerToHostMap = new HashMap[ContainerId, String]() - - // allocatedRackCount is populated ONLY if allocation happens (or decremented if this is an - // allocated node) - // As with the two data structures above, tightly coupled with them, and to be locked on - // allocatedHostToContainersMap - private val allocatedRackCount = new HashMap[String, Int]() - - // Containers to be released in next request to RM - private val releasedContainers = new ConcurrentHashMap[ContainerId, Boolean] - - // Number of container requests that have been sent to, but not yet allocated by the - // ApplicationMaster. - private val numPendingAllocate = new AtomicInteger() - private val numExecutorsRunning = new AtomicInteger() - // Used to generate a unique id per executor - private val executorIdCounter = new AtomicInteger() - private val numExecutorsFailed = new AtomicInteger() - - private var maxExecutors = args.numExecutors - - // Keep track of which container is running which executor to remove the executors later - private val executorIdToContainer = new HashMap[String, Container] - - protected val executorMemory = args.executorMemory - protected val executorCores = args.executorCores - protected val (preferredHostToCount, preferredRackToCount) = - generateNodeToWeight(conf, preferredNodes) - - // Additional memory overhead - in mb. - protected val memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead", - math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN)) - - private val launcherPool = new ThreadPoolExecutor( - // max pool size of Integer.MAX_VALUE is ignored because we use an unbounded queue - sparkConf.getInt("spark.yarn.containerLauncherMaxThreads", 25), Integer.MAX_VALUE, - 1, TimeUnit.MINUTES, - new LinkedBlockingQueue[Runnable](), - new ThreadFactoryBuilder().setNameFormat("ContainerLauncher #%d").setDaemon(true).build()) - launcherPool.allowCoreThreadTimeOut(true) - - def getNumExecutorsRunning: Int = numExecutorsRunning.intValue - - def getNumExecutorsFailed: Int = numExecutorsFailed.intValue - - /** - * Request as many executors from the ResourceManager as needed to reach the desired total. - * This takes into account executors already running or pending. - */ - def requestTotalExecutors(requestedTotal: Int): Unit = synchronized { - val currentTotal = numPendingAllocate.get + numExecutorsRunning.get - if (requestedTotal > currentTotal) { - maxExecutors += (requestedTotal - currentTotal) - // We need to call `allocateResources` here to avoid the following race condition: - // If we request executors twice before `allocateResources` is called, then we will end up - // double counting the number requested because `numPendingAllocate` is not updated yet. - allocateResources() - } else { - logInfo(s"Not allocating more executors because there are already $currentTotal " + - s"(application requested $requestedTotal total)") - } - } - - /** - * Request that the ResourceManager release the container running the specified executor. - */ - def killExecutor(executorId: String): Unit = synchronized { - if (executorIdToContainer.contains(executorId)) { - val container = executorIdToContainer.remove(executorId).get - internalReleaseContainer(container) - numExecutorsRunning.decrementAndGet() - maxExecutors -= 1 - assert(maxExecutors >= 0, "Allocator killed more executors than are allocated!") - } else { - logWarning(s"Attempted to kill unknown executor $executorId!") - } - } - - /** - * Allocate missing containers based on the number of executors currently pending and running. - * - * This method prioritizes the allocated container responses from the RM based on node and - * rack locality. Additionally, it releases any extra containers allocated for this application - * but are not needed. This must be synchronized because variables read in this block are - * mutated by other methods. - */ - def allocateResources(): Unit = synchronized { - val missing = maxExecutors - numPendingAllocate.get() - numExecutorsRunning.get() - - // this is needed by alpha, do it here since we add numPending right after this - val executorsPending = numPendingAllocate.get() - if (missing > 0) { - val totalExecutorMemory = executorMemory + memoryOverhead - numPendingAllocate.addAndGet(missing) - logInfo(s"Will allocate $missing executor containers, each with $totalExecutorMemory MB " + - s"memory including $memoryOverhead MB overhead") - } else { - logDebug("Empty allocation request ...") - } - - val allocateResponse = allocateContainers(missing, executorsPending) - val allocatedContainers = allocateResponse.getAllocatedContainers() - - if (allocatedContainers.size > 0) { - var numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * allocatedContainers.size) - - if (numPendingAllocateNow < 0) { - numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * numPendingAllocateNow) - } - - logDebug(""" - Allocated containers: %d - Current executor count: %d - Containers released: %s - Cluster resources: %s - """.format( - allocatedContainers.size, - numExecutorsRunning.get(), - releasedContainers, - allocateResponse.getAvailableResources)) - - val hostToContainers = new HashMap[String, ArrayBuffer[Container]]() - - for (container <- allocatedContainers) { - if (isResourceConstraintSatisfied(container)) { - // Add the accepted `container` to the host's list of already accepted, - // allocated containers - val host = container.getNodeId.getHost - val containersForHost = hostToContainers.getOrElseUpdate(host, - new ArrayBuffer[Container]()) - containersForHost += container - } else { - // Release container, since it doesn't satisfy resource constraints. - internalReleaseContainer(container) - } - } - - // Find the appropriate containers to use. - // TODO: Cleanup this group-by... - val dataLocalContainers = new HashMap[String, ArrayBuffer[Container]]() - val rackLocalContainers = new HashMap[String, ArrayBuffer[Container]]() - val offRackContainers = new HashMap[String, ArrayBuffer[Container]]() - - for (candidateHost <- hostToContainers.keySet) { - val maxExpectedHostCount = preferredHostToCount.getOrElse(candidateHost, 0) - val requiredHostCount = maxExpectedHostCount - allocatedContainersOnHost(candidateHost) - - val remainingContainersOpt = hostToContainers.get(candidateHost) - assert(remainingContainersOpt.isDefined) - var remainingContainers = remainingContainersOpt.get - - if (requiredHostCount >= remainingContainers.size) { - // Since we have <= required containers, add all remaining containers to - // `dataLocalContainers`. - dataLocalContainers.put(candidateHost, remainingContainers) - // There are no more free containers remaining. - remainingContainers = null - } else if (requiredHostCount > 0) { - // Container list has more containers than we need for data locality. - // Split the list into two: one based on the data local container count, - // (`remainingContainers.size` - `requiredHostCount`), and the other to hold remaining - // containers. - val (dataLocal, remaining) = remainingContainers.splitAt( - remainingContainers.size - requiredHostCount) - dataLocalContainers.put(candidateHost, dataLocal) - - // Invariant: remainingContainers == remaining - - // YARN has a nasty habit of allocating a ton of containers on a host - discourage this. - // Add each container in `remaining` to list of containers to release. If we have an - // insufficient number of containers, then the next allocation cycle will reallocate - // (but won't treat it as data local). - // TODO(harvey): Rephrase this comment some more. - for (container <- remaining) internalReleaseContainer(container) - remainingContainers = null - } - - // For rack local containers - if (remainingContainers != null) { - val rack = YarnSparkHadoopUtil.lookupRack(conf, candidateHost) - if (rack != null) { - val maxExpectedRackCount = preferredRackToCount.getOrElse(rack, 0) - val requiredRackCount = maxExpectedRackCount - allocatedContainersOnRack(rack) - - rackLocalContainers.getOrElse(rack, List()).size - - if (requiredRackCount >= remainingContainers.size) { - // Add all remaining containers to to `dataLocalContainers`. - dataLocalContainers.put(rack, remainingContainers) - remainingContainers = null - } else if (requiredRackCount > 0) { - // Container list has more containers that we need for data locality. - // Split the list into two: one based on the data local container count, - // (`remainingContainers.size` - `requiredHostCount`), and the other to hold remaining - // containers. - val (rackLocal, remaining) = remainingContainers.splitAt( - remainingContainers.size - requiredRackCount) - val existingRackLocal = rackLocalContainers.getOrElseUpdate(rack, - new ArrayBuffer[Container]()) - - existingRackLocal ++= rackLocal - - remainingContainers = remaining - } - } - } - - if (remainingContainers != null) { - // Not all containers have been consumed - add them to the list of off-rack containers. - offRackContainers.put(candidateHost, remainingContainers) - } - } - - // Now that we have split the containers into various groups, go through them in order: - // first host-local, then rack-local, and finally off-rack. - // Note that the list we create below tries to ensure that not all containers end up within - // a host if there is a sufficiently large number of hosts/containers. - val allocatedContainersToProcess = new ArrayBuffer[Container](allocatedContainers.size) - allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(dataLocalContainers) - allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(rackLocalContainers) - allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(offRackContainers) - - // Run each of the allocated containers. - for (container <- allocatedContainersToProcess) { - val numExecutorsRunningNow = numExecutorsRunning.incrementAndGet() - val executorHostname = container.getNodeId.getHost - val containerId = container.getId - - val executorMemoryOverhead = (executorMemory + memoryOverhead) - assert(container.getResource.getMemory >= executorMemoryOverhead) - - if (numExecutorsRunningNow > maxExecutors) { - logInfo("""Ignoring container %s at host %s, since we already have the required number of - containers for it.""".format(containerId, executorHostname)) - internalReleaseContainer(container) - numExecutorsRunning.decrementAndGet() - } else { - val executorId = executorIdCounter.incrementAndGet().toString - val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format( - SparkEnv.driverActorSystemName, - sparkConf.get("spark.driver.host"), - sparkConf.get("spark.driver.port"), - CoarseGrainedSchedulerBackend.ACTOR_NAME) - - logInfo("Launching container %s for on host %s".format(containerId, executorHostname)) - executorIdToContainer(executorId) = container - - // To be safe, remove the container from `releasedContainers`. - releasedContainers.remove(containerId) - - val rack = YarnSparkHadoopUtil.lookupRack(conf, executorHostname) - allocatedHostToContainersMap.synchronized { - val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname, - new HashSet[ContainerId]()) - - containerSet += containerId - allocatedContainerToHostMap.put(containerId, executorHostname) - - if (rack != null) { - allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 0) + 1) - } - } - logInfo("Launching ExecutorRunnable. driverUrl: %s, executorHostname: %s".format( - driverUrl, executorHostname)) - val executorRunnable = new ExecutorRunnable( - container, - conf, - sparkConf, - driverUrl, - executorId, - executorHostname, - executorMemory, - executorCores, - appAttemptId.getApplicationId.toString, - securityMgr) - launcherPool.execute(executorRunnable) - } - } - logDebug(""" - Finished allocating %s containers (from %s originally). - Current number of executors running: %d, - Released containers: %s - """.format( - allocatedContainersToProcess, - allocatedContainers, - numExecutorsRunning.get(), - releasedContainers)) - } - - val completedContainers = allocateResponse.getCompletedContainersStatuses() - if (completedContainers.size > 0) { - logDebug("Completed %d containers".format(completedContainers.size)) - - for (completedContainer <- completedContainers) { - val containerId = completedContainer.getContainerId - - if (releasedContainers.containsKey(containerId)) { - // YarnAllocationHandler already marked the container for release, so remove it from - // `releasedContainers`. - releasedContainers.remove(containerId) - } else { - // Decrement the number of executors running. The next iteration of - // the ApplicationMaster's reporting thread will take care of allocating. - numExecutorsRunning.decrementAndGet() - logInfo("Completed container %s (state: %s, exit status: %s)".format( - containerId, - completedContainer.getState, - completedContainer.getExitStatus)) - // Hadoop 2.2.X added a ContainerExitStatus we should switch to use - // there are some exit status' we shouldn't necessarily count against us, but for - // now I think its ok as none of the containers are expected to exit - if (completedContainer.getExitStatus == -103) { // vmem limit exceeded - logWarning(memLimitExceededLogMessage( - completedContainer.getDiagnostics, - VMEM_EXCEEDED_PATTERN)) - } else if (completedContainer.getExitStatus == -104) { // pmem limit exceeded - logWarning(memLimitExceededLogMessage( - completedContainer.getDiagnostics, - PMEM_EXCEEDED_PATTERN)) - } else if (completedContainer.getExitStatus != 0) { - logInfo("Container marked as failed: " + containerId + - ". Exit status: " + completedContainer.getExitStatus + - ". Diagnostics: " + completedContainer.getDiagnostics) - numExecutorsFailed.incrementAndGet() - } - } - - allocatedHostToContainersMap.synchronized { - if (allocatedContainerToHostMap.containsKey(containerId)) { - val hostOpt = allocatedContainerToHostMap.get(containerId) - assert(hostOpt.isDefined) - val host = hostOpt.get - - val containerSetOpt = allocatedHostToContainersMap.get(host) - assert(containerSetOpt.isDefined) - val containerSet = containerSetOpt.get - - containerSet.remove(containerId) - if (containerSet.isEmpty) { - allocatedHostToContainersMap.remove(host) - } else { - allocatedHostToContainersMap.update(host, containerSet) - } - - allocatedContainerToHostMap.remove(containerId) - - // TODO: Move this part outside the synchronized block? - val rack = YarnSparkHadoopUtil.lookupRack(conf, host) - if (rack != null) { - val rackCount = allocatedRackCount.getOrElse(rack, 0) - 1 - if (rackCount > 0) { - allocatedRackCount.put(rack, rackCount) - } else { - allocatedRackCount.remove(rack) - } - } - } - } - } - logDebug(""" - Finished processing %d completed containers. - Current number of executors running: %d, - Released containers: %s - """.format( - completedContainers.size, - numExecutorsRunning.get(), - releasedContainers)) - } - } - - protected def allocatedContainersOnHost(host: String): Int = { - var retval = 0 - allocatedHostToContainersMap.synchronized { - retval = allocatedHostToContainersMap.getOrElse(host, Set()).size - } - retval - } - - protected def allocatedContainersOnRack(rack: String): Int = { - var retval = 0 - allocatedHostToContainersMap.synchronized { - retval = allocatedRackCount.getOrElse(rack, 0) - } - retval - } - - private def isResourceConstraintSatisfied(container: Container): Boolean = { - container.getResource.getMemory >= (executorMemory + memoryOverhead) - } - - // A simple method to copy the split info map. - private def generateNodeToWeight( - conf: Configuration, - input: collection.Map[String, collection.Set[SplitInfo]] - ): (Map[String, Int], Map[String, Int]) = { - - if (input == null) { - return (Map[String, Int](), Map[String, Int]()) - } - - val hostToCount = new HashMap[String, Int] - val rackToCount = new HashMap[String, Int] - - for ((host, splits) <- input) { - val hostCount = hostToCount.getOrElse(host, 0) - hostToCount.put(host, hostCount + splits.size) - - val rack = YarnSparkHadoopUtil.lookupRack(conf, host) - if (rack != null) { - val rackCount = rackToCount.getOrElse(host, 0) - rackToCount.put(host, rackCount + splits.size) - } - } - - (hostToCount.toMap, rackToCount.toMap) - } - - private def internalReleaseContainer(container: Container) = { - releasedContainers.put(container.getId(), true) - releaseContainer(container) - } - - /** - * Called to allocate containers in the cluster. - * - * @param count Number of containers to allocate. - * If zero, should still contact RM (as a heartbeat). - * @param pending Number of containers pending allocate. Only used on alpha. - * @return Response to the allocation request. - */ - protected def allocateContainers(count: Int, pending: Int): YarnAllocateResponse - - /** Called to release a previously allocated container. */ - protected def releaseContainer(container: Container): Unit - - /** - * Defines the interface for an allocate response from the RM. This is needed since the alpha - * and stable interfaces differ here in ways that cannot be fixed using other routes. - */ - protected trait YarnAllocateResponse { - - def getAllocatedContainers(): JList[Container] - - def getAvailableResources(): Resource - - def getCompletedContainersStatuses(): JList[ContainerStatus] - - } - -} - -private object YarnAllocator { - val MEM_REGEX = "[0-9.]+ [KMG]B" - val PMEM_EXCEEDED_PATTERN = - Pattern.compile(s"$MEM_REGEX of $MEM_REGEX physical memory used") - val VMEM_EXCEEDED_PATTERN = - Pattern.compile(s"$MEM_REGEX of $MEM_REGEX virtual memory used") - - def memLimitExceededLogMessage(diagnostics: String, pattern: Pattern): String = { - val matcher = pattern.matcher(diagnostics) - val diag = if (matcher.find()) " " + matcher.group() + "." else "" - ("Container killed by YARN for exceeding memory limits." + diag - + " Consider boosting spark.yarn.executor.memoryOverhead.") - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala ---------------------------------------------------------------------- diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala deleted file mode 100644 index 2510b9c..0000000 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.yarn - -import scala.collection.{Map, Set} - -import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.apache.hadoop.yarn.api.records._ - -import org.apache.spark.{SecurityManager, SparkConf, SparkContext} -import org.apache.spark.scheduler.SplitInfo - -/** - * Interface that defines a Yarn RM client. Abstracts away Yarn version-specific functionality that - * is used by Spark's AM. - */ -trait YarnRMClient { - - /** - * Registers the application master with the RM. - * - * @param conf The Yarn configuration. - * @param sparkConf The Spark configuration. - * @param preferredNodeLocations Map with hints about where to allocate containers. - * @param uiAddress Address of the SparkUI. - * @param uiHistoryAddress Address of the application on the History Server. - */ - def register( - conf: YarnConfiguration, - sparkConf: SparkConf, - preferredNodeLocations: Map[String, Set[SplitInfo]], - uiAddress: String, - uiHistoryAddress: String, - securityMgr: SecurityManager): YarnAllocator - - /** - * Unregister the AM. Guaranteed to only be called once. - * - * @param status The final status of the AM. - * @param diagnostics Diagnostics message to include in the final status. - */ - def unregister(status: FinalApplicationStatus, diagnostics: String = ""): Unit - - /** Returns the attempt ID. */ - def getAttemptId(): ApplicationAttemptId - - /** Returns the configuration for the AmIpFilter to add to the Spark UI. */ - def getAmIpFilterParams(conf: YarnConfiguration, proxyBase: String): Map[String, String] - - /** Returns the maximum number of attempts to register the AM. */ - def getMaxRegAttempts(conf: YarnConfiguration): Int - -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
