http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala ---------------------------------------------------------------------- diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala deleted file mode 100644 index 7d453ec..0000000 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ /dev/null @@ -1,224 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.yarn - -import java.lang.{Boolean => JBoolean} -import java.io.File -import java.util.{Collections, Set => JSet} -import java.util.regex.Matcher -import java.util.regex.Pattern -import java.util.concurrent.ConcurrentHashMap - -import scala.collection.mutable.HashMap - -import org.apache.hadoop.io.Text -import org.apache.hadoop.mapred.JobConf -import org.apache.hadoop.security.Credentials -import org.apache.hadoop.security.UserGroupInformation -import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.apache.hadoop.yarn.api.records.ApplicationAccessType -import org.apache.hadoop.yarn.util.RackResolver -import org.apache.hadoop.conf.Configuration - -import org.apache.spark.{SecurityManager, SparkConf} -import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.util.Utils - -/** - * Contains util methods to interact with Hadoop from spark. - */ -class YarnSparkHadoopUtil extends SparkHadoopUtil { - - override def transferCredentials(source: UserGroupInformation, dest: UserGroupInformation) { - dest.addCredentials(source.getCredentials()) - } - - // Note that all params which start with SPARK are propagated all the way through, so if in yarn mode, this MUST be set to true. - override def isYarnMode(): Boolean = { true } - - // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems - // Always create a new config, dont reuse yarnConf. - override def newConfiguration(conf: SparkConf): Configuration = - new YarnConfiguration(super.newConfiguration(conf)) - - // add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster - override def addCredentials(conf: JobConf) { - val jobCreds = conf.getCredentials() - jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials()) - } - - override def getCurrentUserCredentials(): Credentials = { - UserGroupInformation.getCurrentUser().getCredentials() - } - - override def addCurrentUserCredentials(creds: Credentials) { - UserGroupInformation.getCurrentUser().addCredentials(creds) - } - - override def addSecretKeyToUserCredentials(key: String, secret: String) { - val creds = new Credentials() - creds.addSecretKey(new Text(key), secret.getBytes("utf-8")) - addCurrentUserCredentials(creds) - } - - override def getSecretKeyFromUserCredentials(key: String): Array[Byte] = { - val credentials = getCurrentUserCredentials() - if (credentials != null) credentials.getSecretKey(new Text(key)) else null - } - -} - -object YarnSparkHadoopUtil { - // Additional memory overhead - // 7% was arrived at experimentally. In the interest of minimizing memory waste while covering - // the common cases. Memory overhead tends to grow with container size. - - val MEMORY_OVERHEAD_FACTOR = 0.07 - val MEMORY_OVERHEAD_MIN = 384 - - val ANY_HOST = "*" - - val DEFAULT_NUMBER_EXECUTORS = 2 - - // All RM requests are issued with same priority : we do not (yet) have any distinction between - // request types (like map/reduce in hadoop for example) - val RM_REQUEST_PRIORITY = 1 - - // Host to rack map - saved from allocation requests. We are expecting this not to change. - // Note that it is possible for this to change : and ResourceManager will indicate that to us via - // update response to allocate. But we are punting on handling that for now. - private val hostToRack = new ConcurrentHashMap[String, String]() - private val rackToHostSet = new ConcurrentHashMap[String, JSet[String]]() - - /** - * Add a path variable to the given environment map. - * If the map already contains this key, append the value to the existing value instead. - */ - def addPathToEnvironment(env: HashMap[String, String], key: String, value: String): Unit = { - val newValue = if (env.contains(key)) { env(key) + File.pathSeparator + value } else value - env.put(key, newValue) - } - - /** - * Set zero or more environment variables specified by the given input string. - * The input string is expected to take the form "KEY1=VAL1,KEY2=VAL2,KEY3=VAL3". - */ - def setEnvFromInputString(env: HashMap[String, String], inputString: String): Unit = { - if (inputString != null && inputString.length() > 0) { - val childEnvs = inputString.split(",") - val p = Pattern.compile(environmentVariableRegex) - for (cEnv <- childEnvs) { - val parts = cEnv.split("=") // split on '=' - val m = p.matcher(parts(1)) - val sb = new StringBuffer - while (m.find()) { - val variable = m.group(1) - var replace = "" - if (env.get(variable) != None) { - replace = env.get(variable).get - } else { - // if this key is not configured for the child .. get it from the env - replace = System.getenv(variable) - if (replace == null) { - // the env key is note present anywhere .. simply set it - replace = "" - } - } - m.appendReplacement(sb, Matcher.quoteReplacement(replace)) - } - m.appendTail(sb) - // This treats the environment variable as path variable delimited by `File.pathSeparator` - // This is kept for backward compatibility and consistency with Hadoop's behavior - addPathToEnvironment(env, parts(0), sb.toString) - } - } - } - - private val environmentVariableRegex: String = { - if (Utils.isWindows) { - "%([A-Za-z_][A-Za-z0-9_]*?)%" - } else { - "\\$([A-Za-z_][A-Za-z0-9_]*)" - } - } - - /** - * Escapes a string for inclusion in a command line executed by Yarn. Yarn executes commands - * using `bash -c "command arg1 arg2"` and that means plain quoting doesn't really work. The - * argument is enclosed in single quotes and some key characters are escaped. - * - * @param arg A single argument. - * @return Argument quoted for execution via Yarn's generated shell script. - */ - def escapeForShell(arg: String): String = { - if (arg != null) { - val escaped = new StringBuilder("'") - for (i <- 0 to arg.length() - 1) { - arg.charAt(i) match { - case '$' => escaped.append("\\$") - case '"' => escaped.append("\\\"") - case '\'' => escaped.append("'\\''") - case c => escaped.append(c) - } - } - escaped.append("'").toString() - } else { - arg - } - } - - def lookupRack(conf: Configuration, host: String): String = { - if (!hostToRack.contains(host)) { - populateRackInfo(conf, host) - } - hostToRack.get(host) - } - - def populateRackInfo(conf: Configuration, hostname: String) { - Utils.checkHost(hostname) - - if (!hostToRack.containsKey(hostname)) { - // If there are repeated failures to resolve, all to an ignore list. - val rackInfo = RackResolver.resolve(conf, hostname) - if (rackInfo != null && rackInfo.getNetworkLocation != null) { - val rack = rackInfo.getNetworkLocation - hostToRack.put(hostname, rack) - if (! rackToHostSet.containsKey(rack)) { - rackToHostSet.putIfAbsent(rack, - Collections.newSetFromMap(new ConcurrentHashMap[String, JBoolean]())) - } - rackToHostSet.get(rack).add(hostname) - - // TODO(harvey): Figure out what this comment means... - // Since RackResolver caches, we are disabling this for now ... - } /* else { - // right ? Else we will keep calling rack resolver in case we cant resolve rack info ... - hostToRack.put(hostname, null) - } */ - } - } - - def getApplicationAclsForYarn(securityMgr: SecurityManager) - : Map[ApplicationAccessType, String] = { - Map[ApplicationAccessType, String] ( - ApplicationAccessType.VIEW_APP -> securityMgr.getViewAcls, - ApplicationAccessType.MODIFY_APP -> securityMgr.getModifyAcls - ) - } - -}
http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala ---------------------------------------------------------------------- diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala deleted file mode 100644 index 254774a..0000000 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.cluster - -import org.apache.spark._ -import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil -import org.apache.spark.scheduler.TaskSchedulerImpl -import org.apache.spark.util.Utils - -/** - * This scheduler launches executors through Yarn - by calling into Client to launch the Spark AM. - */ -private[spark] class YarnClientClusterScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) { - - // By default, rack is unknown - override def getRackForHost(hostPort: String): Option[String] = { - val host = Utils.parseHostPort(hostPort)._1 - Option(YarnSparkHadoopUtil.lookupRack(sc.hadoopConfiguration, host)) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala deleted file mode 100644 index 2923e67..0000000 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.cluster - -import scala.collection.mutable.ArrayBuffer - -import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState} - -import org.apache.spark.{SparkException, Logging, SparkContext} -import org.apache.spark.deploy.yarn.{Client, ClientArguments} -import org.apache.spark.scheduler.TaskSchedulerImpl - -private[spark] class YarnClientSchedulerBackend( - scheduler: TaskSchedulerImpl, - sc: SparkContext) - extends YarnSchedulerBackend(scheduler, sc) - with Logging { - - private var client: Client = null - private var appId: ApplicationId = null - @volatile private var stopping: Boolean = false - - /** - * Create a Yarn client to submit an application to the ResourceManager. - * This waits until the application is running. - */ - override def start() { - super.start() - val driverHost = conf.get("spark.driver.host") - val driverPort = conf.get("spark.driver.port") - val hostport = driverHost + ":" + driverPort - sc.ui.foreach { ui => conf.set("spark.driver.appUIAddress", ui.appUIAddress) } - - val argsArrayBuf = new ArrayBuffer[String]() - argsArrayBuf += ("--arg", hostport) - argsArrayBuf ++= getExtraClientArguments - - logDebug("ClientArguments called with: " + argsArrayBuf.mkString(" ")) - val args = new ClientArguments(argsArrayBuf.toArray, conf) - totalExpectedExecutors = args.numExecutors - client = new Client(args, conf) - appId = client.submitApplication() - waitForApplication() - asyncMonitorApplication() - } - - /** - * Return any extra command line arguments to be passed to Client provided in the form of - * environment variables or Spark properties. - */ - private def getExtraClientArguments: Seq[String] = { - val extraArgs = new ArrayBuffer[String] - val optionTuples = // List of (target Client argument, environment variable, Spark property) - List( - ("--driver-memory", "SPARK_MASTER_MEMORY", "spark.master.memory"), - ("--driver-memory", "SPARK_DRIVER_MEMORY", "spark.driver.memory"), - ("--num-executors", "SPARK_WORKER_INSTANCES", "spark.executor.instances"), - ("--num-executors", "SPARK_EXECUTOR_INSTANCES", "spark.executor.instances"), - ("--executor-memory", "SPARK_WORKER_MEMORY", "spark.executor.memory"), - ("--executor-memory", "SPARK_EXECUTOR_MEMORY", "spark.executor.memory"), - ("--executor-cores", "SPARK_WORKER_CORES", "spark.executor.cores"), - ("--executor-cores", "SPARK_EXECUTOR_CORES", "spark.executor.cores"), - ("--queue", "SPARK_YARN_QUEUE", "spark.yarn.queue"), - ("--name", "SPARK_YARN_APP_NAME", "spark.app.name") - ) - optionTuples.foreach { case (optionName, envVar, sparkProp) => - if (System.getenv(envVar) != null) { - extraArgs += (optionName, System.getenv(envVar)) - } else if (sc.getConf.contains(sparkProp)) { - extraArgs += (optionName, sc.getConf.get(sparkProp)) - } - } - extraArgs - } - - /** - * Report the state of the application until it is running. - * If the application has finished, failed or been killed in the process, throw an exception. - * This assumes both `client` and `appId` have already been set. - */ - private def waitForApplication(): Unit = { - assert(client != null && appId != null, "Application has not been submitted yet!") - val (state, _) = client.monitorApplication(appId, returnOnRunning = true) // blocking - if (state == YarnApplicationState.FINISHED || - state == YarnApplicationState.FAILED || - state == YarnApplicationState.KILLED) { - throw new SparkException("Yarn application has already ended! " + - "It might have been killed or unable to launch application master.") - } - if (state == YarnApplicationState.RUNNING) { - logInfo(s"Application $appId has started running.") - } - } - - /** - * Monitor the application state in a separate thread. - * If the application has exited for any reason, stop the SparkContext. - * This assumes both `client` and `appId` have already been set. - */ - private def asyncMonitorApplication(): Unit = { - assert(client != null && appId != null, "Application has not been submitted yet!") - val t = new Thread { - override def run() { - while (!stopping) { - val report = client.getApplicationReport(appId) - val state = report.getYarnApplicationState() - if (state == YarnApplicationState.FINISHED || - state == YarnApplicationState.KILLED || - state == YarnApplicationState.FAILED) { - logError(s"Yarn application has already exited with state $state!") - sc.stop() - stopping = true - } - Thread.sleep(1000L) - } - Thread.currentThread().interrupt() - } - } - t.setName("Yarn application state monitor") - t.setDaemon(true) - t.start() - } - - /** - * Stop the scheduler. This assumes `start()` has already been called. - */ - override def stop() { - assert(client != null, "Attempted to stop this scheduler before starting it!") - stopping = true - super.stop() - client.stop() - logInfo("Stopped") - } - - override def applicationId(): String = { - Option(appId).map(_.toString).getOrElse { - logWarning("Application ID is not initialized yet.") - super.applicationId - } - } - -} http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala ---------------------------------------------------------------------- diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala deleted file mode 100644 index 4157ff9..0000000 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.cluster - -import org.apache.spark._ -import org.apache.spark.deploy.yarn.{ApplicationMaster, YarnSparkHadoopUtil} -import org.apache.spark.scheduler.TaskSchedulerImpl -import org.apache.spark.util.Utils - -/** - * This is a simple extension to ClusterScheduler - to ensure that appropriate initialization of - * ApplicationMaster, etc is done - */ -private[spark] class YarnClusterScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) { - - logInfo("Created YarnClusterScheduler") - - // Nothing else for now ... initialize application master : which needs a SparkContext to - // determine how to allocate. - // Note that only the first creation of a SparkContext influences (and ideally, there must be - // only one SparkContext, right ?). Subsequent creations are ignored since executors are already - // allocated by then. - - // By default, rack is unknown - override def getRackForHost(hostPort: String): Option[String] = { - val host = Utils.parseHostPort(hostPort)._1 - Option(YarnSparkHadoopUtil.lookupRack(sc.hadoopConfiguration, host)) - } - - override def postStartHook() { - ApplicationMaster.sparkContextInitialized(sc) - super.postStartHook() - logInfo("YarnClusterScheduler.postStartHook done") - } - - override def stop() { - super.stop() - ApplicationMaster.sparkContextStopped(sc) - } - -} http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala deleted file mode 100644 index b1de81e..0000000 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.cluster - -import org.apache.spark.SparkContext -import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ -import org.apache.spark.scheduler.TaskSchedulerImpl -import org.apache.spark.util.IntParam - -private[spark] class YarnClusterSchedulerBackend( - scheduler: TaskSchedulerImpl, - sc: SparkContext) - extends YarnSchedulerBackend(scheduler, sc) { - - override def start() { - super.start() - totalExpectedExecutors = DEFAULT_NUMBER_EXECUTORS - if (System.getenv("SPARK_EXECUTOR_INSTANCES") != null) { - totalExpectedExecutors = IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES")) - .getOrElse(totalExpectedExecutors) - } - // System property can override environment variable. - totalExpectedExecutors = sc.getConf.getInt("spark.executor.instances", totalExpectedExecutors) - } - - override def applicationId(): String = - // In YARN Cluster mode, spark.yarn.app.id is expect to be set - // before user application is launched. - // So, if spark.yarn.app.id is not set, it is something wrong. - sc.getConf.getOption("spark.yarn.app.id").getOrElse { - logError("Application ID is not set.") - super.applicationId - } - -} http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala ---------------------------------------------------------------------- diff --git a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala deleted file mode 100644 index 17b79ae..0000000 --- a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala +++ /dev/null @@ -1,256 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.yarn - -import java.io.File -import java.net.URI - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path -import org.apache.hadoop.mapreduce.MRJobConfig -import org.apache.hadoop.yarn.api.ApplicationConstants.Environment -import org.apache.hadoop.yarn.api.records._ -import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.mockito.Matchers._ -import org.mockito.Mockito._ - - -import org.scalatest.FunSuite -import org.scalatest.Matchers - -import scala.collection.JavaConversions._ -import scala.collection.mutable.{ HashMap => MutableHashMap } -import scala.reflect.ClassTag -import scala.util.Try - -import org.apache.spark.{SparkException, SparkConf} -import org.apache.spark.util.Utils - -class ClientBaseSuite extends FunSuite with Matchers { - - test("default Yarn application classpath") { - ClientBase.getDefaultYarnApplicationClasspath should be(Some(Fixtures.knownDefYarnAppCP)) - } - - test("default MR application classpath") { - ClientBase.getDefaultMRApplicationClasspath should be(Some(Fixtures.knownDefMRAppCP)) - } - - test("resultant classpath for an application that defines a classpath for YARN") { - withAppConf(Fixtures.mapYARNAppConf) { conf => - val env = newEnv - ClientBase.populateHadoopClasspath(conf, env) - classpath(env) should be( - flatten(Fixtures.knownYARNAppCP, ClientBase.getDefaultMRApplicationClasspath)) - } - } - - test("resultant classpath for an application that defines a classpath for MR") { - withAppConf(Fixtures.mapMRAppConf) { conf => - val env = newEnv - ClientBase.populateHadoopClasspath(conf, env) - classpath(env) should be( - flatten(ClientBase.getDefaultYarnApplicationClasspath, Fixtures.knownMRAppCP)) - } - } - - test("resultant classpath for an application that defines both classpaths, YARN and MR") { - withAppConf(Fixtures.mapAppConf) { conf => - val env = newEnv - ClientBase.populateHadoopClasspath(conf, env) - classpath(env) should be(flatten(Fixtures.knownYARNAppCP, Fixtures.knownMRAppCP)) - } - } - - private val SPARK = "local:/sparkJar" - private val USER = "local:/userJar" - private val ADDED = "local:/addJar1,local:/addJar2,/addJar3" - - test("Local jar URIs") { - val conf = new Configuration() - val sparkConf = new SparkConf().set(ClientBase.CONF_SPARK_JAR, SPARK) - val env = new MutableHashMap[String, String]() - val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf) - - ClientBase.populateClasspath(args, conf, sparkConf, env) - - val cp = env("CLASSPATH").split(File.pathSeparator) - s"$SPARK,$USER,$ADDED".split(",").foreach({ entry => - val uri = new URI(entry) - if (ClientBase.LOCAL_SCHEME.equals(uri.getScheme())) { - cp should contain (uri.getPath()) - } else { - cp should not contain (uri.getPath()) - } - }) - cp should contain (Environment.PWD.$()) - cp should contain (s"${Environment.PWD.$()}${File.separator}*") - cp should not contain (ClientBase.SPARK_JAR) - cp should not contain (ClientBase.APP_JAR) - } - - test("Jar path propagation through SparkConf") { - val conf = new Configuration() - val sparkConf = new SparkConf().set(ClientBase.CONF_SPARK_JAR, SPARK) - val yarnConf = new YarnConfiguration() - val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf) - - val client = spy(new DummyClient(args, conf, sparkConf, yarnConf)) - doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]), - any(classOf[Path]), anyShort(), anyBoolean()) - - val tempDir = Utils.createTempDir() - try { - client.prepareLocalResources(tempDir.getAbsolutePath()) - sparkConf.getOption(ClientBase.CONF_SPARK_USER_JAR) should be (Some(USER)) - - // The non-local path should be propagated by name only, since it will end up in the app's - // staging dir. - val expected = ADDED.split(",") - .map(p => { - val uri = new URI(p) - if (ClientBase.LOCAL_SCHEME == uri.getScheme()) { - p - } else { - Option(uri.getFragment()).getOrElse(new File(p).getName()) - } - }) - .mkString(",") - - sparkConf.getOption(ClientBase.CONF_SPARK_YARN_SECONDARY_JARS) should be (Some(expected)) - } finally { - Utils.deleteRecursively(tempDir) - } - } - - test("check access nns empty") { - val sparkConf = new SparkConf() - sparkConf.set("spark.yarn.access.namenodes", "") - val nns = ClientBase.getNameNodesToAccess(sparkConf) - nns should be(Set()) - } - - test("check access nns unset") { - val sparkConf = new SparkConf() - val nns = ClientBase.getNameNodesToAccess(sparkConf) - nns should be(Set()) - } - - test("check access nns") { - val sparkConf = new SparkConf() - sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032") - val nns = ClientBase.getNameNodesToAccess(sparkConf) - nns should be(Set(new Path("hdfs://nn1:8032"))) - } - - test("check access nns space") { - val sparkConf = new SparkConf() - sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032, ") - val nns = ClientBase.getNameNodesToAccess(sparkConf) - nns should be(Set(new Path("hdfs://nn1:8032"))) - } - - test("check access two nns") { - val sparkConf = new SparkConf() - sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032,hdfs://nn2:8032") - val nns = ClientBase.getNameNodesToAccess(sparkConf) - nns should be(Set(new Path("hdfs://nn1:8032"), new Path("hdfs://nn2:8032"))) - } - - test("check token renewer") { - val hadoopConf = new Configuration() - hadoopConf.set("yarn.resourcemanager.address", "myrm:8033") - hadoopConf.set("yarn.resourcemanager.principal", "yarn/myrm:[email protected]") - val renewer = ClientBase.getTokenRenewer(hadoopConf) - renewer should be ("yarn/myrm:[email protected]") - } - - test("check token renewer default") { - val hadoopConf = new Configuration() - val caught = - intercept[SparkException] { - ClientBase.getTokenRenewer(hadoopConf) - } - assert(caught.getMessage === "Can't get Master Kerberos principal for use as renewer") - } - - object Fixtures { - - val knownDefYarnAppCP: Seq[String] = - getFieldValue[Array[String], Seq[String]](classOf[YarnConfiguration], - "DEFAULT_YARN_APPLICATION_CLASSPATH", - Seq[String]())(a => a.toSeq) - - - val knownDefMRAppCP: Seq[String] = - getFieldValue2[String, Array[String], Seq[String]]( - classOf[MRJobConfig], - "DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH", - Seq[String]())(a => a.split(","))(a => a.toSeq) - - val knownYARNAppCP = Some(Seq("/known/yarn/path")) - - val knownMRAppCP = Some(Seq("/known/mr/path")) - - val mapMRAppConf = - Map("mapreduce.application.classpath" -> knownMRAppCP.map(_.mkString(":")).get) - - val mapYARNAppConf = - Map(YarnConfiguration.YARN_APPLICATION_CLASSPATH -> knownYARNAppCP.map(_.mkString(":")).get) - - val mapAppConf = mapYARNAppConf ++ mapMRAppConf - } - - def withAppConf(m: Map[String, String] = Map())(testCode: (Configuration) => Any) { - val conf = new Configuration - m.foreach { case (k, v) => conf.set(k, v, "ClientBaseSpec") } - testCode(conf) - } - - def newEnv = MutableHashMap[String, String]() - - def classpath(env: MutableHashMap[String, String]) = env(Environment.CLASSPATH.name).split(":|;") - - def flatten(a: Option[Seq[String]], b: Option[Seq[String]]) = (a ++ b).flatten.toArray - - def getFieldValue[A, B](clazz: Class[_], field: String, defaults: => B)(mapTo: A => B): B = - Try(clazz.getField(field)).map(_.get(null).asInstanceOf[A]).toOption.map(mapTo).getOrElse(defaults) - - def getFieldValue2[A: ClassTag, A1: ClassTag, B]( - clazz: Class[_], - field: String, - defaults: => B)(mapTo: A => B)(mapTo1: A1 => B) : B = { - Try(clazz.getField(field)).map(_.get(null)).map { - case v: A => mapTo(v) - case v1: A1 => mapTo1(v1) - case _ => defaults - }.toOption.getOrElse(defaults) - } - - private class DummyClient( - val args: ClientArguments, - val hadoopConf: Configuration, - val sparkConf: SparkConf, - val yarnConf: YarnConfiguration) extends ClientBase { - override def setupSecurityToken(amContainer: ContainerLaunchContext): Unit = ??? - override def submitApplication(): ApplicationId = ??? - override def getApplicationReport(appId: ApplicationId): ApplicationReport = ??? - override def getClientToken(report: ApplicationReport): String = ??? - } - -} http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala ---------------------------------------------------------------------- diff --git a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala deleted file mode 100644 index 80b57d1..0000000 --- a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala +++ /dev/null @@ -1,220 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.yarn - -import java.net.URI - -import org.scalatest.FunSuite -import org.scalatest.mock.MockitoSugar -import org.mockito.Mockito.when - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.FileStatus -import org.apache.hadoop.fs.FileSystem -import org.apache.hadoop.fs.Path -import org.apache.hadoop.fs.permission.FsAction -import org.apache.hadoop.yarn.api.records.LocalResource -import org.apache.hadoop.yarn.api.records.LocalResourceVisibility -import org.apache.hadoop.yarn.api.records.LocalResourceType -import org.apache.hadoop.yarn.util.{Records, ConverterUtils} - -import scala.collection.mutable.HashMap -import scala.collection.mutable.Map - - -class ClientDistributedCacheManagerSuite extends FunSuite with MockitoSugar { - - class MockClientDistributedCacheManager extends ClientDistributedCacheManager { - override def getVisibility(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]): - LocalResourceVisibility = { - LocalResourceVisibility.PRIVATE - } - } - - test("test getFileStatus empty") { - val distMgr = new ClientDistributedCacheManager() - val fs = mock[FileSystem] - val uri = new URI("/tmp/testing") - when(fs.getFileStatus(new Path(uri))).thenReturn(new FileStatus()) - val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]() - val stat = distMgr.getFileStatus(fs, uri, statCache) - assert(stat.getPath() === null) - } - - test("test getFileStatus cached") { - val distMgr = new ClientDistributedCacheManager() - val fs = mock[FileSystem] - val uri = new URI("/tmp/testing") - val realFileStatus = new FileStatus(10, false, 1, 1024, 10, 10, null, "testOwner", - null, new Path("/tmp/testing")) - when(fs.getFileStatus(new Path(uri))).thenReturn(new FileStatus()) - val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus](uri -> realFileStatus) - val stat = distMgr.getFileStatus(fs, uri, statCache) - assert(stat.getPath().toString() === "/tmp/testing") - } - - test("test addResource") { - val distMgr = new MockClientDistributedCacheManager() - val fs = mock[FileSystem] - val conf = new Configuration() - val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing") - val localResources = HashMap[String, LocalResource]() - val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]() - when(fs.getFileStatus(destPath)).thenReturn(new FileStatus()) - - distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, "link", - statCache, false) - val resource = localResources("link") - assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE) - assert(ConverterUtils.getPathFromYarnURL(resource.getResource()) === destPath) - assert(resource.getTimestamp() === 0) - assert(resource.getSize() === 0) - assert(resource.getType() === LocalResourceType.FILE) - - val env = new HashMap[String, String]() - distMgr.setDistFilesEnv(env) - assert(env("SPARK_YARN_CACHE_FILES") === "file:/foo.invalid.com:8080/tmp/testing#link") - assert(env("SPARK_YARN_CACHE_FILES_TIME_STAMPS") === "0") - assert(env("SPARK_YARN_CACHE_FILES_FILE_SIZES") === "0") - assert(env("SPARK_YARN_CACHE_FILES_VISIBILITIES") === LocalResourceVisibility.PRIVATE.name()) - - distMgr.setDistArchivesEnv(env) - assert(env.get("SPARK_YARN_CACHE_ARCHIVES") === None) - assert(env.get("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") === None) - assert(env.get("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === None) - assert(env.get("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === None) - - // add another one and verify both there and order correct - val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner", - null, new Path("/tmp/testing2")) - val destPath2 = new Path("file:///foo.invalid.com:8080/tmp/testing2") - when(fs.getFileStatus(destPath2)).thenReturn(realFileStatus) - distMgr.addResource(fs, conf, destPath2, localResources, LocalResourceType.FILE, "link2", - statCache, false) - val resource2 = localResources("link2") - assert(resource2.getVisibility() === LocalResourceVisibility.PRIVATE) - assert(ConverterUtils.getPathFromYarnURL(resource2.getResource()) === destPath2) - assert(resource2.getTimestamp() === 10) - assert(resource2.getSize() === 20) - assert(resource2.getType() === LocalResourceType.FILE) - - val env2 = new HashMap[String, String]() - distMgr.setDistFilesEnv(env2) - val timestamps = env2("SPARK_YARN_CACHE_FILES_TIME_STAMPS").split(',') - val files = env2("SPARK_YARN_CACHE_FILES").split(',') - val sizes = env2("SPARK_YARN_CACHE_FILES_FILE_SIZES").split(',') - val visibilities = env2("SPARK_YARN_CACHE_FILES_VISIBILITIES") .split(',') - assert(files(0) === "file:/foo.invalid.com:8080/tmp/testing#link") - assert(timestamps(0) === "0") - assert(sizes(0) === "0") - assert(visibilities(0) === LocalResourceVisibility.PRIVATE.name()) - - assert(files(1) === "file:/foo.invalid.com:8080/tmp/testing2#link2") - assert(timestamps(1) === "10") - assert(sizes(1) === "20") - assert(visibilities(1) === LocalResourceVisibility.PRIVATE.name()) - } - - test("test addResource link null") { - val distMgr = new MockClientDistributedCacheManager() - val fs = mock[FileSystem] - val conf = new Configuration() - val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing") - val localResources = HashMap[String, LocalResource]() - val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]() - when(fs.getFileStatus(destPath)).thenReturn(new FileStatus()) - - intercept[Exception] { - distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, null, - statCache, false) - } - assert(localResources.get("link") === None) - assert(localResources.size === 0) - } - - test("test addResource appmaster only") { - val distMgr = new MockClientDistributedCacheManager() - val fs = mock[FileSystem] - val conf = new Configuration() - val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing") - val localResources = HashMap[String, LocalResource]() - val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]() - val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner", - null, new Path("/tmp/testing")) - when(fs.getFileStatus(destPath)).thenReturn(realFileStatus) - - distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE, "link", - statCache, true) - val resource = localResources("link") - assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE) - assert(ConverterUtils.getPathFromYarnURL(resource.getResource()) === destPath) - assert(resource.getTimestamp() === 10) - assert(resource.getSize() === 20) - assert(resource.getType() === LocalResourceType.ARCHIVE) - - val env = new HashMap[String, String]() - distMgr.setDistFilesEnv(env) - assert(env.get("SPARK_YARN_CACHE_FILES") === None) - assert(env.get("SPARK_YARN_CACHE_FILES_TIME_STAMPS") === None) - assert(env.get("SPARK_YARN_CACHE_FILES_FILE_SIZES") === None) - assert(env.get("SPARK_YARN_CACHE_FILES_VISIBILITIES") === None) - - distMgr.setDistArchivesEnv(env) - assert(env.get("SPARK_YARN_CACHE_ARCHIVES") === None) - assert(env.get("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") === None) - assert(env.get("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === None) - assert(env.get("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === None) - } - - test("test addResource archive") { - val distMgr = new MockClientDistributedCacheManager() - val fs = mock[FileSystem] - val conf = new Configuration() - val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing") - val localResources = HashMap[String, LocalResource]() - val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]() - val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner", - null, new Path("/tmp/testing")) - when(fs.getFileStatus(destPath)).thenReturn(realFileStatus) - - distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE, "link", - statCache, false) - val resource = localResources("link") - assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE) - assert(ConverterUtils.getPathFromYarnURL(resource.getResource()) === destPath) - assert(resource.getTimestamp() === 10) - assert(resource.getSize() === 20) - assert(resource.getType() === LocalResourceType.ARCHIVE) - - val env = new HashMap[String, String]() - - distMgr.setDistArchivesEnv(env) - assert(env("SPARK_YARN_CACHE_ARCHIVES") === "file:/foo.invalid.com:8080/tmp/testing#link") - assert(env("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") === "10") - assert(env("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === "20") - assert(env("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === LocalResourceVisibility.PRIVATE.name()) - - distMgr.setDistFilesEnv(env) - assert(env.get("SPARK_YARN_CACHE_FILES") === None) - assert(env.get("SPARK_YARN_CACHE_FILES_TIME_STAMPS") === None) - assert(env.get("SPARK_YARN_CACHE_FILES_FILE_SIZES") === None) - assert(env.get("SPARK_YARN_CACHE_FILES_VISIBILITIES") === None) - } - - -} http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala ---------------------------------------------------------------------- diff --git a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala deleted file mode 100644 index 8d184a0..0000000 --- a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.yarn - -import org.apache.spark.deploy.yarn.YarnAllocator._ -import org.scalatest.FunSuite - -class YarnAllocatorSuite extends FunSuite { - test("memory exceeded diagnostic regexes") { - val diagnostics = - "Container [pid=12465,containerID=container_1412887393566_0003_01_000002] is running " + - "beyond physical memory limits. Current usage: 2.1 MB of 2 GB physical memory used; " + - "5.8 GB of 4.2 GB virtual memory used. Killing container." - val vmemMsg = memLimitExceededLogMessage(diagnostics, VMEM_EXCEEDED_PATTERN) - val pmemMsg = memLimitExceededLogMessage(diagnostics, PMEM_EXCEEDED_PATTERN) - assert(vmemMsg.contains("5.8 GB of 4.2 GB virtual memory used.")) - assert(pmemMsg.contains("2.1 MB of 2 GB physical memory used.")) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala ---------------------------------------------------------------------- diff --git a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala deleted file mode 100644 index 2cc5abb..0000000 --- a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.yarn - -import java.io.{File, IOException} - -import com.google.common.io.{ByteStreams, Files} -import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.scalatest.{FunSuite, Matchers} - -import org.apache.hadoop.yarn.api.records.ApplicationAccessType - -import org.apache.spark.{Logging, SecurityManager, SparkConf} - - -class YarnSparkHadoopUtilSuite extends FunSuite with Matchers with Logging { - - val hasBash = - try { - val exitCode = Runtime.getRuntime().exec(Array("bash", "--version")).waitFor() - exitCode == 0 - } catch { - case e: IOException => - false - } - - if (!hasBash) { - logWarning("Cannot execute bash, skipping bash tests.") - } - - def bashTest(name: String)(fn: => Unit) = - if (hasBash) test(name)(fn) else ignore(name)(fn) - - bashTest("shell script escaping") { - val scriptFile = File.createTempFile("script.", ".sh") - val args = Array("arg1", "${arg.2}", "\"arg3\"", "'arg4'", "$arg5", "\\arg6") - try { - val argLine = args.map(a => YarnSparkHadoopUtil.escapeForShell(a)).mkString(" ") - Files.write(("bash -c \"echo " + argLine + "\"").getBytes(), scriptFile) - scriptFile.setExecutable(true) - - val proc = Runtime.getRuntime().exec(Array(scriptFile.getAbsolutePath())) - val out = new String(ByteStreams.toByteArray(proc.getInputStream())).trim() - val err = new String(ByteStreams.toByteArray(proc.getErrorStream())) - val exitCode = proc.waitFor() - exitCode should be (0) - out should be (args.mkString(" ")) - } finally { - scriptFile.delete() - } - } - - test("Yarn configuration override") { - val key = "yarn.nodemanager.hostname" - val default = new YarnConfiguration() - - val sparkConf = new SparkConf() - .set("spark.hadoop." + key, "someHostName") - val yarnConf = new YarnSparkHadoopUtil().newConfiguration(sparkConf) - - yarnConf.getClass() should be (classOf[YarnConfiguration]) - yarnConf.get(key) should not be default.get(key) - } - - - test("test getApplicationAclsForYarn acls on") { - - // spark acls on, just pick up default user - val sparkConf = new SparkConf() - sparkConf.set("spark.acls.enable", "true") - - val securityMgr = new SecurityManager(sparkConf) - val acls = YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr) - - val viewAcls = acls.get(ApplicationAccessType.VIEW_APP) - val modifyAcls = acls.get(ApplicationAccessType.MODIFY_APP) - - viewAcls match { - case Some(vacls) => { - val aclSet = vacls.split(',').map(_.trim).toSet - assert(aclSet.contains(System.getProperty("user.name", "invalid"))) - } - case None => { - fail() - } - } - modifyAcls match { - case Some(macls) => { - val aclSet = macls.split(',').map(_.trim).toSet - assert(aclSet.contains(System.getProperty("user.name", "invalid"))) - } - case None => { - fail() - } - } - } - - test("test getApplicationAclsForYarn acls on and specify users") { - - // default spark acls are on and specify acls - val sparkConf = new SparkConf() - sparkConf.set("spark.acls.enable", "true") - sparkConf.set("spark.ui.view.acls", "user1,user2") - sparkConf.set("spark.modify.acls", "user3,user4") - - val securityMgr = new SecurityManager(sparkConf) - val acls = YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr) - - val viewAcls = acls.get(ApplicationAccessType.VIEW_APP) - val modifyAcls = acls.get(ApplicationAccessType.MODIFY_APP) - - viewAcls match { - case Some(vacls) => { - val aclSet = vacls.split(',').map(_.trim).toSet - assert(aclSet.contains("user1")) - assert(aclSet.contains("user2")) - assert(aclSet.contains(System.getProperty("user.name", "invalid"))) - } - case None => { - fail() - } - } - modifyAcls match { - case Some(macls) => { - val aclSet = macls.split(',').map(_.trim).toSet - assert(aclSet.contains("user3")) - assert(aclSet.contains("user4")) - assert(aclSet.contains(System.getProperty("user.name", "invalid"))) - } - case None => { - fail() - } - } - - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/pom.xml ---------------------------------------------------------------------- diff --git a/yarn/pom.xml b/yarn/pom.xml index bba7364..d7579bf 100644 --- a/yarn/pom.xml +++ b/yarn/pom.xml @@ -25,9 +25,9 @@ </parent> <groupId>org.apache.spark</groupId> - <artifactId>yarn-parent_2.10</artifactId> - <packaging>pom</packaging> - <name>Spark Project YARN Parent POM</name> + <artifactId>spark-yarn_2.10</artifactId> + <packaging>jar</packaging> + <name>Spark Project YARN</name> <properties> <sbt.project.name>yarn</sbt.project.name> </properties> @@ -59,6 +59,12 @@ <artifactId>hadoop-client</artifactId> </dependency> <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-tests</artifactId> + <classifier>tests</classifier> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.scalatest</groupId> <artifactId>scalatest_${scala.binary.version}</artifactId> <scope>test</scope> @@ -70,41 +76,54 @@ </dependency> </dependencies> + <!-- + See SPARK-3710. hadoop-yarn-server-tests in Hadoop 2.2 fails to pull some needed + dependencies, so they need to be added manually for the tests to work. + --> <profiles> <profile> - <id>yarn-alpha</id> - <build> - <plugins> - <plugin> - <artifactId>maven-antrun-plugin</artifactId> - <executions> - <execution> - <phase>validate</phase> - <goals> - <goal>run</goal> - </goals> - <configuration> - <tasks> - <echo>*******************************************************************************************</echo> - <echo>***WARNING***: Support for YARN-alpha API's will be removed in Spark 1.3 (see SPARK-3445).*</echo> - <echo>*******************************************************************************************</echo> - </tasks> - </configuration> - </execution> - </executions> - </plugin> - </plugins> - </build> - <modules> - <module>alpha</module> - </modules> - </profile> - - <profile> - <id>yarn</id> - <modules> - <module>stable</module> - </modules> + <id>hadoop-2.2</id> + <properties> + <jersey.version>1.9</jersey.version> + </properties> + <dependencies> + <dependency> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jetty</artifactId> + <version>6.1.26</version> + <exclusions> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>servlet-api</artifactId> + </exclusion> + </exclusions> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-core</artifactId> + <version>${jersey.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-json</artifactId> + <version>${jersey.version}</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>stax</groupId> + <artifactId>stax-api</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-server</artifactId> + <version>${jersey.version}</version> + <scope>test</scope> + </dependency> + </dependencies> </profile> </profiles> @@ -125,38 +144,6 @@ </configuration> </plugin> <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>build-helper-maven-plugin</artifactId> - <executions> - <execution> - <id>add-scala-sources</id> - <phase>generate-sources</phase> - <goals> - <goal>add-source</goal> - </goals> - <configuration> - <sources> - <source>src/main/scala</source> - <source>../common/src/main/scala</source> - </sources> - </configuration> - </execution> - <execution> - <id>add-scala-test-sources</id> - <phase>generate-test-sources</phase> - <goals> - <goal>add-test-source</goal> - </goals> - <configuration> - <sources> - <source>src/test/scala</source> - <source>../common/src/test/scala</source> - </sources> - </configuration> - </execution> - </executions> - </plugin> - <plugin> <groupId>org.scalatest</groupId> <artifactId>scalatest-maven-plugin</artifactId> <configuration> @@ -169,12 +156,6 @@ <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory> - - <resources> - <resource> - <directory>../common/src/main/resources</directory> - </resource> - </resources> </build> </project> http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala new file mode 100644 index 0000000..987b337 --- /dev/null +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -0,0 +1,539 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import scala.util.control.NonFatal + +import java.io.IOException +import java.lang.reflect.InvocationTargetException +import java.net.Socket +import java.util.concurrent.atomic.AtomicReference + +import akka.actor._ +import akka.remote._ +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.util.ShutdownHookManager +import org.apache.hadoop.yarn.api._ +import org.apache.hadoop.yarn.api.records._ +import org.apache.hadoop.yarn.conf.YarnConfiguration + +import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv} +import org.apache.spark.SparkException +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.history.HistoryServer +import org.apache.spark.scheduler.cluster.YarnSchedulerBackend +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ +import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils} + +/** + * Common application master functionality for Spark on Yarn. + */ +private[spark] class ApplicationMaster(args: ApplicationMasterArguments, + client: YarnRMClient) extends Logging { + // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be + // optimal as more containers are available. Might need to handle this better. + + private val sparkConf = new SparkConf() + private val yarnConf: YarnConfiguration = SparkHadoopUtil.get.newConfiguration(sparkConf) + .asInstanceOf[YarnConfiguration] + private val isDriver = args.userClass != null + + // Default to numExecutors * 2, with minimum of 3 + private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures", + sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3))) + + @volatile private var exitCode = 0 + @volatile private var unregistered = false + @volatile private var finished = false + @volatile private var finalStatus = FinalApplicationStatus.SUCCEEDED + @volatile private var finalMsg: String = "" + @volatile private var userClassThread: Thread = _ + + private var reporterThread: Thread = _ + private var allocator: YarnAllocator = _ + + // Fields used in client mode. + private var actorSystem: ActorSystem = null + private var actor: ActorRef = _ + + // Fields used in cluster mode. + private val sparkContextRef = new AtomicReference[SparkContext](null) + + final def run(): Int = { + try { + val appAttemptId = client.getAttemptId() + + if (isDriver) { + // Set the web ui port to be ephemeral for yarn so we don't conflict with + // other spark processes running on the same box + System.setProperty("spark.ui.port", "0") + + // Set the master property to match the requested mode. + System.setProperty("spark.master", "yarn-cluster") + + // Propagate the application ID so that YarnClusterSchedulerBackend can pick it up. + System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString()) + } + + logInfo("ApplicationAttemptId: " + appAttemptId) + + val fs = FileSystem.get(yarnConf) + val cleanupHook = new Runnable { + override def run() { + // If the SparkContext is still registered, shut it down as a best case effort in case + // users do not call sc.stop or do System.exit(). + val sc = sparkContextRef.get() + if (sc != null) { + logInfo("Invoking sc stop from shutdown hook") + sc.stop() + } + val maxAppAttempts = client.getMaxRegAttempts(yarnConf) + val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts + + if (!finished) { + // This happens when the user application calls System.exit(). We have the choice + // of either failing or succeeding at this point. We report success to avoid + // retrying applications that have succeeded (System.exit(0)), which means that + // applications that explicitly exit with a non-zero status will also show up as + // succeeded in the RM UI. + finish(finalStatus, + ApplicationMaster.EXIT_SUCCESS, + "Shutdown hook called before final status was reported.") + } + + if (!unregistered) { + // we only want to unregister if we don't want the RM to retry + if (finalStatus == FinalApplicationStatus.SUCCEEDED || isLastAttempt) { + unregister(finalStatus, finalMsg) + cleanupStagingDir(fs) + } + } + } + } + + // Use higher priority than FileSystem. + assert(ApplicationMaster.SHUTDOWN_HOOK_PRIORITY > FileSystem.SHUTDOWN_HOOK_PRIORITY) + ShutdownHookManager + .get().addShutdownHook(cleanupHook, ApplicationMaster.SHUTDOWN_HOOK_PRIORITY) + + // Call this to force generation of secret so it gets populated into the + // Hadoop UGI. This has to happen before the startUserClass which does a + // doAs in order for the credentials to be passed on to the executor containers. + val securityMgr = new SecurityManager(sparkConf) + + if (isDriver) { + runDriver(securityMgr) + } else { + runExecutorLauncher(securityMgr) + } + } catch { + case e: Exception => + // catch everything else if not specifically handled + logError("Uncaught exception: ", e) + finish(FinalApplicationStatus.FAILED, + ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION, + "Uncaught exception: " + e.getMessage()) + } + exitCode + } + + /** + * unregister is used to completely unregister the application from the ResourceManager. + * This means the ResourceManager will not retry the application attempt on your behalf if + * a failure occurred. + */ + final def unregister(status: FinalApplicationStatus, diagnostics: String = null) = synchronized { + if (!unregistered) { + logInfo(s"Unregistering ApplicationMaster with $status" + + Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse("")) + unregistered = true + client.unregister(status, Option(diagnostics).getOrElse("")) + } + } + + final def finish(status: FinalApplicationStatus, code: Int, msg: String = null) = synchronized { + if (!finished) { + val inShutdown = Utils.inShutdown() + logInfo(s"Final app status: ${status}, exitCode: ${code}" + + Option(msg).map(msg => s", (reason: $msg)").getOrElse("")) + exitCode = code + finalStatus = status + finalMsg = msg + finished = true + if (!inShutdown && Thread.currentThread() != reporterThread && reporterThread != null) { + logDebug("shutting down reporter thread") + reporterThread.interrupt() + } + if (!inShutdown && Thread.currentThread() != userClassThread && userClassThread != null) { + logDebug("shutting down user thread") + userClassThread.interrupt() + } + } + } + + private def sparkContextInitialized(sc: SparkContext) = { + sparkContextRef.synchronized { + sparkContextRef.compareAndSet(null, sc) + sparkContextRef.notifyAll() + } + } + + private def sparkContextStopped(sc: SparkContext) = { + sparkContextRef.compareAndSet(sc, null) + } + + private def registerAM(uiAddress: String, securityMgr: SecurityManager) = { + val sc = sparkContextRef.get() + + val appId = client.getAttemptId().getApplicationId().toString() + val historyAddress = + sparkConf.getOption("spark.yarn.historyServer.address") + .map { address => s"${address}${HistoryServer.UI_PATH_PREFIX}/${appId}" } + .getOrElse("") + + allocator = client.register(yarnConf, + if (sc != null) sc.getConf else sparkConf, + if (sc != null) sc.preferredNodeLocationData else Map(), + uiAddress, + historyAddress, + securityMgr) + + allocator.allocateResources() + reporterThread = launchReporterThread() + } + + private def runDriver(securityMgr: SecurityManager): Unit = { + addAmIpFilter() + userClassThread = startUserClass() + + // This a bit hacky, but we need to wait until the spark.driver.port property has + // been set by the Thread executing the user class. + val sc = waitForSparkContextInitialized() + + // If there is no SparkContext at this point, just fail the app. + if (sc == null) { + finish(FinalApplicationStatus.FAILED, + ApplicationMaster.EXIT_SC_NOT_INITED, + "Timed out waiting for SparkContext.") + } else { + registerAM(sc.ui.map(_.appUIAddress).getOrElse(""), securityMgr) + userClassThread.join() + } + } + + private def runExecutorLauncher(securityMgr: SecurityManager): Unit = { + actorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0, + conf = sparkConf, securityManager = securityMgr)._1 + actor = waitForSparkDriver() + addAmIpFilter() + registerAM(sparkConf.get("spark.driver.appUIAddress", ""), securityMgr) + + // In client mode the actor will stop the reporter thread. + reporterThread.join() + } + + private def launchReporterThread(): Thread = { + // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses. + val expiryInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000) + + // we want to be reasonably responsive without causing too many requests to RM. + val schedulerInterval = + sparkConf.getLong("spark.yarn.scheduler.heartbeat.interval-ms", 5000) + + // must be <= expiryInterval / 2. + val interval = math.max(0, math.min(expiryInterval / 2, schedulerInterval)) + + // The number of failures in a row until Reporter thread give up + val reporterMaxFailures = sparkConf.getInt("spark.yarn.scheduler.reporterThread.maxFailures", 5) + + val t = new Thread { + override def run() { + var failureCount = 0 + while (!finished) { + try { + if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) { + finish(FinalApplicationStatus.FAILED, + ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES, + "Max number of executor failures reached") + } else { + logDebug("Sending progress") + allocator.allocateResources() + } + failureCount = 0 + } catch { + case i: InterruptedException => + case e: Throwable => { + failureCount += 1 + if (!NonFatal(e) || failureCount >= reporterMaxFailures) { + finish(FinalApplicationStatus.FAILED, + ApplicationMaster.EXIT_REPORTER_FAILURE, "Exception was thrown " + + s"${failureCount} time(s) from Reporter thread.") + + } else { + logWarning(s"Reporter thread fails ${failureCount} time(s) in a row.", e) + } + } + } + try { + Thread.sleep(interval) + } catch { + case e: InterruptedException => + } + } + } + } + // setting to daemon status, though this is usually not a good idea. + t.setDaemon(true) + t.setName("Reporter") + t.start() + logInfo("Started progress reporter thread - sleep time : " + interval) + t + } + + /** + * Clean up the staging directory. + */ + private def cleanupStagingDir(fs: FileSystem) { + var stagingDirPath: Path = null + try { + val preserveFiles = sparkConf.get("spark.yarn.preserve.staging.files", "false").toBoolean + if (!preserveFiles) { + stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR")) + if (stagingDirPath == null) { + logError("Staging directory is null") + return + } + logInfo("Deleting staging directory " + stagingDirPath) + fs.delete(stagingDirPath, true) + } + } catch { + case ioe: IOException => + logError("Failed to cleanup staging dir " + stagingDirPath, ioe) + } + } + + private def waitForSparkContextInitialized(): SparkContext = { + logInfo("Waiting for spark context initialization") + try { + sparkContextRef.synchronized { + var count = 0 + val waitTime = 10000L + val numTries = sparkConf.getInt("spark.yarn.applicationMaster.waitTries", 10) + while (sparkContextRef.get() == null && count < numTries && !finished) { + logInfo("Waiting for spark context initialization ... " + count) + count = count + 1 + sparkContextRef.wait(waitTime) + } + + val sparkContext = sparkContextRef.get() + if (sparkContext == null) { + logError(("SparkContext did not initialize after waiting for %d ms. Please check earlier" + + " log output for errors. Failing the application.").format(numTries * waitTime)) + } + sparkContext + } + } + } + + private def waitForSparkDriver(): ActorRef = { + logInfo("Waiting for Spark driver to be reachable.") + var driverUp = false + var count = 0 + val hostport = args.userArgs(0) + val (driverHost, driverPort) = Utils.parseHostPort(hostport) + + // spark driver should already be up since it launched us, but we don't want to + // wait forever, so wait 100 seconds max to match the cluster mode setting. + // Leave this config unpublished for now. SPARK-3779 to investigating changing + // this config to be time based. + val numTries = sparkConf.getInt("spark.yarn.applicationMaster.waitTries", 1000) + + while (!driverUp && !finished && count < numTries) { + try { + count = count + 1 + val socket = new Socket(driverHost, driverPort) + socket.close() + logInfo("Driver now available: %s:%s".format(driverHost, driverPort)) + driverUp = true + } catch { + case e: Exception => + logError("Failed to connect to driver at %s:%s, retrying ...". + format(driverHost, driverPort)) + Thread.sleep(100) + } + } + + if (!driverUp) { + throw new SparkException("Failed to connect to driver!") + } + + sparkConf.set("spark.driver.host", driverHost) + sparkConf.set("spark.driver.port", driverPort.toString) + + val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format( + SparkEnv.driverActorSystemName, + driverHost, + driverPort.toString, + YarnSchedulerBackend.ACTOR_NAME) + actorSystem.actorOf(Props(new AMActor(driverUrl)), name = "YarnAM") + } + + /** Add the Yarn IP filter that is required for properly securing the UI. */ + private def addAmIpFilter() = { + val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV) + val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter" + val params = client.getAmIpFilterParams(yarnConf, proxyBase) + if (isDriver) { + System.setProperty("spark.ui.filters", amFilter) + params.foreach { case (k, v) => System.setProperty(s"spark.$amFilter.param.$k", v) } + } else { + actor ! AddWebUIFilter(amFilter, params.toMap, proxyBase) + } + } + + /** + * Start the user class, which contains the spark driver, in a separate Thread. + * If the main routine exits cleanly or exits with System.exit(N) for any N + * we assume it was successful, for all other cases we assume failure. + * + * Returns the user thread that was started. + */ + private def startUserClass(): Thread = { + logInfo("Starting the user JAR in a separate Thread") + System.setProperty("spark.executor.instances", args.numExecutors.toString) + val mainMethod = Class.forName(args.userClass, false, + Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]]) + + val userThread = new Thread { + override def run() { + try { + val mainArgs = new Array[String](args.userArgs.size) + args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size) + mainMethod.invoke(null, mainArgs) + finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS) + logDebug("Done running users class") + } catch { + case e: InvocationTargetException => + e.getCause match { + case _: InterruptedException => + // Reporter thread can interrupt to stop user class + case e: Exception => + finish(FinalApplicationStatus.FAILED, + ApplicationMaster.EXIT_EXCEPTION_USER_CLASS, + "User class threw exception: " + e.getMessage) + // re-throw to get it logged + throw e + } + } + } + } + userThread.setName("Driver") + userThread.start() + userThread + } + + /** + * Actor that communicates with the driver in client deploy mode. + */ + private class AMActor(driverUrl: String) extends Actor { + var driver: ActorSelection = _ + + override def preStart() = { + logInfo("Listen to driver: " + driverUrl) + driver = context.actorSelection(driverUrl) + // Send a hello message to establish the connection, after which + // we can monitor Lifecycle Events. + driver ! "Hello" + driver ! RegisterClusterManager + context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) + } + + override def receive = { + case x: DisassociatedEvent => + logInfo(s"Driver terminated or disconnected! Shutting down. $x") + finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS) + + case x: AddWebUIFilter => + logInfo(s"Add WebUI Filter. $x") + driver ! x + + case RequestExecutors(requestedTotal) => + logInfo(s"Driver requested a total number of $requestedTotal executor(s).") + Option(allocator) match { + case Some(a) => a.requestTotalExecutors(requestedTotal) + case None => logWarning("Container allocator is not ready to request executors yet.") + } + sender ! true + + case KillExecutors(executorIds) => + logInfo(s"Driver requested to kill executor(s) ${executorIds.mkString(", ")}.") + Option(allocator) match { + case Some(a) => executorIds.foreach(a.killExecutor) + case None => logWarning("Container allocator is not ready to kill executors yet.") + } + sender ! true + } + } + +} + +object ApplicationMaster extends Logging { + + val SHUTDOWN_HOOK_PRIORITY: Int = 30 + + // exit codes for different causes, no reason behind the values + private val EXIT_SUCCESS = 0 + private val EXIT_UNCAUGHT_EXCEPTION = 10 + private val EXIT_MAX_EXECUTOR_FAILURES = 11 + private val EXIT_REPORTER_FAILURE = 12 + private val EXIT_SC_NOT_INITED = 13 + private val EXIT_SECURITY = 14 + private val EXIT_EXCEPTION_USER_CLASS = 15 + + private var master: ApplicationMaster = _ + + def main(args: Array[String]) = { + SignalLogger.register(log) + val amArgs = new ApplicationMasterArguments(args) + SparkHadoopUtil.get.runAsSparkUser { () => + master = new ApplicationMaster(amArgs, new YarnRMClientImpl(amArgs)) + System.exit(master.run()) + } + } + + private[spark] def sparkContextInitialized(sc: SparkContext) = { + master.sparkContextInitialized(sc) + } + + private[spark] def sparkContextStopped(sc: SparkContext) = { + master.sparkContextStopped(sc) + } + +} + +/** + * This object does not provide any special functionality. It exists so that it's easy to tell + * apart the client-mode AM from the cluster-mode AM when using tools such as ps or jps. + */ +object ExecutorLauncher { + + def main(args: Array[String]) = { + ApplicationMaster.main(args) + } + +} http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala new file mode 100644 index 0000000..d76a632 --- /dev/null +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import org.apache.spark.util.{MemoryParam, IntParam} +import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ +import collection.mutable.ArrayBuffer + +class ApplicationMasterArguments(val args: Array[String]) { + var userJar: String = null + var userClass: String = null + var userArgs: Seq[String] = Seq[String]() + var executorMemory = 1024 + var executorCores = 1 + var numExecutors = DEFAULT_NUMBER_EXECUTORS + + parseArgs(args.toList) + + private def parseArgs(inputArgs: List[String]): Unit = { + val userArgsBuffer = new ArrayBuffer[String]() + + var args = inputArgs + + while (!args.isEmpty) { + // --num-workers, --worker-memory, and --worker-cores are deprecated since 1.0, + // the properties with executor in their names are preferred. + args match { + case ("--jar") :: value :: tail => + userJar = value + args = tail + + case ("--class") :: value :: tail => + userClass = value + args = tail + + case ("--args" | "--arg") :: value :: tail => + userArgsBuffer += value + args = tail + + case ("--num-workers" | "--num-executors") :: IntParam(value) :: tail => + numExecutors = value + args = tail + + case ("--worker-memory" | "--executor-memory") :: MemoryParam(value) :: tail => + executorMemory = value + args = tail + + case ("--worker-cores" | "--executor-cores") :: IntParam(value) :: tail => + executorCores = value + args = tail + + case _ => + printUsageAndExit(1, args) + } + } + + userArgs = userArgsBuffer.readOnly + } + + def printUsageAndExit(exitCode: Int, unknownParam: Any = null) { + if (unknownParam != null) { + System.err.println("Unknown/unsupported param " + unknownParam) + } + System.err.println(""" + |Usage: org.apache.spark.deploy.yarn.ApplicationMaster [options] + |Options: + | --jar JAR_PATH Path to your application's JAR file + | --class CLASS_NAME Name of your application's main class + | --args ARGS Arguments to be passed to your application's main class. + | Multiple invocations are possible, each will be passed in order. + | --num-executors NUM Number of executors to start (Default: 2) + | --executor-cores NUM Number of cores for the executors (Default: 1) + | --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G) + """.stripMargin) + System.exit(exitCode) + } +} + +object ApplicationMasterArguments { + val DEFAULT_NUMBER_EXECUTORS = 2 +} http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala new file mode 100644 index 0000000..addaddb --- /dev/null +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import java.nio.ByteBuffer + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.io.DataOutputBuffer +import org.apache.hadoop.yarn.api.records._ +import org.apache.hadoop.yarn.client.api.{YarnClient, YarnClientApplication} +import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.apache.hadoop.yarn.util.Records + +import org.apache.spark.{Logging, SparkConf} +import org.apache.spark.deploy.SparkHadoopUtil + +/** + * Version of [[org.apache.spark.deploy.yarn.ClientBase]] tailored to YARN's stable API. + */ +private[spark] class Client( + val args: ClientArguments, + val hadoopConf: Configuration, + val sparkConf: SparkConf) + extends ClientBase with Logging { + + def this(clientArgs: ClientArguments, spConf: SparkConf) = + this(clientArgs, SparkHadoopUtil.get.newConfiguration(spConf), spConf) + + def this(clientArgs: ClientArguments) = this(clientArgs, new SparkConf()) + + val yarnClient = YarnClient.createYarnClient + val yarnConf = new YarnConfiguration(hadoopConf) + + def stop(): Unit = yarnClient.stop() + + /* ------------------------------------------------------------------------------------- * + | The following methods have much in common in the stable and alpha versions of Client, | + | but cannot be implemented in the parent trait due to subtle API differences across | + | hadoop versions. | + * ------------------------------------------------------------------------------------- */ + + /** + * Submit an application running our ApplicationMaster to the ResourceManager. + * + * The stable Yarn API provides a convenience method (YarnClient#createApplication) for + * creating applications and setting up the application submission context. This was not + * available in the alpha API. + */ + override def submitApplication(): ApplicationId = { + yarnClient.init(yarnConf) + yarnClient.start() + + logInfo("Requesting a new application from cluster with %d NodeManagers" + .format(yarnClient.getYarnClusterMetrics.getNumNodeManagers)) + + // Get a new application from our RM + val newApp = yarnClient.createApplication() + val newAppResponse = newApp.getNewApplicationResponse() + val appId = newAppResponse.getApplicationId() + + // Verify whether the cluster has enough resources for our AM + verifyClusterResources(newAppResponse) + + // Set up the appropriate contexts to launch our AM + val containerContext = createContainerLaunchContext(newAppResponse) + val appContext = createApplicationSubmissionContext(newApp, containerContext) + + // Finally, submit and monitor the application + logInfo(s"Submitting application ${appId.getId} to ResourceManager") + yarnClient.submitApplication(appContext) + appId + } + + /** + * Set up the context for submitting our ApplicationMaster. + * This uses the YarnClientApplication not available in the Yarn alpha API. + */ + def createApplicationSubmissionContext( + newApp: YarnClientApplication, + containerContext: ContainerLaunchContext): ApplicationSubmissionContext = { + val appContext = newApp.getApplicationSubmissionContext + appContext.setApplicationName(args.appName) + appContext.setQueue(args.amQueue) + appContext.setAMContainerSpec(containerContext) + appContext.setApplicationType("SPARK") + val capability = Records.newRecord(classOf[Resource]) + capability.setMemory(args.amMemory + amMemoryOverhead) + appContext.setResource(capability) + appContext + } + + /** Set up security tokens for launching our ApplicationMaster container. */ + override def setupSecurityToken(amContainer: ContainerLaunchContext): Unit = { + val dob = new DataOutputBuffer + credentials.writeTokenStorageToStream(dob) + amContainer.setTokens(ByteBuffer.wrap(dob.getData)) + } + + /** Get the application report from the ResourceManager for an application we have submitted. */ + override def getApplicationReport(appId: ApplicationId): ApplicationReport = + yarnClient.getApplicationReport(appId) + + /** + * Return the security token used by this client to communicate with the ApplicationMaster. + * If no security is enabled, the token returned by the report is null. + */ + override def getClientToken(report: ApplicationReport): String = + Option(report.getClientToAMToken).map(_.toString).getOrElse("") +} + +object Client { + def main(argStrings: Array[String]) { + if (!sys.props.contains("SPARK_SUBMIT")) { + println("WARNING: This client is deprecated and will be removed in a " + + "future version of Spark. Use ./bin/spark-submit with \"--master yarn\"") + } + + // Set an env variable indicating we are running in YARN mode. + // Note that any env variable with the SPARK_ prefix gets propagated to all (remote) processes + System.setProperty("SPARK_YARN_MODE", "true") + val sparkConf = new SparkConf + + val args = new ClientArguments(argStrings, sparkConf) + new Client(args, sparkConf).run() + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
