Repository: hadoop
Updated Branches:
  refs/heads/branch-2 cde5bfe3e -> 798ab5128


YARN-2998. Abstract out scheduler independent PlanFollower components. (Anubhav 
Dhoot via kasha)

(cherry picked from commit e7257acd8a7adb74d81cd1d009d4a99f023ed844)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/798ab512
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/798ab512
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/798ab512

Branch: refs/heads/branch-2
Commit: 798ab51289e1552778433379b9157d1d4082fffc
Parents: cde5bfe
Author: Karthik Kambatla <ka...@apache.org>
Authored: Tue Dec 30 19:52:43 2014 -0800
Committer: Karthik Kambatla <ka...@apache.org>
Committed: Tue Dec 30 19:55:50 2014 -0800

----------------------------------------------------------------------
 .../sls/scheduler/ResourceSchedulerWrapper.java |   6 +
 hadoop-yarn-project/CHANGES.txt                 |   5 +
 .../AbstractSchedulerPlanFollower.java          | 412 +++++++++++++++++++
 .../CapacitySchedulerPlanFollower.java          | 361 ++++------------
 .../scheduler/YarnScheduler.java                |   5 +
 .../scheduler/fair/FSLeafQueue.java             |   4 +-
 .../scheduler/fair/FairScheduler.java           |   3 +-
 .../scheduler/fifo/FifoScheduler.java           |   5 +
 .../reservation/ReservationSystemTestUtil.java  |   4 +-
 .../TestCapacitySchedulerPlanFollower.java      | 209 +++-------
 .../TestSchedulerPlanFollowerBase.java          | 191 +++++++++
 11 files changed, 755 insertions(+), 450 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/798ab512/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java
 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java
index 3ac3382..e729363 100644
--- 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java
+++ 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java
@@ -84,6 +84,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoSchedule
 import org.apache.hadoop.yarn.sls.SLSRunner;
 import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
 import org.apache.hadoop.yarn.sls.web.SLSWebApp;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.apache.log4j.Logger;
 
@@ -868,6 +869,11 @@ public class ResourceSchedulerWrapper
   }
 
   @Override
+  public ResourceCalculator getResourceCalculator() {
+    return scheduler.getResourceCalculator();
+  }
+
+  @Override
   public int getNumClusterNodes() {
     return scheduler.getNumClusterNodes();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/798ab512/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 6c280cb..a135f1e 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -38,6 +38,7 @@ Release 2.7.0 - UNRELEASED
     YARN-2738. [YARN-2574] Add FairReservationSystem for FairScheduler. 
     (Anubhav Dhoot via kasha)
 
+
   IMPROVEMENTS
 
     YARN-2950. Change message to mandate, not suggest JS requirement on UI.
@@ -125,6 +126,10 @@ Release 2.7.0 - UNRELEASED
 
     YARN-2943. Added node-labels page on RM web UI. (Wangda Tan via jianhe)
 
+    YARN-2998. Abstract out scheduler independent PlanFollower components. 
+    (Anubhav Dhoot via kasha)
+
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/798ab512/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractSchedulerPlanFollower.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractSchedulerPlanFollower.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractSchedulerPlanFollower.java
new file mode 100644
index 0000000..0de4dcf
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractSchedulerPlanFollower.java
@@ -0,0 +1,412 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PlanQueue;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
+
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
+  private static final Logger LOG = LoggerFactory
+      .getLogger(CapacitySchedulerPlanFollower.class);
+
+  protected Collection<Plan> plans = new ArrayList<Plan>();
+  protected YarnScheduler scheduler;
+  protected Clock clock;
+
+  @Override
+  public void init(Clock clock, ResourceScheduler sched, Collection<Plan> 
plans) {
+    this.clock = clock;
+    this.scheduler = sched;
+    this.plans.addAll(plans);
+  }
+
+  @Override
+  public synchronized void run() {
+    for (Plan plan : plans) {
+      synchronizePlan(plan);
+    }
+  }
+
+  @Override
+  public synchronized void setPlans(Collection<Plan> plans) {
+    this.plans.clear();
+    this.plans.addAll(plans);
+  }
+
+  @Override
+  public synchronized void synchronizePlan(Plan plan) {
+     String planQueueName = plan.getQueueName();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Running plan follower edit policy for plan: " + 
planQueueName);
+    }
+    // align with plan step
+    long step = plan.getStep();
+    long now = clock.getTime();
+    if (now % step != 0) {
+      now += step - (now % step);
+    }
+    Queue planQueue = getPlanQueue(planQueueName);
+    if (planQueue == null) return;
+
+    // first we publish to the plan the current availability of resources
+    Resource clusterResources = scheduler.getClusterResource();
+    Resource planResources = getPlanResources(plan, planQueue,
+        clusterResources);
+
+    Set<ReservationAllocation> currentReservations =
+        plan.getReservationsAtTime(now);
+    Set<String> curReservationNames = new HashSet<String>();
+    Resource reservedResources = Resource.newInstance(0, 0);
+    int numRes = getReservedResources(now, currentReservations,
+        curReservationNames, reservedResources);
+
+    // create the default reservation queue if it doesnt exist
+    String defReservationId = getReservationIdFromQueueName(planQueueName) +
+        PlanQueue.DEFAULT_QUEUE_SUFFIX;
+    String defReservationQueue = getReservationQueueName(planQueueName,
+        defReservationId);
+    createDefaultReservationQueue(planQueueName, planQueue,
+        defReservationId);
+    curReservationNames.add(defReservationId);
+
+    // if the resources dedicated to this plan has shrunk invoke replanner
+    if (arePlanResourcesLessThanReservations(clusterResources, planResources,
+        reservedResources)) {
+      try {
+        plan.getReplanner().plan(plan, null);
+      } catch (PlanningException e) {
+        LOG.warn("Exception while trying to replan: {}", planQueueName, e);
+      }
+    }
+    // identify the reservations that have expired and new reservations that
+    // have to be activated
+    List<? extends Queue> resQueues = getChildReservationQueues(planQueue);
+    Set<String> expired = new HashSet<String>();
+    for (Queue resQueue : resQueues) {
+      String resQueueName = resQueue.getQueueName();
+      String reservationId = getReservationIdFromQueueName(resQueueName);
+      if (curReservationNames.contains(reservationId)) {
+        // it is already existing reservation, so needed not create new
+        // reservation queue
+        curReservationNames.remove(reservationId);
+      } else {
+        // the reservation has termination, mark for cleanup
+        expired.add(reservationId);
+      }
+    }
+    // garbage collect expired reservations
+    cleanupExpiredQueues(planQueueName, plan.getMoveOnExpiry(), expired,
+        defReservationQueue);
+
+    // Add new reservations and update existing ones
+    float totalAssignedCapacity = 0f;
+    if (currentReservations != null) {
+      // first release all excess capacity in default queue
+      try {
+        setQueueEntitlement(planQueueName, defReservationQueue, 0f, 1.0f);
+      } catch (YarnException e) {
+        LOG.warn(
+            "Exception while trying to release default queue capacity for 
plan: {}",
+            planQueueName, e);
+      }
+      // sort allocations from the one giving up the most resources, to the
+      // one asking for the most
+      // avoid order-of-operation errors that temporarily violate 100%
+      // capacity bound
+      List<ReservationAllocation> sortedAllocations =
+          sortByDelta(
+              new ArrayList<ReservationAllocation>(currentReservations), now,
+              plan);
+      for (ReservationAllocation res : sortedAllocations) {
+        String currResId = res.getReservationId().toString();
+        if (curReservationNames.contains(currResId)) {
+          addReservationQueue(planQueueName, planQueue, currResId);
+        }
+        Resource capToAssign = res.getResourcesAtTime(now);
+        float targetCapacity = 0f;
+        if (planResources.getMemory() > 0
+            && planResources.getVirtualCores() > 0) {
+          targetCapacity =
+              calculateReservationToPlanRatio(clusterResources,
+                  planResources,
+                  capToAssign);
+        }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(
+              "Assigning capacity of {} to queue {} with target capacity {}",
+              capToAssign, currResId, targetCapacity);
+        }
+        // set maxCapacity to 100% unless the job requires gang, in which
+        // case we stick to capacity (as running early/before is likely a
+        // waste of resources)
+        float maxCapacity = 1.0f;
+        if (res.containsGangs()) {
+          maxCapacity = targetCapacity;
+        }
+        try {
+          setQueueEntitlement(planQueueName, currResId, targetCapacity, 
maxCapacity);
+        } catch (YarnException e) {
+          LOG.warn("Exception while trying to size reservation for plan: {}",
+              currResId, planQueueName, e);
+        }
+        totalAssignedCapacity += targetCapacity;
+      }
+    }
+    // compute the default queue capacity
+    float defQCap = 1.0f - totalAssignedCapacity;
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("PlanFollowerEditPolicyTask: total Plan Capacity: {} "
+          + "currReservation: {} default-queue capacity: {}", planResources,
+          numRes, defQCap);
+    }
+    // set the default queue to eat-up all remaining capacity
+    try {
+      setQueueEntitlement(planQueueName, defReservationQueue, defQCap, 1.0f);
+    } catch (YarnException e) {
+      LOG.warn(
+          "Exception while trying to reclaim default queue capacity for plan: 
{}",
+          planQueueName, e);
+    }
+    // garbage collect finished reservations from plan
+    try {
+      plan.archiveCompletedReservations(now);
+    } catch (PlanningException e) {
+      LOG.error("Exception in archiving completed reservations: ", e);
+    }
+    LOG.info("Finished iteration of plan follower edit policy for plan: "
+        + planQueueName);
+
+    // Extension: update plan with app states,
+    // useful to support smart replanning
+  }
+
+  protected String getReservationIdFromQueueName(String resQueueName) {
+    return resQueueName;
+  }
+
+  protected void setQueueEntitlement(String planQueueName, String currResId,
+      float targetCapacity,
+      float maxCapacity) throws YarnException {
+    String reservationQueueName = getReservationQueueName(planQueueName,
+        currResId);
+    scheduler.setEntitlement(reservationQueueName, new QueueEntitlement(
+        targetCapacity, maxCapacity));
+  }
+
+  // Schedulers have different ways of naming queues. See YARN-2773
+  protected String getReservationQueueName(String planQueueName,
+      String reservationId) {
+    return reservationId;
+  }
+
+  /**
+   * First sets entitlement of queues to zero to prevent new app submission.
+   * Then move all apps in the set of queues to the parent plan queue's default
+   * reservation queue if move is enabled. Finally cleanups the queue by 
killing
+   * any apps (if move is disabled or move failed) and removing the queue
+   */
+  protected void cleanupExpiredQueues(String planQueueName,
+      boolean shouldMove, Set<String> toRemove, String defReservationQueue) {
+    for (String expiredReservationId : toRemove) {
+      try {
+        // reduce entitlement to 0
+        String expiredReservation = getReservationQueueName(planQueueName,
+            expiredReservationId);
+        setQueueEntitlement(planQueueName, expiredReservation, 0.0f, 0.0f);
+        if (shouldMove) {
+          moveAppsInQueueSync(expiredReservation, defReservationQueue);
+        }
+        if (scheduler.getAppsInQueue(expiredReservation).size() > 0) {
+          scheduler.killAllAppsInQueue(expiredReservation);
+          LOG.info("Killing applications in queue: {}", expiredReservation);
+        } else {
+          scheduler.removeQueue(expiredReservation);
+          LOG.info("Queue: " + expiredReservation + " removed");
+        }
+      } catch (YarnException e) {
+        LOG.warn("Exception while trying to expire reservation: {}",
+            expiredReservationId, e);
+      }
+    }
+  }
+
+  /**
+   * Move all apps in the set of queues to the parent plan queue's default
+   * reservation queue in a synchronous fashion
+   */
+  private void moveAppsInQueueSync(String expiredReservation,
+                                   String defReservationQueue) {
+    List<ApplicationAttemptId> activeApps =
+        scheduler.getAppsInQueue(expiredReservation);
+    if (activeApps.isEmpty()) {
+      return;
+    }
+    for (ApplicationAttemptId app : activeApps) {
+      // fallback to parent's default queue
+      try {
+        scheduler.moveApplication(app.getApplicationId(), defReservationQueue);
+      } catch (YarnException e) {
+        LOG.warn(
+            "Encountered unexpected error during migration of application: {}" 
+
+                " from reservation: {}",
+            app, expiredReservation, e);
+      }
+    }
+  }
+
+  protected int getReservedResources(long now, Set<ReservationAllocation>
+      currentReservations, Set<String> curReservationNames,
+                                     Resource reservedResources) {
+    int numRes = 0;
+    if (currentReservations != null) {
+      numRes = currentReservations.size();
+      for (ReservationAllocation reservation : currentReservations) {
+        curReservationNames.add(reservation.getReservationId().toString());
+        Resources.addTo(reservedResources, 
reservation.getResourcesAtTime(now));
+      }
+    }
+    return numRes;
+  }
+
+  /**
+   * Sort in the order from the least new amount of resources asked (likely
+   * negative) to the highest. This prevents "order-of-operation" errors 
related
+   * to exceeding 100% capacity temporarily.
+   */
+  protected List<ReservationAllocation> sortByDelta(
+      List<ReservationAllocation> currentReservations, long now, Plan plan) {
+    Collections.sort(currentReservations, new ReservationAllocationComparator(
+        now, this, plan));
+    return currentReservations;
+  }
+
+  /**
+   * Get queue associated with reservable queue named
+   * @param planQueueName Name of the reservable queue
+   * @return queue associated with the reservable queue
+   */
+  protected abstract Queue getPlanQueue(String planQueueName);
+
+  /**
+   * Calculates ratio of reservationResources to planResources
+   */
+  protected abstract float calculateReservationToPlanRatio(
+      Resource clusterResources, Resource planResources,
+      Resource reservationResources);
+
+  /**
+   * Check if plan resources are less than expected reservation resources
+   */
+  protected abstract boolean arePlanResourcesLessThanReservations(
+      Resource clusterResources, Resource planResources,
+      Resource reservedResources);
+
+  /**
+   * Get a list of reservation queues for this planQueue
+   */
+  protected abstract List<? extends Queue> getChildReservationQueues(
+      Queue planQueue);
+
+  /**
+   * Add a new reservation queue for reservation currResId for this planQueue
+   */
+  protected abstract void addReservationQueue(
+      String planQueueName, Queue queue, String currResId);
+
+  /**
+   * Creates the default reservation queue for use when no reservation is
+   * used for applications submitted to this planQueue
+   */
+  protected abstract void createDefaultReservationQueue(
+      String planQueueName, Queue queue, String defReservationQueue);
+
+  /**
+   * Get plan resources for this planQueue
+   */
+  protected abstract Resource getPlanResources(
+      Plan plan, Queue queue, Resource clusterResources);
+
+  /**
+   * Get reservation queue resources if it exists otherwise return null
+   */
+  protected abstract Resource getReservationQueueResourceIfExists(Plan plan,
+      ReservationId reservationId);
+
+  private static class ReservationAllocationComparator implements
+      Comparator<ReservationAllocation> {
+    AbstractSchedulerPlanFollower planFollower;
+    long now;
+    Plan plan;
+
+    ReservationAllocationComparator(long now,
+        AbstractSchedulerPlanFollower planFollower, Plan plan) {
+      this.now = now;
+      this.planFollower = planFollower;
+      this.plan = plan;
+    }
+
+    private Resource getUnallocatedReservedResources(
+        ReservationAllocation reservation) {
+      Resource resResource;
+      Resource reservationResource = planFollower
+          .getReservationQueueResourceIfExists
+              (plan, reservation.getReservationId());
+      if (reservationResource != null) {
+        resResource =
+            Resources.subtract(
+                reservation.getResourcesAtTime(now),
+                reservationResource);
+      } else {
+        resResource = reservation.getResourcesAtTime(now);
+      }
+      return resResource;
+    }
+
+    @Override
+    public int compare(ReservationAllocation lhs, ReservationAllocation rhs) {
+      // compute delta between current and previous reservation, and compare
+      // based on that
+      Resource lhsRes = getUnallocatedReservedResources(lhs);
+      Resource rhsRes = getUnallocatedReservedResources(rhs);
+      return lhsRes.compareTo(rhsRes);
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/hadoop/blob/798ab512/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java
index 126560a..61772c9 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java
@@ -19,26 +19,19 @@
 package org.apache.hadoop.yarn.server.resourcemanager.reservation;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
 
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PlanQueue;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ReservationQueue;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.slf4j.Logger;
@@ -58,319 +51,119 @@ import org.slf4j.LoggerFactory;
  * differences among existing queues). This makes it resilient to frequency of
  * synchronization, and RM restart issues (no "catch up" is necessary).
  */
-public class CapacitySchedulerPlanFollower implements PlanFollower {
+public class CapacitySchedulerPlanFollower extends 
AbstractSchedulerPlanFollower {
 
   private static final Logger LOG = LoggerFactory
       .getLogger(CapacitySchedulerPlanFollower.class);
 
-  private Collection<Plan> plans = new ArrayList<Plan>();
-
-  private Clock clock;
-  private CapacityScheduler scheduler;
+  private CapacityScheduler cs;
 
   @Override
   public void init(Clock clock, ResourceScheduler sched, Collection<Plan> 
plans) {
+    super.init(clock, sched, plans);
     LOG.info("Initializing Plan Follower Policy:"
         + this.getClass().getCanonicalName());
     if (!(sched instanceof CapacityScheduler)) {
       throw new YarnRuntimeException(
           "CapacitySchedulerPlanFollower can only work with 
CapacityScheduler");
     }
-    this.clock = clock;
-    this.scheduler = (CapacityScheduler) sched;
-    this.plans.addAll(plans);
+    this.cs = (CapacityScheduler) sched;
   }
 
   @Override
-  public synchronized void run() {
-    for (Plan plan : plans) {
-      synchronizePlan(plan);
+  protected Queue getPlanQueue(String planQueueName) {
+    CSQueue queue = cs.getQueue(planQueueName);
+    if (!(queue instanceof PlanQueue)) {
+      LOG.error("The Plan is not an PlanQueue!");
+      return null;
     }
+    return queue;
   }
 
   @Override
-  public synchronized void synchronizePlan(Plan plan) {
-    String planQueueName = plan.getQueueName();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Running plan follower edit policy for plan: " + 
planQueueName);
-    }
-    // align with plan step
-    long step = plan.getStep();
-    long now = clock.getTime();
-    if (now % step != 0) {
-      now += step - (now % step);
-    }
-    CSQueue queue = scheduler.getQueue(planQueueName);
-    if (!(queue instanceof PlanQueue)) {
-      LOG.error("The Plan is not an PlanQueue!");
-      return;
-    }
-    PlanQueue planQueue = (PlanQueue) queue;
-    // first we publish to the plan the current availability of resources
-    Resource clusterResources = scheduler.getClusterResource();
-    float planAbsCap = planQueue.getAbsoluteCapacity();
-    Resource planResources = Resources.multiply(clusterResources, planAbsCap);
-    plan.setTotalCapacity(planResources);
+  protected float calculateReservationToPlanRatio(
+      Resource clusterResources, Resource planResources,
+      Resource reservationResources) {
+    return Resources.divide(cs.getResourceCalculator(),
+        clusterResources, reservationResources, planResources);
+  }
 
-    Set<ReservationAllocation> currentReservations =
-        plan.getReservationsAtTime(now);
-    Set<String> curReservationNames = new HashSet<String>();
-    Resource reservedResources = Resource.newInstance(0, 0);
-    int numRes = 0;
-    if (currentReservations != null) {
-      numRes = currentReservations.size();
-      for (ReservationAllocation reservation : currentReservations) {
-        curReservationNames.add(reservation.getReservationId().toString());
-        Resources.addTo(reservedResources, 
reservation.getResourcesAtTime(now));
-      }
+  @Override
+  protected boolean arePlanResourcesLessThanReservations(
+      Resource clusterResources, Resource planResources,
+      Resource reservedResources) {
+    return Resources.greaterThan(cs.getResourceCalculator(),
+        clusterResources, reservedResources, planResources);
+  }
+
+  @Override
+  protected List<? extends Queue> getChildReservationQueues(Queue queue) {
+    PlanQueue planQueue = (PlanQueue)queue;
+    List<CSQueue> childQueues = planQueue.getChildQueues();
+    return childQueues;
+  }
+
+  @Override
+  protected void addReservationQueue(
+      String planQueueName, Queue queue, String currResId) {
+    PlanQueue planQueue = (PlanQueue)queue;
+    try {
+      ReservationQueue resQueue =
+          new ReservationQueue(cs, currResId, planQueue);
+      cs.addQueue(resQueue);
+    } catch (SchedulerDynamicEditException e) {
+      LOG.warn(
+          "Exception while trying to activate reservation: {} for plan: {}",
+          currResId, planQueueName, e);
+    } catch (IOException e) {
+      LOG.warn(
+          "Exception while trying to activate reservation: {} for plan: {}",
+          currResId, planQueueName, e);
     }
-    // create the default reservation queue if it doesnt exist
-    String defReservationQueue = planQueueName + 
PlanQueue.DEFAULT_QUEUE_SUFFIX;
-    if (scheduler.getQueue(defReservationQueue) == null) {
+  }
+
+  @Override
+  protected void createDefaultReservationQueue(
+      String planQueueName, Queue queue, String defReservationId) {
+    PlanQueue planQueue = (PlanQueue)queue;
+    if (cs.getQueue(defReservationId) == null) {
       try {
         ReservationQueue defQueue =
-            new ReservationQueue(scheduler, defReservationQueue, planQueue);
-        scheduler.addQueue(defQueue);
+            new ReservationQueue(cs, defReservationId, planQueue);
+        cs.addQueue(defQueue);
       } catch (SchedulerDynamicEditException e) {
         LOG.warn(
             "Exception while trying to create default reservation queue for 
plan: {}",
             planQueueName, e);
       } catch (IOException e) {
         LOG.warn(
-            "Exception while trying to create default reservation queue for 
plan: {}",
-            planQueueName, e);
-      }
-    }
-    curReservationNames.add(defReservationQueue);
-    // if the resources dedicated to this plan has shrunk invoke replanner
-    if (Resources.greaterThan(scheduler.getResourceCalculator(),
-        clusterResources, reservedResources, planResources)) {
-      try {
-        plan.getReplanner().plan(plan, null);
-      } catch (PlanningException e) {
-        LOG.warn("Exception while trying to replan: {}", planQueueName, e);
-      }
-    }
-    // identify the reservations that have expired and new reservations that
-    // have to be activated
-    List<CSQueue> resQueues = planQueue.getChildQueues();
-    Set<String> expired = new HashSet<String>();
-    for (CSQueue resQueue : resQueues) {
-      String resQueueName = resQueue.getQueueName();
-      if (curReservationNames.contains(resQueueName)) {
-        // it is already existing reservation, so needed not create new
-        // reservation queue
-        curReservationNames.remove(resQueueName);
-      } else {
-        // the reservation has termination, mark for cleanup
-        expired.add(resQueueName);
-      }
-    }
-    // garbage collect expired reservations
-    cleanupExpiredQueues(plan.getMoveOnExpiry(), expired, defReservationQueue);
-
-    // Add new reservations and update existing ones
-    float totalAssignedCapacity = 0f;
-    if (currentReservations != null) {
-      // first release all excess capacity in default queue
-      try {
-        scheduler.setEntitlement(defReservationQueue, new QueueEntitlement(0f,
-            1.0f));
-      } catch (YarnException e) {
-        LOG.warn(
-            "Exception while trying to release default queue capacity for 
plan: {}",
+            "Exception while trying to create default reservation queue for " +
+                "plan: {}",
             planQueueName, e);
       }
-      // sort allocations from the one giving up the most resources, to the
-      // one asking for the most
-      // avoid order-of-operation errors that temporarily violate 100%
-      // capacity bound
-      List<ReservationAllocation> sortedAllocations =
-          sortByDelta(
-              new ArrayList<ReservationAllocation>(currentReservations), now);
-      for (ReservationAllocation res : sortedAllocations) {
-        String currResId = res.getReservationId().toString();
-        if (curReservationNames.contains(currResId)) {
-          try {
-            ReservationQueue resQueue =
-                new ReservationQueue(scheduler, currResId, planQueue);
-            scheduler.addQueue(resQueue);
-          } catch (SchedulerDynamicEditException e) {
-            LOG.warn(
-                "Exception while trying to activate reservation: {} for plan: 
{}",
-                currResId, planQueueName, e);
-          } catch (IOException e) {
-            LOG.warn(
-                "Exception while trying to activate reservation: {} for plan: 
{}",
-                currResId, planQueueName, e);
-          }
-        }
-        Resource capToAssign = res.getResourcesAtTime(now);
-        float targetCapacity = 0f;
-        if (planResources.getMemory() > 0
-            && planResources.getVirtualCores() > 0) {
-          targetCapacity =
-              Resources.divide(scheduler.getResourceCalculator(),
-                  clusterResources, capToAssign, planResources);
-        }
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(
-              "Assigning capacity of {} to queue {} with target capacity {}",
-              capToAssign, currResId, targetCapacity);
-        }
-        // set maxCapacity to 100% unless the job requires gang, in which
-        // case we stick to capacity (as running early/before is likely a
-        // waste of resources)
-        float maxCapacity = 1.0f;
-        if (res.containsGangs()) {
-          maxCapacity = targetCapacity;
-        }
-        try {
-          scheduler.setEntitlement(currResId, new QueueEntitlement(
-              targetCapacity, maxCapacity));
-        } catch (YarnException e) {
-          LOG.warn("Exception while trying to size reservation for plan: {}",
-              currResId, planQueueName, e);
-        }
-        totalAssignedCapacity += targetCapacity;
-      }
-    }
-    // compute the default queue capacity
-    float defQCap = 1.0f - totalAssignedCapacity;
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("PlanFollowerEditPolicyTask: total Plan Capacity: {} "
-          + "currReservation: {} default-queue capacity: {}", planResources,
-          numRes, defQCap);
-    }
-    // set the default queue to eat-up all remaining capacity
-    try {
-      scheduler.setEntitlement(defReservationQueue, new QueueEntitlement(
-          defQCap, 1.0f));
-    } catch (YarnException e) {
-      LOG.warn(
-          "Exception while trying to reclaim default queue capacity for plan: 
{}",
-          planQueueName, e);
-    }
-    // garbage collect finished reservations from plan
-    try {
-      plan.archiveCompletedReservations(now);
-    } catch (PlanningException e) {
-      LOG.error("Exception in archiving completed reservations: ", e);
-    }
-    LOG.info("Finished iteration of plan follower edit policy for plan: "
-        + planQueueName);
-
-    // Extension: update plan with app states,
-    // useful to support smart replanning
-  }
-
-  /**
-   * Move all apps in the set of queues to the parent plan queue's default
-   * reservation queue in a synchronous fashion
-   */
-  private void moveAppsInQueueSync(String expiredReservation,
-      String defReservationQueue) {
-    List<ApplicationAttemptId> activeApps =
-        scheduler.getAppsInQueue(expiredReservation);
-    if (activeApps.isEmpty()) {
-      return;
-    }
-    for (ApplicationAttemptId app : activeApps) {
-      // fallback to parent's default queue
-      try {
-        scheduler.moveApplication(app.getApplicationId(), defReservationQueue);
-      } catch (YarnException e) {
-        LOG.warn(
-            "Encountered unexpected error during migration of application: {} 
from reservation: {}",
-            app, expiredReservation, e);
-      }
-    }
-  }
-
-  /**
-   * First sets entitlement of queues to zero to prevent new app submission.
-   * Then move all apps in the set of queues to the parent plan queue's default
-   * reservation queue if move is enabled. Finally cleanups the queue by 
killing
-   * any apps (if move is disabled or move failed) and removing the queue
-   */
-  private void cleanupExpiredQueues(boolean shouldMove, Set<String> toRemove,
-      String defReservationQueue) {
-    for (String expiredReservation : toRemove) {
-      try {
-        // reduce entitlement to 0
-        scheduler.setEntitlement(expiredReservation, new QueueEntitlement(0.0f,
-            0.0f));
-        if (shouldMove) {
-          moveAppsInQueueSync(expiredReservation, defReservationQueue);
-        }
-        if (scheduler.getAppsInQueue(expiredReservation).size() > 0) {
-          scheduler.killAllAppsInQueue(expiredReservation);
-          LOG.info("Killing applications in queue: {}", expiredReservation);
-        } else {
-          scheduler.removeQueue(expiredReservation);
-          LOG.info("Queue: " + expiredReservation + " removed");
-        }
-      } catch (YarnException e) {
-        LOG.warn("Exception while trying to expire reservation: {}",
-            expiredReservation, e);
-      }
     }
   }
 
   @Override
-  public synchronized void setPlans(Collection<Plan> plans) {
-    this.plans.clear();
-    this.plans.addAll(plans);
-  }
-
-  /**
-   * Sort in the order from the least new amount of resources asked (likely
-   * negative) to the highest. This prevents "order-of-operation" errors 
related
-   * to exceeding 100% capacity temporarily.
-   */
-  private List<ReservationAllocation> sortByDelta(
-      List<ReservationAllocation> currentReservations, long now) {
-    Collections.sort(currentReservations, new ReservationAllocationComparator(
-        scheduler, now));
-    return currentReservations;
+  protected Resource getPlanResources(
+      Plan plan, Queue queue, Resource clusterResources) {
+    PlanQueue planQueue = (PlanQueue)queue;
+    float planAbsCap = planQueue.getAbsoluteCapacity();
+    Resource planResources = Resources.multiply(clusterResources, planAbsCap);
+    plan.setTotalCapacity(planResources);
+    return planResources;
   }
 
-  private static class ReservationAllocationComparator implements
-      Comparator<ReservationAllocation> {
-    CapacityScheduler scheduler;
-    long now;
-
-    ReservationAllocationComparator(CapacityScheduler scheduler, long now) {
-      this.scheduler = scheduler;
-      this.now = now;
-    }
-
-    private Resource getUnallocatedReservedResources(
-        ReservationAllocation reservation) {
-      Resource resResource;
-      CSQueue resQueue =
-          scheduler.getQueue(reservation.getReservationId().toString());
-      if (resQueue != null) {
-        resResource =
-            Resources.subtract(
-                reservation.getResourcesAtTime(now),
-                Resources.multiply(scheduler.getClusterResource(),
-                    resQueue.getAbsoluteCapacity()));
-      } else {
-        resResource = reservation.getResourcesAtTime(now);
-      }
-      return resResource;
-    }
-
-    @Override
-    public int compare(ReservationAllocation lhs, ReservationAllocation rhs) {
-      // compute delta between current and previous reservation, and compare
-      // based on that
-      Resource lhsRes = getUnallocatedReservedResources(lhs);
-      Resource rhsRes = getUnallocatedReservedResources(rhs);
-      return lhsRes.compareTo(rhsRes);
-    }
-
+  @Override
+  protected Resource getReservationQueueResourceIfExists(Plan plan,
+      ReservationId reservationId) {
+    CSQueue resQueue = cs.getQueue(reservationId.toString());
+    Resource reservationResource = null;
+    if (resQueue != null) {
+      reservationResource = Resources.multiply(cs.getClusterResource(),
+          resQueue.getAbsoluteCapacity());
+    }
+    return reservationResource;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/798ab512/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
index d1b5275..4a3a35c 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 
 /**
  * This interface is used by the components to talk to the
@@ -98,6 +99,10 @@ public interface YarnScheduler extends 
EventHandler<SchedulerEvent> {
   @Stable
   public Resource getMaximumResourceCapability();
 
+  @LimitedPrivate("yarn")
+  @Evolving
+  ResourceCalculator getResourceCalculator();
+
   /**
    * Get the number of nodes available in the cluster.
    * @return the number of available nodes.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/798ab512/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
index 5862a73..91bea11 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
@@ -544,9 +544,9 @@ public class FSLeafQueue extends FSQueue {
   }
 
   private boolean isStarved(Resource share) {
-    Resource desiredShare = 
Resources.min(FairScheduler.getResourceCalculator(),
+    Resource desiredShare = Resources.min(scheduler.getResourceCalculator(),
         scheduler.getClusterResource(), share, getDemand());
-    return Resources.lessThan(FairScheduler.getResourceCalculator(),
+    return Resources.lessThan(scheduler.getResourceCalculator(),
         scheduler.getClusterResource(), getResourceUsage(), desiredShare);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/798ab512/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index 49dfc3c..57b41af 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -1094,7 +1094,8 @@ public class FairScheduler extends
     return super.getApplicationAttempt(appAttemptId);
   }
 
-  public static ResourceCalculator getResourceCalculator() {
+  @Override
+  public ResourceCalculator getResourceCalculator() {
     return RESOURCE_CALCULATOR;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/798ab512/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
index 3d4c9dd..e006715 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
@@ -919,6 +919,11 @@ public class FifoScheduler extends
     return DEFAULT_QUEUE.getQueueUserAclInfo(null); 
   }
 
+  @Override
+  public ResourceCalculator getResourceCalculator() {
+    return resourceCalculator;
+  }
+
   private synchronized void addNode(RMNode nodeManager) {
     FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager,
         usePortForNodeName);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/798ab512/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
index d93af38..12c2583 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
@@ -104,7 +104,7 @@ public class ReservationSystemTestUtil {
         .assertTrue(newPlan.getSharingPolicy() instanceof 
CapacityOverTimePolicy);
   }
 
-  static void setupFSAllocationFile(String allocationFile)
+  public static void setupFSAllocationFile(String allocationFile)
       throws IOException {
     PrintWriter out = new PrintWriter(new FileWriter(allocationFile));
     out.println("<?xml version=\"1.0\"?>");
@@ -130,7 +130,7 @@ public class ReservationSystemTestUtil {
     out.close();
   }
 
-  static void updateFSAllocationFile(String allocationFile)
+  public static void updateFSAllocationFile(String allocationFile)
       throws IOException {
     PrintWriter out = new PrintWriter(new FileWriter(allocationFile));
     out.println("<?xml version=\"1.0\"?>");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/798ab512/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java
index 4eedd42..c603f5b 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java
@@ -33,25 +33,20 @@ import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ReservationId;
-import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerContext;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PlanQueue;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
-import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.After;
 import org.junit.Assert;
@@ -62,21 +57,12 @@ import org.junit.rules.TestName;
 import org.mockito.Matchers;
 import org.mockito.Mockito;
 
-public class TestCapacitySchedulerPlanFollower {
+public class TestCapacitySchedulerPlanFollower extends 
TestSchedulerPlanFollowerBase {
 
-  final static int GB = 1024;
-
-  private Clock mClock = null;
-  private CapacityScheduler scheduler = null;
   private RMContext rmContext;
   private RMContext spyRMContext;
   private CapacitySchedulerContext csContext;
-  private ReservationAgent mAgent;
-  private Plan plan;
-  private Resource minAlloc = Resource.newInstance(GB, 1);
-  private Resource maxAlloc = Resource.newInstance(GB * 8, 8);
-  private ResourceCalculator res = new DefaultResourceCalculator();
-  private CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
+  private CapacityScheduler cs;
 
   @Rule
   public TestName name = new TestName();
@@ -84,7 +70,9 @@ public class TestCapacitySchedulerPlanFollower {
   @Before
   public void setUp() throws Exception {
     CapacityScheduler spyCs = new CapacityScheduler();
-    scheduler = spy(spyCs);
+    cs = spy(spyCs);
+    scheduler = cs;
+
     rmContext = TestUtils.getMockRMContext();
     spyRMContext = spy(rmContext);
 
@@ -100,7 +88,7 @@ public class TestCapacitySchedulerPlanFollower {
         new CapacitySchedulerConfiguration();
     ReservationSystemTestUtil.setupQueueConfiguration(csConf);
 
-    scheduler.setConf(csConf);
+    cs.setConf(csConf);
 
     csContext = mock(CapacitySchedulerContext.class);
     when(csContext.getConfiguration()).thenReturn(csConf);
@@ -119,9 +107,9 @@ public class TestCapacitySchedulerPlanFollower {
     when(csContext.getContainerTokenSecretManager()).thenReturn(
         containerTokenSecretManager);
 
-    scheduler.setRMContext(spyRMContext);
-    scheduler.init(csConf);
-    scheduler.start();
+    cs.setRMContext(spyRMContext);
+    cs.init(csConf);
+    cs.start();
 
     setupPlanFollower();
   }
@@ -132,7 +120,7 @@ public class TestCapacitySchedulerPlanFollower {
     mAgent = mock(ReservationAgent.class);
 
     String reservationQ = testUtil.getFullReservationQueueName();
-    CapacitySchedulerConfiguration csConf = scheduler.getConfiguration();
+    CapacitySchedulerConfiguration csConf = cs.getConfiguration();
     csConf.setReservationWindow(reservationQ, 20L);
     csConf.setMaximumCapacity(reservationQ, 40);
     csConf.setAverageCapacity(reservationQ, 20);
@@ -153,155 +141,51 @@ public class TestCapacitySchedulerPlanFollower {
     testPlanFollower(false);
   }
 
-  private void testPlanFollower(boolean isMove) throws PlanningException,
-      InterruptedException, AccessControlException {
-    // Initialize plan based on move flag
-    plan =
-        new InMemoryPlan(scheduler.getRootQueueMetrics(), policy, mAgent,
-            scheduler.getClusterResource(), 1L, res,
-            scheduler.getMinimumResourceCapability(), maxAlloc, "dedicated",
-            null, isMove);
-
-    // add a few reservations to the plan
-    long ts = System.currentTimeMillis();
-    ReservationId r1 = ReservationId.newInstance(ts, 1);
-    int[] f1 = { 10, 10, 10, 10, 10 };
-    assertTrue(plan.toString(),
-        plan.addReservation(new InMemoryReservationAllocation(r1, null, "u3",
-            "dedicated", 0, 0 + f1.length, ReservationSystemTestUtil
-                .generateAllocation(0L, 1L, f1), res, minAlloc)));
+  @Override
+  protected void verifyCapacity(Queue defQ) {
+    CSQueue csQueue = (CSQueue)defQ;
+    assertTrue(csQueue.getCapacity() > 0.9);
+  }
 
-    ReservationId r2 = ReservationId.newInstance(ts, 2);
-    assertTrue(plan.toString(),
-        plan.addReservation(new InMemoryReservationAllocation(r2, null, "u3",
-            "dedicated", 3, 3 + f1.length, ReservationSystemTestUtil
-                .generateAllocation(3L, 1L, f1), res, minAlloc)));
+  @Override
+  protected Queue getDefaultQueue() {
+    return cs.getQueue("dedicated" + PlanQueue.DEFAULT_QUEUE_SUFFIX);
+  }
 
-    ReservationId r3 = ReservationId.newInstance(ts, 3);
-    int[] f2 = { 0, 10, 20, 10, 0 };
-    assertTrue(plan.toString(),
-        plan.addReservation(new InMemoryReservationAllocation(r3, null, "u4",
-            "dedicated", 10, 10 + f2.length, ReservationSystemTestUtil
-                .generateAllocation(10L, 1L, f2), res, minAlloc)));
+  @Override
+  protected int getNumberOfApplications(Queue queue) {
+    CSQueue csQueue = (CSQueue)queue;
+    int numberOfApplications = csQueue.getNumApplications();
+    return numberOfApplications;
+  }
 
+  @Override
+  protected CapacitySchedulerPlanFollower createPlanFollower() {
     CapacitySchedulerPlanFollower planFollower =
         new CapacitySchedulerPlanFollower();
     planFollower.init(mClock, scheduler, Collections.singletonList(plan));
+    return planFollower;
+  }
 
-    when(mClock.getTime()).thenReturn(0L);
-    planFollower.run();
-
-    CSQueue defQ =
-        scheduler.getQueue("dedicated" + PlanQueue.DEFAULT_QUEUE_SUFFIX);
-    CSQueue q = scheduler.getQueue(r1.toString());
+  @Override
+  protected void assertReservationQueueExists(ReservationId r) {
+    CSQueue q = cs.getQueue(r.toString());
     assertNotNull(q);
-    // submit an app to r1
-    String user_0 = "test-user";
-    ApplicationId appId = ApplicationId.newInstance(0, 1);
-    ApplicationAttemptId appAttemptId_0 =
-        ApplicationAttemptId.newInstance(appId, 0);
-    AppAddedSchedulerEvent addAppEvent =
-        new AppAddedSchedulerEvent(appId, q.getQueueName(), user_0);
-    scheduler.handle(addAppEvent);
-    AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
-        new AppAttemptAddedSchedulerEvent(appAttemptId_0, false);
-    scheduler.handle(appAttemptAddedEvent);
-
-    // initial default reservation queue should have no apps
-    Assert.assertEquals(0, defQ.getNumApplications());
-
-    Assert.assertEquals(0.1, q.getCapacity(), 0.01);
-    Assert.assertEquals(0.1, q.getMaximumCapacity(), 1.0);
-    Assert.assertEquals(1, q.getNumApplications());
-
-    CSQueue q2 = scheduler.getQueue(r2.toString());
-    assertNull(q2);
-    CSQueue q3 = scheduler.getQueue(r3.toString());
-    assertNull(q3);
-
-    when(mClock.getTime()).thenReturn(3L);
-    planFollower.run();
+  }
 
-    Assert.assertEquals(0, defQ.getNumApplications());
-    q = scheduler.getQueue(r1.toString());
+  @Override
+  protected void assertReservationQueueExists(ReservationId r2,
+      double expectedCapacity, double expectedMaxCapacity) {
+    CSQueue q = cs.getQueue(r2.toString());
     assertNotNull(q);
-    Assert.assertEquals(0.1, q.getCapacity(), 0.01);
-    Assert.assertEquals(0.1, q.getMaximumCapacity(), 1.0);
-    Assert.assertEquals(1, q.getNumApplications());
-    q2 = scheduler.getQueue(r2.toString());
-    assertNotNull(q2);
-    Assert.assertEquals(0.1, q.getCapacity(), 0.01);
-    Assert.assertEquals(0.1, q.getMaximumCapacity(), 1.0);
-    q3 = scheduler.getQueue(r3.toString());
-    assertNull(q3);
-
-    when(mClock.getTime()).thenReturn(10L);
-    planFollower.run();
-
-    q = scheduler.getQueue(r1.toString());
-    if (isMove) {
-      // app should have been moved to default reservation queue
-      Assert.assertEquals(1, defQ.getNumApplications());
-      assertNull(q);
-    } else {
-      // app should be killed
-      Assert.assertEquals(0, defQ.getNumApplications());
-      assertNotNull(q);
-      AppAttemptRemovedSchedulerEvent appAttemptRemovedEvent =
-          new AppAttemptRemovedSchedulerEvent(appAttemptId_0,
-              RMAppAttemptState.KILLED, false);
-      scheduler.handle(appAttemptRemovedEvent);
-    }
-    q2 = scheduler.getQueue(r2.toString());
-    assertNull(q2);
-    q3 = scheduler.getQueue(r3.toString());
-    assertNotNull(q3);
-    Assert.assertEquals(0, q3.getCapacity(), 0.01);
-    Assert.assertEquals(1.0, q3.getMaximumCapacity(), 1.0);
-
-    when(mClock.getTime()).thenReturn(11L);
-    planFollower.run();
-
-    if (isMove) {
-      // app should have been moved to default reservation queue
-      Assert.assertEquals(1, defQ.getNumApplications());
-    } else {
-      // app should be killed
-      Assert.assertEquals(0, defQ.getNumApplications());
-    }
-    q = scheduler.getQueue(r1.toString());
-    assertNull(q);
-    q2 = scheduler.getQueue(r2.toString());
-    assertNull(q2);
-    q3 = scheduler.getQueue(r3.toString());
-    assertNotNull(q3);
-    Assert.assertEquals(0.1, q3.getCapacity(), 0.01);
-    Assert.assertEquals(0.1, q3.getMaximumCapacity(), 1.0);
-
-    when(mClock.getTime()).thenReturn(12L);
-    planFollower.run();
-
-    q = scheduler.getQueue(r1.toString());
-    assertNull(q);
-    q2 = scheduler.getQueue(r2.toString());
-    assertNull(q2);
-    q3 = scheduler.getQueue(r3.toString());
-    assertNotNull(q3);
-    Assert.assertEquals(0.2, q3.getCapacity(), 0.01);
-    Assert.assertEquals(0.2, q3.getMaximumCapacity(), 1.0);
-
-    when(mClock.getTime()).thenReturn(16L);
-    planFollower.run();
+    Assert.assertEquals(expectedCapacity, q.getCapacity(), 0.01);
+    Assert.assertEquals(expectedMaxCapacity, q.getMaximumCapacity(), 1.0);
+  }
 
-    q = scheduler.getQueue(r1.toString());
-    assertNull(q);
-    q2 = scheduler.getQueue(r2.toString());
+  @Override
+  protected void assertReservationQueueDoesNotExist(ReservationId r2) {
+    CSQueue q2 = cs.getQueue(r2.toString());
     assertNull(q2);
-    q3 = scheduler.getQueue(r3.toString());
-    assertNull(q3);
-
-    assertTrue(defQ.getCapacity() > 0.9);
-
   }
 
   public static ApplicationACLsManager mockAppACLsManager() {
@@ -312,8 +196,11 @@ public class TestCapacitySchedulerPlanFollower {
   @After
   public void tearDown() throws Exception {
     if (scheduler != null) {
-      scheduler.stop();
+      cs.stop();
     }
   }
 
+  protected Queue getReservationQueue(String reservationId) {
+    return cs.getQueue(reservationId);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/798ab512/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSchedulerPlanFollowerBase.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSchedulerPlanFollowerBase.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSchedulerPlanFollowerBase.java
new file mode 100644
index 0000000..50df8fe
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSchedulerPlanFollowerBase.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.junit.Assert;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.when;
+
+public abstract class TestSchedulerPlanFollowerBase {
+  final static int GB = 1024;
+  protected Clock mClock = null;
+  protected ResourceScheduler scheduler = null;
+  protected ReservationAgent mAgent;
+  protected Resource minAlloc = Resource.newInstance(GB, 1);
+  protected Resource maxAlloc = Resource.newInstance(GB * 8, 8);
+  protected CapacityOverTimePolicy policy = new CapacityOverTimePolicy();
+  protected Plan plan;
+  private ResourceCalculator res = new DefaultResourceCalculator();
+
+  protected void testPlanFollower(boolean isMove) throws PlanningException,
+      InterruptedException, AccessControlException {
+    // Initialize plan based on move flag
+    plan =
+        new InMemoryPlan(scheduler.getRootQueueMetrics(), policy, mAgent,
+            scheduler.getClusterResource(), 1L, res,
+            scheduler.getMinimumResourceCapability(), maxAlloc, "dedicated",
+            null, isMove);
+
+    // add a few reservations to the plan
+    long ts = System.currentTimeMillis();
+    ReservationId r1 = ReservationId.newInstance(ts, 1);
+    int[] f1 = { 10, 10, 10, 10, 10 };
+    assertTrue(plan.toString(),
+        plan.addReservation(new InMemoryReservationAllocation(r1, null, "u3",
+            "dedicated", 0, 0 + f1.length, ReservationSystemTestUtil
+                .generateAllocation(0L, 1L, f1), res, minAlloc)));
+
+    ReservationId r2 = ReservationId.newInstance(ts, 2);
+    assertTrue(plan.toString(),
+        plan.addReservation(new InMemoryReservationAllocation(r2, null, "u3",
+            "dedicated", 3, 3 + f1.length, ReservationSystemTestUtil
+                .generateAllocation(3L, 1L, f1), res, minAlloc)));
+
+    ReservationId r3 = ReservationId.newInstance(ts, 3);
+    int[] f2 = { 0, 10, 20, 10, 0 };
+    assertTrue(plan.toString(),
+        plan.addReservation(new InMemoryReservationAllocation(r3, null, "u4",
+            "dedicated", 10, 10 + f2.length, ReservationSystemTestUtil
+                .generateAllocation(10L, 1L, f2), res, minAlloc)));
+
+    AbstractSchedulerPlanFollower planFollower = createPlanFollower();
+
+    when(mClock.getTime()).thenReturn(0L);
+    planFollower.run();
+
+    Queue q = getReservationQueue(r1.toString());
+    assertReservationQueueExists(r1);
+    // submit an app to r1
+    String user_0 = "test-user";
+    ApplicationId appId = ApplicationId.newInstance(0, 1);
+    ApplicationAttemptId appAttemptId_0 =
+        ApplicationAttemptId.newInstance(appId, 0);
+    AppAddedSchedulerEvent addAppEvent =
+        new AppAddedSchedulerEvent(appId, q.getQueueName(), user_0);
+    scheduler.handle(addAppEvent);
+    AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
+        new AppAttemptAddedSchedulerEvent(appAttemptId_0, false);
+    scheduler.handle(appAttemptAddedEvent);
+
+    // initial default reservation queue should have no apps
+
+    Queue defQ = getDefaultQueue();
+    Assert.assertEquals(0, getNumberOfApplications(defQ));
+
+    assertReservationQueueExists(r1, 0.1, 0.1);
+    Assert.assertEquals(1, getNumberOfApplications(q));
+
+    assertReservationQueueDoesNotExist(r2);
+    assertReservationQueueDoesNotExist(r3);
+
+    when(mClock.getTime()).thenReturn(3L);
+    planFollower.run();
+
+    Assert.assertEquals(0, getNumberOfApplications(defQ));
+    assertReservationQueueExists(r1, 0.1, 0.1);
+    Assert.assertEquals(1, getNumberOfApplications(q));
+    assertReservationQueueExists(r2, 0.1, 0.1);
+    assertReservationQueueDoesNotExist(r3);
+
+    when(mClock.getTime()).thenReturn(10L);
+    planFollower.run();
+
+    q = getReservationQueue(r1.toString());
+    if (isMove) {
+      // app should have been moved to default reservation queue
+      Assert.assertEquals(1, getNumberOfApplications(defQ));
+      assertNull(q);
+    } else {
+      // app should be killed
+      Assert.assertEquals(0, getNumberOfApplications(defQ));
+      assertNotNull(q);
+      AppAttemptRemovedSchedulerEvent appAttemptRemovedEvent =
+          new AppAttemptRemovedSchedulerEvent(appAttemptId_0,
+              RMAppAttemptState.KILLED, false);
+      scheduler.handle(appAttemptRemovedEvent);
+    }
+    assertReservationQueueDoesNotExist(r2);
+    assertReservationQueueExists(r3, 0, 1.0);
+
+    when(mClock.getTime()).thenReturn(11L);
+    planFollower.run();
+
+    if (isMove) {
+      // app should have been moved to default reservation queue
+      Assert.assertEquals(1, getNumberOfApplications(defQ));
+    } else {
+      // app should be killed
+      Assert.assertEquals(0, getNumberOfApplications(defQ));
+    }
+    assertReservationQueueDoesNotExist(r1);
+    assertReservationQueueDoesNotExist(r2);
+    assertReservationQueueExists(r3, 0.1, 0.1);
+
+    when(mClock.getTime()).thenReturn(12L);
+    planFollower.run();
+
+    assertReservationQueueDoesNotExist(r1);
+    assertReservationQueueDoesNotExist(r2);
+    assertReservationQueueExists(r3, 0.2, 0.2);
+
+    when(mClock.getTime()).thenReturn(16L);
+    planFollower.run();
+
+    assertReservationQueueDoesNotExist(r1);
+    assertReservationQueueDoesNotExist(r2);
+    assertReservationQueueDoesNotExist(r3);
+
+    verifyCapacity(defQ);
+  }
+
+  protected abstract Queue getReservationQueue(String reservationId);
+
+  protected abstract void verifyCapacity(Queue defQ);
+
+  protected abstract Queue getDefaultQueue();
+
+  protected abstract int getNumberOfApplications(Queue queue);
+
+  protected abstract AbstractSchedulerPlanFollower createPlanFollower();
+
+  protected abstract void assertReservationQueueExists(ReservationId r);
+
+  protected abstract void assertReservationQueueExists(ReservationId r2,
+      double expectedCapacity, double expectedMaxCapacity);
+
+  protected abstract void assertReservationQueueDoesNotExist(ReservationId r2);
+}

Reply via email to