http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala new file mode 100644 index 0000000..b32e157 --- /dev/null +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -0,0 +1,538 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import java.util.{List => JList} +import java.util.concurrent._ +import java.util.concurrent.atomic.AtomicInteger +import java.util.regex.Pattern + +import scala.collection.JavaConversions._ +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.yarn.api.records._ +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse + +import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv} +import org.apache.spark.scheduler.{SplitInfo, TaskSchedulerImpl} +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend + +import com.google.common.util.concurrent.ThreadFactoryBuilder +import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ + +object AllocationType extends Enumeration { + type AllocationType = Value + val HOST, RACK, ANY = Value +} + +// TODO: +// Too many params. +// Needs to be mt-safe +// Need to refactor this to make it 'cleaner' ... right now, all computation is reactive - should +// make it more proactive and decoupled. + +// Note that right now, we assume all node asks as uniform in terms of capabilities and priority +// Refer to http://developer.yahoo.com/blogs/hadoop/posts/2011/03/mapreduce-nextgen-scheduler/ for +// more info on how we are requesting for containers. + +/** + * Common code for the Yarn container allocator. Contains all the version-agnostic code to + * manage container allocation for a running Spark application. + */ +private[yarn] abstract class YarnAllocator( + conf: Configuration, + sparkConf: SparkConf, + appAttemptId: ApplicationAttemptId, + args: ApplicationMasterArguments, + preferredNodes: collection.Map[String, collection.Set[SplitInfo]], + securityMgr: SecurityManager) + extends Logging { + + import YarnAllocator._ + + // These three are locked on allocatedHostToContainersMap. Complementary data structures + // allocatedHostToContainersMap : containers which are running : host, Set<containerid> + // allocatedContainerToHostMap: container to host mapping. + private val allocatedHostToContainersMap = + new HashMap[String, collection.mutable.Set[ContainerId]]() + + private val allocatedContainerToHostMap = new HashMap[ContainerId, String]() + + // allocatedRackCount is populated ONLY if allocation happens (or decremented if this is an + // allocated node) + // As with the two data structures above, tightly coupled with them, and to be locked on + // allocatedHostToContainersMap + private val allocatedRackCount = new HashMap[String, Int]() + + // Containers to be released in next request to RM + private val releasedContainers = new ConcurrentHashMap[ContainerId, Boolean] + + // Number of container requests that have been sent to, but not yet allocated by the + // ApplicationMaster. + private val numPendingAllocate = new AtomicInteger() + private val numExecutorsRunning = new AtomicInteger() + // Used to generate a unique id per executor + private val executorIdCounter = new AtomicInteger() + private val numExecutorsFailed = new AtomicInteger() + + private var maxExecutors = args.numExecutors + + // Keep track of which container is running which executor to remove the executors later + private val executorIdToContainer = new HashMap[String, Container] + + protected val executorMemory = args.executorMemory + protected val executorCores = args.executorCores + protected val (preferredHostToCount, preferredRackToCount) = + generateNodeToWeight(conf, preferredNodes) + + // Additional memory overhead - in mb. + protected val memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead", + math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN)) + + private val launcherPool = new ThreadPoolExecutor( + // max pool size of Integer.MAX_VALUE is ignored because we use an unbounded queue + sparkConf.getInt("spark.yarn.containerLauncherMaxThreads", 25), Integer.MAX_VALUE, + 1, TimeUnit.MINUTES, + new LinkedBlockingQueue[Runnable](), + new ThreadFactoryBuilder().setNameFormat("ContainerLauncher #%d").setDaemon(true).build()) + launcherPool.allowCoreThreadTimeOut(true) + + def getNumExecutorsRunning: Int = numExecutorsRunning.intValue + + def getNumExecutorsFailed: Int = numExecutorsFailed.intValue + + /** + * Request as many executors from the ResourceManager as needed to reach the desired total. + * This takes into account executors already running or pending. + */ + def requestTotalExecutors(requestedTotal: Int): Unit = synchronized { + val currentTotal = numPendingAllocate.get + numExecutorsRunning.get + if (requestedTotal > currentTotal) { + maxExecutors += (requestedTotal - currentTotal) + // We need to call `allocateResources` here to avoid the following race condition: + // If we request executors twice before `allocateResources` is called, then we will end up + // double counting the number requested because `numPendingAllocate` is not updated yet. + allocateResources() + } else { + logInfo(s"Not allocating more executors because there are already $currentTotal " + + s"(application requested $requestedTotal total)") + } + } + + /** + * Request that the ResourceManager release the container running the specified executor. + */ + def killExecutor(executorId: String): Unit = synchronized { + if (executorIdToContainer.contains(executorId)) { + val container = executorIdToContainer.remove(executorId).get + internalReleaseContainer(container) + numExecutorsRunning.decrementAndGet() + maxExecutors -= 1 + assert(maxExecutors >= 0, "Allocator killed more executors than are allocated!") + } else { + logWarning(s"Attempted to kill unknown executor $executorId!") + } + } + + /** + * Allocate missing containers based on the number of executors currently pending and running. + * + * This method prioritizes the allocated container responses from the RM based on node and + * rack locality. Additionally, it releases any extra containers allocated for this application + * but are not needed. This must be synchronized because variables read in this block are + * mutated by other methods. + */ + def allocateResources(): Unit = synchronized { + val missing = maxExecutors - numPendingAllocate.get() - numExecutorsRunning.get() + + // this is needed by alpha, do it here since we add numPending right after this + val executorsPending = numPendingAllocate.get() + if (missing > 0) { + val totalExecutorMemory = executorMemory + memoryOverhead + numPendingAllocate.addAndGet(missing) + logInfo(s"Will allocate $missing executor containers, each with $totalExecutorMemory MB " + + s"memory including $memoryOverhead MB overhead") + } else { + logDebug("Empty allocation request ...") + } + + val allocateResponse = allocateContainers(missing, executorsPending) + val allocatedContainers = allocateResponse.getAllocatedContainers() + + if (allocatedContainers.size > 0) { + var numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * allocatedContainers.size) + + if (numPendingAllocateNow < 0) { + numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * numPendingAllocateNow) + } + + logDebug(""" + Allocated containers: %d + Current executor count: %d + Containers released: %s + Cluster resources: %s + """.format( + allocatedContainers.size, + numExecutorsRunning.get(), + releasedContainers, + allocateResponse.getAvailableResources)) + + val hostToContainers = new HashMap[String, ArrayBuffer[Container]]() + + for (container <- allocatedContainers) { + if (isResourceConstraintSatisfied(container)) { + // Add the accepted `container` to the host's list of already accepted, + // allocated containers + val host = container.getNodeId.getHost + val containersForHost = hostToContainers.getOrElseUpdate(host, + new ArrayBuffer[Container]()) + containersForHost += container + } else { + // Release container, since it doesn't satisfy resource constraints. + internalReleaseContainer(container) + } + } + + // Find the appropriate containers to use. + // TODO: Cleanup this group-by... + val dataLocalContainers = new HashMap[String, ArrayBuffer[Container]]() + val rackLocalContainers = new HashMap[String, ArrayBuffer[Container]]() + val offRackContainers = new HashMap[String, ArrayBuffer[Container]]() + + for (candidateHost <- hostToContainers.keySet) { + val maxExpectedHostCount = preferredHostToCount.getOrElse(candidateHost, 0) + val requiredHostCount = maxExpectedHostCount - allocatedContainersOnHost(candidateHost) + + val remainingContainersOpt = hostToContainers.get(candidateHost) + assert(remainingContainersOpt.isDefined) + var remainingContainers = remainingContainersOpt.get + + if (requiredHostCount >= remainingContainers.size) { + // Since we have <= required containers, add all remaining containers to + // `dataLocalContainers`. + dataLocalContainers.put(candidateHost, remainingContainers) + // There are no more free containers remaining. + remainingContainers = null + } else if (requiredHostCount > 0) { + // Container list has more containers than we need for data locality. + // Split the list into two: one based on the data local container count, + // (`remainingContainers.size` - `requiredHostCount`), and the other to hold remaining + // containers. + val (dataLocal, remaining) = remainingContainers.splitAt( + remainingContainers.size - requiredHostCount) + dataLocalContainers.put(candidateHost, dataLocal) + + // Invariant: remainingContainers == remaining + + // YARN has a nasty habit of allocating a ton of containers on a host - discourage this. + // Add each container in `remaining` to list of containers to release. If we have an + // insufficient number of containers, then the next allocation cycle will reallocate + // (but won't treat it as data local). + // TODO(harvey): Rephrase this comment some more. + for (container <- remaining) internalReleaseContainer(container) + remainingContainers = null + } + + // For rack local containers + if (remainingContainers != null) { + val rack = YarnSparkHadoopUtil.lookupRack(conf, candidateHost) + if (rack != null) { + val maxExpectedRackCount = preferredRackToCount.getOrElse(rack, 0) + val requiredRackCount = maxExpectedRackCount - allocatedContainersOnRack(rack) - + rackLocalContainers.getOrElse(rack, List()).size + + if (requiredRackCount >= remainingContainers.size) { + // Add all remaining containers to to `dataLocalContainers`. + dataLocalContainers.put(rack, remainingContainers) + remainingContainers = null + } else if (requiredRackCount > 0) { + // Container list has more containers that we need for data locality. + // Split the list into two: one based on the data local container count, + // (`remainingContainers.size` - `requiredHostCount`), and the other to hold remaining + // containers. + val (rackLocal, remaining) = remainingContainers.splitAt( + remainingContainers.size - requiredRackCount) + val existingRackLocal = rackLocalContainers.getOrElseUpdate(rack, + new ArrayBuffer[Container]()) + + existingRackLocal ++= rackLocal + + remainingContainers = remaining + } + } + } + + if (remainingContainers != null) { + // Not all containers have been consumed - add them to the list of off-rack containers. + offRackContainers.put(candidateHost, remainingContainers) + } + } + + // Now that we have split the containers into various groups, go through them in order: + // first host-local, then rack-local, and finally off-rack. + // Note that the list we create below tries to ensure that not all containers end up within + // a host if there is a sufficiently large number of hosts/containers. + val allocatedContainersToProcess = new ArrayBuffer[Container](allocatedContainers.size) + allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(dataLocalContainers) + allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(rackLocalContainers) + allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(offRackContainers) + + // Run each of the allocated containers. + for (container <- allocatedContainersToProcess) { + val numExecutorsRunningNow = numExecutorsRunning.incrementAndGet() + val executorHostname = container.getNodeId.getHost + val containerId = container.getId + + val executorMemoryOverhead = (executorMemory + memoryOverhead) + assert(container.getResource.getMemory >= executorMemoryOverhead) + + if (numExecutorsRunningNow > maxExecutors) { + logInfo("""Ignoring container %s at host %s, since we already have the required number of + containers for it.""".format(containerId, executorHostname)) + internalReleaseContainer(container) + numExecutorsRunning.decrementAndGet() + } else { + val executorId = executorIdCounter.incrementAndGet().toString + val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format( + SparkEnv.driverActorSystemName, + sparkConf.get("spark.driver.host"), + sparkConf.get("spark.driver.port"), + CoarseGrainedSchedulerBackend.ACTOR_NAME) + + logInfo("Launching container %s for on host %s".format(containerId, executorHostname)) + executorIdToContainer(executorId) = container + + // To be safe, remove the container from `releasedContainers`. + releasedContainers.remove(containerId) + + val rack = YarnSparkHadoopUtil.lookupRack(conf, executorHostname) + allocatedHostToContainersMap.synchronized { + val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname, + new HashSet[ContainerId]()) + + containerSet += containerId + allocatedContainerToHostMap.put(containerId, executorHostname) + + if (rack != null) { + allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 0) + 1) + } + } + logInfo("Launching ExecutorRunnable. driverUrl: %s, executorHostname: %s".format( + driverUrl, executorHostname)) + val executorRunnable = new ExecutorRunnable( + container, + conf, + sparkConf, + driverUrl, + executorId, + executorHostname, + executorMemory, + executorCores, + appAttemptId.getApplicationId.toString, + securityMgr) + launcherPool.execute(executorRunnable) + } + } + logDebug(""" + Finished allocating %s containers (from %s originally). + Current number of executors running: %d, + Released containers: %s + """.format( + allocatedContainersToProcess, + allocatedContainers, + numExecutorsRunning.get(), + releasedContainers)) + } + + val completedContainers = allocateResponse.getCompletedContainersStatuses() + if (completedContainers.size > 0) { + logDebug("Completed %d containers".format(completedContainers.size)) + + for (completedContainer <- completedContainers) { + val containerId = completedContainer.getContainerId + + if (releasedContainers.containsKey(containerId)) { + // YarnAllocationHandler already marked the container for release, so remove it from + // `releasedContainers`. + releasedContainers.remove(containerId) + } else { + // Decrement the number of executors running. The next iteration of + // the ApplicationMaster's reporting thread will take care of allocating. + numExecutorsRunning.decrementAndGet() + logInfo("Completed container %s (state: %s, exit status: %s)".format( + containerId, + completedContainer.getState, + completedContainer.getExitStatus)) + // Hadoop 2.2.X added a ContainerExitStatus we should switch to use + // there are some exit status' we shouldn't necessarily count against us, but for + // now I think its ok as none of the containers are expected to exit + if (completedContainer.getExitStatus == -103) { // vmem limit exceeded + logWarning(memLimitExceededLogMessage( + completedContainer.getDiagnostics, + VMEM_EXCEEDED_PATTERN)) + } else if (completedContainer.getExitStatus == -104) { // pmem limit exceeded + logWarning(memLimitExceededLogMessage( + completedContainer.getDiagnostics, + PMEM_EXCEEDED_PATTERN)) + } else if (completedContainer.getExitStatus != 0) { + logInfo("Container marked as failed: " + containerId + + ". Exit status: " + completedContainer.getExitStatus + + ". Diagnostics: " + completedContainer.getDiagnostics) + numExecutorsFailed.incrementAndGet() + } + } + + allocatedHostToContainersMap.synchronized { + if (allocatedContainerToHostMap.containsKey(containerId)) { + val hostOpt = allocatedContainerToHostMap.get(containerId) + assert(hostOpt.isDefined) + val host = hostOpt.get + + val containerSetOpt = allocatedHostToContainersMap.get(host) + assert(containerSetOpt.isDefined) + val containerSet = containerSetOpt.get + + containerSet.remove(containerId) + if (containerSet.isEmpty) { + allocatedHostToContainersMap.remove(host) + } else { + allocatedHostToContainersMap.update(host, containerSet) + } + + allocatedContainerToHostMap.remove(containerId) + + // TODO: Move this part outside the synchronized block? + val rack = YarnSparkHadoopUtil.lookupRack(conf, host) + if (rack != null) { + val rackCount = allocatedRackCount.getOrElse(rack, 0) - 1 + if (rackCount > 0) { + allocatedRackCount.put(rack, rackCount) + } else { + allocatedRackCount.remove(rack) + } + } + } + } + } + logDebug(""" + Finished processing %d completed containers. + Current number of executors running: %d, + Released containers: %s + """.format( + completedContainers.size, + numExecutorsRunning.get(), + releasedContainers)) + } + } + + protected def allocatedContainersOnHost(host: String): Int = { + var retval = 0 + allocatedHostToContainersMap.synchronized { + retval = allocatedHostToContainersMap.getOrElse(host, Set()).size + } + retval + } + + protected def allocatedContainersOnRack(rack: String): Int = { + var retval = 0 + allocatedHostToContainersMap.synchronized { + retval = allocatedRackCount.getOrElse(rack, 0) + } + retval + } + + private def isResourceConstraintSatisfied(container: Container): Boolean = { + container.getResource.getMemory >= (executorMemory + memoryOverhead) + } + + // A simple method to copy the split info map. + private def generateNodeToWeight( + conf: Configuration, + input: collection.Map[String, collection.Set[SplitInfo]] + ): (Map[String, Int], Map[String, Int]) = { + + if (input == null) { + return (Map[String, Int](), Map[String, Int]()) + } + + val hostToCount = new HashMap[String, Int] + val rackToCount = new HashMap[String, Int] + + for ((host, splits) <- input) { + val hostCount = hostToCount.getOrElse(host, 0) + hostToCount.put(host, hostCount + splits.size) + + val rack = YarnSparkHadoopUtil.lookupRack(conf, host) + if (rack != null) { + val rackCount = rackToCount.getOrElse(host, 0) + rackToCount.put(host, rackCount + splits.size) + } + } + + (hostToCount.toMap, rackToCount.toMap) + } + + private def internalReleaseContainer(container: Container) = { + releasedContainers.put(container.getId(), true) + releaseContainer(container) + } + + /** + * Called to allocate containers in the cluster. + * + * @param count Number of containers to allocate. + * If zero, should still contact RM (as a heartbeat). + * @param pending Number of containers pending allocate. Only used on alpha. + * @return Response to the allocation request. + */ + protected def allocateContainers(count: Int, pending: Int): YarnAllocateResponse + + /** Called to release a previously allocated container. */ + protected def releaseContainer(container: Container): Unit + + /** + * Defines the interface for an allocate response from the RM. This is needed since the alpha + * and stable interfaces differ here in ways that cannot be fixed using other routes. + */ + protected trait YarnAllocateResponse { + + def getAllocatedContainers(): JList[Container] + + def getAvailableResources(): Resource + + def getCompletedContainersStatuses(): JList[ContainerStatus] + + } + +} + +private object YarnAllocator { + val MEM_REGEX = "[0-9.]+ [KMG]B" + val PMEM_EXCEEDED_PATTERN = + Pattern.compile(s"$MEM_REGEX of $MEM_REGEX physical memory used") + val VMEM_EXCEEDED_PATTERN = + Pattern.compile(s"$MEM_REGEX of $MEM_REGEX virtual memory used") + + def memLimitExceededLogMessage(diagnostics: String, pattern: Pattern): String = { + val matcher = pattern.matcher(diagnostics) + val diag = if (matcher.find()) " " + matcher.group() + "." else "" + ("Container killed by YARN for exceeding memory limits." + diag + + " Consider boosting spark.yarn.executor.memoryOverhead.") + } +}
http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala new file mode 100644 index 0000000..2510b9c --- /dev/null +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import scala.collection.{Map, Set} + +import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.apache.hadoop.yarn.api.records._ + +import org.apache.spark.{SecurityManager, SparkConf, SparkContext} +import org.apache.spark.scheduler.SplitInfo + +/** + * Interface that defines a Yarn RM client. Abstracts away Yarn version-specific functionality that + * is used by Spark's AM. + */ +trait YarnRMClient { + + /** + * Registers the application master with the RM. + * + * @param conf The Yarn configuration. + * @param sparkConf The Spark configuration. + * @param preferredNodeLocations Map with hints about where to allocate containers. + * @param uiAddress Address of the SparkUI. + * @param uiHistoryAddress Address of the application on the History Server. + */ + def register( + conf: YarnConfiguration, + sparkConf: SparkConf, + preferredNodeLocations: Map[String, Set[SplitInfo]], + uiAddress: String, + uiHistoryAddress: String, + securityMgr: SecurityManager): YarnAllocator + + /** + * Unregister the AM. Guaranteed to only be called once. + * + * @param status The final status of the AM. + * @param diagnostics Diagnostics message to include in the final status. + */ + def unregister(status: FinalApplicationStatus, diagnostics: String = ""): Unit + + /** Returns the attempt ID. */ + def getAttemptId(): ApplicationAttemptId + + /** Returns the configuration for the AmIpFilter to add to the Spark UI. */ + def getAmIpFilterParams(conf: YarnConfiguration, proxyBase: String): Map[String, String] + + /** Returns the maximum number of attempts to register the AM. */ + def getMaxRegAttempts(conf: YarnConfiguration): Int + +} http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala new file mode 100644 index 0000000..8d4b96e --- /dev/null +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import java.util.{List => JList} + +import scala.collection.{Map, Set} +import scala.collection.JavaConversions._ +import scala.util._ + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.yarn.api._ +import org.apache.hadoop.yarn.api.protocolrecords._ +import org.apache.hadoop.yarn.api.records._ +import org.apache.hadoop.yarn.client.api.AMRMClient +import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest +import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.apache.hadoop.yarn.util.ConverterUtils +import org.apache.hadoop.yarn.webapp.util.WebAppUtils + +import org.apache.spark.{Logging, SecurityManager, SparkConf} +import org.apache.spark.scheduler.SplitInfo +import org.apache.spark.util.Utils + + +/** + * YarnRMClient implementation for the Yarn stable API. + */ +private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMClient with Logging { + + private var amClient: AMRMClient[ContainerRequest] = _ + private var uiHistoryAddress: String = _ + private var registered: Boolean = false + + override def register( + conf: YarnConfiguration, + sparkConf: SparkConf, + preferredNodeLocations: Map[String, Set[SplitInfo]], + uiAddress: String, + uiHistoryAddress: String, + securityMgr: SecurityManager) = { + amClient = AMRMClient.createAMRMClient() + amClient.init(conf) + amClient.start() + this.uiHistoryAddress = uiHistoryAddress + + logInfo("Registering the ApplicationMaster") + synchronized { + amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress) + registered = true + } + new YarnAllocationHandler(conf, sparkConf, amClient, getAttemptId(), args, + preferredNodeLocations, securityMgr) + } + + override def unregister(status: FinalApplicationStatus, diagnostics: String = "") = synchronized { + if (registered) { + amClient.unregisterApplicationMaster(status, diagnostics, uiHistoryAddress) + } + } + + override def getAttemptId() = { + val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name()) + val containerId = ConverterUtils.toContainerId(containerIdString) + val appAttemptId = containerId.getApplicationAttemptId() + appAttemptId + } + + override def getAmIpFilterParams(conf: YarnConfiguration, proxyBase: String) = { + // Figure out which scheme Yarn is using. Note the method seems to have been added after 2.2, + // so not all stable releases have it. + val prefix = Try(classOf[WebAppUtils].getMethod("getHttpSchemePrefix", classOf[Configuration]) + .invoke(null, conf).asInstanceOf[String]).getOrElse("http://") + + // If running a new enough Yarn, use the HA-aware API for retrieving the RM addresses. + try { + val method = classOf[WebAppUtils].getMethod("getProxyHostsAndPortsForAmFilter", + classOf[Configuration]) + val proxies = method.invoke(null, conf).asInstanceOf[JList[String]] + val hosts = proxies.map { proxy => proxy.split(":")(0) } + val uriBases = proxies.map { proxy => prefix + proxy + proxyBase } + Map("PROXY_HOSTS" -> hosts.mkString(","), "PROXY_URI_BASES" -> uriBases.mkString(",")) + } catch { + case e: NoSuchMethodException => + val proxy = WebAppUtils.getProxyHostAndPort(conf) + val parts = proxy.split(":") + val uriBase = prefix + proxy + proxyBase + Map("PROXY_HOST" -> parts(0), "PROXY_URI_BASE" -> uriBase) + } + } + + override def getMaxRegAttempts(conf: YarnConfiguration) = + conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS) + +} http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala new file mode 100644 index 0000000..d7cf904 --- /dev/null +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import java.lang.{Boolean => JBoolean} +import java.io.File +import java.util.{Collections, Set => JSet} +import java.util.regex.Matcher +import java.util.regex.Pattern +import java.util.concurrent.ConcurrentHashMap + +import scala.collection.mutable.HashMap + +import org.apache.hadoop.io.Text +import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.security.Credentials +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.apache.hadoop.yarn.api.records.ApplicationAccessType +import org.apache.hadoop.yarn.util.RackResolver +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.{SecurityManager, SparkConf} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.util.Utils + +/** + * Contains util methods to interact with Hadoop from spark. + */ +class YarnSparkHadoopUtil extends SparkHadoopUtil { + + override def transferCredentials(source: UserGroupInformation, dest: UserGroupInformation) { + dest.addCredentials(source.getCredentials()) + } + + // Note that all params which start with SPARK are propagated all the way through, so if in yarn + // mode, this MUST be set to true. + override def isYarnMode(): Boolean = { true } + + // Return an appropriate (subclass) of Configuration. Creating a config initializes some Hadoop + // subsystems. Always create a new config, dont reuse yarnConf. + override def newConfiguration(conf: SparkConf): Configuration = + new YarnConfiguration(super.newConfiguration(conf)) + + // Add any user credentials to the job conf which are necessary for running on a secure Hadoop + // cluster + override def addCredentials(conf: JobConf) { + val jobCreds = conf.getCredentials() + jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials()) + } + + override def getCurrentUserCredentials(): Credentials = { + UserGroupInformation.getCurrentUser().getCredentials() + } + + override def addCurrentUserCredentials(creds: Credentials) { + UserGroupInformation.getCurrentUser().addCredentials(creds) + } + + override def addSecretKeyToUserCredentials(key: String, secret: String) { + val creds = new Credentials() + creds.addSecretKey(new Text(key), secret.getBytes("utf-8")) + addCurrentUserCredentials(creds) + } + + override def getSecretKeyFromUserCredentials(key: String): Array[Byte] = { + val credentials = getCurrentUserCredentials() + if (credentials != null) credentials.getSecretKey(new Text(key)) else null + } + +} + +object YarnSparkHadoopUtil { + // Additional memory overhead + // 7% was arrived at experimentally. In the interest of minimizing memory waste while covering + // the common cases. Memory overhead tends to grow with container size. + + val MEMORY_OVERHEAD_FACTOR = 0.07 + val MEMORY_OVERHEAD_MIN = 384 + + val ANY_HOST = "*" + + val DEFAULT_NUMBER_EXECUTORS = 2 + + // All RM requests are issued with same priority : we do not (yet) have any distinction between + // request types (like map/reduce in hadoop for example) + val RM_REQUEST_PRIORITY = 1 + + // Host to rack map - saved from allocation requests. We are expecting this not to change. + // Note that it is possible for this to change : and ResourceManager will indicate that to us via + // update response to allocate. But we are punting on handling that for now. + private val hostToRack = new ConcurrentHashMap[String, String]() + private val rackToHostSet = new ConcurrentHashMap[String, JSet[String]]() + + /** + * Add a path variable to the given environment map. + * If the map already contains this key, append the value to the existing value instead. + */ + def addPathToEnvironment(env: HashMap[String, String], key: String, value: String): Unit = { + val newValue = if (env.contains(key)) { env(key) + File.pathSeparator + value } else value + env.put(key, newValue) + } + + /** + * Set zero or more environment variables specified by the given input string. + * The input string is expected to take the form "KEY1=VAL1,KEY2=VAL2,KEY3=VAL3". + */ + def setEnvFromInputString(env: HashMap[String, String], inputString: String): Unit = { + if (inputString != null && inputString.length() > 0) { + val childEnvs = inputString.split(",") + val p = Pattern.compile(environmentVariableRegex) + for (cEnv <- childEnvs) { + val parts = cEnv.split("=") // split on '=' + val m = p.matcher(parts(1)) + val sb = new StringBuffer + while (m.find()) { + val variable = m.group(1) + var replace = "" + if (env.get(variable) != None) { + replace = env.get(variable).get + } else { + // if this key is not configured for the child .. get it from the env + replace = System.getenv(variable) + if (replace == null) { + // the env key is note present anywhere .. simply set it + replace = "" + } + } + m.appendReplacement(sb, Matcher.quoteReplacement(replace)) + } + m.appendTail(sb) + // This treats the environment variable as path variable delimited by `File.pathSeparator` + // This is kept for backward compatibility and consistency with Hadoop's behavior + addPathToEnvironment(env, parts(0), sb.toString) + } + } + } + + private val environmentVariableRegex: String = { + if (Utils.isWindows) { + "%([A-Za-z_][A-Za-z0-9_]*?)%" + } else { + "\\$([A-Za-z_][A-Za-z0-9_]*)" + } + } + + /** + * Escapes a string for inclusion in a command line executed by Yarn. Yarn executes commands + * using `bash -c "command arg1 arg2"` and that means plain quoting doesn't really work. The + * argument is enclosed in single quotes and some key characters are escaped. + * + * @param arg A single argument. + * @return Argument quoted for execution via Yarn's generated shell script. + */ + def escapeForShell(arg: String): String = { + if (arg != null) { + val escaped = new StringBuilder("'") + for (i <- 0 to arg.length() - 1) { + arg.charAt(i) match { + case '$' => escaped.append("\\$") + case '"' => escaped.append("\\\"") + case '\'' => escaped.append("'\\''") + case c => escaped.append(c) + } + } + escaped.append("'").toString() + } else { + arg + } + } + + def lookupRack(conf: Configuration, host: String): String = { + if (!hostToRack.contains(host)) { + populateRackInfo(conf, host) + } + hostToRack.get(host) + } + + def populateRackInfo(conf: Configuration, hostname: String) { + Utils.checkHost(hostname) + + if (!hostToRack.containsKey(hostname)) { + // If there are repeated failures to resolve, all to an ignore list. + val rackInfo = RackResolver.resolve(conf, hostname) + if (rackInfo != null && rackInfo.getNetworkLocation != null) { + val rack = rackInfo.getNetworkLocation + hostToRack.put(hostname, rack) + if (! rackToHostSet.containsKey(rack)) { + rackToHostSet.putIfAbsent(rack, + Collections.newSetFromMap(new ConcurrentHashMap[String, JBoolean]())) + } + rackToHostSet.get(rack).add(hostname) + + // TODO(harvey): Figure out what this comment means... + // Since RackResolver caches, we are disabling this for now ... + } /* else { + // right ? Else we will keep calling rack resolver in case we cant resolve rack info ... + hostToRack.put(hostname, null) + } */ + } + } + + def getApplicationAclsForYarn(securityMgr: SecurityManager) + : Map[ApplicationAccessType, String] = { + Map[ApplicationAccessType, String] ( + ApplicationAccessType.VIEW_APP -> securityMgr.getViewAcls, + ApplicationAccessType.MODIFY_APP -> securityMgr.getModifyAcls + ) + } + +} http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala new file mode 100644 index 0000000..254774a --- /dev/null +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster + +import org.apache.spark._ +import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil +import org.apache.spark.scheduler.TaskSchedulerImpl +import org.apache.spark.util.Utils + +/** + * This scheduler launches executors through Yarn - by calling into Client to launch the Spark AM. + */ +private[spark] class YarnClientClusterScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) { + + // By default, rack is unknown + override def getRackForHost(hostPort: String): Option[String] = { + val host = Utils.parseHostPort(hostPort)._1 + Option(YarnSparkHadoopUtil.lookupRack(sc.hadoopConfiguration, host)) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala new file mode 100644 index 0000000..2923e67 --- /dev/null +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster + +import scala.collection.mutable.ArrayBuffer + +import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState} + +import org.apache.spark.{SparkException, Logging, SparkContext} +import org.apache.spark.deploy.yarn.{Client, ClientArguments} +import org.apache.spark.scheduler.TaskSchedulerImpl + +private[spark] class YarnClientSchedulerBackend( + scheduler: TaskSchedulerImpl, + sc: SparkContext) + extends YarnSchedulerBackend(scheduler, sc) + with Logging { + + private var client: Client = null + private var appId: ApplicationId = null + @volatile private var stopping: Boolean = false + + /** + * Create a Yarn client to submit an application to the ResourceManager. + * This waits until the application is running. + */ + override def start() { + super.start() + val driverHost = conf.get("spark.driver.host") + val driverPort = conf.get("spark.driver.port") + val hostport = driverHost + ":" + driverPort + sc.ui.foreach { ui => conf.set("spark.driver.appUIAddress", ui.appUIAddress) } + + val argsArrayBuf = new ArrayBuffer[String]() + argsArrayBuf += ("--arg", hostport) + argsArrayBuf ++= getExtraClientArguments + + logDebug("ClientArguments called with: " + argsArrayBuf.mkString(" ")) + val args = new ClientArguments(argsArrayBuf.toArray, conf) + totalExpectedExecutors = args.numExecutors + client = new Client(args, conf) + appId = client.submitApplication() + waitForApplication() + asyncMonitorApplication() + } + + /** + * Return any extra command line arguments to be passed to Client provided in the form of + * environment variables or Spark properties. + */ + private def getExtraClientArguments: Seq[String] = { + val extraArgs = new ArrayBuffer[String] + val optionTuples = // List of (target Client argument, environment variable, Spark property) + List( + ("--driver-memory", "SPARK_MASTER_MEMORY", "spark.master.memory"), + ("--driver-memory", "SPARK_DRIVER_MEMORY", "spark.driver.memory"), + ("--num-executors", "SPARK_WORKER_INSTANCES", "spark.executor.instances"), + ("--num-executors", "SPARK_EXECUTOR_INSTANCES", "spark.executor.instances"), + ("--executor-memory", "SPARK_WORKER_MEMORY", "spark.executor.memory"), + ("--executor-memory", "SPARK_EXECUTOR_MEMORY", "spark.executor.memory"), + ("--executor-cores", "SPARK_WORKER_CORES", "spark.executor.cores"), + ("--executor-cores", "SPARK_EXECUTOR_CORES", "spark.executor.cores"), + ("--queue", "SPARK_YARN_QUEUE", "spark.yarn.queue"), + ("--name", "SPARK_YARN_APP_NAME", "spark.app.name") + ) + optionTuples.foreach { case (optionName, envVar, sparkProp) => + if (System.getenv(envVar) != null) { + extraArgs += (optionName, System.getenv(envVar)) + } else if (sc.getConf.contains(sparkProp)) { + extraArgs += (optionName, sc.getConf.get(sparkProp)) + } + } + extraArgs + } + + /** + * Report the state of the application until it is running. + * If the application has finished, failed or been killed in the process, throw an exception. + * This assumes both `client` and `appId` have already been set. + */ + private def waitForApplication(): Unit = { + assert(client != null && appId != null, "Application has not been submitted yet!") + val (state, _) = client.monitorApplication(appId, returnOnRunning = true) // blocking + if (state == YarnApplicationState.FINISHED || + state == YarnApplicationState.FAILED || + state == YarnApplicationState.KILLED) { + throw new SparkException("Yarn application has already ended! " + + "It might have been killed or unable to launch application master.") + } + if (state == YarnApplicationState.RUNNING) { + logInfo(s"Application $appId has started running.") + } + } + + /** + * Monitor the application state in a separate thread. + * If the application has exited for any reason, stop the SparkContext. + * This assumes both `client` and `appId` have already been set. + */ + private def asyncMonitorApplication(): Unit = { + assert(client != null && appId != null, "Application has not been submitted yet!") + val t = new Thread { + override def run() { + while (!stopping) { + val report = client.getApplicationReport(appId) + val state = report.getYarnApplicationState() + if (state == YarnApplicationState.FINISHED || + state == YarnApplicationState.KILLED || + state == YarnApplicationState.FAILED) { + logError(s"Yarn application has already exited with state $state!") + sc.stop() + stopping = true + } + Thread.sleep(1000L) + } + Thread.currentThread().interrupt() + } + } + t.setName("Yarn application state monitor") + t.setDaemon(true) + t.start() + } + + /** + * Stop the scheduler. This assumes `start()` has already been called. + */ + override def stop() { + assert(client != null, "Attempted to stop this scheduler before starting it!") + stopping = true + super.stop() + client.stop() + logInfo("Stopped") + } + + override def applicationId(): String = { + Option(appId).map(_.toString).getOrElse { + logWarning("Application ID is not initialized yet.") + super.applicationId + } + } + +} http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala new file mode 100644 index 0000000..4157ff9 --- /dev/null +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster + +import org.apache.spark._ +import org.apache.spark.deploy.yarn.{ApplicationMaster, YarnSparkHadoopUtil} +import org.apache.spark.scheduler.TaskSchedulerImpl +import org.apache.spark.util.Utils + +/** + * This is a simple extension to ClusterScheduler - to ensure that appropriate initialization of + * ApplicationMaster, etc is done + */ +private[spark] class YarnClusterScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) { + + logInfo("Created YarnClusterScheduler") + + // Nothing else for now ... initialize application master : which needs a SparkContext to + // determine how to allocate. + // Note that only the first creation of a SparkContext influences (and ideally, there must be + // only one SparkContext, right ?). Subsequent creations are ignored since executors are already + // allocated by then. + + // By default, rack is unknown + override def getRackForHost(hostPort: String): Option[String] = { + val host = Utils.parseHostPort(hostPort)._1 + Option(YarnSparkHadoopUtil.lookupRack(sc.hadoopConfiguration, host)) + } + + override def postStartHook() { + ApplicationMaster.sparkContextInitialized(sc) + super.postStartHook() + logInfo("YarnClusterScheduler.postStartHook done") + } + + override def stop() { + super.stop() + ApplicationMaster.sparkContextStopped(sc) + } + +} http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala new file mode 100644 index 0000000..b1de81e --- /dev/null +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster + +import org.apache.spark.SparkContext +import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ +import org.apache.spark.scheduler.TaskSchedulerImpl +import org.apache.spark.util.IntParam + +private[spark] class YarnClusterSchedulerBackend( + scheduler: TaskSchedulerImpl, + sc: SparkContext) + extends YarnSchedulerBackend(scheduler, sc) { + + override def start() { + super.start() + totalExpectedExecutors = DEFAULT_NUMBER_EXECUTORS + if (System.getenv("SPARK_EXECUTOR_INSTANCES") != null) { + totalExpectedExecutors = IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES")) + .getOrElse(totalExpectedExecutors) + } + // System property can override environment variable. + totalExpectedExecutors = sc.getConf.getInt("spark.executor.instances", totalExpectedExecutors) + } + + override def applicationId(): String = + // In YARN Cluster mode, spark.yarn.app.id is expect to be set + // before user application is launched. + // So, if spark.yarn.app.id is not set, it is something wrong. + sc.getConf.getOption("spark.yarn.app.id").getOrElse { + logError("Application ID is not set.") + super.applicationId + } + +} http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/yarn/src/test/resources/log4j.properties b/yarn/src/test/resources/log4j.properties new file mode 100644 index 0000000..9dd05f1 --- /dev/null +++ b/yarn/src/test/resources/log4j.properties @@ -0,0 +1,28 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set everything to be logged to the file core/target/unit-tests.log +log4j.rootCategory=INFO, file +log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file.append=false +log4j.appender.file.file=target/unit-tests.log +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n + +# Ignore messages below warning level from Jetty, because it's a bit verbose +log4j.logger.org.eclipse.jetty=WARN +org.eclipse.jetty.LEVEL=WARN http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala ---------------------------------------------------------------------- diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala new file mode 100644 index 0000000..17b79ae --- /dev/null +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import java.io.File +import java.net.URI + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce.MRJobConfig +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment +import org.apache.hadoop.yarn.api.records._ +import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.mockito.Matchers._ +import org.mockito.Mockito._ + + +import org.scalatest.FunSuite +import org.scalatest.Matchers + +import scala.collection.JavaConversions._ +import scala.collection.mutable.{ HashMap => MutableHashMap } +import scala.reflect.ClassTag +import scala.util.Try + +import org.apache.spark.{SparkException, SparkConf} +import org.apache.spark.util.Utils + +class ClientBaseSuite extends FunSuite with Matchers { + + test("default Yarn application classpath") { + ClientBase.getDefaultYarnApplicationClasspath should be(Some(Fixtures.knownDefYarnAppCP)) + } + + test("default MR application classpath") { + ClientBase.getDefaultMRApplicationClasspath should be(Some(Fixtures.knownDefMRAppCP)) + } + + test("resultant classpath for an application that defines a classpath for YARN") { + withAppConf(Fixtures.mapYARNAppConf) { conf => + val env = newEnv + ClientBase.populateHadoopClasspath(conf, env) + classpath(env) should be( + flatten(Fixtures.knownYARNAppCP, ClientBase.getDefaultMRApplicationClasspath)) + } + } + + test("resultant classpath for an application that defines a classpath for MR") { + withAppConf(Fixtures.mapMRAppConf) { conf => + val env = newEnv + ClientBase.populateHadoopClasspath(conf, env) + classpath(env) should be( + flatten(ClientBase.getDefaultYarnApplicationClasspath, Fixtures.knownMRAppCP)) + } + } + + test("resultant classpath for an application that defines both classpaths, YARN and MR") { + withAppConf(Fixtures.mapAppConf) { conf => + val env = newEnv + ClientBase.populateHadoopClasspath(conf, env) + classpath(env) should be(flatten(Fixtures.knownYARNAppCP, Fixtures.knownMRAppCP)) + } + } + + private val SPARK = "local:/sparkJar" + private val USER = "local:/userJar" + private val ADDED = "local:/addJar1,local:/addJar2,/addJar3" + + test("Local jar URIs") { + val conf = new Configuration() + val sparkConf = new SparkConf().set(ClientBase.CONF_SPARK_JAR, SPARK) + val env = new MutableHashMap[String, String]() + val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf) + + ClientBase.populateClasspath(args, conf, sparkConf, env) + + val cp = env("CLASSPATH").split(File.pathSeparator) + s"$SPARK,$USER,$ADDED".split(",").foreach({ entry => + val uri = new URI(entry) + if (ClientBase.LOCAL_SCHEME.equals(uri.getScheme())) { + cp should contain (uri.getPath()) + } else { + cp should not contain (uri.getPath()) + } + }) + cp should contain (Environment.PWD.$()) + cp should contain (s"${Environment.PWD.$()}${File.separator}*") + cp should not contain (ClientBase.SPARK_JAR) + cp should not contain (ClientBase.APP_JAR) + } + + test("Jar path propagation through SparkConf") { + val conf = new Configuration() + val sparkConf = new SparkConf().set(ClientBase.CONF_SPARK_JAR, SPARK) + val yarnConf = new YarnConfiguration() + val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf) + + val client = spy(new DummyClient(args, conf, sparkConf, yarnConf)) + doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]), + any(classOf[Path]), anyShort(), anyBoolean()) + + val tempDir = Utils.createTempDir() + try { + client.prepareLocalResources(tempDir.getAbsolutePath()) + sparkConf.getOption(ClientBase.CONF_SPARK_USER_JAR) should be (Some(USER)) + + // The non-local path should be propagated by name only, since it will end up in the app's + // staging dir. + val expected = ADDED.split(",") + .map(p => { + val uri = new URI(p) + if (ClientBase.LOCAL_SCHEME == uri.getScheme()) { + p + } else { + Option(uri.getFragment()).getOrElse(new File(p).getName()) + } + }) + .mkString(",") + + sparkConf.getOption(ClientBase.CONF_SPARK_YARN_SECONDARY_JARS) should be (Some(expected)) + } finally { + Utils.deleteRecursively(tempDir) + } + } + + test("check access nns empty") { + val sparkConf = new SparkConf() + sparkConf.set("spark.yarn.access.namenodes", "") + val nns = ClientBase.getNameNodesToAccess(sparkConf) + nns should be(Set()) + } + + test("check access nns unset") { + val sparkConf = new SparkConf() + val nns = ClientBase.getNameNodesToAccess(sparkConf) + nns should be(Set()) + } + + test("check access nns") { + val sparkConf = new SparkConf() + sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032") + val nns = ClientBase.getNameNodesToAccess(sparkConf) + nns should be(Set(new Path("hdfs://nn1:8032"))) + } + + test("check access nns space") { + val sparkConf = new SparkConf() + sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032, ") + val nns = ClientBase.getNameNodesToAccess(sparkConf) + nns should be(Set(new Path("hdfs://nn1:8032"))) + } + + test("check access two nns") { + val sparkConf = new SparkConf() + sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032,hdfs://nn2:8032") + val nns = ClientBase.getNameNodesToAccess(sparkConf) + nns should be(Set(new Path("hdfs://nn1:8032"), new Path("hdfs://nn2:8032"))) + } + + test("check token renewer") { + val hadoopConf = new Configuration() + hadoopConf.set("yarn.resourcemanager.address", "myrm:8033") + hadoopConf.set("yarn.resourcemanager.principal", "yarn/myrm:[email protected]") + val renewer = ClientBase.getTokenRenewer(hadoopConf) + renewer should be ("yarn/myrm:[email protected]") + } + + test("check token renewer default") { + val hadoopConf = new Configuration() + val caught = + intercept[SparkException] { + ClientBase.getTokenRenewer(hadoopConf) + } + assert(caught.getMessage === "Can't get Master Kerberos principal for use as renewer") + } + + object Fixtures { + + val knownDefYarnAppCP: Seq[String] = + getFieldValue[Array[String], Seq[String]](classOf[YarnConfiguration], + "DEFAULT_YARN_APPLICATION_CLASSPATH", + Seq[String]())(a => a.toSeq) + + + val knownDefMRAppCP: Seq[String] = + getFieldValue2[String, Array[String], Seq[String]]( + classOf[MRJobConfig], + "DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH", + Seq[String]())(a => a.split(","))(a => a.toSeq) + + val knownYARNAppCP = Some(Seq("/known/yarn/path")) + + val knownMRAppCP = Some(Seq("/known/mr/path")) + + val mapMRAppConf = + Map("mapreduce.application.classpath" -> knownMRAppCP.map(_.mkString(":")).get) + + val mapYARNAppConf = + Map(YarnConfiguration.YARN_APPLICATION_CLASSPATH -> knownYARNAppCP.map(_.mkString(":")).get) + + val mapAppConf = mapYARNAppConf ++ mapMRAppConf + } + + def withAppConf(m: Map[String, String] = Map())(testCode: (Configuration) => Any) { + val conf = new Configuration + m.foreach { case (k, v) => conf.set(k, v, "ClientBaseSpec") } + testCode(conf) + } + + def newEnv = MutableHashMap[String, String]() + + def classpath(env: MutableHashMap[String, String]) = env(Environment.CLASSPATH.name).split(":|;") + + def flatten(a: Option[Seq[String]], b: Option[Seq[String]]) = (a ++ b).flatten.toArray + + def getFieldValue[A, B](clazz: Class[_], field: String, defaults: => B)(mapTo: A => B): B = + Try(clazz.getField(field)).map(_.get(null).asInstanceOf[A]).toOption.map(mapTo).getOrElse(defaults) + + def getFieldValue2[A: ClassTag, A1: ClassTag, B]( + clazz: Class[_], + field: String, + defaults: => B)(mapTo: A => B)(mapTo1: A1 => B) : B = { + Try(clazz.getField(field)).map(_.get(null)).map { + case v: A => mapTo(v) + case v1: A1 => mapTo1(v1) + case _ => defaults + }.toOption.getOrElse(defaults) + } + + private class DummyClient( + val args: ClientArguments, + val hadoopConf: Configuration, + val sparkConf: SparkConf, + val yarnConf: YarnConfiguration) extends ClientBase { + override def setupSecurityToken(amContainer: ContainerLaunchContext): Unit = ??? + override def submitApplication(): ApplicationId = ??? + override def getApplicationReport(appId: ApplicationId): ApplicationReport = ??? + override def getClientToken(report: ApplicationReport): String = ??? + } + +} http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala ---------------------------------------------------------------------- diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala new file mode 100644 index 0000000..80b57d1 --- /dev/null +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import java.net.URI + +import org.scalatest.FunSuite +import org.scalatest.mock.MockitoSugar +import org.mockito.Mockito.when + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.FileStatus +import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.permission.FsAction +import org.apache.hadoop.yarn.api.records.LocalResource +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility +import org.apache.hadoop.yarn.api.records.LocalResourceType +import org.apache.hadoop.yarn.util.{Records, ConverterUtils} + +import scala.collection.mutable.HashMap +import scala.collection.mutable.Map + + +class ClientDistributedCacheManagerSuite extends FunSuite with MockitoSugar { + + class MockClientDistributedCacheManager extends ClientDistributedCacheManager { + override def getVisibility(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]): + LocalResourceVisibility = { + LocalResourceVisibility.PRIVATE + } + } + + test("test getFileStatus empty") { + val distMgr = new ClientDistributedCacheManager() + val fs = mock[FileSystem] + val uri = new URI("/tmp/testing") + when(fs.getFileStatus(new Path(uri))).thenReturn(new FileStatus()) + val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]() + val stat = distMgr.getFileStatus(fs, uri, statCache) + assert(stat.getPath() === null) + } + + test("test getFileStatus cached") { + val distMgr = new ClientDistributedCacheManager() + val fs = mock[FileSystem] + val uri = new URI("/tmp/testing") + val realFileStatus = new FileStatus(10, false, 1, 1024, 10, 10, null, "testOwner", + null, new Path("/tmp/testing")) + when(fs.getFileStatus(new Path(uri))).thenReturn(new FileStatus()) + val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus](uri -> realFileStatus) + val stat = distMgr.getFileStatus(fs, uri, statCache) + assert(stat.getPath().toString() === "/tmp/testing") + } + + test("test addResource") { + val distMgr = new MockClientDistributedCacheManager() + val fs = mock[FileSystem] + val conf = new Configuration() + val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing") + val localResources = HashMap[String, LocalResource]() + val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]() + when(fs.getFileStatus(destPath)).thenReturn(new FileStatus()) + + distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, "link", + statCache, false) + val resource = localResources("link") + assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE) + assert(ConverterUtils.getPathFromYarnURL(resource.getResource()) === destPath) + assert(resource.getTimestamp() === 0) + assert(resource.getSize() === 0) + assert(resource.getType() === LocalResourceType.FILE) + + val env = new HashMap[String, String]() + distMgr.setDistFilesEnv(env) + assert(env("SPARK_YARN_CACHE_FILES") === "file:/foo.invalid.com:8080/tmp/testing#link") + assert(env("SPARK_YARN_CACHE_FILES_TIME_STAMPS") === "0") + assert(env("SPARK_YARN_CACHE_FILES_FILE_SIZES") === "0") + assert(env("SPARK_YARN_CACHE_FILES_VISIBILITIES") === LocalResourceVisibility.PRIVATE.name()) + + distMgr.setDistArchivesEnv(env) + assert(env.get("SPARK_YARN_CACHE_ARCHIVES") === None) + assert(env.get("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") === None) + assert(env.get("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === None) + assert(env.get("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === None) + + // add another one and verify both there and order correct + val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner", + null, new Path("/tmp/testing2")) + val destPath2 = new Path("file:///foo.invalid.com:8080/tmp/testing2") + when(fs.getFileStatus(destPath2)).thenReturn(realFileStatus) + distMgr.addResource(fs, conf, destPath2, localResources, LocalResourceType.FILE, "link2", + statCache, false) + val resource2 = localResources("link2") + assert(resource2.getVisibility() === LocalResourceVisibility.PRIVATE) + assert(ConverterUtils.getPathFromYarnURL(resource2.getResource()) === destPath2) + assert(resource2.getTimestamp() === 10) + assert(resource2.getSize() === 20) + assert(resource2.getType() === LocalResourceType.FILE) + + val env2 = new HashMap[String, String]() + distMgr.setDistFilesEnv(env2) + val timestamps = env2("SPARK_YARN_CACHE_FILES_TIME_STAMPS").split(',') + val files = env2("SPARK_YARN_CACHE_FILES").split(',') + val sizes = env2("SPARK_YARN_CACHE_FILES_FILE_SIZES").split(',') + val visibilities = env2("SPARK_YARN_CACHE_FILES_VISIBILITIES") .split(',') + assert(files(0) === "file:/foo.invalid.com:8080/tmp/testing#link") + assert(timestamps(0) === "0") + assert(sizes(0) === "0") + assert(visibilities(0) === LocalResourceVisibility.PRIVATE.name()) + + assert(files(1) === "file:/foo.invalid.com:8080/tmp/testing2#link2") + assert(timestamps(1) === "10") + assert(sizes(1) === "20") + assert(visibilities(1) === LocalResourceVisibility.PRIVATE.name()) + } + + test("test addResource link null") { + val distMgr = new MockClientDistributedCacheManager() + val fs = mock[FileSystem] + val conf = new Configuration() + val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing") + val localResources = HashMap[String, LocalResource]() + val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]() + when(fs.getFileStatus(destPath)).thenReturn(new FileStatus()) + + intercept[Exception] { + distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, null, + statCache, false) + } + assert(localResources.get("link") === None) + assert(localResources.size === 0) + } + + test("test addResource appmaster only") { + val distMgr = new MockClientDistributedCacheManager() + val fs = mock[FileSystem] + val conf = new Configuration() + val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing") + val localResources = HashMap[String, LocalResource]() + val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]() + val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner", + null, new Path("/tmp/testing")) + when(fs.getFileStatus(destPath)).thenReturn(realFileStatus) + + distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE, "link", + statCache, true) + val resource = localResources("link") + assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE) + assert(ConverterUtils.getPathFromYarnURL(resource.getResource()) === destPath) + assert(resource.getTimestamp() === 10) + assert(resource.getSize() === 20) + assert(resource.getType() === LocalResourceType.ARCHIVE) + + val env = new HashMap[String, String]() + distMgr.setDistFilesEnv(env) + assert(env.get("SPARK_YARN_CACHE_FILES") === None) + assert(env.get("SPARK_YARN_CACHE_FILES_TIME_STAMPS") === None) + assert(env.get("SPARK_YARN_CACHE_FILES_FILE_SIZES") === None) + assert(env.get("SPARK_YARN_CACHE_FILES_VISIBILITIES") === None) + + distMgr.setDistArchivesEnv(env) + assert(env.get("SPARK_YARN_CACHE_ARCHIVES") === None) + assert(env.get("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") === None) + assert(env.get("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === None) + assert(env.get("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === None) + } + + test("test addResource archive") { + val distMgr = new MockClientDistributedCacheManager() + val fs = mock[FileSystem] + val conf = new Configuration() + val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing") + val localResources = HashMap[String, LocalResource]() + val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]() + val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner", + null, new Path("/tmp/testing")) + when(fs.getFileStatus(destPath)).thenReturn(realFileStatus) + + distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE, "link", + statCache, false) + val resource = localResources("link") + assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE) + assert(ConverterUtils.getPathFromYarnURL(resource.getResource()) === destPath) + assert(resource.getTimestamp() === 10) + assert(resource.getSize() === 20) + assert(resource.getType() === LocalResourceType.ARCHIVE) + + val env = new HashMap[String, String]() + + distMgr.setDistArchivesEnv(env) + assert(env("SPARK_YARN_CACHE_ARCHIVES") === "file:/foo.invalid.com:8080/tmp/testing#link") + assert(env("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") === "10") + assert(env("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === "20") + assert(env("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === LocalResourceVisibility.PRIVATE.name()) + + distMgr.setDistFilesEnv(env) + assert(env.get("SPARK_YARN_CACHE_FILES") === None) + assert(env.get("SPARK_YARN_CACHE_FILES_TIME_STAMPS") === None) + assert(env.get("SPARK_YARN_CACHE_FILES_FILE_SIZES") === None) + assert(env.get("SPARK_YARN_CACHE_FILES_VISIBILITIES") === None) + } + + +} http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala ---------------------------------------------------------------------- diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala new file mode 100644 index 0000000..8d184a0 --- /dev/null +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import org.apache.spark.deploy.yarn.YarnAllocator._ +import org.scalatest.FunSuite + +class YarnAllocatorSuite extends FunSuite { + test("memory exceeded diagnostic regexes") { + val diagnostics = + "Container [pid=12465,containerID=container_1412887393566_0003_01_000002] is running " + + "beyond physical memory limits. Current usage: 2.1 MB of 2 GB physical memory used; " + + "5.8 GB of 4.2 GB virtual memory used. Killing container." + val vmemMsg = memLimitExceededLogMessage(diagnostics, VMEM_EXCEEDED_PATTERN) + val pmemMsg = memLimitExceededLogMessage(diagnostics, PMEM_EXCEEDED_PATTERN) + assert(vmemMsg.contains("5.8 GB of 4.2 GB virtual memory used.")) + assert(pmemMsg.contains("2.1 MB of 2 GB physical memory used.")) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala ---------------------------------------------------------------------- diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala new file mode 100644 index 0000000..d79b85e --- /dev/null +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import java.io.File +import java.util.concurrent.TimeUnit + +import scala.collection.JavaConversions._ + +import com.google.common.base.Charsets +import com.google.common.io.Files +import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} + +import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.apache.hadoop.yarn.server.MiniYARNCluster + +import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.util.Utils + +class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers with Logging { + + // log4j configuration for the Yarn containers, so that their output is collected + // by Yarn instead of trying to overwrite unit-tests.log. + private val LOG4J_CONF = """ + |log4j.rootCategory=DEBUG, console + |log4j.appender.console=org.apache.log4j.ConsoleAppender + |log4j.appender.console.target=System.err + |log4j.appender.console.layout=org.apache.log4j.PatternLayout + |log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n + """.stripMargin + + private var yarnCluster: MiniYARNCluster = _ + private var tempDir: File = _ + private var fakeSparkJar: File = _ + private var oldConf: Map[String, String] = _ + + override def beforeAll() { + tempDir = Utils.createTempDir() + + val logConfDir = new File(tempDir, "log4j") + logConfDir.mkdir() + + val logConfFile = new File(logConfDir, "log4j.properties") + Files.write(LOG4J_CONF, logConfFile, Charsets.UTF_8) + + val childClasspath = logConfDir.getAbsolutePath() + File.pathSeparator + + sys.props("java.class.path") + + oldConf = sys.props.filter { case (k, v) => k.startsWith("spark.") }.toMap + + yarnCluster = new MiniYARNCluster(getClass().getName(), 1, 1, 1) + yarnCluster.init(new YarnConfiguration()) + yarnCluster.start() + + // There's a race in MiniYARNCluster in which start() may return before the RM has updated + // its address in the configuration. You can see this in the logs by noticing that when + // MiniYARNCluster prints the address, it still has port "0" assigned, although later the + // test works sometimes: + // + // INFO MiniYARNCluster: MiniYARN ResourceManager address: blah:0 + // + // That log message prints the contents of the RM_ADDRESS config variable. If you check it + // later on, it looks something like this: + // + // INFO YarnClusterSuite: RM address in configuration is blah:42631 + // + // This hack loops for a bit waiting for the port to change, and fails the test if it hasn't + // done so in a timely manner (defined to be 10 seconds). + val config = yarnCluster.getConfig() + val deadline = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(10) + while (config.get(YarnConfiguration.RM_ADDRESS).split(":")(1) == "0") { + if (System.currentTimeMillis() > deadline) { + throw new IllegalStateException("Timed out waiting for RM to come up.") + } + logDebug("RM address still not set in configuration, waiting...") + TimeUnit.MILLISECONDS.sleep(100) + } + + logInfo(s"RM address in configuration is ${config.get(YarnConfiguration.RM_ADDRESS)}") + config.foreach { e => + sys.props += ("spark.hadoop." + e.getKey() -> e.getValue()) + } + + fakeSparkJar = File.createTempFile("sparkJar", null, tempDir) + sys.props += ("spark.yarn.jar" -> ("local:" + fakeSparkJar.getAbsolutePath())) + sys.props += ("spark.executor.instances" -> "1") + sys.props += ("spark.driver.extraClassPath" -> childClasspath) + sys.props += ("spark.executor.extraClassPath" -> childClasspath) + + super.beforeAll() + } + + override def afterAll() { + yarnCluster.stop() + sys.props.retain { case (k, v) => !k.startsWith("spark.") } + sys.props ++= oldConf + super.afterAll() + } + + test("run Spark in yarn-client mode") { + var result = File.createTempFile("result", null, tempDir) + YarnClusterDriver.main(Array("yarn-client", result.getAbsolutePath())) + checkResult(result) + } + + test("run Spark in yarn-cluster mode") { + val main = YarnClusterDriver.getClass.getName().stripSuffix("$") + var result = File.createTempFile("result", null, tempDir) + + val args = Array("--class", main, + "--jar", "file:" + fakeSparkJar.getAbsolutePath(), + "--arg", "yarn-cluster", + "--arg", result.getAbsolutePath(), + "--num-executors", "1") + Client.main(args) + checkResult(result) + } + + test("run Spark in yarn-cluster mode unsuccessfully") { + val main = YarnClusterDriver.getClass.getName().stripSuffix("$") + + // Use only one argument so the driver will fail + val args = Array("--class", main, + "--jar", "file:" + fakeSparkJar.getAbsolutePath(), + "--arg", "yarn-cluster", + "--num-executors", "1") + val exception = intercept[SparkException] { + Client.main(args) + } + assert(Utils.exceptionString(exception).contains("Application finished with failed status")) + } + + /** + * This is a workaround for an issue with yarn-cluster mode: the Client class will not provide + * any sort of error when the job process finishes successfully, but the job itself fails. So + * the tests enforce that something is written to a file after everything is ok to indicate + * that the job succeeded. + */ + private def checkResult(result: File) = { + var resultString = Files.toString(result, Charsets.UTF_8) + resultString should be ("success") + } + +} + +private object YarnClusterDriver extends Logging with Matchers { + + def main(args: Array[String]) = { + if (args.length != 2) { + System.err.println( + s""" + |Invalid command line: ${args.mkString(" ")} + | + |Usage: YarnClusterDriver [master] [result file] + """.stripMargin) + System.exit(1) + } + + val sc = new SparkContext(new SparkConf().setMaster(args(0)) + .setAppName("yarn \"test app\" 'with quotes' and \\back\\slashes and $dollarSigns")) + val status = new File(args(1)) + var result = "failure" + try { + val data = sc.parallelize(1 to 4, 4).collect().toSet + data should be (Set(1, 2, 3, 4)) + result = "success" + } finally { + sc.stop() + Files.write(result, status, Charsets.UTF_8) + } + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
