Repository: hadoop Updated Branches: refs/heads/branch-2 cde5bfe3e -> 798ab5128
YARN-2998. Abstract out scheduler independent PlanFollower components. (Anubhav Dhoot via kasha) (cherry picked from commit e7257acd8a7adb74d81cd1d009d4a99f023ed844) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/798ab512 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/798ab512 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/798ab512 Branch: refs/heads/branch-2 Commit: 798ab51289e1552778433379b9157d1d4082fffc Parents: cde5bfe Author: Karthik Kambatla <ka...@apache.org> Authored: Tue Dec 30 19:52:43 2014 -0800 Committer: Karthik Kambatla <ka...@apache.org> Committed: Tue Dec 30 19:55:50 2014 -0800 ---------------------------------------------------------------------- .../sls/scheduler/ResourceSchedulerWrapper.java | 6 + hadoop-yarn-project/CHANGES.txt | 5 + .../AbstractSchedulerPlanFollower.java | 412 +++++++++++++++++++ .../CapacitySchedulerPlanFollower.java | 361 ++++------------ .../scheduler/YarnScheduler.java | 5 + .../scheduler/fair/FSLeafQueue.java | 4 +- .../scheduler/fair/FairScheduler.java | 3 +- .../scheduler/fifo/FifoScheduler.java | 5 + .../reservation/ReservationSystemTestUtil.java | 4 +- .../TestCapacitySchedulerPlanFollower.java | 209 +++------- .../TestSchedulerPlanFollowerBase.java | 191 +++++++++ 11 files changed, 755 insertions(+), 450 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/798ab512/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java index 3ac3382..e729363 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java @@ -84,6 +84,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoSchedule import org.apache.hadoop.yarn.sls.SLSRunner; import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; import org.apache.hadoop.yarn.sls.web.SLSWebApp; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.log4j.Logger; @@ -868,6 +869,11 @@ public class ResourceSchedulerWrapper } @Override + public ResourceCalculator getResourceCalculator() { + return scheduler.getResourceCalculator(); + } + + @Override public int getNumClusterNodes() { return scheduler.getNumClusterNodes(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/798ab512/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 6c280cb..a135f1e 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -38,6 +38,7 @@ Release 2.7.0 - UNRELEASED YARN-2738. [YARN-2574] Add FairReservationSystem for FairScheduler. (Anubhav Dhoot via kasha) + IMPROVEMENTS YARN-2950. Change message to mandate, not suggest JS requirement on UI. @@ -125,6 +126,10 @@ Release 2.7.0 - UNRELEASED YARN-2943. Added node-labels page on RM web UI. (Wangda Tan via jianhe) + YARN-2998. Abstract out scheduler independent PlanFollower components. + (Anubhav Dhoot via kasha) + + OPTIMIZATIONS BUG FIXES http://git-wip-us.apache.org/repos/asf/hadoop/blob/798ab512/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractSchedulerPlanFollower.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractSchedulerPlanFollower.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractSchedulerPlanFollower.java new file mode 100644 index 0000000..0de4dcf --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractSchedulerPlanFollower.java @@ -0,0 +1,412 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PlanQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; + +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public abstract class AbstractSchedulerPlanFollower implements PlanFollower { + private static final Logger LOG = LoggerFactory + .getLogger(CapacitySchedulerPlanFollower.class); + + protected Collection<Plan> plans = new ArrayList<Plan>(); + protected YarnScheduler scheduler; + protected Clock clock; + + @Override + public void init(Clock clock, ResourceScheduler sched, Collection<Plan> plans) { + this.clock = clock; + this.scheduler = sched; + this.plans.addAll(plans); + } + + @Override + public synchronized void run() { + for (Plan plan : plans) { + synchronizePlan(plan); + } + } + + @Override + public synchronized void setPlans(Collection<Plan> plans) { + this.plans.clear(); + this.plans.addAll(plans); + } + + @Override + public synchronized void synchronizePlan(Plan plan) { + String planQueueName = plan.getQueueName(); + if (LOG.isDebugEnabled()) { + LOG.debug("Running plan follower edit policy for plan: " + planQueueName); + } + // align with plan step + long step = plan.getStep(); + long now = clock.getTime(); + if (now % step != 0) { + now += step - (now % step); + } + Queue planQueue = getPlanQueue(planQueueName); + if (planQueue == null) return; + + // first we publish to the plan the current availability of resources + Resource clusterResources = scheduler.getClusterResource(); + Resource planResources = getPlanResources(plan, planQueue, + clusterResources); + + Set<ReservationAllocation> currentReservations = + plan.getReservationsAtTime(now); + Set<String> curReservationNames = new HashSet<String>(); + Resource reservedResources = Resource.newInstance(0, 0); + int numRes = getReservedResources(now, currentReservations, + curReservationNames, reservedResources); + + // create the default reservation queue if it doesnt exist + String defReservationId = getReservationIdFromQueueName(planQueueName) + + PlanQueue.DEFAULT_QUEUE_SUFFIX; + String defReservationQueue = getReservationQueueName(planQueueName, + defReservationId); + createDefaultReservationQueue(planQueueName, planQueue, + defReservationId); + curReservationNames.add(defReservationId); + + // if the resources dedicated to this plan has shrunk invoke replanner + if (arePlanResourcesLessThanReservations(clusterResources, planResources, + reservedResources)) { + try { + plan.getReplanner().plan(plan, null); + } catch (PlanningException e) { + LOG.warn("Exception while trying to replan: {}", planQueueName, e); + } + } + // identify the reservations that have expired and new reservations that + // have to be activated + List<? extends Queue> resQueues = getChildReservationQueues(planQueue); + Set<String> expired = new HashSet<String>(); + for (Queue resQueue : resQueues) { + String resQueueName = resQueue.getQueueName(); + String reservationId = getReservationIdFromQueueName(resQueueName); + if (curReservationNames.contains(reservationId)) { + // it is already existing reservation, so needed not create new + // reservation queue + curReservationNames.remove(reservationId); + } else { + // the reservation has termination, mark for cleanup + expired.add(reservationId); + } + } + // garbage collect expired reservations + cleanupExpiredQueues(planQueueName, plan.getMoveOnExpiry(), expired, + defReservationQueue); + + // Add new reservations and update existing ones + float totalAssignedCapacity = 0f; + if (currentReservations != null) { + // first release all excess capacity in default queue + try { + setQueueEntitlement(planQueueName, defReservationQueue, 0f, 1.0f); + } catch (YarnException e) { + LOG.warn( + "Exception while trying to release default queue capacity for plan: {}", + planQueueName, e); + } + // sort allocations from the one giving up the most resources, to the + // one asking for the most + // avoid order-of-operation errors that temporarily violate 100% + // capacity bound + List<ReservationAllocation> sortedAllocations = + sortByDelta( + new ArrayList<ReservationAllocation>(currentReservations), now, + plan); + for (ReservationAllocation res : sortedAllocations) { + String currResId = res.getReservationId().toString(); + if (curReservationNames.contains(currResId)) { + addReservationQueue(planQueueName, planQueue, currResId); + } + Resource capToAssign = res.getResourcesAtTime(now); + float targetCapacity = 0f; + if (planResources.getMemory() > 0 + && planResources.getVirtualCores() > 0) { + targetCapacity = + calculateReservationToPlanRatio(clusterResources, + planResources, + capToAssign); + } + if (LOG.isDebugEnabled()) { + LOG.debug( + "Assigning capacity of {} to queue {} with target capacity {}", + capToAssign, currResId, targetCapacity); + } + // set maxCapacity to 100% unless the job requires gang, in which + // case we stick to capacity (as running early/before is likely a + // waste of resources) + float maxCapacity = 1.0f; + if (res.containsGangs()) { + maxCapacity = targetCapacity; + } + try { + setQueueEntitlement(planQueueName, currResId, targetCapacity, maxCapacity); + } catch (YarnException e) { + LOG.warn("Exception while trying to size reservation for plan: {}", + currResId, planQueueName, e); + } + totalAssignedCapacity += targetCapacity; + } + } + // compute the default queue capacity + float defQCap = 1.0f - totalAssignedCapacity; + if (LOG.isDebugEnabled()) { + LOG.debug("PlanFollowerEditPolicyTask: total Plan Capacity: {} " + + "currReservation: {} default-queue capacity: {}", planResources, + numRes, defQCap); + } + // set the default queue to eat-up all remaining capacity + try { + setQueueEntitlement(planQueueName, defReservationQueue, defQCap, 1.0f); + } catch (YarnException e) { + LOG.warn( + "Exception while trying to reclaim default queue capacity for plan: {}", + planQueueName, e); + } + // garbage collect finished reservations from plan + try { + plan.archiveCompletedReservations(now); + } catch (PlanningException e) { + LOG.error("Exception in archiving completed reservations: ", e); + } + LOG.info("Finished iteration of plan follower edit policy for plan: " + + planQueueName); + + // Extension: update plan with app states, + // useful to support smart replanning + } + + protected String getReservationIdFromQueueName(String resQueueName) { + return resQueueName; + } + + protected void setQueueEntitlement(String planQueueName, String currResId, + float targetCapacity, + float maxCapacity) throws YarnException { + String reservationQueueName = getReservationQueueName(planQueueName, + currResId); + scheduler.setEntitlement(reservationQueueName, new QueueEntitlement( + targetCapacity, maxCapacity)); + } + + // Schedulers have different ways of naming queues. See YARN-2773 + protected String getReservationQueueName(String planQueueName, + String reservationId) { + return reservationId; + } + + /** + * First sets entitlement of queues to zero to prevent new app submission. + * Then move all apps in the set of queues to the parent plan queue's default + * reservation queue if move is enabled. Finally cleanups the queue by killing + * any apps (if move is disabled or move failed) and removing the queue + */ + protected void cleanupExpiredQueues(String planQueueName, + boolean shouldMove, Set<String> toRemove, String defReservationQueue) { + for (String expiredReservationId : toRemove) { + try { + // reduce entitlement to 0 + String expiredReservation = getReservationQueueName(planQueueName, + expiredReservationId); + setQueueEntitlement(planQueueName, expiredReservation, 0.0f, 0.0f); + if (shouldMove) { + moveAppsInQueueSync(expiredReservation, defReservationQueue); + } + if (scheduler.getAppsInQueue(expiredReservation).size() > 0) { + scheduler.killAllAppsInQueue(expiredReservation); + LOG.info("Killing applications in queue: {}", expiredReservation); + } else { + scheduler.removeQueue(expiredReservation); + LOG.info("Queue: " + expiredReservation + " removed"); + } + } catch (YarnException e) { + LOG.warn("Exception while trying to expire reservation: {}", + expiredReservationId, e); + } + } + } + + /** + * Move all apps in the set of queues to the parent plan queue's default + * reservation queue in a synchronous fashion + */ + private void moveAppsInQueueSync(String expiredReservation, + String defReservationQueue) { + List<ApplicationAttemptId> activeApps = + scheduler.getAppsInQueue(expiredReservation); + if (activeApps.isEmpty()) { + return; + } + for (ApplicationAttemptId app : activeApps) { + // fallback to parent's default queue + try { + scheduler.moveApplication(app.getApplicationId(), defReservationQueue); + } catch (YarnException e) { + LOG.warn( + "Encountered unexpected error during migration of application: {}" + + " from reservation: {}", + app, expiredReservation, e); + } + } + } + + protected int getReservedResources(long now, Set<ReservationAllocation> + currentReservations, Set<String> curReservationNames, + Resource reservedResources) { + int numRes = 0; + if (currentReservations != null) { + numRes = currentReservations.size(); + for (ReservationAllocation reservation : currentReservations) { + curReservationNames.add(reservation.getReservationId().toString()); + Resources.addTo(reservedResources, reservation.getResourcesAtTime(now)); + } + } + return numRes; + } + + /** + * Sort in the order from the least new amount of resources asked (likely + * negative) to the highest. This prevents "order-of-operation" errors related + * to exceeding 100% capacity temporarily. + */ + protected List<ReservationAllocation> sortByDelta( + List<ReservationAllocation> currentReservations, long now, Plan plan) { + Collections.sort(currentReservations, new ReservationAllocationComparator( + now, this, plan)); + return currentReservations; + } + + /** + * Get queue associated with reservable queue named + * @param planQueueName Name of the reservable queue + * @return queue associated with the reservable queue + */ + protected abstract Queue getPlanQueue(String planQueueName); + + /** + * Calculates ratio of reservationResources to planResources + */ + protected abstract float calculateReservationToPlanRatio( + Resource clusterResources, Resource planResources, + Resource reservationResources); + + /** + * Check if plan resources are less than expected reservation resources + */ + protected abstract boolean arePlanResourcesLessThanReservations( + Resource clusterResources, Resource planResources, + Resource reservedResources); + + /** + * Get a list of reservation queues for this planQueue + */ + protected abstract List<? extends Queue> getChildReservationQueues( + Queue planQueue); + + /** + * Add a new reservation queue for reservation currResId for this planQueue + */ + protected abstract void addReservationQueue( + String planQueueName, Queue queue, String currResId); + + /** + * Creates the default reservation queue for use when no reservation is + * used for applications submitted to this planQueue + */ + protected abstract void createDefaultReservationQueue( + String planQueueName, Queue queue, String defReservationQueue); + + /** + * Get plan resources for this planQueue + */ + protected abstract Resource getPlanResources( + Plan plan, Queue queue, Resource clusterResources); + + /** + * Get reservation queue resources if it exists otherwise return null + */ + protected abstract Resource getReservationQueueResourceIfExists(Plan plan, + ReservationId reservationId); + + private static class ReservationAllocationComparator implements + Comparator<ReservationAllocation> { + AbstractSchedulerPlanFollower planFollower; + long now; + Plan plan; + + ReservationAllocationComparator(long now, + AbstractSchedulerPlanFollower planFollower, Plan plan) { + this.now = now; + this.planFollower = planFollower; + this.plan = plan; + } + + private Resource getUnallocatedReservedResources( + ReservationAllocation reservation) { + Resource resResource; + Resource reservationResource = planFollower + .getReservationQueueResourceIfExists + (plan, reservation.getReservationId()); + if (reservationResource != null) { + resResource = + Resources.subtract( + reservation.getResourcesAtTime(now), + reservationResource); + } else { + resResource = reservation.getResourcesAtTime(now); + } + return resResource; + } + + @Override + public int compare(ReservationAllocation lhs, ReservationAllocation rhs) { + // compute delta between current and previous reservation, and compare + // based on that + Resource lhsRes = getUnallocatedReservedResources(lhs); + Resource rhsRes = getUnallocatedReservedResources(rhs); + return lhsRes.compareTo(rhsRes); + } + } +} + http://git-wip-us.apache.org/repos/asf/hadoop/blob/798ab512/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java index 126560a..61772c9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java @@ -19,26 +19,19 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation; import java.io.IOException; -import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashSet; import java.util.List; -import java.util.Set; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PlanQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ReservationQueue; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.resource.Resources; import org.slf4j.Logger; @@ -58,319 +51,119 @@ import org.slf4j.LoggerFactory; * differences among existing queues). This makes it resilient to frequency of * synchronization, and RM restart issues (no "catch up" is necessary). */ -public class CapacitySchedulerPlanFollower implements PlanFollower { +public class CapacitySchedulerPlanFollower extends AbstractSchedulerPlanFollower { private static final Logger LOG = LoggerFactory .getLogger(CapacitySchedulerPlanFollower.class); - private Collection<Plan> plans = new ArrayList<Plan>(); - - private Clock clock; - private CapacityScheduler scheduler; + private CapacityScheduler cs; @Override public void init(Clock clock, ResourceScheduler sched, Collection<Plan> plans) { + super.init(clock, sched, plans); LOG.info("Initializing Plan Follower Policy:" + this.getClass().getCanonicalName()); if (!(sched instanceof CapacityScheduler)) { throw new YarnRuntimeException( "CapacitySchedulerPlanFollower can only work with CapacityScheduler"); } - this.clock = clock; - this.scheduler = (CapacityScheduler) sched; - this.plans.addAll(plans); + this.cs = (CapacityScheduler) sched; } @Override - public synchronized void run() { - for (Plan plan : plans) { - synchronizePlan(plan); + protected Queue getPlanQueue(String planQueueName) { + CSQueue queue = cs.getQueue(planQueueName); + if (!(queue instanceof PlanQueue)) { + LOG.error("The Plan is not an PlanQueue!"); + return null; } + return queue; } @Override - public synchronized void synchronizePlan(Plan plan) { - String planQueueName = plan.getQueueName(); - if (LOG.isDebugEnabled()) { - LOG.debug("Running plan follower edit policy for plan: " + planQueueName); - } - // align with plan step - long step = plan.getStep(); - long now = clock.getTime(); - if (now % step != 0) { - now += step - (now % step); - } - CSQueue queue = scheduler.getQueue(planQueueName); - if (!(queue instanceof PlanQueue)) { - LOG.error("The Plan is not an PlanQueue!"); - return; - } - PlanQueue planQueue = (PlanQueue) queue; - // first we publish to the plan the current availability of resources - Resource clusterResources = scheduler.getClusterResource(); - float planAbsCap = planQueue.getAbsoluteCapacity(); - Resource planResources = Resources.multiply(clusterResources, planAbsCap); - plan.setTotalCapacity(planResources); + protected float calculateReservationToPlanRatio( + Resource clusterResources, Resource planResources, + Resource reservationResources) { + return Resources.divide(cs.getResourceCalculator(), + clusterResources, reservationResources, planResources); + } - Set<ReservationAllocation> currentReservations = - plan.getReservationsAtTime(now); - Set<String> curReservationNames = new HashSet<String>(); - Resource reservedResources = Resource.newInstance(0, 0); - int numRes = 0; - if (currentReservations != null) { - numRes = currentReservations.size(); - for (ReservationAllocation reservation : currentReservations) { - curReservationNames.add(reservation.getReservationId().toString()); - Resources.addTo(reservedResources, reservation.getResourcesAtTime(now)); - } + @Override + protected boolean arePlanResourcesLessThanReservations( + Resource clusterResources, Resource planResources, + Resource reservedResources) { + return Resources.greaterThan(cs.getResourceCalculator(), + clusterResources, reservedResources, planResources); + } + + @Override + protected List<? extends Queue> getChildReservationQueues(Queue queue) { + PlanQueue planQueue = (PlanQueue)queue; + List<CSQueue> childQueues = planQueue.getChildQueues(); + return childQueues; + } + + @Override + protected void addReservationQueue( + String planQueueName, Queue queue, String currResId) { + PlanQueue planQueue = (PlanQueue)queue; + try { + ReservationQueue resQueue = + new ReservationQueue(cs, currResId, planQueue); + cs.addQueue(resQueue); + } catch (SchedulerDynamicEditException e) { + LOG.warn( + "Exception while trying to activate reservation: {} for plan: {}", + currResId, planQueueName, e); + } catch (IOException e) { + LOG.warn( + "Exception while trying to activate reservation: {} for plan: {}", + currResId, planQueueName, e); } - // create the default reservation queue if it doesnt exist - String defReservationQueue = planQueueName + PlanQueue.DEFAULT_QUEUE_SUFFIX; - if (scheduler.getQueue(defReservationQueue) == null) { + } + + @Override + protected void createDefaultReservationQueue( + String planQueueName, Queue queue, String defReservationId) { + PlanQueue planQueue = (PlanQueue)queue; + if (cs.getQueue(defReservationId) == null) { try { ReservationQueue defQueue = - new ReservationQueue(scheduler, defReservationQueue, planQueue); - scheduler.addQueue(defQueue); + new ReservationQueue(cs, defReservationId, planQueue); + cs.addQueue(defQueue); } catch (SchedulerDynamicEditException e) { LOG.warn( "Exception while trying to create default reservation queue for plan: {}", planQueueName, e); } catch (IOException e) { LOG.warn( - "Exception while trying to create default reservation queue for plan: {}", - planQueueName, e); - } - } - curReservationNames.add(defReservationQueue); - // if the resources dedicated to this plan has shrunk invoke replanner - if (Resources.greaterThan(scheduler.getResourceCalculator(), - clusterResources, reservedResources, planResources)) { - try { - plan.getReplanner().plan(plan, null); - } catch (PlanningException e) { - LOG.warn("Exception while trying to replan: {}", planQueueName, e); - } - } - // identify the reservations that have expired and new reservations that - // have to be activated - List<CSQueue> resQueues = planQueue.getChildQueues(); - Set<String> expired = new HashSet<String>(); - for (CSQueue resQueue : resQueues) { - String resQueueName = resQueue.getQueueName(); - if (curReservationNames.contains(resQueueName)) { - // it is already existing reservation, so needed not create new - // reservation queue - curReservationNames.remove(resQueueName); - } else { - // the reservation has termination, mark for cleanup - expired.add(resQueueName); - } - } - // garbage collect expired reservations - cleanupExpiredQueues(plan.getMoveOnExpiry(), expired, defReservationQueue); - - // Add new reservations and update existing ones - float totalAssignedCapacity = 0f; - if (currentReservations != null) { - // first release all excess capacity in default queue - try { - scheduler.setEntitlement(defReservationQueue, new QueueEntitlement(0f, - 1.0f)); - } catch (YarnException e) { - LOG.warn( - "Exception while trying to release default queue capacity for plan: {}", + "Exception while trying to create default reservation queue for " + + "plan: {}", planQueueName, e); } - // sort allocations from the one giving up the most resources, to the - // one asking for the most - // avoid order-of-operation errors that temporarily violate 100% - // capacity bound - List<ReservationAllocation> sortedAllocations = - sortByDelta( - new ArrayList<ReservationAllocation>(currentReservations), now); - for (ReservationAllocation res : sortedAllocations) { - String currResId = res.getReservationId().toString(); - if (curReservationNames.contains(currResId)) { - try { - ReservationQueue resQueue = - new ReservationQueue(scheduler, currResId, planQueue); - scheduler.addQueue(resQueue); - } catch (SchedulerDynamicEditException e) { - LOG.warn( - "Exception while trying to activate reservation: {} for plan: {}", - currResId, planQueueName, e); - } catch (IOException e) { - LOG.warn( - "Exception while trying to activate reservation: {} for plan: {}", - currResId, planQueueName, e); - } - } - Resource capToAssign = res.getResourcesAtTime(now); - float targetCapacity = 0f; - if (planResources.getMemory() > 0 - && planResources.getVirtualCores() > 0) { - targetCapacity = - Resources.divide(scheduler.getResourceCalculator(), - clusterResources, capToAssign, planResources); - } - if (LOG.isDebugEnabled()) { - LOG.debug( - "Assigning capacity of {} to queue {} with target capacity {}", - capToAssign, currResId, targetCapacity); - } - // set maxCapacity to 100% unless the job requires gang, in which - // case we stick to capacity (as running early/before is likely a - // waste of resources) - float maxCapacity = 1.0f; - if (res.containsGangs()) { - maxCapacity = targetCapacity; - } - try { - scheduler.setEntitlement(currResId, new QueueEntitlement( - targetCapacity, maxCapacity)); - } catch (YarnException e) { - LOG.warn("Exception while trying to size reservation for plan: {}", - currResId, planQueueName, e); - } - totalAssignedCapacity += targetCapacity; - } - } - // compute the default queue capacity - float defQCap = 1.0f - totalAssignedCapacity; - if (LOG.isDebugEnabled()) { - LOG.debug("PlanFollowerEditPolicyTask: total Plan Capacity: {} " - + "currReservation: {} default-queue capacity: {}", planResources, - numRes, defQCap); - } - // set the default queue to eat-up all remaining capacity - try { - scheduler.setEntitlement(defReservationQueue, new QueueEntitlement( - defQCap, 1.0f)); - } catch (YarnException e) { - LOG.warn( - "Exception while trying to reclaim default queue capacity for plan: {}", - planQueueName, e); - } - // garbage collect finished reservations from plan - try { - plan.archiveCompletedReservations(now); - } catch (PlanningException e) { - LOG.error("Exception in archiving completed reservations: ", e); - } - LOG.info("Finished iteration of plan follower edit policy for plan: " - + planQueueName); - - // Extension: update plan with app states, - // useful to support smart replanning - } - - /** - * Move all apps in the set of queues to the parent plan queue's default - * reservation queue in a synchronous fashion - */ - private void moveAppsInQueueSync(String expiredReservation, - String defReservationQueue) { - List<ApplicationAttemptId> activeApps = - scheduler.getAppsInQueue(expiredReservation); - if (activeApps.isEmpty()) { - return; - } - for (ApplicationAttemptId app : activeApps) { - // fallback to parent's default queue - try { - scheduler.moveApplication(app.getApplicationId(), defReservationQueue); - } catch (YarnException e) { - LOG.warn( - "Encountered unexpected error during migration of application: {} from reservation: {}", - app, expiredReservation, e); - } - } - } - - /** - * First sets entitlement of queues to zero to prevent new app submission. - * Then move all apps in the set of queues to the parent plan queue's default - * reservation queue if move is enabled. Finally cleanups the queue by killing - * any apps (if move is disabled or move failed) and removing the queue - */ - private void cleanupExpiredQueues(boolean shouldMove, Set<String> toRemove, - String defReservationQueue) { - for (String expiredReservation : toRemove) { - try { - // reduce entitlement to 0 - scheduler.setEntitlement(expiredReservation, new QueueEntitlement(0.0f, - 0.0f)); - if (shouldMove) { - moveAppsInQueueSync(expiredReservation, defReservationQueue); - } - if (scheduler.getAppsInQueue(expiredReservation).size() > 0) { - scheduler.killAllAppsInQueue(expiredReservation); - LOG.info("Killing applications in queue: {}", expiredReservation); - } else { - scheduler.removeQueue(expiredReservation); - LOG.info("Queue: " + expiredReservation + " removed"); - } - } catch (YarnException e) { - LOG.warn("Exception while trying to expire reservation: {}", - expiredReservation, e); - } } } @Override - public synchronized void setPlans(Collection<Plan> plans) { - this.plans.clear(); - this.plans.addAll(plans); - } - - /** - * Sort in the order from the least new amount of resources asked (likely - * negative) to the highest. This prevents "order-of-operation" errors related - * to exceeding 100% capacity temporarily. - */ - private List<ReservationAllocation> sortByDelta( - List<ReservationAllocation> currentReservations, long now) { - Collections.sort(currentReservations, new ReservationAllocationComparator( - scheduler, now)); - return currentReservations; + protected Resource getPlanResources( + Plan plan, Queue queue, Resource clusterResources) { + PlanQueue planQueue = (PlanQueue)queue; + float planAbsCap = planQueue.getAbsoluteCapacity(); + Resource planResources = Resources.multiply(clusterResources, planAbsCap); + plan.setTotalCapacity(planResources); + return planResources; } - private static class ReservationAllocationComparator implements - Comparator<ReservationAllocation> { - CapacityScheduler scheduler; - long now; - - ReservationAllocationComparator(CapacityScheduler scheduler, long now) { - this.scheduler = scheduler; - this.now = now; - } - - private Resource getUnallocatedReservedResources( - ReservationAllocation reservation) { - Resource resResource; - CSQueue resQueue = - scheduler.getQueue(reservation.getReservationId().toString()); - if (resQueue != null) { - resResource = - Resources.subtract( - reservation.getResourcesAtTime(now), - Resources.multiply(scheduler.getClusterResource(), - resQueue.getAbsoluteCapacity())); - } else { - resResource = reservation.getResourcesAtTime(now); - } - return resResource; - } - - @Override - public int compare(ReservationAllocation lhs, ReservationAllocation rhs) { - // compute delta between current and previous reservation, and compare - // based on that - Resource lhsRes = getUnallocatedReservedResources(lhs); - Resource rhsRes = getUnallocatedReservedResources(rhs); - return lhsRes.compareTo(rhsRes); - } - + @Override + protected Resource getReservationQueueResourceIfExists(Plan plan, + ReservationId reservationId) { + CSQueue resQueue = cs.getQueue(reservationId.toString()); + Resource reservationResource = null; + if (resQueue != null) { + reservationResource = Resources.multiply(cs.getClusterResource(), + resQueue.getAbsoluteCapacity()); + } + return reservationResource; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/798ab512/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java index d1b5275..4a3a35c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java @@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; /** * This interface is used by the components to talk to the @@ -98,6 +99,10 @@ public interface YarnScheduler extends EventHandler<SchedulerEvent> { @Stable public Resource getMaximumResourceCapability(); + @LimitedPrivate("yarn") + @Evolving + ResourceCalculator getResourceCalculator(); + /** * Get the number of nodes available in the cluster. * @return the number of available nodes. http://git-wip-us.apache.org/repos/asf/hadoop/blob/798ab512/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index 5862a73..91bea11 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -544,9 +544,9 @@ public class FSLeafQueue extends FSQueue { } private boolean isStarved(Resource share) { - Resource desiredShare = Resources.min(FairScheduler.getResourceCalculator(), + Resource desiredShare = Resources.min(scheduler.getResourceCalculator(), scheduler.getClusterResource(), share, getDemand()); - return Resources.lessThan(FairScheduler.getResourceCalculator(), + return Resources.lessThan(scheduler.getResourceCalculator(), scheduler.getClusterResource(), getResourceUsage(), desiredShare); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/798ab512/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 49dfc3c..57b41af 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -1094,7 +1094,8 @@ public class FairScheduler extends return super.getApplicationAttempt(appAttemptId); } - public static ResourceCalculator getResourceCalculator() { + @Override + public ResourceCalculator getResourceCalculator() { return RESOURCE_CALCULATOR; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/798ab512/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 3d4c9dd..e006715 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -919,6 +919,11 @@ public class FifoScheduler extends return DEFAULT_QUEUE.getQueueUserAclInfo(null); } + @Override + public ResourceCalculator getResourceCalculator() { + return resourceCalculator; + } + private synchronized void addNode(RMNode nodeManager) { FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager, usePortForNodeName); http://git-wip-us.apache.org/repos/asf/hadoop/blob/798ab512/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java index d93af38..12c2583 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java @@ -104,7 +104,7 @@ public class ReservationSystemTestUtil { .assertTrue(newPlan.getSharingPolicy() instanceof CapacityOverTimePolicy); } - static void setupFSAllocationFile(String allocationFile) + public static void setupFSAllocationFile(String allocationFile) throws IOException { PrintWriter out = new PrintWriter(new FileWriter(allocationFile)); out.println("<?xml version=\"1.0\"?>"); @@ -130,7 +130,7 @@ public class ReservationSystemTestUtil { out.close(); } - static void updateFSAllocationFile(String allocationFile) + public static void updateFSAllocationFile(String allocationFile) throws IOException { PrintWriter out = new PrintWriter(new FileWriter(allocationFile)); out.println("<?xml version=\"1.0\"?>"); http://git-wip-us.apache.org/repos/asf/hadoop/blob/798ab512/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java index 4eedd42..c603f5b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java @@ -33,25 +33,20 @@ import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ReservationId; -import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerContext; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PlanQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; -import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.After; import org.junit.Assert; @@ -62,21 +57,12 @@ import org.junit.rules.TestName; import org.mockito.Matchers; import org.mockito.Mockito; -public class TestCapacitySchedulerPlanFollower { +public class TestCapacitySchedulerPlanFollower extends TestSchedulerPlanFollowerBase { - final static int GB = 1024; - - private Clock mClock = null; - private CapacityScheduler scheduler = null; private RMContext rmContext; private RMContext spyRMContext; private CapacitySchedulerContext csContext; - private ReservationAgent mAgent; - private Plan plan; - private Resource minAlloc = Resource.newInstance(GB, 1); - private Resource maxAlloc = Resource.newInstance(GB * 8, 8); - private ResourceCalculator res = new DefaultResourceCalculator(); - private CapacityOverTimePolicy policy = new CapacityOverTimePolicy(); + private CapacityScheduler cs; @Rule public TestName name = new TestName(); @@ -84,7 +70,9 @@ public class TestCapacitySchedulerPlanFollower { @Before public void setUp() throws Exception { CapacityScheduler spyCs = new CapacityScheduler(); - scheduler = spy(spyCs); + cs = spy(spyCs); + scheduler = cs; + rmContext = TestUtils.getMockRMContext(); spyRMContext = spy(rmContext); @@ -100,7 +88,7 @@ public class TestCapacitySchedulerPlanFollower { new CapacitySchedulerConfiguration(); ReservationSystemTestUtil.setupQueueConfiguration(csConf); - scheduler.setConf(csConf); + cs.setConf(csConf); csContext = mock(CapacitySchedulerContext.class); when(csContext.getConfiguration()).thenReturn(csConf); @@ -119,9 +107,9 @@ public class TestCapacitySchedulerPlanFollower { when(csContext.getContainerTokenSecretManager()).thenReturn( containerTokenSecretManager); - scheduler.setRMContext(spyRMContext); - scheduler.init(csConf); - scheduler.start(); + cs.setRMContext(spyRMContext); + cs.init(csConf); + cs.start(); setupPlanFollower(); } @@ -132,7 +120,7 @@ public class TestCapacitySchedulerPlanFollower { mAgent = mock(ReservationAgent.class); String reservationQ = testUtil.getFullReservationQueueName(); - CapacitySchedulerConfiguration csConf = scheduler.getConfiguration(); + CapacitySchedulerConfiguration csConf = cs.getConfiguration(); csConf.setReservationWindow(reservationQ, 20L); csConf.setMaximumCapacity(reservationQ, 40); csConf.setAverageCapacity(reservationQ, 20); @@ -153,155 +141,51 @@ public class TestCapacitySchedulerPlanFollower { testPlanFollower(false); } - private void testPlanFollower(boolean isMove) throws PlanningException, - InterruptedException, AccessControlException { - // Initialize plan based on move flag - plan = - new InMemoryPlan(scheduler.getRootQueueMetrics(), policy, mAgent, - scheduler.getClusterResource(), 1L, res, - scheduler.getMinimumResourceCapability(), maxAlloc, "dedicated", - null, isMove); - - // add a few reservations to the plan - long ts = System.currentTimeMillis(); - ReservationId r1 = ReservationId.newInstance(ts, 1); - int[] f1 = { 10, 10, 10, 10, 10 }; - assertTrue(plan.toString(), - plan.addReservation(new InMemoryReservationAllocation(r1, null, "u3", - "dedicated", 0, 0 + f1.length, ReservationSystemTestUtil - .generateAllocation(0L, 1L, f1), res, minAlloc))); + @Override + protected void verifyCapacity(Queue defQ) { + CSQueue csQueue = (CSQueue)defQ; + assertTrue(csQueue.getCapacity() > 0.9); + } - ReservationId r2 = ReservationId.newInstance(ts, 2); - assertTrue(plan.toString(), - plan.addReservation(new InMemoryReservationAllocation(r2, null, "u3", - "dedicated", 3, 3 + f1.length, ReservationSystemTestUtil - .generateAllocation(3L, 1L, f1), res, minAlloc))); + @Override + protected Queue getDefaultQueue() { + return cs.getQueue("dedicated" + PlanQueue.DEFAULT_QUEUE_SUFFIX); + } - ReservationId r3 = ReservationId.newInstance(ts, 3); - int[] f2 = { 0, 10, 20, 10, 0 }; - assertTrue(plan.toString(), - plan.addReservation(new InMemoryReservationAllocation(r3, null, "u4", - "dedicated", 10, 10 + f2.length, ReservationSystemTestUtil - .generateAllocation(10L, 1L, f2), res, minAlloc))); + @Override + protected int getNumberOfApplications(Queue queue) { + CSQueue csQueue = (CSQueue)queue; + int numberOfApplications = csQueue.getNumApplications(); + return numberOfApplications; + } + @Override + protected CapacitySchedulerPlanFollower createPlanFollower() { CapacitySchedulerPlanFollower planFollower = new CapacitySchedulerPlanFollower(); planFollower.init(mClock, scheduler, Collections.singletonList(plan)); + return planFollower; + } - when(mClock.getTime()).thenReturn(0L); - planFollower.run(); - - CSQueue defQ = - scheduler.getQueue("dedicated" + PlanQueue.DEFAULT_QUEUE_SUFFIX); - CSQueue q = scheduler.getQueue(r1.toString()); + @Override + protected void assertReservationQueueExists(ReservationId r) { + CSQueue q = cs.getQueue(r.toString()); assertNotNull(q); - // submit an app to r1 - String user_0 = "test-user"; - ApplicationId appId = ApplicationId.newInstance(0, 1); - ApplicationAttemptId appAttemptId_0 = - ApplicationAttemptId.newInstance(appId, 0); - AppAddedSchedulerEvent addAppEvent = - new AppAddedSchedulerEvent(appId, q.getQueueName(), user_0); - scheduler.handle(addAppEvent); - AppAttemptAddedSchedulerEvent appAttemptAddedEvent = - new AppAttemptAddedSchedulerEvent(appAttemptId_0, false); - scheduler.handle(appAttemptAddedEvent); - - // initial default reservation queue should have no apps - Assert.assertEquals(0, defQ.getNumApplications()); - - Assert.assertEquals(0.1, q.getCapacity(), 0.01); - Assert.assertEquals(0.1, q.getMaximumCapacity(), 1.0); - Assert.assertEquals(1, q.getNumApplications()); - - CSQueue q2 = scheduler.getQueue(r2.toString()); - assertNull(q2); - CSQueue q3 = scheduler.getQueue(r3.toString()); - assertNull(q3); - - when(mClock.getTime()).thenReturn(3L); - planFollower.run(); + } - Assert.assertEquals(0, defQ.getNumApplications()); - q = scheduler.getQueue(r1.toString()); + @Override + protected void assertReservationQueueExists(ReservationId r2, + double expectedCapacity, double expectedMaxCapacity) { + CSQueue q = cs.getQueue(r2.toString()); assertNotNull(q); - Assert.assertEquals(0.1, q.getCapacity(), 0.01); - Assert.assertEquals(0.1, q.getMaximumCapacity(), 1.0); - Assert.assertEquals(1, q.getNumApplications()); - q2 = scheduler.getQueue(r2.toString()); - assertNotNull(q2); - Assert.assertEquals(0.1, q.getCapacity(), 0.01); - Assert.assertEquals(0.1, q.getMaximumCapacity(), 1.0); - q3 = scheduler.getQueue(r3.toString()); - assertNull(q3); - - when(mClock.getTime()).thenReturn(10L); - planFollower.run(); - - q = scheduler.getQueue(r1.toString()); - if (isMove) { - // app should have been moved to default reservation queue - Assert.assertEquals(1, defQ.getNumApplications()); - assertNull(q); - } else { - // app should be killed - Assert.assertEquals(0, defQ.getNumApplications()); - assertNotNull(q); - AppAttemptRemovedSchedulerEvent appAttemptRemovedEvent = - new AppAttemptRemovedSchedulerEvent(appAttemptId_0, - RMAppAttemptState.KILLED, false); - scheduler.handle(appAttemptRemovedEvent); - } - q2 = scheduler.getQueue(r2.toString()); - assertNull(q2); - q3 = scheduler.getQueue(r3.toString()); - assertNotNull(q3); - Assert.assertEquals(0, q3.getCapacity(), 0.01); - Assert.assertEquals(1.0, q3.getMaximumCapacity(), 1.0); - - when(mClock.getTime()).thenReturn(11L); - planFollower.run(); - - if (isMove) { - // app should have been moved to default reservation queue - Assert.assertEquals(1, defQ.getNumApplications()); - } else { - // app should be killed - Assert.assertEquals(0, defQ.getNumApplications()); - } - q = scheduler.getQueue(r1.toString()); - assertNull(q); - q2 = scheduler.getQueue(r2.toString()); - assertNull(q2); - q3 = scheduler.getQueue(r3.toString()); - assertNotNull(q3); - Assert.assertEquals(0.1, q3.getCapacity(), 0.01); - Assert.assertEquals(0.1, q3.getMaximumCapacity(), 1.0); - - when(mClock.getTime()).thenReturn(12L); - planFollower.run(); - - q = scheduler.getQueue(r1.toString()); - assertNull(q); - q2 = scheduler.getQueue(r2.toString()); - assertNull(q2); - q3 = scheduler.getQueue(r3.toString()); - assertNotNull(q3); - Assert.assertEquals(0.2, q3.getCapacity(), 0.01); - Assert.assertEquals(0.2, q3.getMaximumCapacity(), 1.0); - - when(mClock.getTime()).thenReturn(16L); - planFollower.run(); + Assert.assertEquals(expectedCapacity, q.getCapacity(), 0.01); + Assert.assertEquals(expectedMaxCapacity, q.getMaximumCapacity(), 1.0); + } - q = scheduler.getQueue(r1.toString()); - assertNull(q); - q2 = scheduler.getQueue(r2.toString()); + @Override + protected void assertReservationQueueDoesNotExist(ReservationId r2) { + CSQueue q2 = cs.getQueue(r2.toString()); assertNull(q2); - q3 = scheduler.getQueue(r3.toString()); - assertNull(q3); - - assertTrue(defQ.getCapacity() > 0.9); - } public static ApplicationACLsManager mockAppACLsManager() { @@ -312,8 +196,11 @@ public class TestCapacitySchedulerPlanFollower { @After public void tearDown() throws Exception { if (scheduler != null) { - scheduler.stop(); + cs.stop(); } } + protected Queue getReservationQueue(String reservationId) { + return cs.getQueue(reservationId); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/798ab512/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSchedulerPlanFollowerBase.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSchedulerPlanFollowerBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSchedulerPlanFollowerBase.java new file mode 100644 index 0000000..50df8fe --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSchedulerPlanFollowerBase.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.junit.Assert; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.when; + +public abstract class TestSchedulerPlanFollowerBase { + final static int GB = 1024; + protected Clock mClock = null; + protected ResourceScheduler scheduler = null; + protected ReservationAgent mAgent; + protected Resource minAlloc = Resource.newInstance(GB, 1); + protected Resource maxAlloc = Resource.newInstance(GB * 8, 8); + protected CapacityOverTimePolicy policy = new CapacityOverTimePolicy(); + protected Plan plan; + private ResourceCalculator res = new DefaultResourceCalculator(); + + protected void testPlanFollower(boolean isMove) throws PlanningException, + InterruptedException, AccessControlException { + // Initialize plan based on move flag + plan = + new InMemoryPlan(scheduler.getRootQueueMetrics(), policy, mAgent, + scheduler.getClusterResource(), 1L, res, + scheduler.getMinimumResourceCapability(), maxAlloc, "dedicated", + null, isMove); + + // add a few reservations to the plan + long ts = System.currentTimeMillis(); + ReservationId r1 = ReservationId.newInstance(ts, 1); + int[] f1 = { 10, 10, 10, 10, 10 }; + assertTrue(plan.toString(), + plan.addReservation(new InMemoryReservationAllocation(r1, null, "u3", + "dedicated", 0, 0 + f1.length, ReservationSystemTestUtil + .generateAllocation(0L, 1L, f1), res, minAlloc))); + + ReservationId r2 = ReservationId.newInstance(ts, 2); + assertTrue(plan.toString(), + plan.addReservation(new InMemoryReservationAllocation(r2, null, "u3", + "dedicated", 3, 3 + f1.length, ReservationSystemTestUtil + .generateAllocation(3L, 1L, f1), res, minAlloc))); + + ReservationId r3 = ReservationId.newInstance(ts, 3); + int[] f2 = { 0, 10, 20, 10, 0 }; + assertTrue(plan.toString(), + plan.addReservation(new InMemoryReservationAllocation(r3, null, "u4", + "dedicated", 10, 10 + f2.length, ReservationSystemTestUtil + .generateAllocation(10L, 1L, f2), res, minAlloc))); + + AbstractSchedulerPlanFollower planFollower = createPlanFollower(); + + when(mClock.getTime()).thenReturn(0L); + planFollower.run(); + + Queue q = getReservationQueue(r1.toString()); + assertReservationQueueExists(r1); + // submit an app to r1 + String user_0 = "test-user"; + ApplicationId appId = ApplicationId.newInstance(0, 1); + ApplicationAttemptId appAttemptId_0 = + ApplicationAttemptId.newInstance(appId, 0); + AppAddedSchedulerEvent addAppEvent = + new AppAddedSchedulerEvent(appId, q.getQueueName(), user_0); + scheduler.handle(addAppEvent); + AppAttemptAddedSchedulerEvent appAttemptAddedEvent = + new AppAttemptAddedSchedulerEvent(appAttemptId_0, false); + scheduler.handle(appAttemptAddedEvent); + + // initial default reservation queue should have no apps + + Queue defQ = getDefaultQueue(); + Assert.assertEquals(0, getNumberOfApplications(defQ)); + + assertReservationQueueExists(r1, 0.1, 0.1); + Assert.assertEquals(1, getNumberOfApplications(q)); + + assertReservationQueueDoesNotExist(r2); + assertReservationQueueDoesNotExist(r3); + + when(mClock.getTime()).thenReturn(3L); + planFollower.run(); + + Assert.assertEquals(0, getNumberOfApplications(defQ)); + assertReservationQueueExists(r1, 0.1, 0.1); + Assert.assertEquals(1, getNumberOfApplications(q)); + assertReservationQueueExists(r2, 0.1, 0.1); + assertReservationQueueDoesNotExist(r3); + + when(mClock.getTime()).thenReturn(10L); + planFollower.run(); + + q = getReservationQueue(r1.toString()); + if (isMove) { + // app should have been moved to default reservation queue + Assert.assertEquals(1, getNumberOfApplications(defQ)); + assertNull(q); + } else { + // app should be killed + Assert.assertEquals(0, getNumberOfApplications(defQ)); + assertNotNull(q); + AppAttemptRemovedSchedulerEvent appAttemptRemovedEvent = + new AppAttemptRemovedSchedulerEvent(appAttemptId_0, + RMAppAttemptState.KILLED, false); + scheduler.handle(appAttemptRemovedEvent); + } + assertReservationQueueDoesNotExist(r2); + assertReservationQueueExists(r3, 0, 1.0); + + when(mClock.getTime()).thenReturn(11L); + planFollower.run(); + + if (isMove) { + // app should have been moved to default reservation queue + Assert.assertEquals(1, getNumberOfApplications(defQ)); + } else { + // app should be killed + Assert.assertEquals(0, getNumberOfApplications(defQ)); + } + assertReservationQueueDoesNotExist(r1); + assertReservationQueueDoesNotExist(r2); + assertReservationQueueExists(r3, 0.1, 0.1); + + when(mClock.getTime()).thenReturn(12L); + planFollower.run(); + + assertReservationQueueDoesNotExist(r1); + assertReservationQueueDoesNotExist(r2); + assertReservationQueueExists(r3, 0.2, 0.2); + + when(mClock.getTime()).thenReturn(16L); + planFollower.run(); + + assertReservationQueueDoesNotExist(r1); + assertReservationQueueDoesNotExist(r2); + assertReservationQueueDoesNotExist(r3); + + verifyCapacity(defQ); + } + + protected abstract Queue getReservationQueue(String reservationId); + + protected abstract void verifyCapacity(Queue defQ); + + protected abstract Queue getDefaultQueue(); + + protected abstract int getNumberOfApplications(Queue queue); + + protected abstract AbstractSchedulerPlanFollower createPlanFollower(); + + protected abstract void assertReservationQueueExists(ReservationId r); + + protected abstract void assertReservationQueueExists(ReservationId r2, + double expectedCapacity, double expectedMaxCapacity); + + protected abstract void assertReservationQueueDoesNotExist(ReservationId r2); +}