Repository: hadoop
Updated Branches:
  refs/heads/branch-2 0c2d996c2 -> 7adffad2b


YARN-2881. [YARN-2574] Implement PlanFollower for FairScheduler. (Anubhav Dhoot 
via kasha)

(cherry picked from commit 0c4b11267717eb451fa6ed4c586317f2db32fbd5)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7adffad2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7adffad2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7adffad2

Branch: refs/heads/branch-2
Commit: 7adffad2bb17338c266fe1d3320f59b0c89fce4a
Parents: 0c2d996
Author: Karthik Kambatla <ka...@apache.org>
Authored: Tue Jan 6 04:41:45 2015 +0530
Committer: Karthik Kambatla <ka...@apache.org>
Committed: Tue Jan 6 04:42:55 2015 +0530

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   2 +
 .../reservation/AbstractReservationSystem.java  |   2 +
 .../AbstractSchedulerPlanFollower.java          |   3 +-
 .../reservation/FairSchedulerPlanFollower.java  | 141 +++++++++++++
 .../reservation/ReservationConstants.java       |  28 +++
 .../scheduler/capacity/CapacityScheduler.java   |   6 +-
 .../scheduler/capacity/PlanQueue.java           |   2 -
 .../scheduler/fair/AllocationConfiguration.java |  14 ++
 .../scheduler/fair/FSLeafQueue.java             |  10 +
 .../scheduler/fair/FairScheduler.java           | 104 +++++++++-
 .../scheduler/fair/QueueManager.java            |  13 +-
 .../fair/ReservationQueueConfiguration.java     |  11 +
 .../reservation/ReservationSystemTestUtil.java  |  26 ++-
 .../TestCapacitySchedulerPlanFollower.java      |   3 +-
 .../reservation/TestFairReservationSystem.java  |  55 ++---
 .../TestFairSchedulerPlanFollower.java          | 203 +++++++++++++++++++
 .../TestCapacitySchedulerDynamicBehavior.java   |   3 +-
 .../scheduler/fair/FairSchedulerTestBase.java   |   4 +-
 .../fair/TestFairSchedulerPreemption.java       |   3 +-
 19 files changed, 574 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7adffad2/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index c737a9a..8139c72 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -38,6 +38,8 @@ Release 2.7.0 - UNRELEASED
     YARN-2738. [YARN-2574] Add FairReservationSystem for FairScheduler. 
     (Anubhav Dhoot via kasha)
 
+    YARN-2881. [YARN-2574] Implement PlanFollower for FairScheduler. 
+    (Anubhav Dhoot via kasha)
 
   IMPROVEMENTS
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7adffad2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java
index fa0835a..8a15ac6 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java
@@ -204,6 +204,8 @@ public abstract class AbstractReservationSystem extends 
AbstractService
     // currently only capacity scheduler is supported
     if (scheduler instanceof CapacityScheduler) {
       return CapacitySchedulerPlanFollower.class.getName();
+    } else if (scheduler instanceof FairScheduler) {
+      return FairSchedulerPlanFollower.class.getName();
     }
     return null;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7adffad2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractSchedulerPlanFollower.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractSchedulerPlanFollower.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractSchedulerPlanFollower.java
index 0de4dcf..ea7f27d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractSchedulerPlanFollower.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractSchedulerPlanFollower.java
@@ -26,7 +26,6 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.Plan
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PlanQueue;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
 
 import org.apache.hadoop.yarn.util.Clock;
@@ -99,7 +98,7 @@ public abstract class AbstractSchedulerPlanFollower 
implements PlanFollower {
 
     // create the default reservation queue if it doesnt exist
     String defReservationId = getReservationIdFromQueueName(planQueueName) +
-        PlanQueue.DEFAULT_QUEUE_SUFFIX;
+        ReservationConstants.DEFAULT_QUEUE_SUFFIX;
     String defReservationQueue = getReservationQueueName(planQueueName,
         defReservationId);
     createDefaultReservationQueue(planQueueName, planQueue,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7adffad2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/FairSchedulerPlanFollower.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/FairSchedulerPlanFollower.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/FairSchedulerPlanFollower.java
new file mode 100644
index 0000000..7ca03c5
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/FairSchedulerPlanFollower.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSParentQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FairSchedulerPlanFollower extends AbstractSchedulerPlanFollower {
+  private static final Logger LOG = LoggerFactory
+      .getLogger(FairSchedulerPlanFollower.class);
+
+  private FairScheduler fs;
+
+  @Override
+  public void init(Clock clock, ResourceScheduler sched,
+      Collection<Plan> plans) {
+    super.init(clock, sched, plans);
+    fs = (FairScheduler)sched;
+    LOG.info("Initializing Plan Follower Policy:"
+        + this.getClass().getCanonicalName());
+  }
+
+  @Override
+  protected Queue getPlanQueue(String planQueueName) {
+    Queue planQueue = fs.getQueueManager().getParentQueue(planQueueName, 
false);
+    if (planQueue == null) {
+      LOG.error("The queue " + planQueueName + " cannot be found or is not a " 
+
+          "ParentQueue");
+    }
+    return planQueue;
+  }
+
+  @Override
+  protected float calculateReservationToPlanRatio(Resource clusterResources,
+      Resource planResources, Resource capToAssign) {
+    return Resources.divide(fs.getResourceCalculator(),
+        clusterResources, capToAssign, planResources);
+  }
+
+  @Override
+  protected boolean arePlanResourcesLessThanReservations(Resource
+      clusterResources, Resource planResources, Resource reservedResources) {
+    return Resources.greaterThan(fs.getResourceCalculator(),
+        clusterResources, reservedResources, planResources);
+  }
+
+  @Override
+  protected List<? extends Queue> getChildReservationQueues(Queue queue) {
+    FSQueue planQueue = (FSQueue)queue;
+    List<FSQueue> childQueues = planQueue.getChildQueues();
+    return childQueues;
+  }
+
+
+  @Override
+  protected void addReservationQueue(String planQueueName, Queue queue,
+      String currResId) {
+    String leafQueueName = getReservationQueueName(planQueueName, currResId);
+    fs.getQueueManager().getLeafQueue(leafQueueName, true);
+  }
+
+  @Override
+  protected void createDefaultReservationQueue(String planQueueName,
+      Queue queue, String defReservationId) {
+    String defReservationQueueName = getReservationQueueName(planQueueName,
+        defReservationId);
+    if (!fs.getQueueManager().exists(defReservationQueueName)) {
+      fs.getQueueManager().getLeafQueue(defReservationQueueName, true);
+    }
+  }
+
+  @Override
+  protected Resource getPlanResources(Plan plan, Queue queue,
+      Resource clusterResources) {
+    FSParentQueue planQueue = (FSParentQueue)queue;
+    Resource planResources = planQueue.getSteadyFairShare();
+    return planResources;
+  }
+
+  @Override
+  protected Resource getReservationQueueResourceIfExists(Plan plan,
+      ReservationId reservationId) {
+    String reservationQueueName = getReservationQueueName(plan.getQueueName(),
+        reservationId.toString());
+    FSLeafQueue reservationQueue =
+        fs.getQueueManager().getLeafQueue(reservationQueueName, false);
+    Resource reservationResource = null;
+    if (reservationQueue != null) {
+      reservationResource = reservationQueue.getSteadyFairShare();
+    }
+    return reservationResource;
+  }
+
+  @Override
+  protected String getReservationQueueName(String planQueueName,
+      String reservationQueueName) {
+    String planQueueNameFullPath = fs.getQueueManager().getQueue
+        (planQueueName).getName();
+
+    if (!reservationQueueName.startsWith(planQueueNameFullPath)) {
+      // If name is not a path we need full path for FairScheduler. See
+      // YARN-2773 for the root cause
+      return planQueueNameFullPath + "." + reservationQueueName;
+    }
+    return reservationQueueName;
+  }
+
+  @Override
+  protected String getReservationIdFromQueueName(String resQueueName) {
+    return resQueueName.substring(resQueueName.lastIndexOf(".") + 1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7adffad2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationConstants.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationConstants.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationConstants.java
new file mode 100644
index 0000000..1ff941f
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationConstants.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+public interface ReservationConstants {
+
+  /**
+   * The suffix used for a queue under a reservable queue that will be used
+   * as a default queue whenever no reservation is used
+   */
+  String DEFAULT_QUEUE_SUFFIX = "-default";
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7adffad2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 3648c54..5e6a352 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -21,7 +21,6 @@ package 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.EnumSet;
@@ -66,6 +65,7 @@ import 
org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import 
org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
+import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@@ -1419,7 +1419,7 @@ public class CapacityScheduler extends
       queueName = resQName;
     } else {
       // use the default child queue of the plan for unreserved apps
-      queueName = queueName + PlanQueue.DEFAULT_QUEUE_SUFFIX;
+      queueName = queueName + ReservationConstants.DEFAULT_QUEUE_SUFFIX;
     }
     return queueName;
   }
@@ -1583,7 +1583,7 @@ public class CapacityScheduler extends
     CSQueue dest = getQueue(targetQueueName);
     if (dest != null && dest instanceof PlanQueue) {
       // use the default child reservation queue of the plan
-      targetQueueName = targetQueueName + PlanQueue.DEFAULT_QUEUE_SUFFIX;
+      targetQueueName = targetQueueName + 
ReservationConstants.DEFAULT_QUEUE_SUFFIX;
     }
     return targetQueueName;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7adffad2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java
index 0725959..f8b11eb 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java
@@ -37,8 +37,6 @@ import org.slf4j.LoggerFactory;
  */
 public class PlanQueue extends ParentQueue {
 
-  public static final String DEFAULT_QUEUE_SUFFIX = "-default";
-
   private static final Logger LOG = LoggerFactory.getLogger(PlanQueue.class);
 
   private int maxAppsForReservation;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7adffad2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java
index fd99d65..0ea7314 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java
@@ -207,6 +207,10 @@ public class AllocationConfiguration extends 
ReservationSchedulerConfiguration {
     ResourceWeights weight = queueWeights.get(queue);
     return (weight == null) ? ResourceWeights.NEUTRAL : weight;
   }
+
+  public void setQueueWeight(String queue, ResourceWeights weight) {
+    queueWeights.put(queue, weight);
+  }
   
   public int getUserMaxApps(String user) {
     Integer maxApps = userMaxApps.get(user);
@@ -323,4 +327,14 @@ public class AllocationConfiguration extends 
ReservationSchedulerConfiguration {
   public long getEnforcementWindow(String queue) {
     return globalReservationQueueConfig.getEnforcementWindowMsec();
   }
+
+  @VisibleForTesting
+  public void setReservationWindow(long window) {
+    globalReservationQueueConfig.setReservationWindow(window);
+  }
+
+  @VisibleForTesting
+  public void setAverageCapacity(int avgCapacity) {
+    globalReservationQueueConfig.setAverageCapacity(avgCapacity);
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7adffad2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
index 91bea11..3c97535 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
@@ -37,6 +37,7 @@ import 
org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
@@ -516,6 +517,15 @@ public class FSLeafQueue extends FSQueue {
     }
   }
 
+  /** Allows setting weight for a dynamically created queue
+   * Currently only used for reservation based queues
+   * @param weight queue weight
+   */
+  public void setWeights(float weight) {
+    scheduler.getAllocationConfiguration().setQueueWeight(getName(),
+        new ResourceWeights(weight));
+  }
+
   /**
    * Helper method to check if the queue should preempt containers
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7adffad2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index 57b41af..124da99 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
+import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceOption;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -47,6 +48,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import 
org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
+import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
@@ -68,6 +70,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.ContainersAndNMTokensAllocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
@@ -1163,9 +1166,15 @@ public class FairScheduler extends
         throw new RuntimeException("Unexpected event type: " + event);
       }
       AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
-      addApplication(appAddedEvent.getApplicationId(),
-        appAddedEvent.getQueue(), appAddedEvent.getUser(),
-        appAddedEvent.getIsAppRecovering());
+      String queueName =
+          resolveReservationQueueName(appAddedEvent.getQueue(),
+              appAddedEvent.getApplicationId(),
+              appAddedEvent.getReservationID());
+      if (queueName != null) {
+        addApplication(appAddedEvent.getApplicationId(),
+            queueName, appAddedEvent.getUser(),
+            appAddedEvent.getIsAppRecovering());
+      }
       break;
     case APP_REMOVED:
       if (!(event instanceof AppRemovedSchedulerEvent)) {
@@ -1223,6 +1232,51 @@ public class FairScheduler extends
     }
   }
 
+  private String resolveReservationQueueName(String queueName,
+      ApplicationId applicationId, ReservationId reservationID) {
+    FSQueue queue = queueMgr.getQueue(queueName);
+    if ((queue == null) || !allocConf.isReservable(queue.getQueueName())) {
+      return queueName;
+    }
+    // Use fully specified name from now on (including root. prefix)
+    queueName = queue.getQueueName();
+    if (reservationID != null) {
+      String resQName = queueName + "." + reservationID.toString();
+      queue = queueMgr.getQueue(resQName);
+      if (queue == null) {
+        String message =
+            "Application "
+                + applicationId
+                + " submitted to a reservation which is not yet currently 
active: "
+                + resQName;
+        this.rmContext.getDispatcher().getEventHandler()
+            .handle(new RMAppRejectedEvent(applicationId, message));
+        return null;
+      }
+      if (!queue.getParent().getQueueName().equals(queueName)) {
+        String message =
+            "Application: " + applicationId + " submitted to a reservation "
+                + resQName + " which does not belong to the specified queue: "
+                + queueName;
+        this.rmContext.getDispatcher().getEventHandler()
+            .handle(new RMAppRejectedEvent(applicationId, message));
+        return null;
+      }
+      // use the reservation queue to run the app
+      queueName = resQName;
+    } else {
+      // use the default child queue of the plan for unreserved apps
+      queueName = getDefaultQueueForPlanQueue(queueName);
+    }
+    return queueName;
+  }
+
+  private String getDefaultQueueForPlanQueue(String queueName) {
+    String planName = queueName.substring(queueName.lastIndexOf(".") + 1);
+    queueName = queueName + "." + planName + 
ReservationConstants.DEFAULT_QUEUE_SUFFIX;
+    return queueName;
+  }
+
   @Override
   public void recover(RMState state) throws Exception {
     // NOT IMPLEMENTED
@@ -1441,7 +1495,8 @@ public class FairScheduler extends
     // To serialize with FairScheduler#allocate, synchronize on app attempt
     synchronized (attempt) {
       FSLeafQueue oldQueue = (FSLeafQueue) app.getQueue();
-      FSLeafQueue targetQueue = queueMgr.getLeafQueue(queueName, false);
+      String destQueueName = handleMoveToPlanQueue(queueName);
+      FSLeafQueue targetQueue = queueMgr.getLeafQueue(destQueueName, false);
       if (targetQueue == null) {
         throw new YarnException("Target queue " + queueName
             + " not found or is not a leaf queue.");
@@ -1577,4 +1632,45 @@ public class FairScheduler extends
     }
     return planQueues;
   }
+
+  @Override
+  public void setEntitlement(String queueName,
+      QueueEntitlement entitlement) throws YarnException {
+
+    FSLeafQueue reservationQueue = queueMgr.getLeafQueue(queueName, false);
+    if (reservationQueue == null) {
+      throw new YarnException("Target queue " + queueName
+          + " not found or is not a leaf queue.");
+    }
+
+    reservationQueue.setWeights(entitlement.getCapacity());
+
+    // TODO Does MaxCapacity need to be set for fairScheduler ?
+  }
+
+  /**
+   * Only supports removing empty leaf queues
+   * @param queueName name of queue to remove
+   * @throws YarnException if queue to remove is either not a leaf or if its
+   * not empty
+   */
+  @Override
+  public void removeQueue(String queueName) throws YarnException {
+    FSLeafQueue reservationQueue = queueMgr.getLeafQueue(queueName, false);
+    if (reservationQueue != null) {
+      if (!queueMgr.removeLeafQueue(queueName)) {
+        throw new YarnException("Could not remove queue " + queueName + " as " 
+
+            "its either not a leaf queue or its not empty");
+      }
+    }
+  }
+
+  private String handleMoveToPlanQueue(String targetQueueName) {
+    FSQueue dest = queueMgr.getQueue(targetQueueName);
+    if (dest != null && allocConf.isReservable(dest.getQueueName())) {
+      // use the default child reservation queue of the plan
+      targetQueueName = getDefaultQueueForPlanQueue(targetQueueName);
+    }
+    return targetQueueName;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7adffad2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
index 0e625d7..27e571e 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
@@ -91,7 +91,18 @@ public class QueueManager {
     }
     return (FSLeafQueue) queue;
   }
-  
+
+  /**
+   * Remove a leaf queue if empty
+   * @param name name of the queue
+   * @return true if queue was removed or false otherwise
+   */
+  public boolean removeLeafQueue(String name) {
+    name = ensureRootPrefix(name);
+    return removeEmptyIncompatibleQueues(name, FSQueueType.PARENT);
+  }
+
+
   /**
    * Get a parent queue by name, creating it if the create param is true and 
is necessary.
    * If the queue is not or can not be a parent queue, i.e. it already exists 
as a

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7adffad2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/ReservationQueueConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/ReservationQueueConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/ReservationQueueConfiguration.java
index 747a4c2..cf7f84e 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/ReservationQueueConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/ReservationQueueConfiguration.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
@@ -102,4 +103,14 @@ public class ReservationQueueConfiguration {
   public void setReservationAgent(String reservationAgent) {
     this.reservationAgent = reservationAgent;
   }
+
+  @VisibleForTesting
+  public void setReservationWindow(long reservationWindow) {
+    this.reservationWindow = reservationWindow;
+  }
+
+  @VisibleForTesting
+  public void setAverageCapacity(int averageCapacity) {
+    this.avgOverTimeMultiplier = averageCapacity;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7adffad2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
index 12c2583..bfaf06b 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java
@@ -29,7 +29,6 @@ import java.io.PrintWriter;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Random;
-import java.util.Set;
 import java.util.TreeMap;
 
 import org.apache.hadoop.conf.Configuration;
@@ -42,12 +41,16 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
 import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
@@ -160,6 +163,27 @@ public class ReservationSystemTestUtil {
     out.close();
   }
 
+  public static FairScheduler setupFairScheduler(
+      ReservationSystemTestUtil testUtil,
+      RMContext rmContext, Configuration conf, int numContainers) throws
+      IOException {
+    FairScheduler scheduler = new FairScheduler();
+    scheduler.setRMContext(rmContext);
+
+    when(rmContext.getScheduler()).thenReturn(scheduler);
+
+    scheduler.init(conf);
+    scheduler.start();
+    scheduler.reinitialize(conf, rmContext);
+
+
+    Resource resource = testUtil.calculateClusterResource(numContainers);
+    RMNode node1 = MockNodes.newNodeInfo(1, resource, 1, "127.0.0.1");
+    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+    scheduler.handle(nodeEvent1);
+    return scheduler;
+  }
+
   @SuppressWarnings("unchecked")
   public CapacityScheduler mockCapacityScheduler(int numContainers)
       throws IOException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7adffad2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java
index c603f5b..c7513ab 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java
@@ -41,7 +41,6 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerContext;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PlanQueue;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
@@ -149,7 +148,7 @@ public class TestCapacitySchedulerPlanFollower extends 
TestSchedulerPlanFollower
 
   @Override
   protected Queue getDefaultQueue() {
-    return cs.getQueue("dedicated" + PlanQueue.DEFAULT_QUEUE_SUFFIX);
+    return cs.getQueue("dedicated" + 
ReservationConstants.DEFAULT_QUEUE_SUFFIX);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7adffad2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairReservationSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairReservationSystem.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairReservationSystem.java
index 82ba731..f294eaf 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairReservationSystem.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairReservationSystem.java
@@ -18,14 +18,11 @@
 package org.apache.hadoop.yarn.server.resourcemanager.reservation;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerTestBase;
@@ -38,15 +35,16 @@ import org.junit.Test;
 import java.io.File;
 import java.io.IOException;
 
-import static org.mockito.Mockito.when;
-
-public class TestFairReservationSystem extends FairSchedulerTestBase {
-  private final static String ALLOC_FILE = new File(TEST_DIR,
+public class TestFairReservationSystem {
+  private final static String ALLOC_FILE = new File(FairSchedulerTestBase.
+    TEST_DIR,
       TestFairReservationSystem.class.getName() + ".xml").getAbsolutePath();
+  private Configuration conf;
+  private FairScheduler scheduler;
+  private FairSchedulerTestBase testHelper = new FairSchedulerTestBase();
 
-  @Override
   protected Configuration createConfiguration() {
-    Configuration conf = super.createConfiguration();
+    Configuration conf = testHelper.createConfiguration();
     conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,
         ResourceScheduler.class);
     conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
@@ -60,10 +58,6 @@ public class TestFairReservationSystem extends 
FairSchedulerTestBase {
 
   @After
   public void teardown() {
-    if (resourceManager != null) {
-      resourceManager.stop();
-      resourceManager = null;
-    }
     conf = null;
   }
 
@@ -75,7 +69,8 @@ public class TestFairReservationSystem extends 
FairSchedulerTestBase {
     
     // Setup
     RMContext mockRMContext = testUtil.createRMContext(conf);
-    setupFairScheduler(testUtil, mockRMContext);
+    scheduler = ReservationSystemTestUtil.setupFairScheduler(testUtil,
+        mockRMContext, conf, 10);
 
     FairReservationSystem reservationSystem = new FairReservationSystem();
     reservationSystem.setRMContext(mockRMContext);
@@ -97,14 +92,15 @@ public class TestFairReservationSystem extends 
FairSchedulerTestBase {
     ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
 
     // Setup
-    RMContext mockContext = testUtil.createRMContext(conf);
-    setupFairScheduler(testUtil, mockContext);
+    RMContext mockRMContext = testUtil.createRMContext(conf);
+    scheduler = ReservationSystemTestUtil.setupFairScheduler(testUtil,
+        mockRMContext, conf, 10);
 
     FairReservationSystem reservationSystem = new FairReservationSystem();
-    reservationSystem.setRMContext(mockContext);
+    reservationSystem.setRMContext(mockRMContext);
 
     try {
-      reservationSystem.reinitialize(scheduler.getConf(), mockContext);
+      reservationSystem.reinitialize(scheduler.getConf(), mockRMContext);
     } catch (YarnException e) {
       Assert.fail(e.getMessage());
     }
@@ -116,10 +112,10 @@ public class TestFairReservationSystem extends 
FairSchedulerTestBase {
 
     // Dynamically add a plan
     ReservationSystemTestUtil.updateFSAllocationFile(ALLOC_FILE);
-    scheduler.reinitialize(conf, mockContext);
+    scheduler.reinitialize(conf, mockRMContext);
 
     try {
-      reservationSystem.reinitialize(conf, mockContext);
+      reservationSystem.reinitialize(conf, mockRMContext);
     } catch (YarnException e) {
       Assert.fail(e.getMessage());
     }
@@ -129,23 +125,4 @@ public class TestFairReservationSystem extends 
FairSchedulerTestBase {
         (reservationSystem, newQueue);
   }
 
-  private void setupFairScheduler(ReservationSystemTestUtil testUtil,
-      RMContext rmContext) throws
-      IOException {
-
-    scheduler = new FairScheduler();
-    scheduler.setRMContext(rmContext);
-
-    int numContainers = 10;
-    when(rmContext.getScheduler()).thenReturn(scheduler);
-
-    scheduler.init(conf);
-    scheduler.start();
-    scheduler.reinitialize(conf, rmContext);
-
-    Resource resource = testUtil.calculateClusterResource(numContainers);
-    RMNode node1 = MockNodes.newNodeInfo(1, resource, 1, "127.0.0.1");
-    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
-    scheduler.handle(nodeEvent1);
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7adffad2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairSchedulerPlanFollower.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairSchedulerPlanFollower.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairSchedulerPlanFollower.java
new file mode 100644
index 0000000..e9a4f50
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairSchedulerPlanFollower.java
@@ -0,0 +1,203 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.yarn.server.resourcemanager.reservation;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerTestBase;
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
+import org.apache.hadoop.yarn.util.Clock;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+
+public class TestFairSchedulerPlanFollower extends
+    TestSchedulerPlanFollowerBase {
+  private final static String ALLOC_FILE = new File(FairSchedulerTestBase.
+      TEST_DIR,
+      TestFairReservationSystem.class.getName() + ".xml").getAbsolutePath();
+  private RMContext rmContext;
+  private RMContext spyRMContext;
+  private FairScheduler fs;
+  private Configuration conf;
+  private FairSchedulerTestBase testHelper = new FairSchedulerTestBase();
+
+  @Rule
+  public TestName name = new TestName();
+
+  protected Configuration createConfiguration() {
+    Configuration conf = testHelper.createConfiguration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,
+        ResourceScheduler.class);
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+    return conf;
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    conf = createConfiguration();
+    ReservationSystemTestUtil.setupFSAllocationFile(ALLOC_FILE);
+    ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
+
+    // Setup
+    rmContext = TestUtils.getMockRMContext();
+    spyRMContext = spy(rmContext);
+    fs = ReservationSystemTestUtil.setupFairScheduler(testUtil,
+        spyRMContext, conf, 125);
+    scheduler = fs;
+
+    ConcurrentMap<ApplicationId, RMApp> spyApps =
+        spy(new ConcurrentHashMap<ApplicationId, RMApp>());
+    RMApp rmApp = mock(RMApp.class);
+    when(rmApp.getRMAppAttempt((ApplicationAttemptId) Matchers.any()))
+        .thenReturn(null);
+    Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId) Matchers.any());
+    when(spyRMContext.getRMApps()).thenReturn(spyApps);
+
+    ReservationSystemTestUtil.setupFSAllocationFile(ALLOC_FILE);
+    setupPlanFollower();
+  }
+
+  private void setupPlanFollower() throws Exception {
+    ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
+    mClock = mock(Clock.class);
+    mAgent = mock(ReservationAgent.class);
+
+    String reservationQ = testUtil.getFullReservationQueueName();
+    AllocationConfiguration allocConf = fs.getAllocationConfiguration();
+    allocConf.setReservationWindow(20L);
+    allocConf.setAverageCapacity(20);
+    policy.init(reservationQ, allocConf);
+  }
+
+  @Test
+  public void testWithMoveOnExpiry() throws PlanningException,
+      InterruptedException, AccessControlException {
+    // invoke plan follower test with move
+    testPlanFollower(true);
+  }
+
+  @Test
+  public void testWithKillOnExpiry() throws PlanningException,
+      InterruptedException, AccessControlException {
+    // invoke plan follower test with kill
+    testPlanFollower(false);
+  }
+
+  @Override
+  protected void verifyCapacity(Queue defQ) {
+    assertTrue(((FSQueue) defQ).getWeights().getWeight(ResourceType.MEMORY) >
+        0.9);
+  }
+
+  @Override
+  protected Queue getDefaultQueue() {
+    return getReservationQueue("dedicated" +
+        ReservationConstants.DEFAULT_QUEUE_SUFFIX);
+  }
+
+  @Override
+  protected int getNumberOfApplications(Queue queue) {
+    int numberOfApplications = fs.getAppsInQueue(queue.getQueueName()).size();
+    return numberOfApplications;
+  }
+
+  @Override
+  protected AbstractSchedulerPlanFollower createPlanFollower() {
+    FairSchedulerPlanFollower planFollower =
+        new FairSchedulerPlanFollower();
+    planFollower.init(mClock, scheduler, Collections.singletonList(plan));
+    return planFollower;
+  }
+
+  @Override
+  protected void assertReservationQueueExists(ReservationId r) {
+    Queue q = getReservationQueue(r.toString());
+    assertNotNull(q);
+  }
+
+  @Override
+  protected void assertReservationQueueExists(ReservationId r,
+      double expectedCapacity, double expectedMaxCapacity) {
+    FSLeafQueue q = fs.getQueueManager().getLeafQueue(plan.getQueueName() + "" 
+
+        "." +
+        r, false);
+    assertNotNull(q);
+    // For now we are setting both to same weight
+    Assert.assertEquals(expectedCapacity, q.getWeights().getWeight
+        (ResourceType.MEMORY), 0.01);
+  }
+
+  @Override
+  protected void assertReservationQueueDoesNotExist(ReservationId r) {
+    Queue q = getReservationQueue(r.toString());
+    assertNull(q);
+  }
+
+  @Override
+  protected Queue getReservationQueue(String r) {
+    return fs.getQueueManager().getLeafQueue(plan.getQueueName() + "" +
+        "." +
+        r, false);
+  }
+
+  public static ApplicationACLsManager mockAppACLsManager() {
+    Configuration conf = new Configuration();
+    return new ApplicationACLsManager(conf);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (scheduler != null) {
+      fs.stop();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7adffad2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerDynamicBehavior.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerDynamicBehavior.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerDynamicBehavior.java
index 73d8a55..ce3382f 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerDynamicBehavior.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerDynamicBehavior.java
@@ -29,6 +29,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -217,7 +218,7 @@ public class TestCapacitySchedulerDynamicBehavior {
     assertEquals(1, appsInRoot.size());
 
     // create the default reservation queue
-    String defQName = "a" + PlanQueue.DEFAULT_QUEUE_SUFFIX;
+    String defQName = "a" + ReservationConstants.DEFAULT_QUEUE_SUFFIX;
     ReservationQueue defQ =
         new ReservationQueue(scheduler, defQName,
             (PlanQueue) scheduler.getQueue("a"));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7adffad2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
index 7b6aaf3..8656175 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
@@ -60,7 +60,7 @@ public class FairSchedulerTestBase {
     }
   }
 
-  protected final static String TEST_DIR =
+  public final static String TEST_DIR =
       new File(System.getProperty("test.build.data", 
"/tmp")).getAbsolutePath();
 
   private static RecordFactory
@@ -74,7 +74,7 @@ public class FairSchedulerTestBase {
   protected ResourceManager resourceManager;
 
   // Helper methods
-  protected Configuration createConfiguration() {
+  public Configuration createConfiguration() {
     Configuration conf = new YarnConfiguration();
     conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,
         ResourceScheduler.class);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7adffad2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
index 903c7af..458b06d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
@@ -58,8 +58,7 @@ public class TestFairSchedulerPreemption extends 
FairSchedulerTestBase {
     }
   }
 
-  @Override
-  protected Configuration createConfiguration() {
+  public Configuration createConfiguration() {
     Configuration conf = super.createConfiguration();
     conf.setClass(YarnConfiguration.RM_SCHEDULER, StubbedFairScheduler.class,
         ResourceScheduler.class);

Reply via email to