Repository: hadoop Updated Branches: refs/heads/branch-2 0c2d996c2 -> 7adffad2b
YARN-2881. [YARN-2574] Implement PlanFollower for FairScheduler. (Anubhav Dhoot via kasha) (cherry picked from commit 0c4b11267717eb451fa6ed4c586317f2db32fbd5) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7adffad2 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7adffad2 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7adffad2 Branch: refs/heads/branch-2 Commit: 7adffad2bb17338c266fe1d3320f59b0c89fce4a Parents: 0c2d996 Author: Karthik Kambatla <ka...@apache.org> Authored: Tue Jan 6 04:41:45 2015 +0530 Committer: Karthik Kambatla <ka...@apache.org> Committed: Tue Jan 6 04:42:55 2015 +0530 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 2 + .../reservation/AbstractReservationSystem.java | 2 + .../AbstractSchedulerPlanFollower.java | 3 +- .../reservation/FairSchedulerPlanFollower.java | 141 +++++++++++++ .../reservation/ReservationConstants.java | 28 +++ .../scheduler/capacity/CapacityScheduler.java | 6 +- .../scheduler/capacity/PlanQueue.java | 2 - .../scheduler/fair/AllocationConfiguration.java | 14 ++ .../scheduler/fair/FSLeafQueue.java | 10 + .../scheduler/fair/FairScheduler.java | 104 +++++++++- .../scheduler/fair/QueueManager.java | 13 +- .../fair/ReservationQueueConfiguration.java | 11 + .../reservation/ReservationSystemTestUtil.java | 26 ++- .../TestCapacitySchedulerPlanFollower.java | 3 +- .../reservation/TestFairReservationSystem.java | 55 ++--- .../TestFairSchedulerPlanFollower.java | 203 +++++++++++++++++++ .../TestCapacitySchedulerDynamicBehavior.java | 3 +- .../scheduler/fair/FairSchedulerTestBase.java | 4 +- .../fair/TestFairSchedulerPreemption.java | 3 +- 19 files changed, 574 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/7adffad2/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index c737a9a..8139c72 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -38,6 +38,8 @@ Release 2.7.0 - UNRELEASED YARN-2738. [YARN-2574] Add FairReservationSystem for FairScheduler. (Anubhav Dhoot via kasha) + YARN-2881. [YARN-2574] Implement PlanFollower for FairScheduler. + (Anubhav Dhoot via kasha) IMPROVEMENTS http://git-wip-us.apache.org/repos/asf/hadoop/blob/7adffad2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java index fa0835a..8a15ac6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java @@ -204,6 +204,8 @@ public abstract class AbstractReservationSystem extends AbstractService // currently only capacity scheduler is supported if (scheduler instanceof CapacityScheduler) { return CapacitySchedulerPlanFollower.class.getName(); + } else if (scheduler instanceof FairScheduler) { + return FairSchedulerPlanFollower.class.getName(); } return null; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7adffad2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractSchedulerPlanFollower.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractSchedulerPlanFollower.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractSchedulerPlanFollower.java index 0de4dcf..ea7f27d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractSchedulerPlanFollower.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractSchedulerPlanFollower.java @@ -26,7 +26,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.Plan import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PlanQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; import org.apache.hadoop.yarn.util.Clock; @@ -99,7 +98,7 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower { // create the default reservation queue if it doesnt exist String defReservationId = getReservationIdFromQueueName(planQueueName) + - PlanQueue.DEFAULT_QUEUE_SUFFIX; + ReservationConstants.DEFAULT_QUEUE_SUFFIX; String defReservationQueue = getReservationQueueName(planQueueName, defReservationId); createDefaultReservationQueue(planQueueName, planQueue, http://git-wip-us.apache.org/repos/asf/hadoop/blob/7adffad2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/FairSchedulerPlanFollower.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/FairSchedulerPlanFollower.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/FairSchedulerPlanFollower.java new file mode 100644 index 0000000..7ca03c5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/FairSchedulerPlanFollower.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import java.util.Collection; +import java.util.List; + +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSParentQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FairSchedulerPlanFollower extends AbstractSchedulerPlanFollower { + private static final Logger LOG = LoggerFactory + .getLogger(FairSchedulerPlanFollower.class); + + private FairScheduler fs; + + @Override + public void init(Clock clock, ResourceScheduler sched, + Collection<Plan> plans) { + super.init(clock, sched, plans); + fs = (FairScheduler)sched; + LOG.info("Initializing Plan Follower Policy:" + + this.getClass().getCanonicalName()); + } + + @Override + protected Queue getPlanQueue(String planQueueName) { + Queue planQueue = fs.getQueueManager().getParentQueue(planQueueName, false); + if (planQueue == null) { + LOG.error("The queue " + planQueueName + " cannot be found or is not a " + + "ParentQueue"); + } + return planQueue; + } + + @Override + protected float calculateReservationToPlanRatio(Resource clusterResources, + Resource planResources, Resource capToAssign) { + return Resources.divide(fs.getResourceCalculator(), + clusterResources, capToAssign, planResources); + } + + @Override + protected boolean arePlanResourcesLessThanReservations(Resource + clusterResources, Resource planResources, Resource reservedResources) { + return Resources.greaterThan(fs.getResourceCalculator(), + clusterResources, reservedResources, planResources); + } + + @Override + protected List<? extends Queue> getChildReservationQueues(Queue queue) { + FSQueue planQueue = (FSQueue)queue; + List<FSQueue> childQueues = planQueue.getChildQueues(); + return childQueues; + } + + + @Override + protected void addReservationQueue(String planQueueName, Queue queue, + String currResId) { + String leafQueueName = getReservationQueueName(planQueueName, currResId); + fs.getQueueManager().getLeafQueue(leafQueueName, true); + } + + @Override + protected void createDefaultReservationQueue(String planQueueName, + Queue queue, String defReservationId) { + String defReservationQueueName = getReservationQueueName(planQueueName, + defReservationId); + if (!fs.getQueueManager().exists(defReservationQueueName)) { + fs.getQueueManager().getLeafQueue(defReservationQueueName, true); + } + } + + @Override + protected Resource getPlanResources(Plan plan, Queue queue, + Resource clusterResources) { + FSParentQueue planQueue = (FSParentQueue)queue; + Resource planResources = planQueue.getSteadyFairShare(); + return planResources; + } + + @Override + protected Resource getReservationQueueResourceIfExists(Plan plan, + ReservationId reservationId) { + String reservationQueueName = getReservationQueueName(plan.getQueueName(), + reservationId.toString()); + FSLeafQueue reservationQueue = + fs.getQueueManager().getLeafQueue(reservationQueueName, false); + Resource reservationResource = null; + if (reservationQueue != null) { + reservationResource = reservationQueue.getSteadyFairShare(); + } + return reservationResource; + } + + @Override + protected String getReservationQueueName(String planQueueName, + String reservationQueueName) { + String planQueueNameFullPath = fs.getQueueManager().getQueue + (planQueueName).getName(); + + if (!reservationQueueName.startsWith(planQueueNameFullPath)) { + // If name is not a path we need full path for FairScheduler. See + // YARN-2773 for the root cause + return planQueueNameFullPath + "." + reservationQueueName; + } + return reservationQueueName; + } + + @Override + protected String getReservationIdFromQueueName(String resQueueName) { + return resQueueName.substring(resQueueName.lastIndexOf(".") + 1); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/7adffad2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationConstants.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationConstants.java new file mode 100644 index 0000000..1ff941f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationConstants.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +public interface ReservationConstants { + + /** + * The suffix used for a queue under a reservable queue that will be used + * as a default queue whenever no reservation is used + */ + String DEFAULT_QUEUE_SUFFIX = "-default"; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/7adffad2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 3648c54..5e6a352 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -21,7 +21,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Comparator; import java.util.EnumSet; @@ -66,6 +65,7 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationConstants; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; @@ -1419,7 +1419,7 @@ public class CapacityScheduler extends queueName = resQName; } else { // use the default child queue of the plan for unreserved apps - queueName = queueName + PlanQueue.DEFAULT_QUEUE_SUFFIX; + queueName = queueName + ReservationConstants.DEFAULT_QUEUE_SUFFIX; } return queueName; } @@ -1583,7 +1583,7 @@ public class CapacityScheduler extends CSQueue dest = getQueue(targetQueueName); if (dest != null && dest instanceof PlanQueue) { // use the default child reservation queue of the plan - targetQueueName = targetQueueName + PlanQueue.DEFAULT_QUEUE_SUFFIX; + targetQueueName = targetQueueName + ReservationConstants.DEFAULT_QUEUE_SUFFIX; } return targetQueueName; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7adffad2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java index 0725959..f8b11eb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java @@ -37,8 +37,6 @@ import org.slf4j.LoggerFactory; */ public class PlanQueue extends ParentQueue { - public static final String DEFAULT_QUEUE_SUFFIX = "-default"; - private static final Logger LOG = LoggerFactory.getLogger(PlanQueue.class); private int maxAppsForReservation; http://git-wip-us.apache.org/repos/asf/hadoop/blob/7adffad2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java index fd99d65..0ea7314 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java @@ -207,6 +207,10 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration { ResourceWeights weight = queueWeights.get(queue); return (weight == null) ? ResourceWeights.NEUTRAL : weight; } + + public void setQueueWeight(String queue, ResourceWeights weight) { + queueWeights.put(queue, weight); + } public int getUserMaxApps(String user) { Integer maxApps = userMaxApps.get(user); @@ -323,4 +327,14 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration { public long getEnforcementWindow(String queue) { return globalReservationQueueConfig.getEnforcementWindowMsec(); } + + @VisibleForTesting + public void setReservationWindow(long window) { + globalReservationQueueConfig.setReservationWindow(window); + } + + @VisibleForTesting + public void setAverageCapacity(int avgCapacity) { + globalReservationQueueConfig.setAverageCapacity(avgCapacity); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/7adffad2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index 91bea11..3c97535 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; @@ -516,6 +517,15 @@ public class FSLeafQueue extends FSQueue { } } + /** Allows setting weight for a dynamically created queue + * Currently only used for reservation based queues + * @param weight queue weight + */ + public void setWeights(float weight) { + scheduler.getAllocationConfiguration().setQueueWeight(getName(), + new ResourceWeights(weight)); + } + /** * Helper method to check if the queue should preempt containers * http://git-wip-us.apache.org/repos/asf/hadoop/blob/7adffad2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 57b41af..124da99 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; +import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -47,6 +48,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationConstants; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; @@ -68,6 +70,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.ContainersAndNMTokensAllocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; @@ -1163,9 +1166,15 @@ public class FairScheduler extends throw new RuntimeException("Unexpected event type: " + event); } AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event; - addApplication(appAddedEvent.getApplicationId(), - appAddedEvent.getQueue(), appAddedEvent.getUser(), - appAddedEvent.getIsAppRecovering()); + String queueName = + resolveReservationQueueName(appAddedEvent.getQueue(), + appAddedEvent.getApplicationId(), + appAddedEvent.getReservationID()); + if (queueName != null) { + addApplication(appAddedEvent.getApplicationId(), + queueName, appAddedEvent.getUser(), + appAddedEvent.getIsAppRecovering()); + } break; case APP_REMOVED: if (!(event instanceof AppRemovedSchedulerEvent)) { @@ -1223,6 +1232,51 @@ public class FairScheduler extends } } + private String resolveReservationQueueName(String queueName, + ApplicationId applicationId, ReservationId reservationID) { + FSQueue queue = queueMgr.getQueue(queueName); + if ((queue == null) || !allocConf.isReservable(queue.getQueueName())) { + return queueName; + } + // Use fully specified name from now on (including root. prefix) + queueName = queue.getQueueName(); + if (reservationID != null) { + String resQName = queueName + "." + reservationID.toString(); + queue = queueMgr.getQueue(resQName); + if (queue == null) { + String message = + "Application " + + applicationId + + " submitted to a reservation which is not yet currently active: " + + resQName; + this.rmContext.getDispatcher().getEventHandler() + .handle(new RMAppRejectedEvent(applicationId, message)); + return null; + } + if (!queue.getParent().getQueueName().equals(queueName)) { + String message = + "Application: " + applicationId + " submitted to a reservation " + + resQName + " which does not belong to the specified queue: " + + queueName; + this.rmContext.getDispatcher().getEventHandler() + .handle(new RMAppRejectedEvent(applicationId, message)); + return null; + } + // use the reservation queue to run the app + queueName = resQName; + } else { + // use the default child queue of the plan for unreserved apps + queueName = getDefaultQueueForPlanQueue(queueName); + } + return queueName; + } + + private String getDefaultQueueForPlanQueue(String queueName) { + String planName = queueName.substring(queueName.lastIndexOf(".") + 1); + queueName = queueName + "." + planName + ReservationConstants.DEFAULT_QUEUE_SUFFIX; + return queueName; + } + @Override public void recover(RMState state) throws Exception { // NOT IMPLEMENTED @@ -1441,7 +1495,8 @@ public class FairScheduler extends // To serialize with FairScheduler#allocate, synchronize on app attempt synchronized (attempt) { FSLeafQueue oldQueue = (FSLeafQueue) app.getQueue(); - FSLeafQueue targetQueue = queueMgr.getLeafQueue(queueName, false); + String destQueueName = handleMoveToPlanQueue(queueName); + FSLeafQueue targetQueue = queueMgr.getLeafQueue(destQueueName, false); if (targetQueue == null) { throw new YarnException("Target queue " + queueName + " not found or is not a leaf queue."); @@ -1577,4 +1632,45 @@ public class FairScheduler extends } return planQueues; } + + @Override + public void setEntitlement(String queueName, + QueueEntitlement entitlement) throws YarnException { + + FSLeafQueue reservationQueue = queueMgr.getLeafQueue(queueName, false); + if (reservationQueue == null) { + throw new YarnException("Target queue " + queueName + + " not found or is not a leaf queue."); + } + + reservationQueue.setWeights(entitlement.getCapacity()); + + // TODO Does MaxCapacity need to be set for fairScheduler ? + } + + /** + * Only supports removing empty leaf queues + * @param queueName name of queue to remove + * @throws YarnException if queue to remove is either not a leaf or if its + * not empty + */ + @Override + public void removeQueue(String queueName) throws YarnException { + FSLeafQueue reservationQueue = queueMgr.getLeafQueue(queueName, false); + if (reservationQueue != null) { + if (!queueMgr.removeLeafQueue(queueName)) { + throw new YarnException("Could not remove queue " + queueName + " as " + + "its either not a leaf queue or its not empty"); + } + } + } + + private String handleMoveToPlanQueue(String targetQueueName) { + FSQueue dest = queueMgr.getQueue(targetQueueName); + if (dest != null && allocConf.isReservable(dest.getQueueName())) { + // use the default child reservation queue of the plan + targetQueueName = getDefaultQueueForPlanQueue(targetQueueName); + } + return targetQueueName; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7adffad2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java index 0e625d7..27e571e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java @@ -91,7 +91,18 @@ public class QueueManager { } return (FSLeafQueue) queue; } - + + /** + * Remove a leaf queue if empty + * @param name name of the queue + * @return true if queue was removed or false otherwise + */ + public boolean removeLeafQueue(String name) { + name = ensureRootPrefix(name); + return removeEmptyIncompatibleQueues(name, FSQueueType.PARENT); + } + + /** * Get a parent queue by name, creating it if the create param is true and is necessary. * If the queue is not or can not be a parent queue, i.e. it already exists as a http://git-wip-us.apache.org/repos/asf/hadoop/blob/7adffad2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/ReservationQueueConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/ReservationQueueConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/ReservationQueueConfiguration.java index 747a4c2..cf7f84e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/ReservationQueueConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/ReservationQueueConfiguration.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration; @@ -102,4 +103,14 @@ public class ReservationQueueConfiguration { public void setReservationAgent(String reservationAgent) { this.reservationAgent = reservationAgent; } + + @VisibleForTesting + public void setReservationWindow(long reservationWindow) { + this.reservationWindow = reservationWindow; + } + + @VisibleForTesting + public void setAverageCapacity(int averageCapacity) { + this.avgOverTimeMultiplier = averageCapacity; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7adffad2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java index 12c2583..bfaf06b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java @@ -29,7 +29,6 @@ import java.io.PrintWriter; import java.util.Collections; import java.util.Map; import java.util.Random; -import java.util.Set; import java.util.TreeMap; import org.apache.hadoop.conf.Configuration; @@ -42,12 +41,16 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; @@ -160,6 +163,27 @@ public class ReservationSystemTestUtil { out.close(); } + public static FairScheduler setupFairScheduler( + ReservationSystemTestUtil testUtil, + RMContext rmContext, Configuration conf, int numContainers) throws + IOException { + FairScheduler scheduler = new FairScheduler(); + scheduler.setRMContext(rmContext); + + when(rmContext.getScheduler()).thenReturn(scheduler); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, rmContext); + + + Resource resource = testUtil.calculateClusterResource(numContainers); + RMNode node1 = MockNodes.newNodeInfo(1, resource, 1, "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + return scheduler; + } + @SuppressWarnings("unchecked") public CapacityScheduler mockCapacityScheduler(int numContainers) throws IOException { http://git-wip-us.apache.org/repos/asf/hadoop/blob/7adffad2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java index c603f5b..c7513ab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java @@ -41,7 +41,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerContext; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PlanQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; @@ -149,7 +148,7 @@ public class TestCapacitySchedulerPlanFollower extends TestSchedulerPlanFollower @Override protected Queue getDefaultQueue() { - return cs.getQueue("dedicated" + PlanQueue.DEFAULT_QUEUE_SUFFIX); + return cs.getQueue("dedicated" + ReservationConstants.DEFAULT_QUEUE_SUFFIX); } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/7adffad2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairReservationSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairReservationSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairReservationSystem.java index 82ba731..f294eaf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairReservationSystem.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairReservationSystem.java @@ -18,14 +18,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerTestBase; @@ -38,15 +35,16 @@ import org.junit.Test; import java.io.File; import java.io.IOException; -import static org.mockito.Mockito.when; - -public class TestFairReservationSystem extends FairSchedulerTestBase { - private final static String ALLOC_FILE = new File(TEST_DIR, +public class TestFairReservationSystem { + private final static String ALLOC_FILE = new File(FairSchedulerTestBase. + TEST_DIR, TestFairReservationSystem.class.getName() + ".xml").getAbsolutePath(); + private Configuration conf; + private FairScheduler scheduler; + private FairSchedulerTestBase testHelper = new FairSchedulerTestBase(); - @Override protected Configuration createConfiguration() { - Configuration conf = super.createConfiguration(); + Configuration conf = testHelper.createConfiguration(); conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class, ResourceScheduler.class); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); @@ -60,10 +58,6 @@ public class TestFairReservationSystem extends FairSchedulerTestBase { @After public void teardown() { - if (resourceManager != null) { - resourceManager.stop(); - resourceManager = null; - } conf = null; } @@ -75,7 +69,8 @@ public class TestFairReservationSystem extends FairSchedulerTestBase { // Setup RMContext mockRMContext = testUtil.createRMContext(conf); - setupFairScheduler(testUtil, mockRMContext); + scheduler = ReservationSystemTestUtil.setupFairScheduler(testUtil, + mockRMContext, conf, 10); FairReservationSystem reservationSystem = new FairReservationSystem(); reservationSystem.setRMContext(mockRMContext); @@ -97,14 +92,15 @@ public class TestFairReservationSystem extends FairSchedulerTestBase { ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil(); // Setup - RMContext mockContext = testUtil.createRMContext(conf); - setupFairScheduler(testUtil, mockContext); + RMContext mockRMContext = testUtil.createRMContext(conf); + scheduler = ReservationSystemTestUtil.setupFairScheduler(testUtil, + mockRMContext, conf, 10); FairReservationSystem reservationSystem = new FairReservationSystem(); - reservationSystem.setRMContext(mockContext); + reservationSystem.setRMContext(mockRMContext); try { - reservationSystem.reinitialize(scheduler.getConf(), mockContext); + reservationSystem.reinitialize(scheduler.getConf(), mockRMContext); } catch (YarnException e) { Assert.fail(e.getMessage()); } @@ -116,10 +112,10 @@ public class TestFairReservationSystem extends FairSchedulerTestBase { // Dynamically add a plan ReservationSystemTestUtil.updateFSAllocationFile(ALLOC_FILE); - scheduler.reinitialize(conf, mockContext); + scheduler.reinitialize(conf, mockRMContext); try { - reservationSystem.reinitialize(conf, mockContext); + reservationSystem.reinitialize(conf, mockRMContext); } catch (YarnException e) { Assert.fail(e.getMessage()); } @@ -129,23 +125,4 @@ public class TestFairReservationSystem extends FairSchedulerTestBase { (reservationSystem, newQueue); } - private void setupFairScheduler(ReservationSystemTestUtil testUtil, - RMContext rmContext) throws - IOException { - - scheduler = new FairScheduler(); - scheduler.setRMContext(rmContext); - - int numContainers = 10; - when(rmContext.getScheduler()).thenReturn(scheduler); - - scheduler.init(conf); - scheduler.start(); - scheduler.reinitialize(conf, rmContext); - - Resource resource = testUtil.calculateClusterResource(numContainers); - RMNode node1 = MockNodes.newNodeInfo(1, resource, 1, "127.0.0.1"); - NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); - scheduler.handle(nodeEvent1); - } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7adffad2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairSchedulerPlanFollower.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairSchedulerPlanFollower.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairSchedulerPlanFollower.java new file mode 100644 index 0000000..e9a4f50 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairSchedulerPlanFollower.java @@ -0,0 +1,203 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +import java.io.File; +import java.util.Collections; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; +import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerTestBase; +import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; +import org.apache.hadoop.yarn.util.Clock; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.mockito.Matchers; +import org.mockito.Mockito; + +public class TestFairSchedulerPlanFollower extends + TestSchedulerPlanFollowerBase { + private final static String ALLOC_FILE = new File(FairSchedulerTestBase. + TEST_DIR, + TestFairReservationSystem.class.getName() + ".xml").getAbsolutePath(); + private RMContext rmContext; + private RMContext spyRMContext; + private FairScheduler fs; + private Configuration conf; + private FairSchedulerTestBase testHelper = new FairSchedulerTestBase(); + + @Rule + public TestName name = new TestName(); + + protected Configuration createConfiguration() { + Configuration conf = testHelper.createConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class, + ResourceScheduler.class); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + return conf; + } + + @Before + public void setUp() throws Exception { + conf = createConfiguration(); + ReservationSystemTestUtil.setupFSAllocationFile(ALLOC_FILE); + ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil(); + + // Setup + rmContext = TestUtils.getMockRMContext(); + spyRMContext = spy(rmContext); + fs = ReservationSystemTestUtil.setupFairScheduler(testUtil, + spyRMContext, conf, 125); + scheduler = fs; + + ConcurrentMap<ApplicationId, RMApp> spyApps = + spy(new ConcurrentHashMap<ApplicationId, RMApp>()); + RMApp rmApp = mock(RMApp.class); + when(rmApp.getRMAppAttempt((ApplicationAttemptId) Matchers.any())) + .thenReturn(null); + Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId) Matchers.any()); + when(spyRMContext.getRMApps()).thenReturn(spyApps); + + ReservationSystemTestUtil.setupFSAllocationFile(ALLOC_FILE); + setupPlanFollower(); + } + + private void setupPlanFollower() throws Exception { + ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil(); + mClock = mock(Clock.class); + mAgent = mock(ReservationAgent.class); + + String reservationQ = testUtil.getFullReservationQueueName(); + AllocationConfiguration allocConf = fs.getAllocationConfiguration(); + allocConf.setReservationWindow(20L); + allocConf.setAverageCapacity(20); + policy.init(reservationQ, allocConf); + } + + @Test + public void testWithMoveOnExpiry() throws PlanningException, + InterruptedException, AccessControlException { + // invoke plan follower test with move + testPlanFollower(true); + } + + @Test + public void testWithKillOnExpiry() throws PlanningException, + InterruptedException, AccessControlException { + // invoke plan follower test with kill + testPlanFollower(false); + } + + @Override + protected void verifyCapacity(Queue defQ) { + assertTrue(((FSQueue) defQ).getWeights().getWeight(ResourceType.MEMORY) > + 0.9); + } + + @Override + protected Queue getDefaultQueue() { + return getReservationQueue("dedicated" + + ReservationConstants.DEFAULT_QUEUE_SUFFIX); + } + + @Override + protected int getNumberOfApplications(Queue queue) { + int numberOfApplications = fs.getAppsInQueue(queue.getQueueName()).size(); + return numberOfApplications; + } + + @Override + protected AbstractSchedulerPlanFollower createPlanFollower() { + FairSchedulerPlanFollower planFollower = + new FairSchedulerPlanFollower(); + planFollower.init(mClock, scheduler, Collections.singletonList(plan)); + return planFollower; + } + + @Override + protected void assertReservationQueueExists(ReservationId r) { + Queue q = getReservationQueue(r.toString()); + assertNotNull(q); + } + + @Override + protected void assertReservationQueueExists(ReservationId r, + double expectedCapacity, double expectedMaxCapacity) { + FSLeafQueue q = fs.getQueueManager().getLeafQueue(plan.getQueueName() + "" + + "." + + r, false); + assertNotNull(q); + // For now we are setting both to same weight + Assert.assertEquals(expectedCapacity, q.getWeights().getWeight + (ResourceType.MEMORY), 0.01); + } + + @Override + protected void assertReservationQueueDoesNotExist(ReservationId r) { + Queue q = getReservationQueue(r.toString()); + assertNull(q); + } + + @Override + protected Queue getReservationQueue(String r) { + return fs.getQueueManager().getLeafQueue(plan.getQueueName() + "" + + "." + + r, false); + } + + public static ApplicationACLsManager mockAppACLsManager() { + Configuration conf = new Configuration(); + return new ApplicationACLsManager(conf); + } + + @After + public void tearDown() throws Exception { + if (scheduler != null) { + fs.stop(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/7adffad2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerDynamicBehavior.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerDynamicBehavior.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerDynamicBehavior.java index 73d8a55..ce3382f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerDynamicBehavior.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerDynamicBehavior.java @@ -29,6 +29,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationConstants; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; @@ -217,7 +218,7 @@ public class TestCapacitySchedulerDynamicBehavior { assertEquals(1, appsInRoot.size()); // create the default reservation queue - String defQName = "a" + PlanQueue.DEFAULT_QUEUE_SUFFIX; + String defQName = "a" + ReservationConstants.DEFAULT_QUEUE_SUFFIX; ReservationQueue defQ = new ReservationQueue(scheduler, defQName, (PlanQueue) scheduler.getQueue("a")); http://git-wip-us.apache.org/repos/asf/hadoop/blob/7adffad2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java index 7b6aaf3..8656175 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java @@ -60,7 +60,7 @@ public class FairSchedulerTestBase { } } - protected final static String TEST_DIR = + public final static String TEST_DIR = new File(System.getProperty("test.build.data", "/tmp")).getAbsolutePath(); private static RecordFactory @@ -74,7 +74,7 @@ public class FairSchedulerTestBase { protected ResourceManager resourceManager; // Helper methods - protected Configuration createConfiguration() { + public Configuration createConfiguration() { Configuration conf = new YarnConfiguration(); conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class, ResourceScheduler.class); http://git-wip-us.apache.org/repos/asf/hadoop/blob/7adffad2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java index 903c7af..458b06d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java @@ -58,8 +58,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { } } - @Override - protected Configuration createConfiguration() { + public Configuration createConfiguration() { Configuration conf = super.createConfiguration(); conf.setClass(YarnConfiguration.RM_SCHEDULER, StubbedFairScheduler.class, ResourceScheduler.class);