http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala deleted file mode 100644 index 8772e26..0000000 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala +++ /dev/null @@ -1,224 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.yarn - -import scala.collection.mutable.{ArrayBuffer, HashMap, Set} -import scala.collection.JavaConverters._ - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.yarn.api.records.{ContainerId, Resource} -import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest -import org.apache.hadoop.yarn.util.RackResolver - -import org.apache.spark.SparkConf -import org.apache.spark.internal.config._ - -private[yarn] case class ContainerLocalityPreferences(nodes: Array[String], racks: Array[String]) - -/** - * This strategy is calculating the optimal locality preferences of YARN containers by considering - * the node ratio of pending tasks, number of required cores/containers and and locality of current - * existing and pending allocated containers. The target of this algorithm is to maximize the number - * of tasks that would run locally. - * - * Consider a situation in which we have 20 tasks that require (host1, host2, host3) - * and 10 tasks that require (host1, host2, host4), besides each container has 2 cores - * and cpus per task is 1, so the required container number is 15, - * and host ratio is (host1: 30, host2: 30, host3: 20, host4: 10). - * - * 1. If requested container number (18) is more than the required container number (15): - * - * requests for 5 containers with nodes: (host1, host2, host3, host4) - * requests for 5 containers with nodes: (host1, host2, host3) - * requests for 5 containers with nodes: (host1, host2) - * requests for 3 containers with no locality preferences. - * - * The placement ratio is 3 : 3 : 2 : 1, and set the additional containers with no locality - * preferences. - * - * 2. If requested container number (10) is less than or equal to the required container number - * (15): - * - * requests for 4 containers with nodes: (host1, host2, host3, host4) - * requests for 3 containers with nodes: (host1, host2, host3) - * requests for 3 containers with nodes: (host1, host2) - * - * The placement ratio is 10 : 10 : 7 : 4, close to expected ratio (3 : 3 : 2 : 1) - * - * 3. If containers exist but none of them can match the requested localities, - * follow the method of 1 and 2. - * - * 4. If containers exist and some of them can match the requested localities. - * For example if we have 1 containers on each node (host1: 1, host2: 1: host3: 1, host4: 1), - * and the expected containers on each node would be (host1: 5, host2: 5, host3: 4, host4: 2), - * so the newly requested containers on each node would be updated to (host1: 4, host2: 4, - * host3: 3, host4: 1), 12 containers by total. - * - * 4.1 If requested container number (18) is more than newly required containers (12). Follow - * method 1 with updated ratio 4 : 4 : 3 : 1. - * - * 4.2 If request container number (10) is more than newly required containers (12). Follow - * method 2 with updated ratio 4 : 4 : 3 : 1. - * - * 5. If containers exist and existing localities can fully cover the requested localities. - * For example if we have 5 containers on each node (host1: 5, host2: 5, host3: 5, host4: 5), - * which could cover the current requested localities. This algorithm will allocate all the - * requested containers with no localities. - */ -private[yarn] class LocalityPreferredContainerPlacementStrategy( - val sparkConf: SparkConf, - val yarnConf: Configuration, - val resource: Resource) { - - /** - * Calculate each container's node locality and rack locality - * @param numContainer number of containers to calculate - * @param numLocalityAwareTasks number of locality required tasks - * @param hostToLocalTaskCount a map to store the preferred hostname and possible task - * numbers running on it, used as hints for container allocation - * @param allocatedHostToContainersMap host to allocated containers map, used to calculate the - * expected locality preference by considering the existing - * containers - * @param localityMatchedPendingAllocations A sequence of pending container request which - * matches the localities of current required tasks. - * @return node localities and rack localities, each locality is an array of string, - * the length of localities is the same as number of containers - */ - def localityOfRequestedContainers( - numContainer: Int, - numLocalityAwareTasks: Int, - hostToLocalTaskCount: Map[String, Int], - allocatedHostToContainersMap: HashMap[String, Set[ContainerId]], - localityMatchedPendingAllocations: Seq[ContainerRequest] - ): Array[ContainerLocalityPreferences] = { - val updatedHostToContainerCount = expectedHostToContainerCount( - numLocalityAwareTasks, hostToLocalTaskCount, allocatedHostToContainersMap, - localityMatchedPendingAllocations) - val updatedLocalityAwareContainerNum = updatedHostToContainerCount.values.sum - - // The number of containers to allocate, divided into two groups, one with preferred locality, - // and the other without locality preference. - val requiredLocalityFreeContainerNum = - math.max(0, numContainer - updatedLocalityAwareContainerNum) - val requiredLocalityAwareContainerNum = numContainer - requiredLocalityFreeContainerNum - - val containerLocalityPreferences = ArrayBuffer[ContainerLocalityPreferences]() - if (requiredLocalityFreeContainerNum > 0) { - for (i <- 0 until requiredLocalityFreeContainerNum) { - containerLocalityPreferences += ContainerLocalityPreferences( - null.asInstanceOf[Array[String]], null.asInstanceOf[Array[String]]) - } - } - - if (requiredLocalityAwareContainerNum > 0) { - val largestRatio = updatedHostToContainerCount.values.max - // Round the ratio of preferred locality to the number of locality required container - // number, which is used for locality preferred host calculating. - var preferredLocalityRatio = updatedHostToContainerCount.mapValues { ratio => - val adjustedRatio = ratio.toDouble * requiredLocalityAwareContainerNum / largestRatio - adjustedRatio.ceil.toInt - } - - for (i <- 0 until requiredLocalityAwareContainerNum) { - // Only filter out the ratio which is larger than 0, which means the current host can - // still be allocated with new container request. - val hosts = preferredLocalityRatio.filter(_._2 > 0).keys.toArray - val racks = hosts.map { h => - RackResolver.resolve(yarnConf, h).getNetworkLocation - }.toSet - containerLocalityPreferences += ContainerLocalityPreferences(hosts, racks.toArray) - - // Minus 1 each time when the host is used. When the current ratio is 0, - // which means all the required ratio is satisfied, this host will not be allocated again. - preferredLocalityRatio = preferredLocalityRatio.mapValues(_ - 1) - } - } - - containerLocalityPreferences.toArray - } - - /** - * Calculate the number of executors need to satisfy the given number of pending tasks. - */ - private def numExecutorsPending(numTasksPending: Int): Int = { - val coresPerExecutor = resource.getVirtualCores - (numTasksPending * sparkConf.get(CPUS_PER_TASK) + coresPerExecutor - 1) / coresPerExecutor - } - - /** - * Calculate the expected host to number of containers by considering with allocated containers. - * @param localityAwareTasks number of locality aware tasks - * @param hostToLocalTaskCount a map to store the preferred hostname and possible task - * numbers running on it, used as hints for container allocation - * @param allocatedHostToContainersMap host to allocated containers map, used to calculate the - * expected locality preference by considering the existing - * containers - * @param localityMatchedPendingAllocations A sequence of pending container request which - * matches the localities of current required tasks. - * @return a map with hostname as key and required number of containers on this host as value - */ - private def expectedHostToContainerCount( - localityAwareTasks: Int, - hostToLocalTaskCount: Map[String, Int], - allocatedHostToContainersMap: HashMap[String, Set[ContainerId]], - localityMatchedPendingAllocations: Seq[ContainerRequest] - ): Map[String, Int] = { - val totalLocalTaskNum = hostToLocalTaskCount.values.sum - val pendingHostToContainersMap = pendingHostToContainerCount(localityMatchedPendingAllocations) - - hostToLocalTaskCount.map { case (host, count) => - val expectedCount = - count.toDouble * numExecutorsPending(localityAwareTasks) / totalLocalTaskNum - // Take the locality of pending containers into consideration - val existedCount = allocatedHostToContainersMap.get(host).map(_.size).getOrElse(0) + - pendingHostToContainersMap.getOrElse(host, 0.0) - - // If existing container can not fully satisfy the expected number of container, - // the required container number is expected count minus existed count. Otherwise the - // required container number is 0. - (host, math.max(0, (expectedCount - existedCount).ceil.toInt)) - } - } - - /** - * According to the locality ratio and number of container requests, calculate the host to - * possible number of containers for pending allocated containers. - * - * If current locality ratio of hosts is: Host1 : Host2 : Host3 = 20 : 20 : 10, - * and pending container requests is 3, so the possible number of containers on - * Host1 : Host2 : Host3 will be 1.2 : 1.2 : 0.6. - * @param localityMatchedPendingAllocations A sequence of pending container request which - * matches the localities of current required tasks. - * @return a Map with hostname as key and possible number of containers on this host as value - */ - private def pendingHostToContainerCount( - localityMatchedPendingAllocations: Seq[ContainerRequest]): Map[String, Double] = { - val pendingHostToContainerCount = new HashMap[String, Int]() - localityMatchedPendingAllocations.foreach { cr => - cr.getNodes.asScala.foreach { n => - val count = pendingHostToContainerCount.getOrElse(n, 0) + 1 - pendingHostToContainerCount(n) = count - } - } - - val possibleTotalContainerNum = pendingHostToContainerCount.values.sum - val localityMatchedPendingNum = localityMatchedPendingAllocations.size.toDouble - pendingHostToContainerCount.mapValues(_ * localityMatchedPendingNum / possibleTotalContainerNum) - .toMap - } -}
http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala deleted file mode 100644 index 0b66d1c..0000000 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ /dev/null @@ -1,727 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.yarn - -import java.util.Collections -import java.util.concurrent._ -import java.util.regex.Pattern - -import scala.collection.mutable -import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue} -import scala.collection.JavaConverters._ -import scala.util.control.NonFatal - -import org.apache.hadoop.yarn.api.records._ -import org.apache.hadoop.yarn.client.api.AMRMClient -import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest -import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.apache.hadoop.yarn.util.RackResolver -import org.apache.log4j.{Level, Logger} - -import org.apache.spark.{SecurityManager, SparkConf, SparkException} -import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ -import org.apache.spark.deploy.yarn.config._ -import org.apache.spark.internal.Logging -import org.apache.spark.internal.config._ -import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef} -import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason} -import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor -import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RetrieveLastAllocatedExecutorId -import org.apache.spark.util.{Clock, SystemClock, ThreadUtils} - -/** - * YarnAllocator is charged with requesting containers from the YARN ResourceManager and deciding - * what to do with containers when YARN fulfills these requests. - * - * This class makes use of YARN's AMRMClient APIs. We interact with the AMRMClient in three ways: - * * Making our resource needs known, which updates local bookkeeping about containers requested. - * * Calling "allocate", which syncs our local container requests with the RM, and returns any - * containers that YARN has granted to us. This also functions as a heartbeat. - * * Processing the containers granted to us to possibly launch executors inside of them. - * - * The public methods of this class are thread-safe. All methods that mutate state are - * synchronized. - */ -private[yarn] class YarnAllocator( - driverUrl: String, - driverRef: RpcEndpointRef, - conf: YarnConfiguration, - sparkConf: SparkConf, - amClient: AMRMClient[ContainerRequest], - appAttemptId: ApplicationAttemptId, - securityMgr: SecurityManager, - localResources: Map[String, LocalResource]) - extends Logging { - - import YarnAllocator._ - - // RackResolver logs an INFO message whenever it resolves a rack, which is way too often. - if (Logger.getLogger(classOf[RackResolver]).getLevel == null) { - Logger.getLogger(classOf[RackResolver]).setLevel(Level.WARN) - } - - // Visible for testing. - val allocatedHostToContainersMap = new HashMap[String, collection.mutable.Set[ContainerId]] - val allocatedContainerToHostMap = new HashMap[ContainerId, String] - - // Containers that we no longer care about. We've either already told the RM to release them or - // will on the next heartbeat. Containers get removed from this map after the RM tells us they've - // completed. - private val releasedContainers = Collections.newSetFromMap[ContainerId]( - new ConcurrentHashMap[ContainerId, java.lang.Boolean]) - - @volatile private var numExecutorsRunning = 0 - - /** - * Used to generate a unique ID per executor - * - * Init `executorIdCounter`. when AM restart, `executorIdCounter` will reset to 0. Then - * the id of new executor will start from 1, this will conflict with the executor has - * already created before. So, we should initialize the `executorIdCounter` by getting - * the max executorId from driver. - * - * And this situation of executorId conflict is just in yarn client mode, so this is an issue - * in yarn client mode. For more details, can check in jira. - * - * @see SPARK-12864 - */ - private var executorIdCounter: Int = - driverRef.askWithRetry[Int](RetrieveLastAllocatedExecutorId) - - // Queue to store the timestamp of failed executors - private val failedExecutorsTimeStamps = new Queue[Long]() - - private var clock: Clock = new SystemClock - - private val executorFailuresValidityInterval = - sparkConf.get(EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).getOrElse(-1L) - - @volatile private var targetNumExecutors = - YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sparkConf) - - // Executor loss reason requests that are pending - maps from executor ID for inquiry to a - // list of requesters that should be responded to once we find out why the given executor - // was lost. - private val pendingLossReasonRequests = new HashMap[String, mutable.Buffer[RpcCallContext]] - - // Maintain loss reasons for already released executors, it will be added when executor loss - // reason is got from AM-RM call, and be removed after querying this loss reason. - private val releasedExecutorLossReasons = new HashMap[String, ExecutorLossReason] - - // Keep track of which container is running which executor to remove the executors later - // Visible for testing. - private[yarn] val executorIdToContainer = new HashMap[String, Container] - - private var numUnexpectedContainerRelease = 0L - private val containerIdToExecutorId = new HashMap[ContainerId, String] - - // Executor memory in MB. - protected val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt - // Additional memory overhead. - protected val memoryOverhead: Int = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse( - math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN)).toInt - // Number of cores per executor. - protected val executorCores = sparkConf.get(EXECUTOR_CORES) - // Resource capability requested for each executors - private[yarn] val resource = Resource.newInstance(executorMemory + memoryOverhead, executorCores) - - private val launcherPool = ThreadUtils.newDaemonCachedThreadPool( - "ContainerLauncher", sparkConf.get(CONTAINER_LAUNCH_MAX_THREADS)) - - // For testing - private val launchContainers = sparkConf.getBoolean("spark.yarn.launchContainers", true) - - private val labelExpression = sparkConf.get(EXECUTOR_NODE_LABEL_EXPRESSION) - - // ContainerRequest constructor that can take a node label expression. We grab it through - // reflection because it's only available in later versions of YARN. - private val nodeLabelConstructor = labelExpression.flatMap { expr => - try { - Some(classOf[ContainerRequest].getConstructor(classOf[Resource], - classOf[Array[String]], classOf[Array[String]], classOf[Priority], classOf[Boolean], - classOf[String])) - } catch { - case e: NoSuchMethodException => - logWarning(s"Node label expression $expr will be ignored because YARN version on" + - " classpath does not support it.") - None - } - } - - // A map to store preferred hostname and possible task numbers running on it. - private var hostToLocalTaskCounts: Map[String, Int] = Map.empty - - // Number of tasks that have locality preferences in active stages - private var numLocalityAwareTasks: Int = 0 - - // A container placement strategy based on pending tasks' locality preference - private[yarn] val containerPlacementStrategy = - new LocalityPreferredContainerPlacementStrategy(sparkConf, conf, resource) - - /** - * Use a different clock for YarnAllocator. This is mainly used for testing. - */ - def setClock(newClock: Clock): Unit = { - clock = newClock - } - - def getNumExecutorsRunning: Int = numExecutorsRunning - - def getNumExecutorsFailed: Int = synchronized { - val endTime = clock.getTimeMillis() - - while (executorFailuresValidityInterval > 0 - && failedExecutorsTimeStamps.nonEmpty - && failedExecutorsTimeStamps.head < endTime - executorFailuresValidityInterval) { - failedExecutorsTimeStamps.dequeue() - } - - failedExecutorsTimeStamps.size - } - - /** - * A sequence of pending container requests that have not yet been fulfilled. - */ - def getPendingAllocate: Seq[ContainerRequest] = getPendingAtLocation(ANY_HOST) - - /** - * A sequence of pending container requests at the given location that have not yet been - * fulfilled. - */ - private def getPendingAtLocation(location: String): Seq[ContainerRequest] = { - amClient.getMatchingRequests(RM_REQUEST_PRIORITY, location, resource).asScala - .flatMap(_.asScala) - .toSeq - } - - /** - * Request as many executors from the ResourceManager as needed to reach the desired total. If - * the requested total is smaller than the current number of running executors, no executors will - * be killed. - * @param requestedTotal total number of containers requested - * @param localityAwareTasks number of locality aware tasks to be used as container placement hint - * @param hostToLocalTaskCount a map of preferred hostname to possible task counts to be used as - * container placement hint. - * @return Whether the new requested total is different than the old value. - */ - def requestTotalExecutorsWithPreferredLocalities( - requestedTotal: Int, - localityAwareTasks: Int, - hostToLocalTaskCount: Map[String, Int]): Boolean = synchronized { - this.numLocalityAwareTasks = localityAwareTasks - this.hostToLocalTaskCounts = hostToLocalTaskCount - - if (requestedTotal != targetNumExecutors) { - logInfo(s"Driver requested a total number of $requestedTotal executor(s).") - targetNumExecutors = requestedTotal - true - } else { - false - } - } - - /** - * Request that the ResourceManager release the container running the specified executor. - */ - def killExecutor(executorId: String): Unit = synchronized { - if (executorIdToContainer.contains(executorId)) { - val container = executorIdToContainer.get(executorId).get - internalReleaseContainer(container) - numExecutorsRunning -= 1 - } else { - logWarning(s"Attempted to kill unknown executor $executorId!") - } - } - - /** - * Request resources such that, if YARN gives us all we ask for, we'll have a number of containers - * equal to maxExecutors. - * - * Deal with any containers YARN has granted to us by possibly launching executors in them. - * - * This must be synchronized because variables read in this method are mutated by other methods. - */ - def allocateResources(): Unit = synchronized { - updateResourceRequests() - - val progressIndicator = 0.1f - // Poll the ResourceManager. This doubles as a heartbeat if there are no pending container - // requests. - val allocateResponse = amClient.allocate(progressIndicator) - - val allocatedContainers = allocateResponse.getAllocatedContainers() - - if (allocatedContainers.size > 0) { - logDebug("Allocated containers: %d. Current executor count: %d. Cluster resources: %s." - .format( - allocatedContainers.size, - numExecutorsRunning, - allocateResponse.getAvailableResources)) - - handleAllocatedContainers(allocatedContainers.asScala) - } - - val completedContainers = allocateResponse.getCompletedContainersStatuses() - if (completedContainers.size > 0) { - logDebug("Completed %d containers".format(completedContainers.size)) - processCompletedContainers(completedContainers.asScala) - logDebug("Finished processing %d completed containers. Current running executor count: %d." - .format(completedContainers.size, numExecutorsRunning)) - } - } - - /** - * Update the set of container requests that we will sync with the RM based on the number of - * executors we have currently running and our target number of executors. - * - * Visible for testing. - */ - def updateResourceRequests(): Unit = { - val pendingAllocate = getPendingAllocate - val numPendingAllocate = pendingAllocate.size - val missing = targetNumExecutors - numPendingAllocate - numExecutorsRunning - - if (missing > 0) { - logInfo(s"Will request $missing executor container(s), each with " + - s"${resource.getVirtualCores} core(s) and " + - s"${resource.getMemory} MB memory (including $memoryOverhead MB of overhead)") - - // Split the pending container request into three groups: locality matched list, locality - // unmatched list and non-locality list. Take the locality matched container request into - // consideration of container placement, treat as allocated containers. - // For locality unmatched and locality free container requests, cancel these container - // requests, since required locality preference has been changed, recalculating using - // container placement strategy. - val (localRequests, staleRequests, anyHostRequests) = splitPendingAllocationsByLocality( - hostToLocalTaskCounts, pendingAllocate) - - // cancel "stale" requests for locations that are no longer needed - staleRequests.foreach { stale => - amClient.removeContainerRequest(stale) - } - val cancelledContainers = staleRequests.size - if (cancelledContainers > 0) { - logInfo(s"Canceled $cancelledContainers container request(s) (locality no longer needed)") - } - - // consider the number of new containers and cancelled stale containers available - val availableContainers = missing + cancelledContainers - - // to maximize locality, include requests with no locality preference that can be cancelled - val potentialContainers = availableContainers + anyHostRequests.size - - val containerLocalityPreferences = containerPlacementStrategy.localityOfRequestedContainers( - potentialContainers, numLocalityAwareTasks, hostToLocalTaskCounts, - allocatedHostToContainersMap, localRequests) - - val newLocalityRequests = new mutable.ArrayBuffer[ContainerRequest] - containerLocalityPreferences.foreach { - case ContainerLocalityPreferences(nodes, racks) if nodes != null => - newLocalityRequests += createContainerRequest(resource, nodes, racks) - case _ => - } - - if (availableContainers >= newLocalityRequests.size) { - // more containers are available than needed for locality, fill in requests for any host - for (i <- 0 until (availableContainers - newLocalityRequests.size)) { - newLocalityRequests += createContainerRequest(resource, null, null) - } - } else { - val numToCancel = newLocalityRequests.size - availableContainers - // cancel some requests without locality preferences to schedule more local containers - anyHostRequests.slice(0, numToCancel).foreach { nonLocal => - amClient.removeContainerRequest(nonLocal) - } - if (numToCancel > 0) { - logInfo(s"Canceled $numToCancel unlocalized container requests to resubmit with locality") - } - } - - newLocalityRequests.foreach { request => - amClient.addContainerRequest(request) - } - - if (log.isInfoEnabled()) { - val (localized, anyHost) = newLocalityRequests.partition(_.getNodes() != null) - if (anyHost.nonEmpty) { - logInfo(s"Submitted ${anyHost.size} unlocalized container requests.") - } - localized.foreach { request => - logInfo(s"Submitted container request for host ${hostStr(request)}.") - } - } - } else if (numPendingAllocate > 0 && missing < 0) { - val numToCancel = math.min(numPendingAllocate, -missing) - logInfo(s"Canceling requests for $numToCancel executor container(s) to have a new desired " + - s"total $targetNumExecutors executors.") - - val matchingRequests = amClient.getMatchingRequests(RM_REQUEST_PRIORITY, ANY_HOST, resource) - if (!matchingRequests.isEmpty) { - matchingRequests.iterator().next().asScala - .take(numToCancel).foreach(amClient.removeContainerRequest) - } else { - logWarning("Expected to find pending requests, but found none.") - } - } - } - - private def hostStr(request: ContainerRequest): String = { - Option(request.getNodes) match { - case Some(nodes) => nodes.asScala.mkString(",") - case None => "Any" - } - } - - /** - * Creates a container request, handling the reflection required to use YARN features that were - * added in recent versions. - */ - private def createContainerRequest( - resource: Resource, - nodes: Array[String], - racks: Array[String]): ContainerRequest = { - nodeLabelConstructor.map { constructor => - constructor.newInstance(resource, nodes, racks, RM_REQUEST_PRIORITY, true: java.lang.Boolean, - labelExpression.orNull) - }.getOrElse(new ContainerRequest(resource, nodes, racks, RM_REQUEST_PRIORITY)) - } - - /** - * Handle containers granted by the RM by launching executors on them. - * - * Due to the way the YARN allocation protocol works, certain healthy race conditions can result - * in YARN granting containers that we no longer need. In this case, we release them. - * - * Visible for testing. - */ - def handleAllocatedContainers(allocatedContainers: Seq[Container]): Unit = { - val containersToUse = new ArrayBuffer[Container](allocatedContainers.size) - - // Match incoming requests by host - val remainingAfterHostMatches = new ArrayBuffer[Container] - for (allocatedContainer <- allocatedContainers) { - matchContainerToRequest(allocatedContainer, allocatedContainer.getNodeId.getHost, - containersToUse, remainingAfterHostMatches) - } - - // Match remaining by rack - val remainingAfterRackMatches = new ArrayBuffer[Container] - for (allocatedContainer <- remainingAfterHostMatches) { - val rack = RackResolver.resolve(conf, allocatedContainer.getNodeId.getHost).getNetworkLocation - matchContainerToRequest(allocatedContainer, rack, containersToUse, - remainingAfterRackMatches) - } - - // Assign remaining that are neither node-local nor rack-local - val remainingAfterOffRackMatches = new ArrayBuffer[Container] - for (allocatedContainer <- remainingAfterRackMatches) { - matchContainerToRequest(allocatedContainer, ANY_HOST, containersToUse, - remainingAfterOffRackMatches) - } - - if (!remainingAfterOffRackMatches.isEmpty) { - logDebug(s"Releasing ${remainingAfterOffRackMatches.size} unneeded containers that were " + - s"allocated to us") - for (container <- remainingAfterOffRackMatches) { - internalReleaseContainer(container) - } - } - - runAllocatedContainers(containersToUse) - - logInfo("Received %d containers from YARN, launching executors on %d of them." - .format(allocatedContainers.size, containersToUse.size)) - } - - /** - * Looks for requests for the given location that match the given container allocation. If it - * finds one, removes the request so that it won't be submitted again. Places the container into - * containersToUse or remaining. - * - * @param allocatedContainer container that was given to us by YARN - * @param location resource name, either a node, rack, or * - * @param containersToUse list of containers that will be used - * @param remaining list of containers that will not be used - */ - private def matchContainerToRequest( - allocatedContainer: Container, - location: String, - containersToUse: ArrayBuffer[Container], - remaining: ArrayBuffer[Container]): Unit = { - // SPARK-6050: certain Yarn configurations return a virtual core count that doesn't match the - // request; for example, capacity scheduler + DefaultResourceCalculator. So match on requested - // memory, but use the asked vcore count for matching, effectively disabling matching on vcore - // count. - val matchingResource = Resource.newInstance(allocatedContainer.getResource.getMemory, - resource.getVirtualCores) - val matchingRequests = amClient.getMatchingRequests(allocatedContainer.getPriority, location, - matchingResource) - - // Match the allocation to a request - if (!matchingRequests.isEmpty) { - val containerRequest = matchingRequests.get(0).iterator.next - amClient.removeContainerRequest(containerRequest) - containersToUse += allocatedContainer - } else { - remaining += allocatedContainer - } - } - - /** - * Launches executors in the allocated containers. - */ - private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): Unit = { - for (container <- containersToUse) { - executorIdCounter += 1 - val executorHostname = container.getNodeId.getHost - val containerId = container.getId - val executorId = executorIdCounter.toString - assert(container.getResource.getMemory >= resource.getMemory) - logInfo(s"Launching container $containerId on host $executorHostname") - - def updateInternalState(): Unit = synchronized { - numExecutorsRunning += 1 - executorIdToContainer(executorId) = container - containerIdToExecutorId(container.getId) = executorId - - val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname, - new HashSet[ContainerId]) - containerSet += containerId - allocatedContainerToHostMap.put(containerId, executorHostname) - } - - if (numExecutorsRunning < targetNumExecutors) { - if (launchContainers) { - launcherPool.execute(new Runnable { - override def run(): Unit = { - try { - new ExecutorRunnable( - Some(container), - conf, - sparkConf, - driverUrl, - executorId, - executorHostname, - executorMemory, - executorCores, - appAttemptId.getApplicationId.toString, - securityMgr, - localResources - ).run() - updateInternalState() - } catch { - case NonFatal(e) => - logError(s"Failed to launch executor $executorId on container $containerId", e) - // Assigned container should be released immediately to avoid unnecessary resource - // occupation. - amClient.releaseAssignedContainer(containerId) - } - } - }) - } else { - // For test only - updateInternalState() - } - } else { - logInfo(("Skip launching executorRunnable as runnning Excecutors count: %d " + - "reached target Executors count: %d.").format(numExecutorsRunning, targetNumExecutors)) - } - } - } - - // Visible for testing. - private[yarn] def processCompletedContainers(completedContainers: Seq[ContainerStatus]): Unit = { - for (completedContainer <- completedContainers) { - val containerId = completedContainer.getContainerId - val alreadyReleased = releasedContainers.remove(containerId) - val hostOpt = allocatedContainerToHostMap.get(containerId) - val onHostStr = hostOpt.map(host => s" on host: $host").getOrElse("") - val exitReason = if (!alreadyReleased) { - // Decrement the number of executors running. The next iteration of - // the ApplicationMaster's reporting thread will take care of allocating. - numExecutorsRunning -= 1 - logInfo("Completed container %s%s (state: %s, exit status: %s)".format( - containerId, - onHostStr, - completedContainer.getState, - completedContainer.getExitStatus)) - // Hadoop 2.2.X added a ContainerExitStatus we should switch to use - // there are some exit status' we shouldn't necessarily count against us, but for - // now I think its ok as none of the containers are expected to exit. - val exitStatus = completedContainer.getExitStatus - val (exitCausedByApp, containerExitReason) = exitStatus match { - case ContainerExitStatus.SUCCESS => - (false, s"Executor for container $containerId exited because of a YARN event (e.g., " + - "pre-emption) and not because of an error in the running job.") - case ContainerExitStatus.PREEMPTED => - // Preemption is not the fault of the running tasks, since YARN preempts containers - // merely to do resource sharing, and tasks that fail due to preempted executors could - // just as easily finish on any other executor. See SPARK-8167. - (false, s"Container ${containerId}${onHostStr} was preempted.") - // Should probably still count memory exceeded exit codes towards task failures - case VMEM_EXCEEDED_EXIT_CODE => - (true, memLimitExceededLogMessage( - completedContainer.getDiagnostics, - VMEM_EXCEEDED_PATTERN)) - case PMEM_EXCEEDED_EXIT_CODE => - (true, memLimitExceededLogMessage( - completedContainer.getDiagnostics, - PMEM_EXCEEDED_PATTERN)) - case _ => - // Enqueue the timestamp of failed executor - failedExecutorsTimeStamps.enqueue(clock.getTimeMillis()) - (true, "Container marked as failed: " + containerId + onHostStr + - ". Exit status: " + completedContainer.getExitStatus + - ". Diagnostics: " + completedContainer.getDiagnostics) - - } - if (exitCausedByApp) { - logWarning(containerExitReason) - } else { - logInfo(containerExitReason) - } - ExecutorExited(exitStatus, exitCausedByApp, containerExitReason) - } else { - // If we have already released this container, then it must mean - // that the driver has explicitly requested it to be killed - ExecutorExited(completedContainer.getExitStatus, exitCausedByApp = false, - s"Container $containerId exited from explicit termination request.") - } - - for { - host <- hostOpt - containerSet <- allocatedHostToContainersMap.get(host) - } { - containerSet.remove(containerId) - if (containerSet.isEmpty) { - allocatedHostToContainersMap.remove(host) - } else { - allocatedHostToContainersMap.update(host, containerSet) - } - - allocatedContainerToHostMap.remove(containerId) - } - - containerIdToExecutorId.remove(containerId).foreach { eid => - executorIdToContainer.remove(eid) - pendingLossReasonRequests.remove(eid) match { - case Some(pendingRequests) => - // Notify application of executor loss reasons so it can decide whether it should abort - pendingRequests.foreach(_.reply(exitReason)) - - case None => - // We cannot find executor for pending reasons. This is because completed container - // is processed before querying pending result. We should store it for later query. - // This is usually happened when explicitly killing a container, the result will be - // returned in one AM-RM communication. So query RPC will be later than this completed - // container process. - releasedExecutorLossReasons.put(eid, exitReason) - } - if (!alreadyReleased) { - // The executor could have gone away (like no route to host, node failure, etc) - // Notify backend about the failure of the executor - numUnexpectedContainerRelease += 1 - driverRef.send(RemoveExecutor(eid, exitReason)) - } - } - } - } - - /** - * Register that some RpcCallContext has asked the AM why the executor was lost. Note that - * we can only find the loss reason to send back in the next call to allocateResources(). - */ - private[yarn] def enqueueGetLossReasonRequest( - eid: String, - context: RpcCallContext): Unit = synchronized { - if (executorIdToContainer.contains(eid)) { - pendingLossReasonRequests - .getOrElseUpdate(eid, new ArrayBuffer[RpcCallContext]) += context - } else if (releasedExecutorLossReasons.contains(eid)) { - // Executor is already released explicitly before getting the loss reason, so directly send - // the pre-stored lost reason - context.reply(releasedExecutorLossReasons.remove(eid).get) - } else { - logWarning(s"Tried to get the loss reason for non-existent executor $eid") - context.sendFailure( - new SparkException(s"Fail to find loss reason for non-existent executor $eid")) - } - } - - private def internalReleaseContainer(container: Container): Unit = { - releasedContainers.add(container.getId()) - amClient.releaseAssignedContainer(container.getId()) - } - - private[yarn] def getNumUnexpectedContainerRelease = numUnexpectedContainerRelease - - private[yarn] def getNumPendingLossReasonRequests: Int = synchronized { - pendingLossReasonRequests.size - } - - /** - * Split the pending container requests into 3 groups based on current localities of pending - * tasks. - * @param hostToLocalTaskCount a map of preferred hostname to possible task counts to be used as - * container placement hint. - * @param pendingAllocations A sequence of pending allocation container request. - * @return A tuple of 3 sequences, first is a sequence of locality matched container - * requests, second is a sequence of locality unmatched container requests, and third is a - * sequence of locality free container requests. - */ - private def splitPendingAllocationsByLocality( - hostToLocalTaskCount: Map[String, Int], - pendingAllocations: Seq[ContainerRequest] - ): (Seq[ContainerRequest], Seq[ContainerRequest], Seq[ContainerRequest]) = { - val localityMatched = ArrayBuffer[ContainerRequest]() - val localityUnMatched = ArrayBuffer[ContainerRequest]() - val localityFree = ArrayBuffer[ContainerRequest]() - - val preferredHosts = hostToLocalTaskCount.keySet - pendingAllocations.foreach { cr => - val nodes = cr.getNodes - if (nodes == null) { - localityFree += cr - } else if (nodes.asScala.toSet.intersect(preferredHosts).nonEmpty) { - localityMatched += cr - } else { - localityUnMatched += cr - } - } - - (localityMatched.toSeq, localityUnMatched.toSeq, localityFree.toSeq) - } - -} - -private object YarnAllocator { - val MEM_REGEX = "[0-9.]+ [KMG]B" - val PMEM_EXCEEDED_PATTERN = - Pattern.compile(s"$MEM_REGEX of $MEM_REGEX physical memory used") - val VMEM_EXCEEDED_PATTERN = - Pattern.compile(s"$MEM_REGEX of $MEM_REGEX virtual memory used") - val VMEM_EXCEEDED_EXIT_CODE = -103 - val PMEM_EXCEEDED_EXIT_CODE = -104 - - def memLimitExceededLogMessage(diagnostics: String, pattern: Pattern): String = { - val matcher = pattern.matcher(diagnostics) - val diag = if (matcher.find()) " " + matcher.group() + "." else "" - ("Container killed by YARN for exceeding memory limits." + diag - + " Consider boosting spark.yarn.executor.memoryOverhead.") - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala deleted file mode 100644 index 53df11e..0000000 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.yarn - -import java.util.{List => JList} - -import scala.collection.JavaConverters._ -import scala.util.Try - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.yarn.api.records._ -import org.apache.hadoop.yarn.client.api.AMRMClient -import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest -import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.apache.hadoop.yarn.webapp.util.WebAppUtils - -import org.apache.spark.{SecurityManager, SparkConf} -import org.apache.spark.deploy.yarn.config._ -import org.apache.spark.internal.Logging -import org.apache.spark.rpc.RpcEndpointRef -import org.apache.spark.util.Utils - -/** - * Handles registering and unregistering the application with the YARN ResourceManager. - */ -private[spark] class YarnRMClient extends Logging { - - private var amClient: AMRMClient[ContainerRequest] = _ - private var uiHistoryAddress: String = _ - private var registered: Boolean = false - - /** - * Registers the application master with the RM. - * - * @param conf The Yarn configuration. - * @param sparkConf The Spark configuration. - * @param uiAddress Address of the SparkUI. - * @param uiHistoryAddress Address of the application on the History Server. - * @param securityMgr The security manager. - * @param localResources Map with information about files distributed via YARN's cache. - */ - def register( - driverUrl: String, - driverRef: RpcEndpointRef, - conf: YarnConfiguration, - sparkConf: SparkConf, - uiAddress: String, - uiHistoryAddress: String, - securityMgr: SecurityManager, - localResources: Map[String, LocalResource] - ): YarnAllocator = { - amClient = AMRMClient.createAMRMClient() - amClient.init(conf) - amClient.start() - this.uiHistoryAddress = uiHistoryAddress - - logInfo("Registering the ApplicationMaster") - synchronized { - amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress) - registered = true - } - new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, getAttemptId(), securityMgr, - localResources) - } - - /** - * Unregister the AM. Guaranteed to only be called once. - * - * @param status The final status of the AM. - * @param diagnostics Diagnostics message to include in the final status. - */ - def unregister(status: FinalApplicationStatus, diagnostics: String = ""): Unit = synchronized { - if (registered) { - amClient.unregisterApplicationMaster(status, diagnostics, uiHistoryAddress) - } - } - - /** Returns the attempt ID. */ - def getAttemptId(): ApplicationAttemptId = { - YarnSparkHadoopUtil.get.getContainerId.getApplicationAttemptId() - } - - /** Returns the configuration for the AmIpFilter to add to the Spark UI. */ - def getAmIpFilterParams(conf: YarnConfiguration, proxyBase: String): Map[String, String] = { - // Figure out which scheme Yarn is using. Note the method seems to have been added after 2.2, - // so not all stable releases have it. - val prefix = Try(classOf[WebAppUtils].getMethod("getHttpSchemePrefix", classOf[Configuration]) - .invoke(null, conf).asInstanceOf[String]).getOrElse("http://") - - // If running a new enough Yarn, use the HA-aware API for retrieving the RM addresses. - try { - val method = classOf[WebAppUtils].getMethod("getProxyHostsAndPortsForAmFilter", - classOf[Configuration]) - val proxies = method.invoke(null, conf).asInstanceOf[JList[String]] - val hosts = proxies.asScala.map { proxy => proxy.split(":")(0) } - val uriBases = proxies.asScala.map { proxy => prefix + proxy + proxyBase } - Map("PROXY_HOSTS" -> hosts.mkString(","), "PROXY_URI_BASES" -> uriBases.mkString(",")) - } catch { - case e: NoSuchMethodException => - val proxy = WebAppUtils.getProxyHostAndPort(conf) - val parts = proxy.split(":") - val uriBase = prefix + proxy + proxyBase - Map("PROXY_HOST" -> parts(0), "PROXY_URI_BASE" -> uriBase) - } - } - - /** Returns the maximum number of attempts to register the AM. */ - def getMaxRegAttempts(sparkConf: SparkConf, yarnConf: YarnConfiguration): Int = { - val sparkMaxAttempts = sparkConf.get(MAX_APP_ATTEMPTS).map(_.toInt) - val yarnMaxAttempts = yarnConf.getInt( - YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS) - val retval: Int = sparkMaxAttempts match { - case Some(x) => if (x <= yarnMaxAttempts) x else yarnMaxAttempts - case None => yarnMaxAttempts - } - - retval - } - -} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala deleted file mode 100644 index cc53b1b..0000000 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ /dev/null @@ -1,317 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.yarn - -import java.io.File -import java.nio.charset.StandardCharsets.UTF_8 -import java.util.regex.Matcher -import java.util.regex.Pattern - -import scala.collection.mutable.{HashMap, ListBuffer} -import scala.util.Try - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.io.Text -import org.apache.hadoop.mapred.JobConf -import org.apache.hadoop.security.Credentials -import org.apache.hadoop.security.UserGroupInformation -import org.apache.hadoop.yarn.api.ApplicationConstants -import org.apache.hadoop.yarn.api.ApplicationConstants.Environment -import org.apache.hadoop.yarn.api.records.{ApplicationAccessType, ContainerId, Priority} -import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.apache.hadoop.yarn.util.ConverterUtils - -import org.apache.spark.{SecurityManager, SparkConf, SparkException} -import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.deploy.yarn.security.{ConfigurableCredentialManager, CredentialUpdater} -import org.apache.spark.internal.config._ -import org.apache.spark.launcher.YarnCommandBuilderUtils -import org.apache.spark.util.Utils - -/** - * Contains util methods to interact with Hadoop from spark. - */ -class YarnSparkHadoopUtil extends SparkHadoopUtil { - - private var credentialUpdater: CredentialUpdater = _ - - override def transferCredentials(source: UserGroupInformation, dest: UserGroupInformation) { - dest.addCredentials(source.getCredentials()) - } - - // Note that all params which start with SPARK are propagated all the way through, so if in yarn - // mode, this MUST be set to true. - override def isYarnMode(): Boolean = { true } - - // Return an appropriate (subclass) of Configuration. Creating a config initializes some Hadoop - // subsystems. Always create a new config, don't reuse yarnConf. - override def newConfiguration(conf: SparkConf): Configuration = - new YarnConfiguration(super.newConfiguration(conf)) - - // Add any user credentials to the job conf which are necessary for running on a secure Hadoop - // cluster - override def addCredentials(conf: JobConf) { - val jobCreds = conf.getCredentials() - jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials()) - } - - override def getCurrentUserCredentials(): Credentials = { - UserGroupInformation.getCurrentUser().getCredentials() - } - - override def addCurrentUserCredentials(creds: Credentials) { - UserGroupInformation.getCurrentUser().addCredentials(creds) - } - - override def addSecretKeyToUserCredentials(key: String, secret: String) { - val creds = new Credentials() - creds.addSecretKey(new Text(key), secret.getBytes(UTF_8)) - addCurrentUserCredentials(creds) - } - - override def getSecretKeyFromUserCredentials(key: String): Array[Byte] = { - val credentials = getCurrentUserCredentials() - if (credentials != null) credentials.getSecretKey(new Text(key)) else null - } - - private[spark] override def startCredentialUpdater(sparkConf: SparkConf): Unit = { - credentialUpdater = - new ConfigurableCredentialManager(sparkConf, newConfiguration(sparkConf)).credentialUpdater() - credentialUpdater.start() - } - - private[spark] override def stopCredentialUpdater(): Unit = { - if (credentialUpdater != null) { - credentialUpdater.stop() - credentialUpdater = null - } - } - - private[spark] def getContainerId: ContainerId = { - val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name()) - ConverterUtils.toContainerId(containerIdString) - } -} - -object YarnSparkHadoopUtil { - // Additional memory overhead - // 10% was arrived at experimentally. In the interest of minimizing memory waste while covering - // the common cases. Memory overhead tends to grow with container size. - - val MEMORY_OVERHEAD_FACTOR = 0.10 - val MEMORY_OVERHEAD_MIN = 384L - - val ANY_HOST = "*" - - val DEFAULT_NUMBER_EXECUTORS = 2 - - // All RM requests are issued with same priority : we do not (yet) have any distinction between - // request types (like map/reduce in hadoop for example) - val RM_REQUEST_PRIORITY = Priority.newInstance(1) - - def get: YarnSparkHadoopUtil = { - val yarnMode = java.lang.Boolean.parseBoolean( - System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE"))) - if (!yarnMode) { - throw new SparkException("YarnSparkHadoopUtil is not available in non-YARN mode!") - } - SparkHadoopUtil.get.asInstanceOf[YarnSparkHadoopUtil] - } - /** - * Add a path variable to the given environment map. - * If the map already contains this key, append the value to the existing value instead. - */ - def addPathToEnvironment(env: HashMap[String, String], key: String, value: String): Unit = { - val newValue = if (env.contains(key)) { env(key) + getClassPathSeparator + value } else value - env.put(key, newValue) - } - - /** - * Set zero or more environment variables specified by the given input string. - * The input string is expected to take the form "KEY1=VAL1,KEY2=VAL2,KEY3=VAL3". - */ - def setEnvFromInputString(env: HashMap[String, String], inputString: String): Unit = { - if (inputString != null && inputString.length() > 0) { - val childEnvs = inputString.split(",") - val p = Pattern.compile(environmentVariableRegex) - for (cEnv <- childEnvs) { - val parts = cEnv.split("=") // split on '=' - val m = p.matcher(parts(1)) - val sb = new StringBuffer - while (m.find()) { - val variable = m.group(1) - var replace = "" - if (env.get(variable) != None) { - replace = env.get(variable).get - } else { - // if this key is not configured for the child .. get it from the env - replace = System.getenv(variable) - if (replace == null) { - // the env key is note present anywhere .. simply set it - replace = "" - } - } - m.appendReplacement(sb, Matcher.quoteReplacement(replace)) - } - m.appendTail(sb) - // This treats the environment variable as path variable delimited by `File.pathSeparator` - // This is kept for backward compatibility and consistency with Hadoop's behavior - addPathToEnvironment(env, parts(0), sb.toString) - } - } - } - - private val environmentVariableRegex: String = { - if (Utils.isWindows) { - "%([A-Za-z_][A-Za-z0-9_]*?)%" - } else { - "\\$([A-Za-z_][A-Za-z0-9_]*)" - } - } - - /** - * Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling. - * Not killing the task leaves various aspects of the executor and (to some extent) the jvm in - * an inconsistent state. - * TODO: If the OOM is not recoverable by rescheduling it on different node, then do - * 'something' to fail job ... akin to blacklisting trackers in mapred ? - * - * The handler if an OOM Exception is thrown by the JVM must be configured on Windows - * differently: the 'taskkill' command should be used, whereas Unix-based systems use 'kill'. - * - * As the JVM interprets both %p and %%p as the same, we can use either of them. However, - * some tests on Windows computers suggest, that the JVM only accepts '%%p'. - * - * Furthermore, the behavior of the character '%' on the Windows command line differs from - * the behavior of '%' in a .cmd file: it gets interpreted as an incomplete environment - * variable. Windows .cmd files escape a '%' by '%%'. Thus, the correct way of writing - * '%%p' in an escaped way is '%%%%p'. - */ - private[yarn] def addOutOfMemoryErrorArgument(javaOpts: ListBuffer[String]): Unit = { - if (!javaOpts.exists(_.contains("-XX:OnOutOfMemoryError"))) { - if (Utils.isWindows) { - javaOpts += escapeForShell("-XX:OnOutOfMemoryError=taskkill /F /PID %%%%p") - } else { - javaOpts += "-XX:OnOutOfMemoryError='kill %p'" - } - } - } - - /** - * Escapes a string for inclusion in a command line executed by Yarn. Yarn executes commands - * using either - * - * (Unix-based) `bash -c "command arg1 arg2"` and that means plain quoting doesn't really work. - * The argument is enclosed in single quotes and some key characters are escaped. - * - * (Windows-based) part of a .cmd file in which case windows escaping for each argument must be - * applied. Windows is quite lenient, however it is usually Java that causes trouble, needing to - * distinguish between arguments starting with '-' and class names. If arguments are surrounded - * by ' java takes the following string as is, hence an argument is mistakenly taken as a class - * name which happens to start with a '-'. The way to avoid this, is to surround nothing with - * a ', but instead with a ". - * - * @param arg A single argument. - * @return Argument quoted for execution via Yarn's generated shell script. - */ - def escapeForShell(arg: String): String = { - if (arg != null) { - if (Utils.isWindows) { - YarnCommandBuilderUtils.quoteForBatchScript(arg) - } else { - val escaped = new StringBuilder("'") - for (i <- 0 to arg.length() - 1) { - arg.charAt(i) match { - case '$' => escaped.append("\\$") - case '"' => escaped.append("\\\"") - case '\'' => escaped.append("'\\''") - case c => escaped.append(c) - } - } - escaped.append("'").toString() - } - } else { - arg - } - } - - // YARN/Hadoop acls are specified as user1,user2 group1,group2 - // Users and groups are separated by a space and hence we need to pass the acls in same format - def getApplicationAclsForYarn(securityMgr: SecurityManager) - : Map[ApplicationAccessType, String] = { - Map[ApplicationAccessType, String] ( - ApplicationAccessType.VIEW_APP -> (securityMgr.getViewAcls + " " + - securityMgr.getViewAclsGroups), - ApplicationAccessType.MODIFY_APP -> (securityMgr.getModifyAcls + " " + - securityMgr.getModifyAclsGroups) - ) - } - - /** - * Expand environment variable using Yarn API. - * If environment.$$() is implemented, return the result of it. - * Otherwise, return the result of environment.$() - * Note: $$() is added in Hadoop 2.4. - */ - private lazy val expandMethod = - Try(classOf[Environment].getMethod("$$")) - .getOrElse(classOf[Environment].getMethod("$")) - - def expandEnvironment(environment: Environment): String = - expandMethod.invoke(environment).asInstanceOf[String] - - /** - * Get class path separator using Yarn API. - * If ApplicationConstants.CLASS_PATH_SEPARATOR is implemented, return it. - * Otherwise, return File.pathSeparator - * Note: CLASS_PATH_SEPARATOR is added in Hadoop 2.4. - */ - private lazy val classPathSeparatorField = - Try(classOf[ApplicationConstants].getField("CLASS_PATH_SEPARATOR")) - .getOrElse(classOf[File].getField("pathSeparator")) - - def getClassPathSeparator(): String = { - classPathSeparatorField.get(null).asInstanceOf[String] - } - - /** - * Getting the initial target number of executors depends on whether dynamic allocation is - * enabled. - * If not using dynamic allocation it gets the number of executors requested by the user. - */ - def getInitialTargetExecutorNumber( - conf: SparkConf, - numExecutors: Int = DEFAULT_NUMBER_EXECUTORS): Int = { - if (Utils.isDynamicAllocationEnabled(conf)) { - val minNumExecutors = conf.get(DYN_ALLOCATION_MIN_EXECUTORS) - val initialNumExecutors = Utils.getDynamicAllocationInitialExecutors(conf) - val maxNumExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS) - require(initialNumExecutors >= minNumExecutors && initialNumExecutors <= maxNumExecutors, - s"initial executor number $initialNumExecutors must between min executor number " + - s"$minNumExecutors and max executor number $maxNumExecutors") - - initialNumExecutors - } else { - val targetNumExecutors = - sys.env.get("SPARK_EXECUTOR_INSTANCES").map(_.toInt).getOrElse(numExecutors) - // System property can override environment variable. - conf.get(EXECUTOR_INSTANCES).getOrElse(targetNumExecutors) - } - } -} - http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala deleted file mode 100644 index 666cb45..0000000 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala +++ /dev/null @@ -1,347 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.yarn - -import java.util.concurrent.TimeUnit - -import org.apache.spark.internal.config.ConfigBuilder -import org.apache.spark.network.util.ByteUnit - -package object config { - - /* Common app configuration. */ - - private[spark] val APPLICATION_TAGS = ConfigBuilder("spark.yarn.tags") - .doc("Comma-separated list of strings to pass through as YARN application tags appearing " + - "in YARN Application Reports, which can be used for filtering when querying YARN.") - .stringConf - .toSequence - .createOptional - - private[spark] val AM_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS = - ConfigBuilder("spark.yarn.am.attemptFailuresValidityInterval") - .doc("Interval after which AM failures will be considered independent and " + - "not accumulate towards the attempt count.") - .timeConf(TimeUnit.MILLISECONDS) - .createOptional - - private[spark] val AM_PORT = - ConfigBuilder("spark.yarn.am.port") - .intConf - .createWithDefault(0) - - private[spark] val EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS = - ConfigBuilder("spark.yarn.executor.failuresValidityInterval") - .doc("Interval after which Executor failures will be considered independent and not " + - "accumulate towards the attempt count.") - .timeConf(TimeUnit.MILLISECONDS) - .createOptional - - private[spark] val MAX_APP_ATTEMPTS = ConfigBuilder("spark.yarn.maxAppAttempts") - .doc("Maximum number of AM attempts before failing the app.") - .intConf - .createOptional - - private[spark] val USER_CLASS_PATH_FIRST = ConfigBuilder("spark.yarn.user.classpath.first") - .doc("Whether to place user jars in front of Spark's classpath.") - .booleanConf - .createWithDefault(false) - - private[spark] val GATEWAY_ROOT_PATH = ConfigBuilder("spark.yarn.config.gatewayPath") - .doc("Root of configuration paths that is present on gateway nodes, and will be replaced " + - "with the corresponding path in cluster machines.") - .stringConf - .createWithDefault(null) - - private[spark] val REPLACEMENT_ROOT_PATH = ConfigBuilder("spark.yarn.config.replacementPath") - .doc(s"Path to use as a replacement for ${GATEWAY_ROOT_PATH.key} when launching processes " + - "in the YARN cluster.") - .stringConf - .createWithDefault(null) - - private[spark] val QUEUE_NAME = ConfigBuilder("spark.yarn.queue") - .stringConf - .createWithDefault("default") - - private[spark] val HISTORY_SERVER_ADDRESS = ConfigBuilder("spark.yarn.historyServer.address") - .stringConf - .createOptional - - /* File distribution. */ - - private[spark] val SPARK_ARCHIVE = ConfigBuilder("spark.yarn.archive") - .doc("Location of archive containing jars files with Spark classes.") - .stringConf - .createOptional - - private[spark] val SPARK_JARS = ConfigBuilder("spark.yarn.jars") - .doc("Location of jars containing Spark classes.") - .stringConf - .toSequence - .createOptional - - private[spark] val ARCHIVES_TO_DISTRIBUTE = ConfigBuilder("spark.yarn.dist.archives") - .stringConf - .toSequence - .createWithDefault(Nil) - - private[spark] val FILES_TO_DISTRIBUTE = ConfigBuilder("spark.yarn.dist.files") - .stringConf - .toSequence - .createWithDefault(Nil) - - private[spark] val JARS_TO_DISTRIBUTE = ConfigBuilder("spark.yarn.dist.jars") - .stringConf - .toSequence - .createWithDefault(Nil) - - private[spark] val PRESERVE_STAGING_FILES = ConfigBuilder("spark.yarn.preserve.staging.files") - .doc("Whether to preserve temporary files created by the job in HDFS.") - .booleanConf - .createWithDefault(false) - - private[spark] val STAGING_FILE_REPLICATION = ConfigBuilder("spark.yarn.submit.file.replication") - .doc("Replication factor for files uploaded by Spark to HDFS.") - .intConf - .createOptional - - private[spark] val STAGING_DIR = ConfigBuilder("spark.yarn.stagingDir") - .doc("Staging directory used while submitting applications.") - .stringConf - .createOptional - - /* Cluster-mode launcher configuration. */ - - private[spark] val WAIT_FOR_APP_COMPLETION = ConfigBuilder("spark.yarn.submit.waitAppCompletion") - .doc("In cluster mode, whether to wait for the application to finish before exiting the " + - "launcher process.") - .booleanConf - .createWithDefault(true) - - private[spark] val REPORT_INTERVAL = ConfigBuilder("spark.yarn.report.interval") - .doc("Interval between reports of the current app status in cluster mode.") - .timeConf(TimeUnit.MILLISECONDS) - .createWithDefaultString("1s") - - /* Shared Client-mode AM / Driver configuration. */ - - private[spark] val AM_MAX_WAIT_TIME = ConfigBuilder("spark.yarn.am.waitTime") - .timeConf(TimeUnit.MILLISECONDS) - .createWithDefaultString("100s") - - private[spark] val AM_NODE_LABEL_EXPRESSION = ConfigBuilder("spark.yarn.am.nodeLabelExpression") - .doc("Node label expression for the AM.") - .stringConf - .createOptional - - private[spark] val CONTAINER_LAUNCH_MAX_THREADS = - ConfigBuilder("spark.yarn.containerLauncherMaxThreads") - .intConf - .createWithDefault(25) - - private[spark] val MAX_EXECUTOR_FAILURES = ConfigBuilder("spark.yarn.max.executor.failures") - .intConf - .createOptional - - private[spark] val MAX_REPORTER_THREAD_FAILURES = - ConfigBuilder("spark.yarn.scheduler.reporterThread.maxFailures") - .intConf - .createWithDefault(5) - - private[spark] val RM_HEARTBEAT_INTERVAL = - ConfigBuilder("spark.yarn.scheduler.heartbeat.interval-ms") - .timeConf(TimeUnit.MILLISECONDS) - .createWithDefaultString("3s") - - private[spark] val INITIAL_HEARTBEAT_INTERVAL = - ConfigBuilder("spark.yarn.scheduler.initial-allocation.interval") - .timeConf(TimeUnit.MILLISECONDS) - .createWithDefaultString("200ms") - - private[spark] val SCHEDULER_SERVICES = ConfigBuilder("spark.yarn.services") - .doc("A comma-separated list of class names of services to add to the scheduler.") - .stringConf - .toSequence - .createWithDefault(Nil) - - /* Client-mode AM configuration. */ - - private[spark] val AM_CORES = ConfigBuilder("spark.yarn.am.cores") - .intConf - .createWithDefault(1) - - private[spark] val AM_JAVA_OPTIONS = ConfigBuilder("spark.yarn.am.extraJavaOptions") - .doc("Extra Java options for the client-mode AM.") - .stringConf - .createOptional - - private[spark] val AM_LIBRARY_PATH = ConfigBuilder("spark.yarn.am.extraLibraryPath") - .doc("Extra native library path for the client-mode AM.") - .stringConf - .createOptional - - private[spark] val AM_MEMORY_OVERHEAD = ConfigBuilder("spark.yarn.am.memoryOverhead") - .bytesConf(ByteUnit.MiB) - .createOptional - - private[spark] val AM_MEMORY = ConfigBuilder("spark.yarn.am.memory") - .bytesConf(ByteUnit.MiB) - .createWithDefaultString("512m") - - /* Driver configuration. */ - - private[spark] val DRIVER_CORES = ConfigBuilder("spark.driver.cores") - .intConf - .createWithDefault(1) - - private[spark] val DRIVER_MEMORY_OVERHEAD = ConfigBuilder("spark.yarn.driver.memoryOverhead") - .bytesConf(ByteUnit.MiB) - .createOptional - - /* Executor configuration. */ - - private[spark] val EXECUTOR_CORES = ConfigBuilder("spark.executor.cores") - .intConf - .createWithDefault(1) - - private[spark] val EXECUTOR_MEMORY_OVERHEAD = ConfigBuilder("spark.yarn.executor.memoryOverhead") - .bytesConf(ByteUnit.MiB) - .createOptional - - private[spark] val EXECUTOR_NODE_LABEL_EXPRESSION = - ConfigBuilder("spark.yarn.executor.nodeLabelExpression") - .doc("Node label expression for executors.") - .stringConf - .createOptional - - /* Security configuration. */ - - private[spark] val CREDENTIAL_FILE_MAX_COUNT = - ConfigBuilder("spark.yarn.credentials.file.retention.count") - .intConf - .createWithDefault(5) - - private[spark] val CREDENTIALS_FILE_MAX_RETENTION = - ConfigBuilder("spark.yarn.credentials.file.retention.days") - .intConf - .createWithDefault(5) - - private[spark] val NAMENODES_TO_ACCESS = ConfigBuilder("spark.yarn.access.namenodes") - .doc("Extra NameNode URLs for which to request delegation tokens. The NameNode that hosts " + - "fs.defaultFS does not need to be listed here.") - .stringConf - .toSequence - .createWithDefault(Nil) - - /* Rolled log aggregation configuration. */ - - private[spark] val ROLLED_LOG_INCLUDE_PATTERN = - ConfigBuilder("spark.yarn.rolledLog.includePattern") - .doc("Java Regex to filter the log files which match the defined include pattern and those " + - "log files will be aggregated in a rolling fashion.") - .stringConf - .createOptional - - private[spark] val ROLLED_LOG_EXCLUDE_PATTERN = - ConfigBuilder("spark.yarn.rolledLog.excludePattern") - .doc("Java Regex to filter the log files which match the defined exclude pattern and those " + - "log files will not be aggregated in a rolling fashion.") - .stringConf - .createOptional - - /* Private configs. */ - - private[spark] val CREDENTIALS_FILE_PATH = ConfigBuilder("spark.yarn.credentials.file") - .internal() - .stringConf - .createWithDefault(null) - - // Internal config to propagate the location of the user's jar to the driver/executors - private[spark] val APP_JAR = ConfigBuilder("spark.yarn.user.jar") - .internal() - .stringConf - .createOptional - - // Internal config to propagate the locations of any extra jars to add to the classpath - // of the executors - private[spark] val SECONDARY_JARS = ConfigBuilder("spark.yarn.secondary.jars") - .internal() - .stringConf - .toSequence - .createOptional - - /* Configuration and cached file propagation. */ - - private[spark] val CACHED_FILES = ConfigBuilder("spark.yarn.cache.filenames") - .internal() - .stringConf - .toSequence - .createWithDefault(Nil) - - private[spark] val CACHED_FILES_SIZES = ConfigBuilder("spark.yarn.cache.sizes") - .internal() - .longConf - .toSequence - .createWithDefault(Nil) - - private[spark] val CACHED_FILES_TIMESTAMPS = ConfigBuilder("spark.yarn.cache.timestamps") - .internal() - .longConf - .toSequence - .createWithDefault(Nil) - - private[spark] val CACHED_FILES_VISIBILITIES = ConfigBuilder("spark.yarn.cache.visibilities") - .internal() - .stringConf - .toSequence - .createWithDefault(Nil) - - // Either "file" or "archive", for each file. - private[spark] val CACHED_FILES_TYPES = ConfigBuilder("spark.yarn.cache.types") - .internal() - .stringConf - .toSequence - .createWithDefault(Nil) - - // The location of the conf archive in HDFS. - private[spark] val CACHED_CONF_ARCHIVE = ConfigBuilder("spark.yarn.cache.confArchive") - .internal() - .stringConf - .createOptional - - private[spark] val CREDENTIALS_RENEWAL_TIME = ConfigBuilder("spark.yarn.credentials.renewalTime") - .internal() - .timeConf(TimeUnit.MILLISECONDS) - .createWithDefault(Long.MaxValue) - - private[spark] val CREDENTIALS_UPDATE_TIME = ConfigBuilder("spark.yarn.credentials.updateTime") - .internal() - .timeConf(TimeUnit.MILLISECONDS) - .createWithDefault(Long.MaxValue) - - // The list of cache-related config entries. This is used by Client and the AM to clean - // up the environment so that these settings do not appear on the web UI. - private[yarn] val CACHE_CONFIGS = Seq( - CACHED_FILES, - CACHED_FILES_SIZES, - CACHED_FILES_TIMESTAMPS, - CACHED_FILES_VISIBILITIES, - CACHED_FILES_TYPES, - CACHED_CONF_ARCHIVE) - -} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala deleted file mode 100644 index 7e76f40..0000000 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala +++ /dev/null @@ -1,235 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.deploy.yarn.security - -import java.security.PrivilegedExceptionAction -import java.util.concurrent.{Executors, TimeUnit} - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.hadoop.security.UserGroupInformation - -import org.apache.spark.SparkConf -import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil -import org.apache.spark.deploy.yarn.config._ -import org.apache.spark.internal.Logging -import org.apache.spark.internal.config._ -import org.apache.spark.util.ThreadUtils - -/** - * The following methods are primarily meant to make sure long-running apps like Spark - * Streaming apps can run without interruption while accessing secured services. The - * scheduleLoginFromKeytab method is called on the AM to get the new credentials. - * This method wakes up a thread that logs into the KDC - * once 75% of the renewal interval of the original credentials used for the container - * has elapsed. It then obtains new credentials and writes them to HDFS in a - * pre-specified location - the prefix of which is specified in the sparkConf by - * spark.yarn.credentials.file (so the file(s) would be named c-timestamp1-1, c-timestamp2-2 etc. - * - each update goes to a new file, with a monotonically increasing suffix), also the - * timestamp1, timestamp2 here indicates the time of next update for CredentialUpdater. - * After this, the credentials are renewed once 75% of the new tokens renewal interval has elapsed. - * - * On the executor and driver (yarn client mode) side, the updateCredentialsIfRequired method is - * called once 80% of the validity of the original credentials has elapsed. At that time the - * executor finds the credentials file with the latest timestamp and checks if it has read those - * credentials before (by keeping track of the suffix of the last file it read). If a new file has - * appeared, it will read the credentials and update the currently running UGI with it. This - * process happens again once 80% of the validity of this has expired. - */ -private[yarn] class AMCredentialRenewer( - sparkConf: SparkConf, - hadoopConf: Configuration, - credentialManager: ConfigurableCredentialManager) extends Logging { - - private var lastCredentialsFileSuffix = 0 - - private val credentialRenewer = - Executors.newSingleThreadScheduledExecutor( - ThreadUtils.namedThreadFactory("Credential Refresh Thread")) - - private val hadoopUtil = YarnSparkHadoopUtil.get - - private val credentialsFile = sparkConf.get(CREDENTIALS_FILE_PATH) - private val daysToKeepFiles = sparkConf.get(CREDENTIALS_FILE_MAX_RETENTION) - private val numFilesToKeep = sparkConf.get(CREDENTIAL_FILE_MAX_COUNT) - private val freshHadoopConf = - hadoopUtil.getConfBypassingFSCache(hadoopConf, new Path(credentialsFile).toUri.getScheme) - - @volatile private var timeOfNextRenewal = sparkConf.get(CREDENTIALS_RENEWAL_TIME) - - /** - * Schedule a login from the keytab and principal set using the --principal and --keytab - * arguments to spark-submit. This login happens only when the credentials of the current user - * are about to expire. This method reads spark.yarn.principal and spark.yarn.keytab from - * SparkConf to do the login. This method is a no-op in non-YARN mode. - * - */ - private[spark] def scheduleLoginFromKeytab(): Unit = { - val principal = sparkConf.get(PRINCIPAL).get - val keytab = sparkConf.get(KEYTAB).get - - /** - * Schedule re-login and creation of new credentials. If credentials have already expired, this - * method will synchronously create new ones. - */ - def scheduleRenewal(runnable: Runnable): Unit = { - // Run now! - val remainingTime = timeOfNextRenewal - System.currentTimeMillis() - if (remainingTime <= 0) { - logInfo("Credentials have expired, creating new ones now.") - runnable.run() - } else { - logInfo(s"Scheduling login from keytab in $remainingTime millis.") - credentialRenewer.schedule(runnable, remainingTime, TimeUnit.MILLISECONDS) - } - } - - // This thread periodically runs on the AM to update the credentials on HDFS. - val credentialRenewerRunnable = - new Runnable { - override def run(): Unit = { - try { - writeNewCredentialsToHDFS(principal, keytab) - cleanupOldFiles() - } catch { - case e: Exception => - // Log the error and try to write new tokens back in an hour - logWarning("Failed to write out new credentials to HDFS, will try again in an " + - "hour! If this happens too often tasks will fail.", e) - credentialRenewer.schedule(this, 1, TimeUnit.HOURS) - return - } - scheduleRenewal(this) - } - } - // Schedule update of credentials. This handles the case of updating the credentials right now - // as well, since the renewal interval will be 0, and the thread will get scheduled - // immediately. - scheduleRenewal(credentialRenewerRunnable) - } - - // Keeps only files that are newer than daysToKeepFiles days, and deletes everything else. At - // least numFilesToKeep files are kept for safety - private def cleanupOldFiles(): Unit = { - import scala.concurrent.duration._ - try { - val remoteFs = FileSystem.get(freshHadoopConf) - val credentialsPath = new Path(credentialsFile) - val thresholdTime = System.currentTimeMillis() - (daysToKeepFiles.days).toMillis - hadoopUtil.listFilesSorted( - remoteFs, credentialsPath.getParent, - credentialsPath.getName, SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION) - .dropRight(numFilesToKeep) - .takeWhile(_.getModificationTime < thresholdTime) - .foreach(x => remoteFs.delete(x.getPath, true)) - } catch { - // Such errors are not fatal, so don't throw. Make sure they are logged though - case e: Exception => - logWarning("Error while attempting to cleanup old credentials. If you are seeing many " + - "such warnings there may be an issue with your HDFS cluster.", e) - } - } - - private def writeNewCredentialsToHDFS(principal: String, keytab: String): Unit = { - // Keytab is copied by YARN to the working directory of the AM, so full path is - // not needed. - - // HACK: - // HDFS will not issue new delegation tokens, if the Credentials object - // passed in already has tokens for that FS even if the tokens are expired (it really only - // checks if there are tokens for the service, and not if they are valid). So the only real - // way to get new tokens is to make sure a different Credentials object is used each time to - // get new tokens and then the new tokens are copied over the current user's Credentials. - // So: - // - we login as a different user and get the UGI - // - use that UGI to get the tokens (see doAs block below) - // - copy the tokens over to the current user's credentials (this will overwrite the tokens - // in the current user's Credentials object for this FS). - // The login to KDC happens each time new tokens are required, but this is rare enough to not - // have to worry about (like once every day or so). This makes this code clearer than having - // to login and then relogin every time (the HDFS API may not relogin since we don't use this - // UGI directly for HDFS communication. - logInfo(s"Attempting to login to KDC using principal: $principal") - val keytabLoggedInUGI = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab) - logInfo("Successfully logged into KDC.") - val tempCreds = keytabLoggedInUGI.getCredentials - val credentialsPath = new Path(credentialsFile) - val dst = credentialsPath.getParent - var nearestNextRenewalTime = Long.MaxValue - keytabLoggedInUGI.doAs(new PrivilegedExceptionAction[Void] { - // Get a copy of the credentials - override def run(): Void = { - nearestNextRenewalTime = credentialManager.obtainCredentials(freshHadoopConf, tempCreds) - null - } - }) - - val currTime = System.currentTimeMillis() - val timeOfNextUpdate = if (nearestNextRenewalTime <= currTime) { - // If next renewal time is earlier than current time, we set next renewal time to current - // time, this will trigger next renewal immediately. Also set next update time to current - // time. There still has a gap between token renewal and update will potentially introduce - // issue. - logWarning(s"Next credential renewal time ($nearestNextRenewalTime) is earlier than " + - s"current time ($currTime), which is unexpected, please check your credential renewal " + - "related configurations in the target services.") - timeOfNextRenewal = currTime - currTime - } else { - // Next valid renewal time is about 75% of credential renewal time, and update time is - // slightly later than valid renewal time (80% of renewal time). - timeOfNextRenewal = ((nearestNextRenewalTime - currTime) * 0.75 + currTime).toLong - ((nearestNextRenewalTime - currTime) * 0.8 + currTime).toLong - } - - // Add the temp credentials back to the original ones. - UserGroupInformation.getCurrentUser.addCredentials(tempCreds) - val remoteFs = FileSystem.get(freshHadoopConf) - // If lastCredentialsFileSuffix is 0, then the AM is either started or restarted. If the AM - // was restarted, then the lastCredentialsFileSuffix might be > 0, so find the newest file - // and update the lastCredentialsFileSuffix. - if (lastCredentialsFileSuffix == 0) { - hadoopUtil.listFilesSorted( - remoteFs, credentialsPath.getParent, - credentialsPath.getName, SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION) - .lastOption.foreach { status => - lastCredentialsFileSuffix = hadoopUtil.getSuffixForCredentialsPath(status.getPath) - } - } - val nextSuffix = lastCredentialsFileSuffix + 1 - - val tokenPathStr = - credentialsFile + SparkHadoopUtil.SPARK_YARN_CREDS_COUNTER_DELIM + - timeOfNextUpdate.toLong.toString + SparkHadoopUtil.SPARK_YARN_CREDS_COUNTER_DELIM + - nextSuffix - val tokenPath = new Path(tokenPathStr) - val tempTokenPath = new Path(tokenPathStr + SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION) - - logInfo("Writing out delegation tokens to " + tempTokenPath.toString) - val credentials = UserGroupInformation.getCurrentUser.getCredentials - credentials.writeTokenStorageFile(tempTokenPath, freshHadoopConf) - logInfo(s"Delegation Tokens written out successfully. Renaming file to $tokenPathStr") - remoteFs.rename(tempTokenPath, tokenPath) - logInfo("Delegation token file rename complete.") - lastCredentialsFileSuffix = nextSuffix - } - - def stop(): Unit = { - credentialRenewer.shutdown() - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
