Repository: spark Updated Branches: refs/heads/master 383c5555c -> 912563aa3
http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala ---------------------------------------------------------------------- diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala new file mode 100644 index 0000000..2cc5abb --- /dev/null +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import java.io.{File, IOException} + +import com.google.common.io.{ByteStreams, Files} +import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.scalatest.{FunSuite, Matchers} + +import org.apache.hadoop.yarn.api.records.ApplicationAccessType + +import org.apache.spark.{Logging, SecurityManager, SparkConf} + + +class YarnSparkHadoopUtilSuite extends FunSuite with Matchers with Logging { + + val hasBash = + try { + val exitCode = Runtime.getRuntime().exec(Array("bash", "--version")).waitFor() + exitCode == 0 + } catch { + case e: IOException => + false + } + + if (!hasBash) { + logWarning("Cannot execute bash, skipping bash tests.") + } + + def bashTest(name: String)(fn: => Unit) = + if (hasBash) test(name)(fn) else ignore(name)(fn) + + bashTest("shell script escaping") { + val scriptFile = File.createTempFile("script.", ".sh") + val args = Array("arg1", "${arg.2}", "\"arg3\"", "'arg4'", "$arg5", "\\arg6") + try { + val argLine = args.map(a => YarnSparkHadoopUtil.escapeForShell(a)).mkString(" ") + Files.write(("bash -c \"echo " + argLine + "\"").getBytes(), scriptFile) + scriptFile.setExecutable(true) + + val proc = Runtime.getRuntime().exec(Array(scriptFile.getAbsolutePath())) + val out = new String(ByteStreams.toByteArray(proc.getInputStream())).trim() + val err = new String(ByteStreams.toByteArray(proc.getErrorStream())) + val exitCode = proc.waitFor() + exitCode should be (0) + out should be (args.mkString(" ")) + } finally { + scriptFile.delete() + } + } + + test("Yarn configuration override") { + val key = "yarn.nodemanager.hostname" + val default = new YarnConfiguration() + + val sparkConf = new SparkConf() + .set("spark.hadoop." + key, "someHostName") + val yarnConf = new YarnSparkHadoopUtil().newConfiguration(sparkConf) + + yarnConf.getClass() should be (classOf[YarnConfiguration]) + yarnConf.get(key) should not be default.get(key) + } + + + test("test getApplicationAclsForYarn acls on") { + + // spark acls on, just pick up default user + val sparkConf = new SparkConf() + sparkConf.set("spark.acls.enable", "true") + + val securityMgr = new SecurityManager(sparkConf) + val acls = YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr) + + val viewAcls = acls.get(ApplicationAccessType.VIEW_APP) + val modifyAcls = acls.get(ApplicationAccessType.MODIFY_APP) + + viewAcls match { + case Some(vacls) => { + val aclSet = vacls.split(',').map(_.trim).toSet + assert(aclSet.contains(System.getProperty("user.name", "invalid"))) + } + case None => { + fail() + } + } + modifyAcls match { + case Some(macls) => { + val aclSet = macls.split(',').map(_.trim).toSet + assert(aclSet.contains(System.getProperty("user.name", "invalid"))) + } + case None => { + fail() + } + } + } + + test("test getApplicationAclsForYarn acls on and specify users") { + + // default spark acls are on and specify acls + val sparkConf = new SparkConf() + sparkConf.set("spark.acls.enable", "true") + sparkConf.set("spark.ui.view.acls", "user1,user2") + sparkConf.set("spark.modify.acls", "user3,user4") + + val securityMgr = new SecurityManager(sparkConf) + val acls = YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr) + + val viewAcls = acls.get(ApplicationAccessType.VIEW_APP) + val modifyAcls = acls.get(ApplicationAccessType.MODIFY_APP) + + viewAcls match { + case Some(vacls) => { + val aclSet = vacls.split(',').map(_.trim).toSet + assert(aclSet.contains("user1")) + assert(aclSet.contains("user2")) + assert(aclSet.contains(System.getProperty("user.name", "invalid"))) + } + case None => { + fail() + } + } + modifyAcls match { + case Some(macls) => { + val aclSet = macls.split(',').map(_.trim).toSet + assert(aclSet.contains("user3")) + assert(aclSet.contains("user4")) + assert(aclSet.contains(System.getProperty("user.name", "invalid"))) + } + case None => { + fail() + } + } + + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/stable/pom.xml ---------------------------------------------------------------------- diff --git a/yarn/stable/pom.xml b/yarn/stable/pom.xml deleted file mode 100644 index 8b6521a..0000000 --- a/yarn/stable/pom.xml +++ /dev/null @@ -1,95 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one or more - ~ contributor license agreements. See the NOTICE file distributed with - ~ this work for additional information regarding copyright ownership. - ~ The ASF licenses this file to You under the Apache License, Version 2.0 - ~ (the "License"); you may not use this file except in compliance with - ~ the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.spark</groupId> - <artifactId>yarn-parent_2.10</artifactId> - <version>1.3.0-SNAPSHOT</version> - <relativePath>../pom.xml</relativePath> - </parent> - <properties> - <sbt.project.name>yarn-stable</sbt.project.name> - </properties> - - <groupId>org.apache.spark</groupId> - <artifactId>spark-yarn_2.10</artifactId> - <packaging>jar</packaging> - <name>Spark Project YARN Stable API</name> - - <dependencies> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-server-tests</artifactId> - <classifier>tests</classifier> - <scope>test</scope> - </dependency> - </dependencies> - - <!-- - See SPARK-3710. hadoop-yarn-server-tests in Hadoop 2.2 fails to pull some needed - dependencies, so they need to be added manually for the tests to work. - --> - <profiles> - <profile> - <id>hadoop-2.2</id> - <properties> - <jersey.version>1.9</jersey.version> - </properties> - <dependencies> - <dependency> - <groupId>org.mortbay.jetty</groupId> - <artifactId>jetty</artifactId> - <version>6.1.26</version> - <exclusions> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>servlet-api</artifactId> - </exclusion> - </exclusions> - <scope>test</scope> - </dependency> - <dependency> - <groupId>com.sun.jersey</groupId> - <artifactId>jersey-core</artifactId> - <version>${jersey.version}</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>com.sun.jersey</groupId> - <artifactId>jersey-json</artifactId> - <version>${jersey.version}</version> - <scope>test</scope> - <exclusions> - <exclusion> - <groupId>stax</groupId> - <artifactId>stax-api</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>com.sun.jersey</groupId> - <artifactId>jersey-server</artifactId> - <version>${jersey.version}</version> - <scope>test</scope> - </dependency> - </dependencies> - </profile> - </profiles> - -</project> http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala ---------------------------------------------------------------------- diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala deleted file mode 100644 index addaddb..0000000 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.yarn - -import java.nio.ByteBuffer - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.io.DataOutputBuffer -import org.apache.hadoop.yarn.api.records._ -import org.apache.hadoop.yarn.client.api.{YarnClient, YarnClientApplication} -import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.apache.hadoop.yarn.util.Records - -import org.apache.spark.{Logging, SparkConf} -import org.apache.spark.deploy.SparkHadoopUtil - -/** - * Version of [[org.apache.spark.deploy.yarn.ClientBase]] tailored to YARN's stable API. - */ -private[spark] class Client( - val args: ClientArguments, - val hadoopConf: Configuration, - val sparkConf: SparkConf) - extends ClientBase with Logging { - - def this(clientArgs: ClientArguments, spConf: SparkConf) = - this(clientArgs, SparkHadoopUtil.get.newConfiguration(spConf), spConf) - - def this(clientArgs: ClientArguments) = this(clientArgs, new SparkConf()) - - val yarnClient = YarnClient.createYarnClient - val yarnConf = new YarnConfiguration(hadoopConf) - - def stop(): Unit = yarnClient.stop() - - /* ------------------------------------------------------------------------------------- * - | The following methods have much in common in the stable and alpha versions of Client, | - | but cannot be implemented in the parent trait due to subtle API differences across | - | hadoop versions. | - * ------------------------------------------------------------------------------------- */ - - /** - * Submit an application running our ApplicationMaster to the ResourceManager. - * - * The stable Yarn API provides a convenience method (YarnClient#createApplication) for - * creating applications and setting up the application submission context. This was not - * available in the alpha API. - */ - override def submitApplication(): ApplicationId = { - yarnClient.init(yarnConf) - yarnClient.start() - - logInfo("Requesting a new application from cluster with %d NodeManagers" - .format(yarnClient.getYarnClusterMetrics.getNumNodeManagers)) - - // Get a new application from our RM - val newApp = yarnClient.createApplication() - val newAppResponse = newApp.getNewApplicationResponse() - val appId = newAppResponse.getApplicationId() - - // Verify whether the cluster has enough resources for our AM - verifyClusterResources(newAppResponse) - - // Set up the appropriate contexts to launch our AM - val containerContext = createContainerLaunchContext(newAppResponse) - val appContext = createApplicationSubmissionContext(newApp, containerContext) - - // Finally, submit and monitor the application - logInfo(s"Submitting application ${appId.getId} to ResourceManager") - yarnClient.submitApplication(appContext) - appId - } - - /** - * Set up the context for submitting our ApplicationMaster. - * This uses the YarnClientApplication not available in the Yarn alpha API. - */ - def createApplicationSubmissionContext( - newApp: YarnClientApplication, - containerContext: ContainerLaunchContext): ApplicationSubmissionContext = { - val appContext = newApp.getApplicationSubmissionContext - appContext.setApplicationName(args.appName) - appContext.setQueue(args.amQueue) - appContext.setAMContainerSpec(containerContext) - appContext.setApplicationType("SPARK") - val capability = Records.newRecord(classOf[Resource]) - capability.setMemory(args.amMemory + amMemoryOverhead) - appContext.setResource(capability) - appContext - } - - /** Set up security tokens for launching our ApplicationMaster container. */ - override def setupSecurityToken(amContainer: ContainerLaunchContext): Unit = { - val dob = new DataOutputBuffer - credentials.writeTokenStorageToStream(dob) - amContainer.setTokens(ByteBuffer.wrap(dob.getData)) - } - - /** Get the application report from the ResourceManager for an application we have submitted. */ - override def getApplicationReport(appId: ApplicationId): ApplicationReport = - yarnClient.getApplicationReport(appId) - - /** - * Return the security token used by this client to communicate with the ApplicationMaster. - * If no security is enabled, the token returned by the report is null. - */ - override def getClientToken(report: ApplicationReport): String = - Option(report.getClientToAMToken).map(_.toString).getOrElse("") -} - -object Client { - def main(argStrings: Array[String]) { - if (!sys.props.contains("SPARK_SUBMIT")) { - println("WARNING: This client is deprecated and will be removed in a " + - "future version of Spark. Use ./bin/spark-submit with \"--master yarn\"") - } - - // Set an env variable indicating we are running in YARN mode. - // Note that any env variable with the SPARK_ prefix gets propagated to all (remote) processes - System.setProperty("SPARK_YARN_MODE", "true") - val sparkConf = new SparkConf - - val args = new ClientArguments(argStrings, sparkConf) - new Client(args, sparkConf).run() - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala ---------------------------------------------------------------------- diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala deleted file mode 100644 index fdd3c23..0000000 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.yarn - -import java.nio.ByteBuffer -import java.security.PrivilegedExceptionAction - -import scala.collection.JavaConversions._ - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.io.DataOutputBuffer -import org.apache.hadoop.net.NetUtils -import org.apache.hadoop.security.UserGroupInformation -import org.apache.hadoop.yarn.api._ -import org.apache.hadoop.yarn.api.records._ -import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils -import org.apache.hadoop.yarn.api.protocolrecords._ -import org.apache.hadoop.yarn.client.api.NMClient -import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.apache.hadoop.yarn.ipc.YarnRPC -import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records} - -import org.apache.spark.{SecurityManager, SparkConf, Logging} -import org.apache.spark.network.util.JavaUtils - - -class ExecutorRunnable( - container: Container, - conf: Configuration, - spConf: SparkConf, - masterAddress: String, - slaveId: String, - hostname: String, - executorMemory: Int, - executorCores: Int, - appId: String, - securityMgr: SecurityManager) - extends Runnable with ExecutorRunnableUtil with Logging { - - var rpc: YarnRPC = YarnRPC.create(conf) - var nmClient: NMClient = _ - val sparkConf = spConf - val yarnConf: YarnConfiguration = new YarnConfiguration(conf) - - def run = { - logInfo("Starting Executor Container") - nmClient = NMClient.createNMClient() - nmClient.init(yarnConf) - nmClient.start() - startContainer - } - - def startContainer = { - logInfo("Setting up ContainerLaunchContext") - - val ctx = Records.newRecord(classOf[ContainerLaunchContext]) - .asInstanceOf[ContainerLaunchContext] - - val localResources = prepareLocalResources - ctx.setLocalResources(localResources) - - ctx.setEnvironment(env) - - val credentials = UserGroupInformation.getCurrentUser().getCredentials() - val dob = new DataOutputBuffer() - credentials.writeTokenStorageToStream(dob) - ctx.setTokens(ByteBuffer.wrap(dob.getData())) - - val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores, - appId, localResources) - - logInfo(s"Setting up executor with environment: $env") - logInfo("Setting up executor with commands: " + commands) - ctx.setCommands(commands) - - ctx.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr)) - - // If external shuffle service is enabled, register with the Yarn shuffle service already - // started on the NodeManager and, if authentication is enabled, provide it with our secret - // key for fetching shuffle files later - if (sparkConf.getBoolean("spark.shuffle.service.enabled", false)) { - val secretString = securityMgr.getSecretKey() - val secretBytes = - if (secretString != null) { - // This conversion must match how the YarnShuffleService decodes our secret - JavaUtils.stringToBytes(secretString) - } else { - // Authentication is not enabled, so just provide dummy metadata - ByteBuffer.allocate(0) - } - ctx.setServiceData(Map[String, ByteBuffer]("spark_shuffle" -> secretBytes)) - } - - // Send the start request to the ContainerManager - nmClient.startContainer(container, ctx) - } - -} http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala ---------------------------------------------------------------------- diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala deleted file mode 100644 index 2bbf5d7..0000000 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ /dev/null @@ -1,213 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.yarn - -import scala.collection.JavaConversions._ -import scala.collection.mutable.{ArrayBuffer, HashMap} - -import org.apache.spark.{SecurityManager, SparkConf} -import org.apache.spark.scheduler.SplitInfo - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.yarn.api.records._ -import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse -import org.apache.hadoop.yarn.client.api.AMRMClient -import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest -import org.apache.hadoop.yarn.util.Records - -/** - * Acquires resources for executors from a ResourceManager and launches executors in new containers. - */ -private[yarn] class YarnAllocationHandler( - conf: Configuration, - sparkConf: SparkConf, - amClient: AMRMClient[ContainerRequest], - appAttemptId: ApplicationAttemptId, - args: ApplicationMasterArguments, - preferredNodes: collection.Map[String, collection.Set[SplitInfo]], - securityMgr: SecurityManager) - extends YarnAllocator(conf, sparkConf, appAttemptId, args, preferredNodes, securityMgr) { - - override protected def releaseContainer(container: Container) = { - amClient.releaseAssignedContainer(container.getId()) - } - - // pending isn't used on stable as the AMRMClient handles incremental asks - override protected def allocateContainers(count: Int, pending: Int): YarnAllocateResponse = { - addResourceRequests(count) - - // We have already set the container request. Poll the ResourceManager for a response. - // This doubles as a heartbeat if there are no pending container requests. - val progressIndicator = 0.1f - new StableAllocateResponse(amClient.allocate(progressIndicator)) - } - - private def createRackResourceRequests( - hostContainers: ArrayBuffer[ContainerRequest] - ): ArrayBuffer[ContainerRequest] = { - // Generate modified racks and new set of hosts under it before issuing requests. - val rackToCounts = new HashMap[String, Int]() - - for (container <- hostContainers) { - val candidateHost = container.getNodes.last - assert(YarnSparkHadoopUtil.ANY_HOST != candidateHost) - - val rack = YarnSparkHadoopUtil.lookupRack(conf, candidateHost) - if (rack != null) { - var count = rackToCounts.getOrElse(rack, 0) - count += 1 - rackToCounts.put(rack, count) - } - } - - val requestedContainers = new ArrayBuffer[ContainerRequest](rackToCounts.size) - for ((rack, count) <- rackToCounts) { - requestedContainers ++= createResourceRequests( - AllocationType.RACK, - rack, - count, - YarnSparkHadoopUtil.RM_REQUEST_PRIORITY) - } - - requestedContainers - } - - private def addResourceRequests(numExecutors: Int) { - val containerRequests: List[ContainerRequest] = - if (numExecutors <= 0) { - logDebug("numExecutors: " + numExecutors) - List() - } else if (preferredHostToCount.isEmpty) { - logDebug("host preferences is empty") - createResourceRequests( - AllocationType.ANY, - resource = null, - numExecutors, - YarnSparkHadoopUtil.RM_REQUEST_PRIORITY).toList - } else { - // Request for all hosts in preferred nodes and for numExecutors - - // candidates.size, request by default allocation policy. - val hostContainerRequests = new ArrayBuffer[ContainerRequest](preferredHostToCount.size) - for ((candidateHost, candidateCount) <- preferredHostToCount) { - val requiredCount = candidateCount - allocatedContainersOnHost(candidateHost) - - if (requiredCount > 0) { - hostContainerRequests ++= createResourceRequests( - AllocationType.HOST, - candidateHost, - requiredCount, - YarnSparkHadoopUtil.RM_REQUEST_PRIORITY) - } - } - val rackContainerRequests: List[ContainerRequest] = createRackResourceRequests( - hostContainerRequests).toList - - val anyContainerRequests = createResourceRequests( - AllocationType.ANY, - resource = null, - numExecutors, - YarnSparkHadoopUtil.RM_REQUEST_PRIORITY) - - val containerRequestBuffer = new ArrayBuffer[ContainerRequest]( - hostContainerRequests.size + rackContainerRequests.size() + anyContainerRequests.size) - - containerRequestBuffer ++= hostContainerRequests - containerRequestBuffer ++= rackContainerRequests - containerRequestBuffer ++= anyContainerRequests - containerRequestBuffer.toList - } - - for (request <- containerRequests) { - amClient.addContainerRequest(request) - } - - for (request <- containerRequests) { - val nodes = request.getNodes - var hostStr = if (nodes == null || nodes.isEmpty) { - "Any" - } else { - nodes.last - } - logInfo("Container request (host: %s, priority: %s, capability: %s".format( - hostStr, - request.getPriority().getPriority, - request.getCapability)) - } - } - - private def createResourceRequests( - requestType: AllocationType.AllocationType, - resource: String, - numExecutors: Int, - priority: Int - ): ArrayBuffer[ContainerRequest] = { - - // If hostname is specified, then we need at least two requests - node local and rack local. - // There must be a third request, which is ANY. That will be specially handled. - requestType match { - case AllocationType.HOST => { - assert(YarnSparkHadoopUtil.ANY_HOST != resource) - val hostname = resource - val nodeLocal = constructContainerRequests( - Array(hostname), - racks = null, - numExecutors, - priority) - - // Add `hostname` to the global (singleton) host->rack mapping in YarnAllocationHandler. - YarnSparkHadoopUtil.populateRackInfo(conf, hostname) - nodeLocal - } - case AllocationType.RACK => { - val rack = resource - constructContainerRequests(hosts = null, Array(rack), numExecutors, priority) - } - case AllocationType.ANY => constructContainerRequests( - hosts = null, racks = null, numExecutors, priority) - case _ => throw new IllegalArgumentException( - "Unexpected/unsupported request type: " + requestType) - } - } - - private def constructContainerRequests( - hosts: Array[String], - racks: Array[String], - numExecutors: Int, - priority: Int - ): ArrayBuffer[ContainerRequest] = { - - val memoryRequest = executorMemory + memoryOverhead - val resource = Resource.newInstance(memoryRequest, executorCores) - - val prioritySetting = Records.newRecord(classOf[Priority]) - prioritySetting.setPriority(priority) - - val requests = new ArrayBuffer[ContainerRequest]() - for (i <- 0 until numExecutors) { - requests += new ContainerRequest(resource, hosts, racks, prioritySetting) - } - requests - } - - private class StableAllocateResponse(response: AllocateResponse) extends YarnAllocateResponse { - override def getAllocatedContainers() = response.getAllocatedContainers() - override def getAvailableResources() = response.getAvailableResources() - override def getCompletedContainersStatuses() = response.getCompletedContainersStatuses() - } - -} http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala ---------------------------------------------------------------------- diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala deleted file mode 100644 index 8d4b96e..0000000 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.yarn - -import java.util.{List => JList} - -import scala.collection.{Map, Set} -import scala.collection.JavaConversions._ -import scala.util._ - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.yarn.api._ -import org.apache.hadoop.yarn.api.protocolrecords._ -import org.apache.hadoop.yarn.api.records._ -import org.apache.hadoop.yarn.client.api.AMRMClient -import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest -import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.apache.hadoop.yarn.util.ConverterUtils -import org.apache.hadoop.yarn.webapp.util.WebAppUtils - -import org.apache.spark.{Logging, SecurityManager, SparkConf} -import org.apache.spark.scheduler.SplitInfo -import org.apache.spark.util.Utils - - -/** - * YarnRMClient implementation for the Yarn stable API. - */ -private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMClient with Logging { - - private var amClient: AMRMClient[ContainerRequest] = _ - private var uiHistoryAddress: String = _ - private var registered: Boolean = false - - override def register( - conf: YarnConfiguration, - sparkConf: SparkConf, - preferredNodeLocations: Map[String, Set[SplitInfo]], - uiAddress: String, - uiHistoryAddress: String, - securityMgr: SecurityManager) = { - amClient = AMRMClient.createAMRMClient() - amClient.init(conf) - amClient.start() - this.uiHistoryAddress = uiHistoryAddress - - logInfo("Registering the ApplicationMaster") - synchronized { - amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress) - registered = true - } - new YarnAllocationHandler(conf, sparkConf, amClient, getAttemptId(), args, - preferredNodeLocations, securityMgr) - } - - override def unregister(status: FinalApplicationStatus, diagnostics: String = "") = synchronized { - if (registered) { - amClient.unregisterApplicationMaster(status, diagnostics, uiHistoryAddress) - } - } - - override def getAttemptId() = { - val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name()) - val containerId = ConverterUtils.toContainerId(containerIdString) - val appAttemptId = containerId.getApplicationAttemptId() - appAttemptId - } - - override def getAmIpFilterParams(conf: YarnConfiguration, proxyBase: String) = { - // Figure out which scheme Yarn is using. Note the method seems to have been added after 2.2, - // so not all stable releases have it. - val prefix = Try(classOf[WebAppUtils].getMethod("getHttpSchemePrefix", classOf[Configuration]) - .invoke(null, conf).asInstanceOf[String]).getOrElse("http://") - - // If running a new enough Yarn, use the HA-aware API for retrieving the RM addresses. - try { - val method = classOf[WebAppUtils].getMethod("getProxyHostsAndPortsForAmFilter", - classOf[Configuration]) - val proxies = method.invoke(null, conf).asInstanceOf[JList[String]] - val hosts = proxies.map { proxy => proxy.split(":")(0) } - val uriBases = proxies.map { proxy => prefix + proxy + proxyBase } - Map("PROXY_HOSTS" -> hosts.mkString(","), "PROXY_URI_BASES" -> uriBases.mkString(",")) - } catch { - case e: NoSuchMethodException => - val proxy = WebAppUtils.getProxyHostAndPort(conf) - val parts = proxy.split(":") - val uriBase = prefix + proxy + proxyBase - Map("PROXY_HOST" -> parts(0), "PROXY_URI_BASE" -> uriBase) - } - } - - override def getMaxRegAttempts(conf: YarnConfiguration) = - conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS) - -} http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/stable/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/yarn/stable/src/test/resources/log4j.properties b/yarn/stable/src/test/resources/log4j.properties deleted file mode 100644 index 9dd05f1..0000000 --- a/yarn/stable/src/test/resources/log4j.properties +++ /dev/null @@ -1,28 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# Set everything to be logged to the file core/target/unit-tests.log -log4j.rootCategory=INFO, file -log4j.appender.file=org.apache.log4j.FileAppender -log4j.appender.file.append=false -log4j.appender.file.file=target/unit-tests.log -log4j.appender.file.layout=org.apache.log4j.PatternLayout -log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n - -# Ignore messages below warning level from Jetty, because it's a bit verbose -log4j.logger.org.eclipse.jetty=WARN -org.eclipse.jetty.LEVEL=WARN http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/stable/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala ---------------------------------------------------------------------- diff --git a/yarn/stable/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/stable/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala deleted file mode 100644 index d79b85e..0000000 --- a/yarn/stable/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ /dev/null @@ -1,189 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.yarn - -import java.io.File -import java.util.concurrent.TimeUnit - -import scala.collection.JavaConversions._ - -import com.google.common.base.Charsets -import com.google.common.io.Files -import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} - -import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.apache.hadoop.yarn.server.MiniYARNCluster - -import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException} -import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.util.Utils - -class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers with Logging { - - // log4j configuration for the Yarn containers, so that their output is collected - // by Yarn instead of trying to overwrite unit-tests.log. - private val LOG4J_CONF = """ - |log4j.rootCategory=DEBUG, console - |log4j.appender.console=org.apache.log4j.ConsoleAppender - |log4j.appender.console.target=System.err - |log4j.appender.console.layout=org.apache.log4j.PatternLayout - |log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n - """.stripMargin - - private var yarnCluster: MiniYARNCluster = _ - private var tempDir: File = _ - private var fakeSparkJar: File = _ - private var oldConf: Map[String, String] = _ - - override def beforeAll() { - tempDir = Utils.createTempDir() - - val logConfDir = new File(tempDir, "log4j") - logConfDir.mkdir() - - val logConfFile = new File(logConfDir, "log4j.properties") - Files.write(LOG4J_CONF, logConfFile, Charsets.UTF_8) - - val childClasspath = logConfDir.getAbsolutePath() + File.pathSeparator + - sys.props("java.class.path") - - oldConf = sys.props.filter { case (k, v) => k.startsWith("spark.") }.toMap - - yarnCluster = new MiniYARNCluster(getClass().getName(), 1, 1, 1) - yarnCluster.init(new YarnConfiguration()) - yarnCluster.start() - - // There's a race in MiniYARNCluster in which start() may return before the RM has updated - // its address in the configuration. You can see this in the logs by noticing that when - // MiniYARNCluster prints the address, it still has port "0" assigned, although later the - // test works sometimes: - // - // INFO MiniYARNCluster: MiniYARN ResourceManager address: blah:0 - // - // That log message prints the contents of the RM_ADDRESS config variable. If you check it - // later on, it looks something like this: - // - // INFO YarnClusterSuite: RM address in configuration is blah:42631 - // - // This hack loops for a bit waiting for the port to change, and fails the test if it hasn't - // done so in a timely manner (defined to be 10 seconds). - val config = yarnCluster.getConfig() - val deadline = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(10) - while (config.get(YarnConfiguration.RM_ADDRESS).split(":")(1) == "0") { - if (System.currentTimeMillis() > deadline) { - throw new IllegalStateException("Timed out waiting for RM to come up.") - } - logDebug("RM address still not set in configuration, waiting...") - TimeUnit.MILLISECONDS.sleep(100) - } - - logInfo(s"RM address in configuration is ${config.get(YarnConfiguration.RM_ADDRESS)}") - config.foreach { e => - sys.props += ("spark.hadoop." + e.getKey() -> e.getValue()) - } - - fakeSparkJar = File.createTempFile("sparkJar", null, tempDir) - sys.props += ("spark.yarn.jar" -> ("local:" + fakeSparkJar.getAbsolutePath())) - sys.props += ("spark.executor.instances" -> "1") - sys.props += ("spark.driver.extraClassPath" -> childClasspath) - sys.props += ("spark.executor.extraClassPath" -> childClasspath) - - super.beforeAll() - } - - override def afterAll() { - yarnCluster.stop() - sys.props.retain { case (k, v) => !k.startsWith("spark.") } - sys.props ++= oldConf - super.afterAll() - } - - test("run Spark in yarn-client mode") { - var result = File.createTempFile("result", null, tempDir) - YarnClusterDriver.main(Array("yarn-client", result.getAbsolutePath())) - checkResult(result) - } - - test("run Spark in yarn-cluster mode") { - val main = YarnClusterDriver.getClass.getName().stripSuffix("$") - var result = File.createTempFile("result", null, tempDir) - - val args = Array("--class", main, - "--jar", "file:" + fakeSparkJar.getAbsolutePath(), - "--arg", "yarn-cluster", - "--arg", result.getAbsolutePath(), - "--num-executors", "1") - Client.main(args) - checkResult(result) - } - - test("run Spark in yarn-cluster mode unsuccessfully") { - val main = YarnClusterDriver.getClass.getName().stripSuffix("$") - - // Use only one argument so the driver will fail - val args = Array("--class", main, - "--jar", "file:" + fakeSparkJar.getAbsolutePath(), - "--arg", "yarn-cluster", - "--num-executors", "1") - val exception = intercept[SparkException] { - Client.main(args) - } - assert(Utils.exceptionString(exception).contains("Application finished with failed status")) - } - - /** - * This is a workaround for an issue with yarn-cluster mode: the Client class will not provide - * any sort of error when the job process finishes successfully, but the job itself fails. So - * the tests enforce that something is written to a file after everything is ok to indicate - * that the job succeeded. - */ - private def checkResult(result: File) = { - var resultString = Files.toString(result, Charsets.UTF_8) - resultString should be ("success") - } - -} - -private object YarnClusterDriver extends Logging with Matchers { - - def main(args: Array[String]) = { - if (args.length != 2) { - System.err.println( - s""" - |Invalid command line: ${args.mkString(" ")} - | - |Usage: YarnClusterDriver [master] [result file] - """.stripMargin) - System.exit(1) - } - - val sc = new SparkContext(new SparkConf().setMaster(args(0)) - .setAppName("yarn \"test app\" 'with quotes' and \\back\\slashes and $dollarSigns")) - val status = new File(args(1)) - var result = "failure" - try { - val data = sc.parallelize(1 to 4, 4).collect().toSet - data should be (Set(1, 2, 3, 4)) - result = "success" - } finally { - sc.stop() - Files.write(result, status, Charsets.UTF_8) - } - } - -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
