FS preemption changes WiP - Initial set of classes and structure for preemption - Rename a few files and change starvedApps data structure - Removed a bunch of preemption code. All tests except TestFairSchedulerPreemption pass. - Pass checkStarvation down to FSLeafQueue - Identify starved applications - FSLeafQueue changes - Identify starved apps - FSAppAttempt
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e4eec258 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e4eec258 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e4eec258 Branch: refs/heads/fs-preemption Commit: e4eec2585333d8bcd77e8b39e2f792358be92831 Parents: d781c25 Author: Karthik Kambatla <ka...@apache.org> Authored: Sat Mar 5 09:31:28 2016 -0800 Committer: Karthik Kambatla <ka...@apache.org> Committed: Tue May 31 00:49:56 2016 -0700 ---------------------------------------------------------------------- .../scheduler/fair/FSAppAttempt.java | 64 ++++- .../scheduler/fair/FSContext.java | 92 +++++++ .../scheduler/fair/FSLeafQueue.java | 169 +++++++------ .../scheduler/fair/FSParentQueue.java | 11 +- .../scheduler/fair/FSPreemptionThread.java | 152 ++++++++++++ .../resourcemanager/scheduler/fair/FSQueue.java | 15 +- .../scheduler/fair/FairScheduler.java | 244 +++++-------------- .../scheduler/fair/QueueManager.java | 19 +- .../scheduler/fair/TestFSLeafQueue.java | 19 +- .../scheduler/fair/TestFSParentQueue.java | 6 +- .../fair/TestFairSchedulerPreemption.java | 39 +-- .../fair/TestMaxRunningAppsEnforcer.java | 7 +- .../scheduler/fair/TestQueueManager.java | 6 +- .../webapp/dao/TestFairSchedulerQueueInfo.java | 2 +- 14 files changed, 521 insertions(+), 324 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4eec258/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 5b83c9a..5065881 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -77,10 +77,14 @@ public class FSAppAttempt extends SchedulerApplicationAttempt private ResourceWeights resourceWeights; private Resource demand = Resources.createResource(0); private FairScheduler scheduler; + private FSQueue fsQueue; private Resource fairShare = Resources.createResource(0, 0); - private Resource preemptedResources = Resources.createResource(0); private RMContainerComparator comparator = new RMContainerComparator(); - private final Map<RMContainer, Long> preemptionMap = new HashMap<RMContainer, Long>(); + + // Preemption related variables + private Resource preemptedResources = Resources.createResource(0); + private final Set<RMContainer> containersToPreempt = new HashSet<>(); + private long lastTimeAtFairShare; // Used to record node reservation by an app. // Key = RackName, Value = Set of Nodes reserved by app on rack @@ -106,7 +110,9 @@ public class FSAppAttempt extends SchedulerApplicationAttempt super(applicationAttemptId, user, queue, activeUsersManager, rmContext); this.scheduler = scheduler; + this.fsQueue = queue; this.startTime = scheduler.getClock().getTime(); + this.lastTimeAtFairShare = this.startTime; this.priority = Priority.newInstance(1); this.resourceWeights = new ResourceWeights(); } @@ -145,6 +151,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt // Remove from the list of containers liveContainers.remove(rmContainer.getContainerId()); + containersToPreempt.remove(rmContainer); Resource containerResource = rmContainer.getContainer().getResource(); RMAuditLogger.logSuccess(getUser(), @@ -155,9 +162,6 @@ public class FSAppAttempt extends SchedulerApplicationAttempt queue.getMetrics().releaseResources(getUser(), 1, containerResource); this.attemptResourceUsage.decUsed(containerResource); - // remove from preemption map if it is completed - preemptionMap.remove(rmContainer); - // Clear resource utilization metrics cache. lastMemoryAggregateAllocationUpdateTime = -1; } @@ -423,18 +427,13 @@ public class FSAppAttempt extends SchedulerApplicationAttempt } // related methods - public void addPreemption(RMContainer container, long time) { - assert preemptionMap.get(container) == null; - preemptionMap.put(container, time); + public void addPreemption(RMContainer container) { + containersToPreempt.add(container); Resources.addTo(preemptedResources, container.getAllocatedResource()); } - public Long getContainerPreemptionTime(RMContainer container) { - return preemptionMap.get(container); - } - public Set<RMContainer> getPreemptionContainers() { - return preemptionMap.keySet(); + return containersToPreempt; } @Override @@ -479,6 +478,14 @@ public class FSAppAttempt extends SchedulerApplicationAttempt } /** + * Reserve a spot on this node for a ResourceRequest that would fit in the + * containerSize provided. + */ + public boolean reserve(FSSchedulerNode node, Resource containerSize) { + return false; + } + + /** * Reserve a spot for {@code container} on this {@code node}. If * the container is {@code alreadyReserved} on the node, simply * update relevant bookeeping. This dispatches ro relevant handlers @@ -859,6 +866,37 @@ public class FSAppAttempt extends SchedulerApplicationAttempt } } + /** + * Helper method that computes the extent of fairshare starvation. + */ + Resource fairShareStarvation() { + Resource threshold = Resources.multiply( + getFairShare(), fsQueue.getFairSharePreemptionThreshold()); + Resource starvation = Resources.subtractFrom(threshold, getResourceUsage()); + + long now = scheduler.getClock().getTime(); + boolean starved = Resources.greaterThan( + fsQueue.getPolicy().getResourceCalculator(), + scheduler.getClusterResource(), starvation, Resources.none()); + + if (!starved) { + lastTimeAtFairShare = now; + } + + if (starved && + (now - lastTimeAtFairShare > fsQueue.getFairSharePreemptionTimeout())) { + // Queue is starved for longer than preemption-timeout + return starvation; + } else { + return Resources.none(); + } + } + + public ResourceRequest getNextResourceRequest() { + // TODO (KK): Return highest priority resource request + return null; + } + /* Schedulable methods implementation */ @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4eec258/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSContext.java new file mode 100644 index 0000000..eccbd2d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSContext.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.util.Clock; + +import java.util.concurrent.PriorityBlockingQueue; + +/** + * Helper class that holds basic information to be passed around + * FairScheduler classes. + */ +public class FSContext { + private FairScheduler scheduler; + private QueueManager queueManager; + private Clock clock; + + // Preemption-related info + private boolean preemptionEnabled = false; + private float preemptionUtilizationThreshold; + private PriorityBlockingQueue<FSAppAttempt> starvedApps; + + public FairScheduler getScheduler() { + return scheduler; + } + + public void setScheduler( + FairScheduler scheduler) { + this.scheduler = scheduler; + } + + public Resource getClusterResource() { + return scheduler.getClusterResource(); + } + + public QueueManager getQueueManager() { + return queueManager; + } + + public void setQueueManager( + QueueManager queueManager) { + this.queueManager = queueManager; + } + + public Clock getClock() { + return clock; + } + + public void setClock(Clock clock) { + this.clock = clock; + } + + public boolean isPreemptionEnabled() { + return preemptionEnabled; + } + + public void setPreemptionEnabled() { + this.preemptionEnabled = true; + if (starvedApps == null) { + starvedApps = new PriorityBlockingQueue<>(); + } + } + + public float getPreemptionUtilizationThreshold() { + return preemptionUtilizationThreshold; + } + + public void setPreemptionUtilizationThreshold( + float preemptionUtilizationThreshold) { + this.preemptionUtilizationThreshold = preemptionUtilizationThreshold; + } + + public PriorityBlockingQueue<FSAppAttempt> getStarvedApps() { + return starvedApps; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4eec258/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index a398906..bc2a7c1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -52,10 +52,9 @@ public class FSLeafQueue extends FSQueue { private static final Log LOG = LogFactory.getLog( FSLeafQueue.class.getName()); - private final List<FSAppAttempt> runnableApps = // apps that are runnable - new ArrayList<FSAppAttempt>(); - private final List<FSAppAttempt> nonRunnableApps = - new ArrayList<FSAppAttempt>(); + // apps that are runnable + private final List<FSAppAttempt> runnableApps = new ArrayList<>(); + private final List<FSAppAttempt> nonRunnableApps = new ArrayList<>(); // get a lock with fair distribution for app list updates private final ReadWriteLock rwl = new ReentrantReadWriteLock(true); private final Lock readLock = rwl.readLock(); @@ -65,19 +64,16 @@ public class FSLeafQueue extends FSQueue { // Variables used for preemption private long lastTimeAtMinShare; - private long lastTimeAtFairShareThreshold; - + // Track the AM resource usage for this queue private Resource amResourceUsage; private final ActiveUsersManager activeUsersManager; public static final List<FSQueue> EMPTY_LIST = Collections.emptyList(); - public FSLeafQueue(String name, FairScheduler scheduler, - FSParentQueue parent) { - super(name, scheduler, parent); - this.lastTimeAtMinShare = scheduler.getClock().getTime(); - this.lastTimeAtFairShareThreshold = scheduler.getClock().getTime(); + public FSLeafQueue(FSContext context, FSParentQueue parent, String name) { + super(context, parent, name); + this.lastTimeAtMinShare = context.getClock().getTime(); activeUsersManager = new ActiveUsersManager(getMetrics()); amResourceUsage = Resource.newInstance(0, 0); } @@ -224,17 +220,70 @@ public class FSLeafQueue extends FSQueue { } super.policy = policy; } - + @Override - public void recomputeShares() { + public void updateInternal(boolean checkStarvation) { readLock.lock(); try { policy.computeShares(runnableApps, getFairShare()); + if (checkStarvation) { + identifyStarvedApplications(); + } } finally { readLock.unlock(); } } + /** + * Helper method to identify starved applications. This needs to be called + * ONLY from {@link #updateInternal}, after the application shares + * are updated. + * + * A queue can be starving due to fairshare or minshare. + * + * Minshare is defined only on the queue and not the applications. + * Fairshare is defined for both the queue and the applications. + * + * If this queue is starved due to minshare, we need to identify the most + * deserving apps if they themselves are not starved due to fairshare. + * + * If this queue is starving due to fairshare, there must be at least + * one application that is starved. And, even if the queue is not + * starved due to fairshare, there might still be starved applications. + */ + private void identifyStarvedApplications() { + // First identify starved applications and track total amount of + // starvation (in resources) + Resource fairShareStarvation = Resources.clone(Resources.none()); + TreeSet<FSAppAttempt> appsWithDemand = fetchAppsWithDemand(); + for (FSAppAttempt app : appsWithDemand) { + Resource appStarvation = app.fairShareStarvation(); + if (Resources.equals(Resources.none(), appStarvation)) { + break; + } else { + context.getStarvedApps().add(app); + Resources.addTo(fairShareStarvation, appStarvation); + } + } + + // Compute extent of minshare starvation + Resource minShareStarvation = minShareStarvation(); + + // Compute minshare starvation that is not subsumed by fairshare starvation + Resources.subtractFrom(minShareStarvation, fairShareStarvation); + + // Keep adding apps to the starved list until the unmet demand goes over + // the remaining minshare + for (FSAppAttempt app : appsWithDemand) { + if (Resources.greaterThan(policy.getResourceCalculator(), + context.getClusterResource(), minShareStarvation, Resources.none())) { + context.getStarvedApps().add(app); + Resources.subtractFrom(minShareStarvation, + Resources.subtract(app.getDemand(), app.getResourceUsage())); + } + } + } + @Override public Resource getDemand() { return demand; @@ -317,21 +366,7 @@ public class FSLeafQueue extends FSQueue { return assigned; } - // Apps that have resource demands. - TreeSet<FSAppAttempt> pendingForResourceApps = - new TreeSet<FSAppAttempt>(policy.getComparator()); - readLock.lock(); - try { - for (FSAppAttempt app : runnableApps) { - Resource pending = app.getAppAttemptResourceUsage().getPending(); - if (!pending.equals(Resources.none())) { - pendingForResourceApps.add(app); - } - } - } finally { - readLock.unlock(); - } - for (FSAppAttempt sched : pendingForResourceApps) { + for (FSAppAttempt sched : fetchAppsWithDemand()) { if (SchedulerAppUtils.isBlacklisted(sched, node, LOG)) { continue; } @@ -347,6 +382,23 @@ public class FSLeafQueue extends FSQueue { return assigned; } + private TreeSet<FSAppAttempt> fetchAppsWithDemand() { + TreeSet<FSAppAttempt> pendingForResourceApps = + new TreeSet<>(policy.getComparator()); + readLock.lock(); + try { + for (FSAppAttempt app : runnableApps) { + Resource pending = app.getAppAttemptResourceUsage().getPending(); + if (!pending.equals(Resources.none())) { + pendingForResourceApps.add(app); + } + } + } finally { + readLock.unlock(); + } + return pendingForResourceApps; + } + @Override public RMContainer preemptContainer() { RMContainer toBePreempted = null; @@ -412,15 +464,6 @@ public class FSLeafQueue extends FSQueue { this.lastTimeAtMinShare = lastTimeAtMinShare; } - public long getLastTimeAtFairShareThreshold() { - return lastTimeAtFairShareThreshold; - } - - private void setLastTimeAtFairShareThreshold( - long lastTimeAtFairShareThreshold) { - this.lastTimeAtFairShareThreshold = lastTimeAtFairShareThreshold; - } - @Override public int getNumRunnableApps() { readLock.lock(); @@ -525,20 +568,6 @@ public class FSLeafQueue extends FSQueue { // TODO Auto-generated method stub } - /** - * Update the preemption fields for the queue, i.e. the times since last was - * at its guaranteed share and over its fair share threshold. - */ - public void updateStarvationStats() { - long now = scheduler.getClock().getTime(); - if (!isStarvedForMinShare()) { - setLastTimeAtMinShare(now); - } - if (!isStarvedForFairShare()) { - setLastTimeAtFairShareThreshold(now); - } - } - /** Allows setting weight for a dynamically created queue * Currently only used for reservation based queues * @param weight queue weight @@ -558,28 +587,24 @@ public class FSLeafQueue extends FSQueue { getFairShare()); } - /** - * Is a queue being starved for its min share. - */ - @VisibleForTesting - boolean isStarvedForMinShare() { - return isStarved(getMinShare()); - } + private Resource minShareStarvation() { + Resource desiredShare = Resources.min(policy.getResourceCalculator(), + scheduler.getClusterResource(), getMinShare(), getDemand()); - /** - * Is a queue being starved for its fair share threshold. - */ - @VisibleForTesting - boolean isStarvedForFairShare() { - return isStarved( - Resources.multiply(getFairShare(), getFairSharePreemptionThreshold())); - } + Resource starvation = Resources.subtract(desiredShare, getResourceUsage()); + boolean starved = Resources.greaterThan(policy.getResourceCalculator(), + scheduler.getClusterResource(), starvation, Resources.none()); - private boolean isStarved(Resource share) { - Resource desiredShare = Resources.min(policy.getResourceCalculator(), - scheduler.getClusterResource(), share, getDemand()); - Resource resourceUsage = getResourceUsage(); - return Resources.lessThan(policy.getResourceCalculator(), - scheduler.getClusterResource(), resourceUsage, desiredShare); + long now = context.getClock().getTime(); + if (!starved) { + setLastTimeAtMinShare(now); + } + + if (starved && + (now - lastTimeAtMinShare > getMinSharePreemptionTimeout())) { + return starvation; + } else { + return Resources.none(); + } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4eec258/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java index 035c60c..79c6e1c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java @@ -56,9 +56,8 @@ public class FSParentQueue extends FSQueue { private Lock readLock = rwLock.readLock(); private Lock writeLock = rwLock.writeLock(); - public FSParentQueue(String name, FairScheduler scheduler, - FSParentQueue parent) { - super(name, scheduler, parent); + public FSParentQueue(FSContext context, FSParentQueue parent, String name) { + super(context, parent, name); } public void addChildQueue(FSQueue child) { @@ -80,13 +79,13 @@ public class FSParentQueue extends FSQueue { } @Override - public void recomputeShares() { + public void updateInternal(boolean checkStarvation) { readLock.lock(); try { policy.computeShares(childQueues, getFairShare()); for (FSQueue childQueue : childQueues) { childQueue.getMetrics().setFairShare(childQueue.getFairShare()); - childQueue.recomputeShares(); + childQueue.updateInternal(checkStarvation); } } finally { readLock.unlock(); @@ -304,7 +303,7 @@ public class FSParentQueue extends FSQueue { } super.policy = policy; } - + public void incrementRunnableApps() { writeLock.lock(); try { http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4eec258/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java new file mode 100644 index 0000000..0e99b64 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.util.resource.Resources; + +import java.util.ArrayList; +import java.util.List; +import java.util.Timer; +import java.util.TimerTask; + +/** + * Thread that handles FairScheduler preemption + */ +public class FSPreemptionThread extends Thread { + private static final Log LOG = LogFactory.getLog(FSPreemptionThread.class); + private final FSContext context; + private final FairScheduler scheduler; + private final long warnTimeBeforeKill; + private final Timer preemptionTimer; + + public FSPreemptionThread(FSContext context) { + this.context = context; + this.scheduler = context.getScheduler(); + FairSchedulerConfiguration fsConf = scheduler.getConf(); + context.setPreemptionEnabled(); + context.setPreemptionUtilizationThreshold( + fsConf.getPreemptionUtilizationThreshold()); + warnTimeBeforeKill = fsConf.getWaitTimeBeforeKill(); + preemptionTimer = new Timer("Preemption Timer", true); + + setDaemon(true); + setName("FSPreemptionThread"); + } + + public void run() { + while (!Thread.interrupted()) { + FSAppAttempt starvedApp; + try{ + starvedApp = context.getStarvedApps().take(); + } catch (InterruptedException e) { + LOG.info("Preemption thread interrupted! Exiting."); + return; + } + List<RMContainer> containers = identifyContainersToPreempt(starvedApp); + if (containers != null) { + preemptContainers(containers); + } + } + } + + /** + * Returns a non-null PremptionContext if it finds a node that can + * accommodate a request from this app. Also, reserves the node for this app. + */ + private List<RMContainer> identifyContainersToPreempt(FSAppAttempt starvedApp) { + List<RMContainer> containers = new ArrayList<>(); + ResourceRequest request = starvedApp.getNextResourceRequest(); + Resource requestCapability = request.getCapability(); + List<FSSchedulerNode> nodes = + scheduler.getNodeTracker().getNodes(request.getResourceName()); + FSSchedulerNode targetNode = null; + Resource potential = Resources.clone(Resources.none()); + for (FSSchedulerNode node : nodes) { + containers.clear(); + potential = Resources.clone(Resources.none()); + for (RMContainer container : node.getCopiedListOfRunningContainers()) { + Resource containerResource = container.getAllocatedResource(); + FSAppAttempt app = + scheduler.getSchedulerApp(container.getApplicationAttemptId()); + if (Resources.fitsIn(containerResource, + Resources.subtract(app.getResourceUsage(), app.getFairShare()))) { + Resources.addTo(potential, containerResource); + } + if (Resources.fitsIn(requestCapability, potential)) { + break; + } + } + if (Resources.fitsIn(requestCapability, potential)) { + targetNode = node; + break; + } + } + + if (Resources.fitsIn(requestCapability, potential)) { + starvedApp.reserve(targetNode, requestCapability); + return containers; + } else { + return null; + } + } + + public void preemptContainers(List<RMContainer> containers) { + // Warn application about containers to be killed + for (RMContainer container : containers) { + ApplicationAttemptId appAttemptId = container.getApplicationAttemptId(); + FSAppAttempt app = context.getScheduler().getSchedulerApp(appAttemptId); + FSLeafQueue queue = app.getQueue(); + LOG.info("Preempting container " + container + + " from queue " + queue.getName()); + app.addPreemption(container); + } + + // Schedule timer task to kill containers + preemptionTimer.schedule( + new PreemptContainersTask(containers), warnTimeBeforeKill); + } + + private class PreemptContainersTask extends TimerTask { + private List<RMContainer> containers; + + PreemptContainersTask(List<RMContainer> containers) { + this.containers = containers; + } + + @Override + public void run() { + for (RMContainer container : containers) { + ContainerStatus status = SchedulerUtils.createPreemptedContainerStatus( + container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER); + + LOG.info("Killing container " + container); + context.getScheduler().completedContainer( + container, status, RMContainerEventType.KILL); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4eec258/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java index f50c358..32184fa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java @@ -50,6 +50,7 @@ public abstract class FSQueue implements Queue, Schedulable { private Resource fairShare = Resources.createResource(0, 0); private Resource steadyFairShare = Resources.createResource(0, 0); private final String name; + protected final FSContext context; protected final FairScheduler scheduler; private final FSQueueMetrics metrics; @@ -64,9 +65,10 @@ public abstract class FSQueue implements Queue, Schedulable { private float fairSharePreemptionThreshold = 0.5f; private boolean preemptable = true; - public FSQueue(String name, FairScheduler scheduler, FSParentQueue parent) { + public FSQueue(FSContext context, FSParentQueue parent, String name) { this.name = name; - this.scheduler = scheduler; + this.context = context; + this.scheduler = context.getScheduler(); this.metrics = FSQueueMetrics.forQueue(getName(), parent, true, scheduler.getConf()); metrics.setMinShare(getMinShare()); metrics.setMaxShare(getMaxShare()); @@ -246,9 +248,14 @@ public abstract class FSQueue implements Queue, Schedulable { /** * Recomputes the shares for all child queues and applications based on this - * queue's current share + * queue's current share, and checks for starvation. */ - public abstract void recomputeShares(); + public abstract void updateInternal(boolean checkStarvation); + + public void update(Resource fairShare, boolean checkStarvation) { + setFairShare(fairShare); + updateInternal(checkStarvation); + } /** * Update the min/fair share preemption timeouts, threshold and preemption http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4eec258/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index c8e8406..c3684ce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -127,9 +127,9 @@ public class FairScheduler extends AbstractYarnScheduler<FSAppAttempt, FSSchedulerNode> { private FairSchedulerConfiguration conf; + private FSContext context; private Resource incrAllocation; private QueueManager queueMgr; - private volatile Clock clock; private boolean usePortForNodeName; private static final Log LOG = LogFactory.getLog(FairScheduler.class); @@ -155,6 +155,9 @@ public class FairScheduler extends @VisibleForTesting Thread schedulingThread; + + Thread preemptionThread; + // timeout to join when we stop this service protected final long THREAD_JOIN_TIMEOUT_MS = 1000; @@ -162,25 +165,6 @@ public class FairScheduler extends FSQueueMetrics rootMetrics; FSOpDurations fsOpDurations; - // Time when we last updated preemption vars - protected long lastPreemptionUpdateTime; - // Time we last ran preemptTasksIfNecessary - private long lastPreemptCheckTime; - - // Preemption related variables - protected boolean preemptionEnabled; - protected float preemptionUtilizationThreshold; - - // How often tasks are preempted - protected long preemptionInterval; - - // ms to wait before force killing stuff (must be longer than a couple - // of heartbeats to give task-kill commands a chance to act). - protected long waitTimeBeforeKill; - - // Containers whose AMs have been warned that they will be preempted soon. - private List<RMContainer> warnedContainers = new ArrayList<RMContainer>(); - private float reservableNodesRatio; // percentage of available nodes // an app can be reserved on @@ -214,12 +198,24 @@ public class FairScheduler extends public FairScheduler() { super(FairScheduler.class.getName()); - clock = SystemClock.getInstance(); + + context = new FSContext(); + context.setScheduler(this); + + context.setClock(SystemClock.getInstance()); allocsLoader = new AllocationFileLoaderService(); - queueMgr = new QueueManager(this); + + queueMgr = new QueueManager(context); + context.setQueueManager(queueMgr); + maxRunningEnforcer = new MaxRunningAppsEnforcer(this); } + @VisibleForTesting + public FSContext getContext() { + return context; + } + public boolean isAtLeastReservationThreshold( ResourceCalculator resourceCalculator, Resource resource) { return Resources.greaterThanOrEqual(resourceCalculator, @@ -300,7 +296,6 @@ public class FairScheduler extends } long start = getClock().getTime(); update(); - preemptTasksIfNecessary(); long duration = getClock().getTime() - start; fsOpDurations.addUpdateThreadRunDuration(duration); } catch (InterruptedException ie) { @@ -340,24 +335,22 @@ public class FairScheduler extends */ protected synchronized void update() { long start = getClock().getTime(); - updateStarvationStats(); // Determine if any queues merit preemption FSQueue rootQueue = queueMgr.getRootQueue(); // Recursively update demands for all queues rootQueue.updateDemand(); - Resource clusterResource = getClusterResource(); - rootQueue.setFairShare(clusterResource); - // Recursively compute fair shares for all queues - // and update metrics - rootQueue.recomputeShares(); + // Update fairshares and starvation stats. + rootQueue.update(getClusterResource(), shouldAttemptPreemption()); + + // Update metrics updateRootQueueMetrics(); if (LOG.isDebugEnabled()) { if (--updatesToSkipForDebug < 0) { updatesToSkipForDebug = UPDATE_DEBUG_FREQUENCY; - LOG.debug("Cluster Capacity: " + clusterResource + + LOG.debug("Cluster Capacity: " + getClusterResource() + " Allocations: " + rootMetrics.getAllocatedResources() + " Availability: " + Resource.newInstance( rootMetrics.getAvailableMB(), @@ -371,144 +364,6 @@ public class FairScheduler extends } /** - * Update the preemption fields for all QueueScheduables, i.e. the times since - * each queue last was at its guaranteed share and over its fair share - * threshold for each type of task. - */ - private void updateStarvationStats() { - lastPreemptionUpdateTime = clock.getTime(); - for (FSLeafQueue sched : queueMgr.getLeafQueues()) { - sched.updateStarvationStats(); - } - } - - /** - * Check for queues that need tasks preempted, either because they have been - * below their guaranteed share for minSharePreemptionTimeout or they have - * been below their fair share threshold for the fairSharePreemptionTimeout. If - * such queues exist, compute how many tasks of each type need to be preempted - * and then select the right ones using preemptTasks. - */ - protected synchronized void preemptTasksIfNecessary() { - if (!shouldAttemptPreemption()) { - return; - } - - long curTime = getClock().getTime(); - if (curTime - lastPreemptCheckTime < preemptionInterval) { - return; - } - lastPreemptCheckTime = curTime; - - Resource resToPreempt = Resources.clone(Resources.none()); - for (FSLeafQueue sched : queueMgr.getLeafQueues()) { - Resources.addTo(resToPreempt, resourceDeficit(sched, curTime)); - } - if (isResourceGreaterThanNone(resToPreempt)) { - preemptResources(resToPreempt); - } - } - - /** - * Preempt a quantity of resources. Each round, we start from the root queue, - * level-by-level, until choosing a candidate application. - * The policy for prioritizing preemption for each queue depends on its - * SchedulingPolicy: (1) fairshare/DRF, choose the ChildSchedulable that is - * most over its fair share; (2) FIFO, choose the childSchedulable that is - * latest launched. - * Inside each application, we further prioritize preemption by choosing - * containers with lowest priority to preempt. - * We make sure that no queue is placed below its fair share in the process. - */ - protected void preemptResources(Resource toPreempt) { - long start = getClock().getTime(); - if (Resources.equals(toPreempt, Resources.none())) { - return; - } - - // Scan down the list of containers we've already warned and kill them - // if we need to. Remove any containers from the list that we don't need - // or that are no longer running. - Iterator<RMContainer> warnedIter = warnedContainers.iterator(); - while (warnedIter.hasNext()) { - RMContainer container = warnedIter.next(); - if ((container.getState() == RMContainerState.RUNNING || - container.getState() == RMContainerState.ALLOCATED) && - isResourceGreaterThanNone(toPreempt)) { - warnOrKillContainer(container); - Resources.subtractFrom(toPreempt, container.getContainer().getResource()); - } else { - warnedIter.remove(); - } - } - - try { - // Reset preemptedResource for each app - for (FSLeafQueue queue : getQueueManager().getLeafQueues()) { - queue.resetPreemptedResources(); - } - - while (isResourceGreaterThanNone(toPreempt)) { - RMContainer container = - getQueueManager().getRootQueue().preemptContainer(); - if (container == null) { - break; - } else { - warnOrKillContainer(container); - warnedContainers.add(container); - Resources.subtractFrom( - toPreempt, container.getContainer().getResource()); - } - } - } finally { - // Clear preemptedResources for each app - for (FSLeafQueue queue : getQueueManager().getLeafQueues()) { - queue.clearPreemptedResources(); - } - } - - long duration = getClock().getTime() - start; - fsOpDurations.addPreemptCallDuration(duration); - } - - private boolean isResourceGreaterThanNone(Resource toPreempt) { - return (toPreempt.getMemorySize() > 0) || (toPreempt.getVirtualCores() > 0); - } - - protected void warnOrKillContainer(RMContainer container) { - ApplicationAttemptId appAttemptId = container.getApplicationAttemptId(); - FSAppAttempt app = getSchedulerApp(appAttemptId); - FSLeafQueue queue = app.getQueue(); - LOG.info("Preempting container (prio=" + container.getContainer().getPriority() + - "res=" + container.getContainer().getResource() + - ") from queue " + queue.getName()); - - Long time = app.getContainerPreemptionTime(container); - - if (time != null) { - // if we asked for preemption more than maxWaitTimeBeforeKill ms ago, - // proceed with kill - if (time + waitTimeBeforeKill < getClock().getTime()) { - ContainerStatus status = - SchedulerUtils.createPreemptedContainerStatus( - container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER); - - // TODO: Not sure if this ever actually adds this to the list of cleanup - // containers on the RMNode (see SchedulerNode.releaseContainer()). - super.completedContainer(container, status, RMContainerEventType.KILL); - if (LOG.isDebugEnabled()) { - LOG.debug("Killing container" + container + - " (after waiting for preemption for " + - (getClock().getTime() - time) + "ms)"); - } - } - } else { - // track the request in the FSAppAttempt itself - app.addPreemption(container, getClock().getTime()); - } - } - - /** * Return the resource amount that this queue is allowed to preempt, if any. * If the queue has been below its min share for at least its preemption * timeout, it should preempt the difference between its current share and @@ -531,12 +386,12 @@ public class FairScheduler extends resDueToMinShare = Resources.max(calc, clusterResource, Resources.none(), Resources.subtract(target, sched.getResourceUsage())); } - if (curTime - sched.getLastTimeAtFairShareThreshold() > fairShareTimeout) { - Resource target = Resources.componentwiseMin( - sched.getFairShare(), sched.getDemand()); - resDueToFairShare = Resources.max(calc, clusterResource, - Resources.none(), Resources.subtract(target, sched.getResourceUsage())); - } +// if (curTime - sched.getLastTimeAtFairShareThreshold() > fairShareTimeout) { +// Resource target = Resources.componentwiseMin( +// sched.getFairShare(), sched.getDemand()); +// resDueToFairShare = Resources.max(calc, clusterResource, +// Resources.none(), Resources.subtract(target, sched.getResourceUsage())); +// } Resource deficit = Resources.max(calc, clusterResource, resDueToMinShare, resDueToFairShare); if (Resources.greaterThan(calc, clusterResource, @@ -604,12 +459,13 @@ public class FairScheduler extends } public Clock getClock() { - return clock; + return context.getClock(); } @VisibleForTesting + @Deprecated // use FSContext.setClock void setClock(Clock clock) { - this.clock = clock; + context.setClock(clock); } public FairSchedulerEventLog getEventLog() { @@ -1208,15 +1064,22 @@ public class FairScheduler extends * Check if preemption is enabled and the utilization threshold for * preemption is met. * + * TODO (KK): Should we handle the case where usage is less than preemption + * threshold, but there are applications requesting resources on nodes that + * are otherwise occupied by long running applications over their + * fairshare? What if they are occupied by applications not over their + * fairshare? Does this mean YARN should not allocate all resources on a + * node to long-running services? + * * @return true if preemption should be attempted, false otherwise. */ private boolean shouldAttemptPreemption() { - if (preemptionEnabled) { - Resource clusterResource = getClusterResource(); - return (preemptionUtilizationThreshold < Math.max( - (float) rootMetrics.getAllocatedMB() / clusterResource.getMemorySize(), + if (context.isPreemptionEnabled()) { + return (context.getPreemptionUtilizationThreshold() < Math.max( + (float) rootMetrics.getAllocatedMB() / + getClusterResource().getMemorySize(), (float) rootMetrics.getAllocatedVirtualCores() / - clusterResource.getVirtualCores())); + getClusterResource().getVirtualCores())); } return false; } @@ -1400,15 +1263,10 @@ public class FairScheduler extends rackLocalityThreshold = this.conf.getLocalityThresholdRack(); nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs(); rackLocalityDelayMs = this.conf.getLocalityDelayRackMs(); - preemptionEnabled = this.conf.getPreemptionEnabled(); - preemptionUtilizationThreshold = - this.conf.getPreemptionUtilizationThreshold(); assignMultiple = this.conf.getAssignMultiple(); maxAssignDynamic = this.conf.isMaxAssignDynamic(); maxAssign = this.conf.getMaxAssign(); sizeBasedWeight = this.conf.getSizeBasedWeight(); - preemptionInterval = this.conf.getPreemptionInterval(); - waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill(); usePortForNodeName = this.conf.getUsePortForNodeName(); reservableNodesRatio = this.conf.getReservableNodes(); @@ -1425,8 +1283,7 @@ public class FairScheduler extends fsOpDurations = FSOpDurations.getInstance(true); // This stores per-application scheduling information - this.applications = new ConcurrentHashMap< - ApplicationId, SchedulerApplication<FSAppAttempt>>(); + this.applications = new ConcurrentHashMap<>(); this.eventLog = new FairSchedulerEventLog(); eventLog.init(this.conf); @@ -1447,6 +1304,10 @@ public class FairScheduler extends schedulingThread.setName("FairSchedulerContinuousScheduling"); schedulingThread.setDaemon(true); } + + if (this.conf.getPreemptionEnabled()) { + preemptionThread = new FSPreemptionThread(context); + } } allocsLoader.init(conf); @@ -1477,6 +1338,9 @@ public class FairScheduler extends Preconditions.checkNotNull(schedulingThread, "schedulingThread is null"); schedulingThread.start(); } + if (preemptionThread != null) { + preemptionThread.start(); + } allocsLoader.start(); } @@ -1505,6 +1369,10 @@ public class FairScheduler extends schedulingThread.join(THREAD_JOIN_TIMEOUT_MS); } } + if (preemptionThread != null) { + preemptionThread.interrupt(); + preemptionThread.join(THREAD_JOIN_TIMEOUT_MS); + } if (allocsLoader != null) { allocsLoader.stop(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4eec258/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java index aeadcf6..c7d368c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java @@ -50,16 +50,16 @@ public class QueueManager { QueueManager.class.getName()); public static final String ROOT_QUEUE = "root"; - - private final FairScheduler scheduler; + + private final FSContext context; private final Collection<FSLeafQueue> leafQueues = new CopyOnWriteArrayList<FSLeafQueue>(); private final Map<String, FSQueue> queues = new HashMap<String, FSQueue>(); private FSParentQueue rootQueue; - public QueueManager(FairScheduler scheduler) { - this.scheduler = scheduler; + public QueueManager(FSContext context) { + this.context = context; } public FSParentQueue getRootQueue() { @@ -68,7 +68,7 @@ public class QueueManager { public void initialize(Configuration conf) throws IOException, SAXException, AllocationConfigurationException, ParserConfigurationException { - rootQueue = new FSParentQueue("root", scheduler, null); + rootQueue = new FSParentQueue(context, null, "root"); queues.put(rootQueue.getName(), rootQueue); // Create the default queue @@ -215,12 +215,13 @@ public class QueueManager { // queue to create. // Now that we know everything worked out, make all the queues // and add them to the map. - AllocationConfiguration queueConf = scheduler.getAllocationConfiguration(); + AllocationConfiguration queueConf = + context.getScheduler().getAllocationConfiguration(); FSLeafQueue leafQueue = null; for (int i = newQueueNames.size()-1; i >= 0; i--) { String queueName = newQueueNames.get(i); if (i == 0 && queueType != FSQueueType.PARENT) { - leafQueue = new FSLeafQueue(name, scheduler, parent); + leafQueue = new FSLeafQueue(context, parent, name); try { leafQueue.setPolicy(queueConf.getDefaultSchedulingPolicy()); } catch (AllocationConfigurationException ex) { @@ -233,7 +234,7 @@ public class QueueManager { leafQueue.updatePreemptionVariables(); return leafQueue; } else { - FSParentQueue newParent = new FSParentQueue(queueName, scheduler, parent); + FSParentQueue newParent = new FSParentQueue(context, parent, queueName); try { newParent.setPolicy(queueConf.getDefaultSchedulingPolicy()); } catch (AllocationConfigurationException ex) { @@ -433,7 +434,7 @@ public class QueueManager { // Set scheduling policies and update queue metrics try { SchedulingPolicy policy = queueConf.getSchedulingPolicy(queue.getName()); - policy.initialize(scheduler.getClusterResource()); + policy.initialize(context.getClusterResource()); queue.setPolicy(policy); queueMetrics.setMaxApps(queueConf.getQueueMaxApps(queue.getName())); http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4eec258/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java index ad4e2e4..b8378f4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java @@ -89,7 +89,8 @@ public class TestFSLeafQueue extends FairSchedulerTestBase { thenReturn(Integer.MAX_VALUE); when(scheduler.allocConf.getSchedulingPolicy(queueName)) .thenReturn(SchedulingPolicy.DEFAULT_POLICY); - FSLeafQueue schedulable = new FSLeafQueue(queueName, scheduler, null); + FSLeafQueue schedulable = + new FSLeafQueue(scheduler.getContext(), null, queueName); assertEquals(schedulable.getMetrics().getMaxApps(), Integer.MAX_VALUE); assertEquals(schedulable.getMetrics().getSchedulingPolicy(), SchedulingPolicy.DEFAULT_POLICY.getName()); @@ -156,13 +157,13 @@ public class TestFSLeafQueue extends FairSchedulerTestBase { scheduler.getQueueManager().getLeafQueue("queueA", false); FSLeafQueue queueB = scheduler.getQueueManager().getLeafQueue("queueB", false); - assertFalse(queueA.isStarvedForMinShare()); - assertTrue(queueB.isStarvedForMinShare()); +// TODO: assertFalse(queueA.isStarvedForMinShare()); +// TODO: assertTrue(queueB.isStarvedForMinShare()); // Node checks in again, should allocate for B scheduler.handle(nodeEvent2); // Now B should have min share ( = demand here) - assertFalse(queueB.isStarvedForMinShare()); +// TODO: assertFalse(queueB.isStarvedForMinShare()); } @Test (timeout = 5000) @@ -227,11 +228,11 @@ public class TestFSLeafQueue extends FairSchedulerTestBase { // For queue B1, the fairSharePreemptionThreshold is 0.4, and the fair share // threshold is 1.6 * 1024 - assertFalse(queueB1.isStarvedForFairShare()); +// TODO: assertFalse(queueB1.isStarvedForFairShare()); // For queue B2, the fairSharePreemptionThreshold is 0.6, and the fair share // threshold is 2.4 * 1024 - assertTrue(queueB2.isStarvedForFairShare()); +// TODO: assertTrue(queueB2.isStarvedForFairShare()); // Node checks in again scheduler.handle(nodeEvent2); @@ -240,8 +241,8 @@ public class TestFSLeafQueue extends FairSchedulerTestBase { assertEquals(3 * 1024, queueB2.getResourceUsage().getMemorySize()); // Both queue B1 and queue B2 usages go to 3 * 1024 - assertFalse(queueB1.isStarvedForFairShare()); - assertFalse(queueB2.isStarvedForFairShare()); +// TODO: assertFalse(queueB1.isStarvedForFairShare()); +// TODO: assertFalse(queueB2.isStarvedForFairShare()); } @Test (timeout = 5000) @@ -305,7 +306,7 @@ public class TestFSLeafQueue extends FairSchedulerTestBase { // Verify that Queue us not starved for fair share.. // Since the Starvation logic now uses DRF when the policy = drf, The // Queue should not be starved - assertFalse(queueB.isStarvedForFairShare()); +// TODO: assertFalse(queueB.isStarvedForFairShare()); } @Test http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4eec258/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSParentQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSParentQueue.java index 3ae8f83..d76fdd5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSParentQueue.java @@ -39,13 +39,17 @@ public class TestFSParentQueue { public void setUp() throws Exception { conf = new FairSchedulerConfiguration(); FairScheduler scheduler = mock(FairScheduler.class); + FSContext context = mock(FSContext.class); + when(scheduler.getContext()).thenReturn(context); + when(context.getScheduler()).thenReturn(scheduler); + AllocationConfiguration allocConf = new AllocationConfiguration(conf); when(scheduler.getAllocationConfiguration()).thenReturn(allocConf); when(scheduler.getConf()).thenReturn(conf); SystemClock clock = SystemClock.getInstance(); when(scheduler.getClock()).thenReturn(clock); notEmptyQueues = new HashSet<FSQueue>(); - queueManager = new QueueManager(scheduler) { + queueManager = new QueueManager(context) { @Override public boolean isEmpty(FSQueue queue) { return !notEmptyQueues.contains(queue); http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4eec258/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java index 07a2dca..691b386 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java @@ -68,10 +68,10 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { private static class StubbedFairScheduler extends FairScheduler { public long lastPreemptMemory = -1; - @Override - protected void preemptResources(Resource toPreempt) { - lastPreemptMemory = toPreempt.getMemorySize(); - } +// @Override +// protected void preemptResources(Resource toPreempt) { +// lastPreemptMemory = toPreempt.getMemory(); +// } public void resetLastPreemptResources() { lastPreemptMemory = -1; @@ -216,7 +216,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { clock.tickSec(6); ((StubbedFairScheduler) scheduler).resetLastPreemptResources(); - scheduler.preemptTasksIfNecessary(); +// TODO(KK): scheduler.preemptTasksIfNecessary(); assertEquals("preemptResources() should have been called", 1024, ((StubbedFairScheduler) scheduler).lastPreemptMemory); @@ -232,7 +232,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { clock.tickSec(6); ((StubbedFairScheduler) scheduler).resetLastPreemptResources(); - scheduler.preemptTasksIfNecessary(); +// TODO(KK): scheduler.preemptTasksIfNecessary(); assertEquals("preemptResources() should not have been called", -1, ((StubbedFairScheduler) scheduler).lastPreemptMemory); @@ -248,7 +248,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { clock.tickSec(6); ((StubbedFairScheduler) scheduler).resetLastPreemptResources(); - scheduler.preemptTasksIfNecessary(); +// TODO(KK): scheduler.preemptTasksIfNecessary(); assertEquals("preemptResources() should have been called", 1024, ((StubbedFairScheduler) scheduler).lastPreemptMemory); } @@ -345,7 +345,8 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { scheduler.update(); // We should be able to claw back one container from queueA and queueB each. - scheduler.preemptResources(Resources.createResource(2 * 1024)); + +// TODO(KK): scheduler.preemptResources(Resources.createResource(2 * 1024)); assertEquals(2, scheduler.getSchedulerApp(app1).getLiveContainers().size()); assertEquals(2, scheduler.getSchedulerApp(app3).getLiveContainers().size()); @@ -365,7 +366,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { clock.tickSec(15); // Trigger a kill by insisting we want containers back - scheduler.preemptResources(Resources.createResource(2 * 1024)); +// TODO(KK): scheduler.preemptResources(Resources.createResource(2 * 1024)); // At this point the containers should have been killed (since we are not simulating AM) assertEquals(1, scheduler.getSchedulerApp(app2).getLiveContainers().size()); @@ -389,7 +390,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { "preempted.", set.isEmpty()); // Trigger a kill by insisting we want containers back - scheduler.preemptResources(Resources.createResource(2 * 1024)); +// TODO (KK): scheduler.preemptResources(Resources.createResource(2 * 1024)); // Pretend 15 seconds have passed clock.tickSec(15); @@ -398,7 +399,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { // For queueA (fifo), continue preempting from app2. // For queueB (fair), even app4 has a lowest priority container with p=4, it // still preempts from app3 as app3 is most over fair share. - scheduler.preemptResources(Resources.createResource(2 * 1024)); +// TODO (KK): scheduler.preemptResources(Resources.createResource(2 * 1024)); assertEquals(2, scheduler.getSchedulerApp(app1).getLiveContainers().size()); assertEquals(0, scheduler.getSchedulerApp(app2).getLiveContainers().size()); @@ -406,7 +407,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size()); // Now A and B are below fair share, so preemption shouldn't do anything - scheduler.preemptResources(Resources.createResource(2 * 1024)); +// TODO (KK): scheduler.preemptResources(Resources.createResource(2 * 1024)); assertTrue("App1 should have no container to be preempted", scheduler.getSchedulerApp(app1).getPreemptionContainers().isEmpty()); assertTrue("App2 should have no container to be preempted", @@ -489,7 +490,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { // verify if the 3 containers required by queueA2 are preempted in the same // round - scheduler.preemptResources(toPreempt); +// TODO (KK): scheduler.preemptResources(toPreempt); assertEquals(3, scheduler.getSchedulerApp(app1).getPreemptionContainers() .size()); stopResourceManager(); @@ -1089,7 +1090,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { assertEquals(2048, scheduler.resourceDeficit(schedD, clock.getTime()).getMemorySize()); - scheduler.preemptResources(Resources.createResource(2 * 1024)); +// TODO(KK): scheduler.preemptResources(Resources.createResource(2 * 1024)); // now only app2 is selected to be preempted assertTrue("App2 should have container to be preempted", !Collections.disjoint( @@ -1105,7 +1106,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { scheduler.getSchedulerApp(app3).getPreemptionContainers())); // Pretend 20 seconds have passed clock.tickSec(20); - scheduler.preemptResources(Resources.createResource(2 * 1024)); +// TODO (KK): scheduler.preemptResources(Resources.createResource(2 * 1024)); for (int i = 0; i < 3; i++) { NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1); scheduler.handle(nodeUpdate1); @@ -1258,7 +1259,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { assertEquals(2048, scheduler.resourceDeficit(schedA, clock.getTime()).getMemorySize()); - scheduler.preemptResources(Resources.createResource(2 * 1024)); +// TODO (KK): scheduler.preemptResources(Resources.createResource(2 * 1024)); // now none app is selected to be preempted assertTrue("App1 should have container to be preempted", Collections.disjoint( @@ -1274,7 +1275,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { scheduler.getSchedulerApp(app3).getPreemptionContainers())); // Pretend 20 seconds have passed clock.tickSec(20); - scheduler.preemptResources(Resources.createResource(2 * 1024)); +// TODO (KK): scheduler.preemptResources(Resources.createResource(2 * 1024)); for (int i = 0; i < 3; i++) { NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1); scheduler.handle(nodeUpdate1); @@ -1441,13 +1442,13 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { RMContainer rmContainer = app.getRMContainer(containerId1); // Create a preempt event and register for preemption - scheduler.warnOrKillContainer(rmContainer); +// TODO(KK): scheduler.warnOrKillContainer(rmContainer); // Wait for few clock ticks clock.tickSec(5); // preempt now - scheduler.warnOrKillContainer(rmContainer); +// TODO(KK): scheduler.warnOrKillContainer(rmContainer); // Trigger container rescheduled event scheduler.handle(new ContainerPreemptEvent(appAttemptId, rmContainer, http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4eec258/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java index 6cca19a..21cb91f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java @@ -50,6 +50,11 @@ public class TestMaxRunningAppsEnforcer { Configuration conf = new Configuration(); clock = new ControlledClock(); scheduler = mock(FairScheduler.class); + + FSContext context = mock(FSContext.class); + when(scheduler.getContext()).thenReturn(context); + when(context.getScheduler()).thenReturn(scheduler); + when(scheduler.getConf()).thenReturn( new FairSchedulerConfiguration(conf)); when(scheduler.getClock()).thenReturn(clock); @@ -57,7 +62,7 @@ public class TestMaxRunningAppsEnforcer { conf); when(scheduler.getAllocationConfiguration()).thenReturn(allocConf); - queueManager = new QueueManager(scheduler); + queueManager = new QueueManager(scheduler.getContext()); queueManager.initialize(conf); queueMaxApps = allocConf.queueMaxApps; userMaxApps = allocConf.userMaxApps; http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4eec258/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManager.java index a9b27a1..703da4f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManager.java @@ -38,13 +38,17 @@ public class TestQueueManager { public void setUp() throws Exception { conf = new FairSchedulerConfiguration(); FairScheduler scheduler = mock(FairScheduler.class); + FSContext context = mock(FSContext.class); + when(scheduler.getContext()).thenReturn(context); + when(context.getScheduler()).thenReturn(scheduler); + AllocationConfiguration allocConf = new AllocationConfiguration(conf); when(scheduler.getAllocationConfiguration()).thenReturn(allocConf); when(scheduler.getConf()).thenReturn(conf); SystemClock clock = SystemClock.getInstance(); when(scheduler.getClock()).thenReturn(clock); notEmptyQueues = new HashSet<FSQueue>(); - queueManager = new QueueManager(scheduler) { + queueManager = new QueueManager(context) { @Override public boolean isEmpty(FSQueue queue) { return !notEmptyQueues.contains(queue); http://git-wip-us.apache.org/repos/asf/hadoop/blob/e4eec258/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/TestFairSchedulerQueueInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/TestFairSchedulerQueueInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/TestFairSchedulerQueueInfo.java index 67d7340..128150b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/TestFairSchedulerQueueInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/TestFairSchedulerQueueInfo.java @@ -45,7 +45,7 @@ public class TestFairSchedulerQueueInfo { when(scheduler.getClusterResource()).thenReturn(Resource.newInstance(1, 1)); SystemClock clock = SystemClock.getInstance(); when(scheduler.getClock()).thenReturn(clock); - QueueManager queueManager = new QueueManager(scheduler); + QueueManager queueManager = new QueueManager(scheduler.getContext()); queueManager.initialize(conf); FSQueue testQueue = queueManager.getLeafQueue("test", true); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org