WIP. Prototype finished. Adding tests
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fe5bf79d Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fe5bf79d Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fe5bf79d Branch: refs/heads/fs-preemption Commit: fe5bf79db895974b0088f92ec71b623e217dfcf5 Parents: 450f956 Author: Karthik Kambatla <ka...@apache.org> Authored: Wed Jun 15 19:07:03 2016 -0700 Committer: Karthik Kambatla <ka...@apache.org> Committed: Wed Jun 15 19:07:03 2016 -0700 ---------------------------------------------------------------------- .../markdown/release/1.3.0/CHANGES.1.3.0.md | 2 +- .../scheduler/AppSchedulingInfo.java | 18 +- .../scheduler/fair/FSAppAttempt.java | 36 ++- .../scheduler/fair/FSContext.java | 12 +- .../scheduler/fair/FSLeafQueue.java | 31 ++- .../scheduler/fair/FSPreemptionThread.java | 18 +- .../scheduler/fair/FSStarvedApps.java | 69 ++++++ .../scheduler/fair/FairSchedulerTestBase.java | 2 +- .../fair/TestFairSchedulerPreemption.java | 235 ++++++++++--------- 9 files changed, 277 insertions(+), 146 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe5bf79d/hadoop-common-project/hadoop-common/src/site/markdown/release/1.3.0/CHANGES.1.3.0.md ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/release/1.3.0/CHANGES.1.3.0.md b/hadoop-common-project/hadoop-common/src/site/markdown/release/1.3.0/CHANGES.1.3.0.md index 1a12646..4b3d17e 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/release/1.3.0/CHANGES.1.3.0.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/release/1.3.0/CHANGES.1.3.0.md @@ -106,7 +106,7 @@ | [MAPREDUCE-5968](https://issues.apache.org/jira/browse/MAPREDUCE-5968) | Work directory is not deleted when downloadCacheObject throws IOException | Major | mrv1 | zhihai xu | zhihai xu | | [MAPREDUCE-5966](https://issues.apache.org/jira/browse/MAPREDUCE-5966) | MR1 FairScheduler use of custom weight adjuster is not thread safe for comparisons | Major | scheduler | Anubhav Dhoot | Anubhav Dhoot | | [MAPREDUCE-5877](https://issues.apache.org/jira/browse/MAPREDUCE-5877) | Inconsistency between JT/TT for tasks taking a long time to launch | Critical | jobtracker, tasktracker | Karthik Kambatla | Karthik Kambatla | -| [MAPREDUCE-5822](https://issues.apache.org/jira/browse/MAPREDUCE-5822) | FairScheduler does not preempt due to fairshare-starvation when fairshare is 1 | Major | scheduler | Anubhav Dhoot | Anubhav Dhoot | +| [MAPREDUCE-5822](https://issues.apache.org/jira/browse/MAPREDUCE-5822) | FairScheduler does not preempt due to fairshare-fairshareStarvation when fairshare is 1 | Major | scheduler | Anubhav Dhoot | Anubhav Dhoot | | [MAPREDUCE-5808](https://issues.apache.org/jira/browse/MAPREDUCE-5808) | Port output replication factor configurable for terasort to Hadoop 1.x | Minor | examples | Chuan Liu | Chuan Liu | | [MAPREDUCE-5710](https://issues.apache.org/jira/browse/MAPREDUCE-5710) | Backport MAPREDUCE-1305 to branch-1 | Major | . | Yongjun Zhang | Yongjun Zhang | | [MAPREDUCE-5702](https://issues.apache.org/jira/browse/MAPREDUCE-5702) | TaskLogServlet#printTaskLog has spurious HTML closing tags | Trivial | task | Karthik Kambatla | Robert Kanter | http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe5bf79d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index 463bebd..94e1c68 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -51,6 +51,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.util.resource.Resources; +import static javax.swing.UIManager.get; + /** * This class keeps track of all the consumption of an application. This also * keeps track of current running/completed containers for the application. @@ -78,7 +80,7 @@ public class AppSchedulingInfo { private Set<String> userBlacklist = new HashSet<>(); private Set<String> requestedPartitions = new HashSet<>(); - final Set<Priority> priorities = new TreeSet<>(COMPARATOR); + final TreeSet<Priority> priorities = new TreeSet<>(COMPARATOR); final Map<Priority, Map<String, ResourceRequest>> resourceRequestMap = new ConcurrentHashMap<>(); final Map<NodeId, Map<Priority, Map<ContainerId, @@ -516,6 +518,20 @@ public class AppSchedulingInfo { return (nodeRequests == null) ? null : nodeRequests.get(resourceName); } + /** + * Method to retun the next resource request to be serviced. + * In the initial implementation, we just pick any ResourceRequest + * corresponding to the highest priority. + */ + @Unstable + public synchronized ResourceRequest getNextResourceRequest() { + for (ResourceRequest rr: + resourceRequestMap.get(priorities.first()).values()) { + return rr; + } + return null; + } + public synchronized Resource getResource(Priority priority) { ResourceRequest request = getResourceRequest(priority, ResourceRequest.ANY); return (request == null) ? null : request.getCapability(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe5bf79d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 5065881..7090206 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -82,6 +82,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt private RMContainerComparator comparator = new RMContainerComparator(); // Preemption related variables + private Resource fairshareStarvation = Resources.none(); + private Resource minshareStarvation = Resources.none(); private Resource preemptedResources = Resources.createResource(0); private final Set<RMContainer> containersToPreempt = new HashSet<>(); private long lastTimeAtFairShare; @@ -426,7 +428,24 @@ public class FSAppAttempt extends SchedulerApplicationAttempt allowedLocalityLevel.put(priority, level); } - // related methods + @Override + public FSLeafQueue getQueue() { + return (FSLeafQueue)super.getQueue(); + } + + // Preemption related methods + public Resource getStarvation() { + return Resources.add(fairshareStarvation, minshareStarvation); + } + + public void setMinshareStarvation(Resource starvation) { + this.minshareStarvation = starvation; + } + + public void resetMinshareStarvation() { + this.minshareStarvation = Resources.none(); + } + public void addPreemption(RMContainer container) { containersToPreempt.add(container); Resources.addTo(preemptedResources, container.getAllocatedResource()); @@ -436,10 +455,6 @@ public class FSAppAttempt extends SchedulerApplicationAttempt return containersToPreempt; } - @Override - public FSLeafQueue getQueue() { - return (FSLeafQueue)super.getQueue(); - } public Resource getPreemptedResources() { return preemptedResources; @@ -867,7 +882,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt } /** - * Helper method that computes the extent of fairshare starvation. + * Helper method that computes the extent of fairshare fairshareStarvation. */ Resource fairShareStarvation() { Resource threshold = Resources.multiply( @@ -885,16 +900,15 @@ public class FSAppAttempt extends SchedulerApplicationAttempt if (starved && (now - lastTimeAtFairShare > fsQueue.getFairSharePreemptionTimeout())) { - // Queue is starved for longer than preemption-timeout - return starvation; + this.fairshareStarvation = starvation; } else { - return Resources.none(); + this.fairshareStarvation = Resources.none(); } + return this.fairshareStarvation; } public ResourceRequest getNextResourceRequest() { - // TODO (KK): Return highest priority resource request - return null; + return appSchedulingInfo.getNextResourceRequest(); } /* Schedulable methods implementation */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe5bf79d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSContext.java index eccbd2d..5222a15 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSContext.java @@ -34,7 +34,7 @@ public class FSContext { // Preemption-related info private boolean preemptionEnabled = false; private float preemptionUtilizationThreshold; - private PriorityBlockingQueue<FSAppAttempt> starvedApps; + private FSStarvedApps starvedApps; public FairScheduler getScheduler() { return scheduler; @@ -73,10 +73,14 @@ public class FSContext { public void setPreemptionEnabled() { this.preemptionEnabled = true; if (starvedApps == null) { - starvedApps = new PriorityBlockingQueue<>(); + starvedApps = new FSStarvedApps(); } } + public FSStarvedApps getStarvedApps() { + return starvedApps; + } + public float getPreemptionUtilizationThreshold() { return preemptionUtilizationThreshold; } @@ -85,8 +89,4 @@ public class FSContext { float preemptionUtilizationThreshold) { this.preemptionUtilizationThreshold = preemptionUtilizationThreshold; } - - public PriorityBlockingQueue<FSAppAttempt> getStarvedApps() { - return starvedApps; - } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe5bf79d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index bc2a7c1..28e8683 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -28,7 +28,6 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.TreeSet; -import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -46,6 +45,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicat import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy; import org.apache.hadoop.yarn.util.resource.Resources; +import static org.apache.hadoop.yarn.util.resource.Resources.none; + @Private @Unstable public class FSLeafQueue extends FSQueue { @@ -254,14 +255,14 @@ public class FSLeafQueue extends FSQueue { private void identifyStarvedApplications() { // First identify starved applications and track total amount of // starvation (in resources) - Resource fairShareStarvation = Resources.clone(Resources.none()); + Resource fairShareStarvation = Resources.clone(none()); TreeSet<FSAppAttempt> appsWithDemand = fetchAppsWithDemand(); for (FSAppAttempt app : appsWithDemand) { Resource appStarvation = app.fairShareStarvation(); if (Resources.equals(Resources.none(), appStarvation)) { break; } else { - context.getStarvedApps().add(app); + context.getStarvedApps().addStarvedApp(app); Resources.addTo(fairShareStarvation, appStarvation); } } @@ -276,10 +277,16 @@ public class FSLeafQueue extends FSQueue { // the remaining minshare for (FSAppAttempt app : appsWithDemand) { if (Resources.greaterThan(policy.getResourceCalculator(), - context.getClusterResource(), minShareStarvation, Resources.none())) { - context.getStarvedApps().add(app); - Resources.subtractFrom(minShareStarvation, - Resources.subtract(app.getDemand(), app.getResourceUsage())); + context.getClusterResource(), minShareStarvation, none())) { + Resource appPendingDemand = + Resources.subtract(app.getDemand(), app.getResourceUsage()); + Resources.subtractFrom(minShareStarvation, appPendingDemand); + app.setMinshareStarvation(appPendingDemand); + context.getStarvedApps().addStarvedApp(app); + } else { + // Reset minshare starvation in case we had set it in a previous + // iteration + app.resetMinshareStarvation(); } } } @@ -356,7 +363,7 @@ public class FSLeafQueue extends FSQueue { @Override public Resource assignContainer(FSSchedulerNode node) { - Resource assigned = Resources.none(); + Resource assigned = none(); if (LOG.isDebugEnabled()) { LOG.debug("Node " + node.getNodeName() + " offered to queue: " + getName() + " fairShare: " + getFairShare()); @@ -371,7 +378,7 @@ public class FSLeafQueue extends FSQueue { continue; } assigned = sched.assignContainer(node); - if (!assigned.equals(Resources.none())) { + if (!assigned.equals(none())) { if (LOG.isDebugEnabled()) { LOG.debug("Assigned container in queue:" + getName() + " " + "container:" + assigned); @@ -389,7 +396,7 @@ public class FSLeafQueue extends FSQueue { try { for (FSAppAttempt app : runnableApps) { Resource pending = app.getAppAttemptResourceUsage().getPending(); - if (!pending.equals(Resources.none())) { + if (!pending.equals(none())) { pendingForResourceApps.add(app); } } @@ -593,7 +600,7 @@ public class FSLeafQueue extends FSQueue { Resource starvation = Resources.subtract(desiredShare, getResourceUsage()); boolean starved = Resources.greaterThan(policy.getResourceCalculator(), - scheduler.getClusterResource(), starvation, Resources.none()); + scheduler.getClusterResource(), starvation, none()); long now = context.getClock().getTime(); if (!starved) { @@ -604,7 +611,7 @@ public class FSLeafQueue extends FSQueue { (now - lastTimeAtMinShare > getMinSharePreemptionTimeout())) { return starvation; } else { - return Resources.none(); + return Resources.clone(Resources.none()); } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe5bf79d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java index 766fd5a..c4cd950 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java @@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.util.resource.Resources; import java.util.ArrayList; +import java.util.Comparator; import java.util.List; import java.util.Timer; import java.util.TimerTask; @@ -62,6 +63,9 @@ public class FSPreemptionThread extends Thread { FSAppAttempt starvedApp; try{ starvedApp = context.getStarvedApps().take(); + if (Resources.none().equals(starvedApp.getStarvation())) { + continue; + } } catch (InterruptedException e) { LOG.info("Preemption thread interrupted! Exiting."); return; @@ -97,8 +101,17 @@ public class FSPreemptionThread extends Thread { // from apps over their fairshare FSSchedulerNode targetNode = null; for (FSSchedulerNode node : potentialNodes) { + FSAppAttempt nodeReservedApp = node.getReservedAppSchedulable(); + if (nodeReservedApp != null && !nodeReservedApp.equals(starvedApp)) { + // This node is already reserved by another app. Let us not consider + // this for preemption. + continue; + + // TODO (KK): If the nodeReservedApp is over its fairshare, may be it + // is okay to unreserve it if we find enough resources. + } containers.clear(); - Resource potential = Resources.clone(Resources.none()); + Resource potential = Resources.clone(node.getUnallocatedResource()); for (RMContainer container : node.getCopiedListOfRunningContainers()) { Resource containerResource = container.getAllocatedResource(); FSAppAttempt app = @@ -111,9 +124,6 @@ public class FSPreemptionThread extends Thread { Resources.addTo(potential, containerResource); } - // TODO (KK): Should we go through other app reservations if the - // containers alone are not enough to meet the starvedApp's requirements - // Check if we have already identified enough containers if (Resources.fitsIn(requestCapability, potential)) { break; http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe5bf79d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSStarvedApps.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSStarvedApps.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSStarvedApps.java new file mode 100644 index 0000000..5091e08 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSStarvedApps.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.yarn.util.resource.Resources; + +import java.util.Comparator; +import java.util.concurrent.PriorityBlockingQueue; + +/** + * Helper class to track starved apps. + * + * Initially, this uses a blocking queue. We could use other data structures + * in the future. This class also has some methods to simplify testing. + */ +public class FSStarvedApps { + private int numAppsAddedSoFar; + private PriorityBlockingQueue<FSAppAttempt> apps; + + public FSStarvedApps() { + apps = new PriorityBlockingQueue<>(10, new StarvationComparator()); + } + + public void addStarvedApp(FSAppAttempt app) { + if (!apps.contains(app)) { + apps.add(app); + numAppsAddedSoFar++; + } + } + + public FSAppAttempt take() throws InterruptedException { + return apps.take(); + } + + private static class StarvationComparator implements + Comparator<FSAppAttempt> { + @Override + public int compare(FSAppAttempt app1, FSAppAttempt app2) { + return Resources.fitsIn(app1.getStarvation(), app2.getStarvation()) + ? -1 : 1; + } + } + + @VisibleForTesting + public int getNumAppsAddedSoFar() { + return numAppsAddedSoFar; + } + + @VisibleForTesting + public int numStarvedApps() { + return apps.size(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe5bf79d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java index ec0e6aa..f111165 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java @@ -72,7 +72,7 @@ public class FairSchedulerTestBase { // Helper methods public Configuration createConfiguration() { - Configuration conf = new YarnConfiguration(); + conf = new YarnConfiguration(); conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class, ResourceScheduler.class); conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0); http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe5bf79d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java index 691b386..7ac1ff9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java @@ -65,100 +65,49 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { private ControlledClock clock; - private static class StubbedFairScheduler extends FairScheduler { - public long lastPreemptMemory = -1; - -// @Override -// protected void preemptResources(Resource toPreempt) { -// lastPreemptMemory = toPreempt.getMemory(); -// } - - public void resetLastPreemptResources() { - lastPreemptMemory = -1; - } - } - - public Configuration createConfiguration() { - Configuration conf = super.createConfiguration(); - conf.setClass(YarnConfiguration.RM_SCHEDULER, StubbedFairScheduler.class, - ResourceScheduler.class); - conf.setBoolean(FairSchedulerConfiguration.PREEMPTION, true); - conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); - return conf; - } - @Before - public void setup() throws IOException { + public void setUp() throws IOException { conf = createConfiguration(); + writeBasicAllocFile(); + resourceManager = new MockRM(conf); + resourceManager.start(); + scheduler = (FairScheduler) resourceManager.getResourceScheduler(); clock = new ControlledClock(); + scheduler.setClock(clock); } @After - public void teardown() { + public void tearDown() { if (resourceManager != null) { resourceManager.stop(); resourceManager = null; } - conf = null; - } - - private void startResourceManagerWithStubbedFairScheduler(float utilizationThreshold) { - conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, - utilizationThreshold); - resourceManager = new MockRM(conf); - resourceManager.start(); - - assertTrue( - resourceManager.getResourceScheduler() instanceof StubbedFairScheduler); - scheduler = (FairScheduler)resourceManager.getResourceScheduler(); - - scheduler.setClock(clock); - scheduler.updateInterval = 60 * 1000; } - // YARN-4648: The starting code for ResourceManager mock is originated from - // TestFairScheduler. It should be keep as it was to guarantee no changing - // behaviour of ResourceManager preemption. - private void startResourceManagerWithRealFairScheduler() { - scheduler = new FairScheduler(); - conf = new YarnConfiguration(); - conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class, - ResourceScheduler.class); - conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0); - conf.setInt(FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, - 1024); - conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 10240); - conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, false); + @Override + public Configuration createConfiguration() { + super.createConfiguration(); + conf.setBoolean(FairSchedulerConfiguration.PREEMPTION, true); conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 0f); - conf.setFloat( - FairSchedulerConfiguration - .RM_SCHEDULER_RESERVATION_THRESHOLD_INCERMENT_MULTIPLE, - TEST_RESERVATION_THRESHOLD); - - resourceManager = new MockRM(conf); - - // TODO: This test should really be using MockRM. For now starting stuff - // that is needed at a bare minimum. - ((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start(); - resourceManager.getRMContext().getStateStore().start(); - - // to initialize the master key - resourceManager.getRMContext().getContainerTokenSecretManager().rollMasterKey(); - - scheduler.setRMContext(resourceManager.getRMContext()); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + return conf; } - private void stopResourceManager() { - if (scheduler != null) { - scheduler.stop(); - scheduler = null; - } - if (resourceManager != null) { - resourceManager.stop(); - resourceManager = null; - } - QueueMetrics.clearQueueMetrics(); - DefaultMetricsSystem.shutdown(); + private void writeBasicAllocFile() throws IOException { + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println("<?xml version=\"1.0\"?>"); + out.println("<allocations>"); + out.println("<queueMaxAMShareDefault>-1.0</queueMaxAMShareDefault>"); + out.println("<defaultQueueSchedulingPolicy>fair</defaultQueueSchedulingPolicy>"); + out.println("<queue name=\"root\">"); + out.println(" <schedulingPolicy>drf</schedulingPolicy>"); + out.println(" <weight>1.0</weight>"); + out.println(" <fairSharePreemptionTimeout>2</fairSharePreemptionTimeout>"); + out.println(" <minSharePreemptionTimeout>1</minSharePreemptionTimeout>"); + out.println(" <fairSharePreemptionThreshold>1</fairSharePreemptionThreshold>"); + out.println("</queue>"); + out.println("</allocations>"); + out.close(); } private void registerNodeAndSubmitApp( @@ -176,7 +125,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { createSchedulingRequest(appMemory, "queueA", "user1", appContainers); scheduler.update(); // Sufficient node check-ins to fully schedule containers - for (int i = 0; i < 3; i++) { + for (int i = 0; i < appContainers + 1; i++) { NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1); scheduler.handle(nodeUpdate1); } @@ -186,6 +135,71 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { } @Test + public void testPreemptionEnabled() { + assertTrue(scheduler.getContext().isPreemptionEnabled()); + } + + @Test + public void testIdentifyMinshareStarvation() throws Exception { + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println("<?xml version=\"1.0\"?>"); + out.println("<allocations>"); + out.println("<queue name=\"queueA\">"); + out.println("<weight>1</weight>"); + out.println("</queue>"); + out.println("<queue name=\"queueB\">"); + out.println("<weight>1</weight>"); + out.println("<minResources>1024mb,0vcores</minResources>"); + out.println("</queue>"); + out.println("<defaultMinSharePreemptionTimeout>1</defaultMinSharePreemptionTimeout>"); + out.println("<fairSharePreemptionTimeout>50</fairSharePreemptionTimeout>"); + out.println("</allocations>"); + out.close(); + clock.tickSec(5); + Thread.sleep(100); + + // Create node with 4GB memory and 4 vcores + registerNodeAndSubmitApp(4 * 1024, 4, 4, 1024); + scheduler.update(); + + // Verify submitting another request triggers preemption + createSchedulingRequest(1024, "queueB", "user1", 1, 1); + clock.tickSec(5); + scheduler.update(); + + assertEquals(1, scheduler.getContext().getStarvedApps().getNumAppsAddedSoFar()); + } + + @Test + public void testIdentifyFairshareStarvation() throws Exception { + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println("<?xml version=\"1.0\"?>"); + out.println("<allocations>"); + out.println("<queue name=\"queueA\">"); + out.println("<weight>1</weight>"); + out.println("</queue>"); + out.println("<queue name=\"queueB\">"); + out.println("<weight>1</weight>"); + out.println("<fairSharePreemptionTimeout>10</fairSharePreemptionTimeout>"); + out.println("</queue>"); + out.println("</allocations>"); + out.close(); + clock.tickSec(5); + Thread.sleep(100); + + // Create node with 4GB memory and 4 vcores + registerNodeAndSubmitApp(4 * 1024, 4, 4, 1024); + scheduler.update(); + + // Verify submitting another request triggers preemption + createSchedulingRequest(1024, "queueB", "user1", 1, 1); + clock.tickSec(2); + scheduler.update(); + + assertEquals(1, scheduler.getContext().getStarvedApps().getNumAppsAddedSoFar()); + } + + @Test public void testPreemptionWithFreeResources() throws Exception { PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); out.println("<?xml version=\"1.0\"?>"); @@ -206,7 +220,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { out.println("</allocations>"); out.close(); - startResourceManagerWithStubbedFairScheduler(0f); +// startResourceManagerWithStubbedFairScheduler(0f); // Create node with 4GB memory and 4 vcores registerNodeAndSubmitApp(4 * 1024, 4, 2, 1024); @@ -215,14 +229,14 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { scheduler.update(); clock.tickSec(6); - ((StubbedFairScheduler) scheduler).resetLastPreemptResources(); +// ((StubbedFairScheduler) scheduler).resetLastPreemptResources(); // TODO(KK): scheduler.preemptTasksIfNecessary(); - assertEquals("preemptResources() should have been called", 1024, - ((StubbedFairScheduler) scheduler).lastPreemptMemory); +// assertEquals("preemptResources() should have been called", 1024, +// ((StubbedFairScheduler) scheduler).lastPreemptMemory); resourceManager.stop(); - startResourceManagerWithStubbedFairScheduler(0.8f); +// startResourceManagerWithStubbedFairScheduler(0.8f); // Create node with 4GB memory and 4 vcores registerNodeAndSubmitApp(4 * 1024, 4, 3, 1024); @@ -231,14 +245,14 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { scheduler.update(); clock.tickSec(6); - ((StubbedFairScheduler) scheduler).resetLastPreemptResources(); +// ((StubbedFairScheduler) scheduler).resetLastPreemptResources(); // TODO(KK): scheduler.preemptTasksIfNecessary(); - assertEquals("preemptResources() should not have been called", -1, - ((StubbedFairScheduler) scheduler).lastPreemptMemory); +// assertEquals("preemptResources() should not have been called", -1, +// ((StubbedFairScheduler) scheduler).lastPreemptMemory); resourceManager.stop(); - startResourceManagerWithStubbedFairScheduler(0.7f); +// startResourceManagerWithStubbedFairScheduler(0.7f); // Create node with 4GB memory and 4 vcores registerNodeAndSubmitApp(4 * 1024, 4, 3, 1024); @@ -247,18 +261,19 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { scheduler.update(); clock.tickSec(6); - ((StubbedFairScheduler) scheduler).resetLastPreemptResources(); +// ((StubbedFairScheduler) scheduler).resetLastPreemptResources(); // TODO(KK): scheduler.preemptTasksIfNecessary(); - assertEquals("preemptResources() should have been called", 1024, - ((StubbedFairScheduler) scheduler).lastPreemptMemory); +// assertEquals("preemptResources() should have been called", 1024, +// ((StubbedFairScheduler) scheduler).lastPreemptMemory); } + @Test (timeout = 5000) /** * Make sure containers are chosen to be preempted in the correct order. */ public void testChoiceOfPreemptedContainers() throws Exception { - startResourceManagerWithRealFairScheduler(); + //startResourceManagerWithRealFairScheduler(); conf.setLong(FairSchedulerConfiguration.PREEMPTION_INTERVAL, 5000); conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10000); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE + ".allocation.file", ALLOC_FILE); @@ -416,12 +431,12 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { scheduler.getSchedulerApp(app3).getPreemptionContainers().isEmpty()); assertTrue("App4 should have no container to be preempted", scheduler.getSchedulerApp(app4).getPreemptionContainers().isEmpty()); - stopResourceManager(); +// stopResourceManager(); } @Test public void testPreemptionIsNotDelayedToNextRound() throws Exception { - startResourceManagerWithRealFairScheduler(); +// startResourceManagerWithRealFairScheduler(); conf.setLong(FairSchedulerConfiguration.PREEMPTION_INTERVAL, 5000); conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10000); @@ -493,7 +508,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { // TODO (KK): scheduler.preemptResources(toPreempt); assertEquals(3, scheduler.getSchedulerApp(app1).getPreemptionContainers() .size()); - stopResourceManager(); +// stopResourceManager(); } @Test (timeout = 5000) @@ -501,7 +516,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { * Tests the timing of decision to preempt tasks. */ public void testPreemptionDecision() throws Exception { - startResourceManagerWithRealFairScheduler(); +// startResourceManagerWithRealFairScheduler(); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); ControlledClock clock = new ControlledClock(); @@ -629,7 +644,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { 1536 , scheduler.resourceDeficit(schedC, clock.getTime()).getMemorySize()); assertEquals( 1536, scheduler.resourceDeficit(schedD, clock.getTime()).getMemorySize()); - stopResourceManager(); +// stopResourceManager(); } @Test @@ -637,7 +652,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { * Tests the timing of decision to preempt tasks. */ public void testPreemptionDecisionWithDRF() throws Exception { - startResourceManagerWithRealFairScheduler(); +// startResourceManagerWithRealFairScheduler(); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); ControlledClock clock = new ControlledClock(); @@ -780,7 +795,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { assertEquals(1536, res.getMemorySize()); // Demand = 6, but fair share = 3 assertEquals(3, res.getVirtualCores()); - stopResourceManager(); +// stopResourceManager(); } @Test @@ -788,7 +803,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { * Tests the various timing of decision to preempt tasks. */ public void testPreemptionDecisionWithVariousTimeout() throws Exception { - startResourceManagerWithRealFairScheduler(); +// startResourceManagerWithRealFairScheduler(); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); ControlledClock clock = new ControlledClock(); scheduler.setClock(clock); @@ -963,7 +978,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { 1536, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemorySize()); assertEquals( 1536, scheduler.resourceDeficit(queueC, clock.getTime()).getMemorySize()); - stopResourceManager(); +// stopResourceManager(); } @Test @@ -981,7 +996,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { * 5, Only preemptable queue(queueB) would be preempted. */ public void testPreemptionDecisionWithNonPreemptableQueue() throws Exception { - startResourceManagerWithRealFairScheduler(); +// startResourceManagerWithRealFairScheduler(); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); ControlledClock clock = new ControlledClock(); scheduler.setClock(clock); @@ -1125,7 +1140,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { assertEquals(2, scheduler.getSchedulerApp(app2).getLiveContainers().size()); assertEquals(4, scheduler.getSchedulerApp(app3).getLiveContainers().size()); assertEquals(2, scheduler.getSchedulerApp(app4).getLiveContainers().size()); - stopResourceManager(); +// stopResourceManager(); } @Test @@ -1147,7 +1162,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { */ public void testPreemptionDecisionWhenPreemptionDisabledOnAllQueues() throws Exception { - startResourceManagerWithRealFairScheduler(); +// startResourceManagerWithRealFairScheduler(); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); ControlledClock clock = new ControlledClock(); scheduler.setClock(clock); @@ -1294,12 +1309,12 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { assertEquals(4, scheduler.getSchedulerApp(app2).getLiveContainers().size()); assertEquals(4, scheduler.getSchedulerApp(app3).getLiveContainers().size()); assertEquals(0, scheduler.getSchedulerApp(app4).getLiveContainers().size()); - stopResourceManager(); +// stopResourceManager(); } @Test public void testBackwardsCompatiblePreemptionConfiguration() throws Exception { - startResourceManagerWithRealFairScheduler(); +// startResourceManagerWithRealFairScheduler(); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); @@ -1386,12 +1401,12 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { assertEquals(25000, queueMgr.getQueue("root") .getFairSharePreemptionTimeout()); - stopResourceManager(); +// stopResourceManager(); } @Test(timeout = 5000) public void testRecoverRequestAfterPreemption() throws Exception { - startResourceManagerWithRealFairScheduler(); +// startResourceManagerWithRealFairScheduler(); conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10); ControlledClock clock = new ControlledClock(); @@ -1473,6 +1488,6 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { // Now with updated ResourceRequest, a container is allocated for AM. Assert.assertTrue(containers.size() == 1); - stopResourceManager(); +// stopResourceManager(); } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org