YARN-3887. Support changing Application priority during runtime. Contributed by 
Sunil G


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fa1d84ae
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fa1d84ae
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fa1d84ae

Branch: refs/heads/YARN-1197
Commit: fa1d84ae2739a1e76f58b9c96d1378f9453cc0d2
Parents: b56daff
Author: Jian He <jia...@apache.org>
Authored: Mon Aug 10 20:51:54 2015 -0700
Committer: Jian He <jia...@apache.org>
Committed: Mon Aug 10 20:51:54 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../resourcemanager/recovery/RMStateStore.java  |   5 +
 .../scheduler/AbstractYarnScheduler.java        |   7 +
 .../scheduler/SchedulerApplicationAttempt.java  |   2 +-
 .../scheduler/YarnScheduler.java                |  11 +
 .../scheduler/capacity/CapacityScheduler.java   |  49 ++++
 .../AbstractComparatorOrderingPolicy.java       |   6 +
 .../capacity/TestApplicationPriority.java       | 260 +++++++++++++++++++
 8 files changed, 342 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa1d84ae/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 5e27a2f..ada1056 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -164,6 +164,9 @@ Release 2.8.0 - UNRELEASED
     YARN-3873. PendingApplications in LeafQueue should also use 
OrderingPolicy. 
     (Sunil G via wangda)
 
+    YARN-3887. Support changing Application priority during runtime. (Sunil G
+    via jianhe)
+
   IMPROVEMENTS
 
     YARN-644. Basic null check is not performed on passed in arguments before

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa1d84ae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
index 5036450..affbee1 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
@@ -706,6 +706,11 @@ public abstract class RMStateStore extends AbstractService 
{
     dispatcher.getEventHandler().handle(new RMStateUpdateAppEvent(appState));
   }
 
+  public void updateApplicationStateSynchronously(
+      ApplicationStateData appState) {
+    handleStoreEvent(new RMStateUpdateAppEvent(appState));
+  }
+
   public void updateFencedState() {
     handleStoreEvent(new RMStateStoreEvent(RMStateStoreEventType.FENCED));
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa1d84ae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index d69600a..ed05189 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -701,4 +701,11 @@ public abstract class AbstractYarnScheduler
     // specific scheduler.
     return Priority.newInstance(0);
   }
+
+  @Override
+  public void updateApplicationPriority(Priority newPriority,
+      ApplicationId applicationId) throws YarnException {
+    // Dummy Implementation till Application Priority changes are done in
+    // specific scheduler.
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa1d84ae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index 317e61c..4872543 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -98,7 +98,7 @@ public class SchedulerApplicationAttempt implements 
SchedulableEntity {
   private boolean amRunning = false;
   private LogAggregationContext logAggregationContext;
 
-  private Priority appPriority = null;
+  private volatile Priority appPriority = null;
 
   protected ResourceUsage attemptResourceUsage = new ResourceUsage();
   private AtomicLong firstAllocationRequestSentTime = new AtomicLong(0);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa1d84ae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
index f629579..0fa23e1 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
@@ -306,4 +306,15 @@ public interface YarnScheduler extends 
EventHandler<SchedulerEvent> {
   public Priority checkAndGetApplicationPriority(Priority priorityFromContext,
       String user, String queueName, ApplicationId applicationId)
       throws YarnException;
+
+  /**
+   *
+   * Change application priority of a submitted application at runtime
+   *
+   * @param newPriority Submitted Application priority.
+   *
+   * @param applicationId Application ID
+   */
+  public void updateApplicationPriority(Priority newPriority,
+      ApplicationId applicationId) throws YarnException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa1d84ae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index b4d0095..b4b1383 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -71,6 +71,7 @@ import 
org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import 
org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
+import 
org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
 import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
@@ -1850,4 +1851,52 @@ public class CapacityScheduler extends
   public Priority getMaxClusterLevelAppPriority() {
     return maxClusterLevelAppPriority;
   }
+
+  @Override
+  public synchronized void updateApplicationPriority(Priority newPriority,
+      ApplicationId applicationId) throws YarnException {
+    Priority appPriority = null;
+    SchedulerApplication<FiCaSchedulerApp> application = applications
+        .get(applicationId);
+
+    if (application == null) {
+      throw new YarnException("Application '" + applicationId
+          + "' is not present, hence could not change priority.");
+    }
+
+    if (application.getPriority().equals(newPriority)) {
+      return;
+    }
+
+    RMApp rmApp = rmContext.getRMApps().get(applicationId);
+    appPriority = checkAndGetApplicationPriority(newPriority, rmApp.getUser(),
+        rmApp.getQueue(), applicationId);
+
+    // Update new priority in Submission Context to keep track in HA
+    rmApp.getApplicationSubmissionContext().setPriority(appPriority);
+
+    // Update to state store
+    ApplicationStateData appState = ApplicationStateData.newInstance(
+        rmApp.getSubmitTime(), rmApp.getStartTime(),
+        rmApp.getApplicationSubmissionContext(), rmApp.getUser());
+    rmContext.getStateStore().updateApplicationStateSynchronously(appState);
+
+    // As we use iterator over a TreeSet for OrderingPolicy, once we change
+    // priority then reinsert back to make order correct.
+    LeafQueue queue = (LeafQueue) getQueue(rmApp.getQueue());
+    synchronized (queue) {
+      queue.getOrderingPolicy().removeSchedulableEntity(
+          application.getCurrentAppAttempt());
+
+      // Update new priority in SchedulerApplication
+      application.setPriority(appPriority);
+
+      queue.getOrderingPolicy().addSchedulableEntity(
+          application.getCurrentAppAttempt());
+    }
+
+    LOG.info("Priority '" + appPriority + "' is updated in queue :"
+        + rmApp.getQueue() + "for application:" + applicationId
+        + "for the user: " + rmApp.getUser());
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa1d84ae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java
index c4d2aae..7bec03a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java
@@ -94,11 +94,17 @@ public abstract class AbstractComparatorOrderingPolicy<S 
extends SchedulableEnti
   
   @Override
   public void addSchedulableEntity(S s) {
+    if (null == s) {
+      return;
+    }
     schedulableEntities.add(s); 
   }
   
   @Override
   public boolean removeSchedulableEntity(S s) {
+    if (null == s) {
+      return false;
+    }
     synchronized (entitiesToReorder) {
       entitiesToReorder.remove(s.getId());
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa1d84ae/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java
index db094e3..169e9f6 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java
@@ -23,7 +23,9 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -36,6 +38,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import 
org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
+import 
org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
+import 
org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
@@ -307,4 +312,259 @@ public class TestApplicationPriority {
         maxPriority);
     rm.stop();
   }
+
+  @Test
+  public void testUpdatePriorityAtRuntime() throws Exception {
+
+    Configuration conf = new Configuration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    // Set Max Application Priority as 10
+    conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10);
+    MockRM rm = new MockRM(conf);
+    rm.start();
+
+    Priority appPriority1 = Priority.newInstance(5);
+    MockNM nm1 = rm.registerNode("127.0.0.1:1234", 16 * GB);
+    RMApp app1 = rm.submitApp(1 * GB, appPriority1);
+
+    // kick the scheduler, 1 GB given to AM1, remaining 15GB on nm1
+    MockAM am1 = MockRM.launchAM(app1, rm, nm1);
+    am1.registerAppAttempt();
+
+    // get scheduler
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+
+    // Change the priority of App1 to 8
+    Priority appPriority2 = Priority.newInstance(8);
+    cs.updateApplicationPriority(appPriority2, app1.getApplicationId());
+
+    // get scheduler app
+    FiCaSchedulerApp schedulerAppAttempt = cs.getSchedulerApplications()
+        .get(app1.getApplicationId()).getCurrentAppAttempt();
+
+    // Verify whether the new priority is updated
+    Assert.assertEquals(appPriority2, schedulerAppAttempt.getPriority());
+  }
+
+  @Test
+  public void testUpdateInvalidPriorityAtRuntime() throws Exception {
+
+    Configuration conf = new Configuration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    // Set Max Application Priority as 10
+    conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10);
+    MockRM rm = new MockRM(conf);
+    rm.start();
+
+    Priority appPriority1 = Priority.newInstance(5);
+    MockNM nm1 = rm.registerNode("127.0.0.1:1234", 16 * GB);
+    RMApp app1 = rm.submitApp(1 * GB, appPriority1);
+
+    // kick the scheduler, 1 GB given to AM1, remaining 15GB on nm1
+    MockAM am1 = MockRM.launchAM(app1, rm, nm1);
+    am1.registerAppAttempt();
+
+    // get scheduler
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+
+    // Change the priority of App1 to 15
+    Priority appPriority2 = Priority.newInstance(15);
+    cs.updateApplicationPriority(appPriority2, app1.getApplicationId());
+
+    // get scheduler app
+    FiCaSchedulerApp schedulerAppAttempt = cs.getSchedulerApplications()
+        .get(app1.getApplicationId()).getCurrentAppAttempt();
+
+    // Verify whether priority 15 is reset to 10
+    Priority appPriority3 = Priority.newInstance(10);
+    Assert.assertEquals(appPriority3, schedulerAppAttempt.getPriority());
+    rm.stop();
+  }
+
+  @Test(timeout = 180000)
+  public void testRMRestartWithChangeInPriority() throws Exception {
+    conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
+    conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED,
+        false);
+    conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
+    conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+        YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
+    conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10);
+
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+    RMState rmState = memStore.getState();
+    Map<ApplicationId, ApplicationStateData> rmAppState = rmState
+        .getApplicationState();
+
+    // PHASE 1: create state in an RM
+
+    // start RM
+    MockRM rm1 = new MockRM(conf, memStore);
+    rm1.start();
+
+    MockNM nm1 = new MockNM("127.0.0.1:1234", 15120,
+        rm1.getResourceTrackerService());
+    nm1.registerNode();
+
+    Priority appPriority1 = Priority.newInstance(5);
+    RMApp app1 = rm1.submitApp(1 * GB, appPriority1);
+
+    // kick the scheduler, 1 GB given to AM1, remaining 15GB on nm1
+    MockAM am1 = MockRM.launchAM(app1, rm1, nm1);
+    am1.registerAppAttempt();
+
+    // get scheduler
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+
+    // Change the priority of App1 to 8
+    Priority appPriority2 = Priority.newInstance(8);
+    cs.updateApplicationPriority(appPriority2, app1.getApplicationId());
+
+    // let things settle down
+    Thread.sleep(1000);
+
+    // create new RM to represent restart and recover state
+    MockRM rm2 = new MockRM(conf, memStore);
+
+    // start new RM
+    rm2.start();
+    // change NM to point to new RM
+    nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+
+    // Verify RM Apps after this restart
+    Assert.assertEquals(1, rm2.getRMContext().getRMApps().size());
+
+    // get scheduler app
+    RMApp loadedApp = rm2.getRMContext().getRMApps()
+        .get(app1.getApplicationId());
+
+    // Verify whether priority 15 is reset to 10
+    Assert.assertEquals(appPriority2, loadedApp.getCurrentAppAttempt()
+        .getSubmissionContext().getPriority());
+
+    rm2.stop();
+    rm1.stop();
+  }
+
+  @Test
+  public void testApplicationPriorityAllocationWithChangeInPriority()
+      throws Exception {
+
+    Configuration conf = new Configuration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    // Set Max Application Priority as 10
+    conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10);
+    MockRM rm = new MockRM(conf);
+    rm.start();
+
+    Priority appPriority1 = Priority.newInstance(5);
+    MockNM nm1 = rm.registerNode("127.0.0.1:1234", 16 * GB);
+    RMApp app1 = rm.submitApp(1 * GB, appPriority1);
+
+    // kick the scheduler, 1 GB given to AM1, remaining 15GB on nm1
+    MockAM am1 = MockRM.launchAM(app1, rm, nm1);
+    am1.registerAppAttempt();
+
+    // add request for containers and wait for containers to be allocated.
+    int NUM_CONTAINERS = 7;
+    List<Container> allocated1 = am1.allocateAndWaitForContainers("127.0.0.1",
+        NUM_CONTAINERS, 2 * GB, nm1);
+
+    Assert.assertEquals(7, allocated1.size());
+    Assert.assertEquals(2 * GB, allocated1.get(0).getResource().getMemory());
+
+    // check node report, 15 GB used (1 AM and 7 containers) and 1 GB available
+    SchedulerNodeReport report_nm1 = rm.getResourceScheduler().getNodeReport(
+        nm1.getNodeId());
+    Assert.assertEquals(15 * GB, report_nm1.getUsedResource().getMemory());
+    Assert.assertEquals(1 * GB, report_nm1.getAvailableResource().getMemory());
+
+    // Submit the second app App2 with priority 8 (Higher than App1)
+    Priority appPriority2 = Priority.newInstance(8);
+    RMApp app2 = rm.submitApp(1 * GB, appPriority2);
+
+    // kick the scheduler, 1 GB which was free is given to AM of App2
+    nm1.nodeHeartbeat(true);
+    MockAM am2 = MockRM.launchAM(app2, rm, nm1);
+    am2.registerAppAttempt();
+
+    // check node report, 16 GB used and 0 GB available
+    report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
+    Assert.assertEquals(16 * GB, report_nm1.getUsedResource().getMemory());
+    Assert.assertEquals(0 * GB, report_nm1.getAvailableResource().getMemory());
+
+    // get scheduler
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+
+    // get scheduler app
+    FiCaSchedulerApp schedulerAppAttemptApp1 = cs.getSchedulerApplications()
+        .get(app1.getApplicationId()).getCurrentAppAttempt();
+    // kill 2 containers to free up some space
+    int counter = 0;
+    for (Iterator<Container> iterator = allocated1.iterator(); iterator
+        .hasNext();) {
+      Container c = iterator.next();
+      if (++counter > 2) {
+        break;
+      }
+      cs.killContainer(schedulerAppAttemptApp1.getRMContainer(c.getId()));
+      iterator.remove();
+    }
+
+    // check node report, 12 GB used and 4 GB available
+    report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
+    Assert.assertEquals(12 * GB, report_nm1.getUsedResource().getMemory());
+    Assert.assertEquals(4 * GB, report_nm1.getAvailableResource().getMemory());
+
+    // add request for containers App1
+    am1.allocate("127.0.0.1", 2 * GB, 10, new ArrayList<ContainerId>());
+
+    // add request for containers App2 and wait for containers to get allocated
+    List<Container> allocated2 = am2.allocateAndWaitForContainers("127.0.0.1",
+        2, 2 * GB, nm1);
+
+    Assert.assertEquals(2, allocated2.size());
+    // check node report, 16 GB used and 0 GB available
+    report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
+    Assert.assertEquals(16 * GB, report_nm1.getUsedResource().getMemory());
+    Assert.assertEquals(0 * GB, report_nm1.getAvailableResource().getMemory());
+
+    // kill 1 more
+    counter = 0;
+    for (Iterator<Container> iterator = allocated1.iterator(); iterator
+        .hasNext();) {
+      Container c = iterator.next();
+      if (++counter > 1) {
+        break;
+      }
+      cs.killContainer(schedulerAppAttemptApp1.getRMContainer(c.getId()));
+      iterator.remove();
+    }
+
+    // check node report, 14 GB used and 2 GB available
+    report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
+    Assert.assertEquals(14 * GB, report_nm1.getUsedResource().getMemory());
+    Assert.assertEquals(2 * GB, report_nm1.getAvailableResource().getMemory());
+
+    // Change the priority of App1 to 3 (lowest)
+    Priority appPriority3 = Priority.newInstance(3);
+    cs.updateApplicationPriority(appPriority3, app2.getApplicationId());
+
+    // add request for containers App2
+    am2.allocate("127.0.0.1", 2 * GB, 3, new ArrayList<ContainerId>());
+
+    // add request for containers App1 and wait for containers to get allocated
+    // since priority is more for App1 now, App1 will get a container.
+    List<Container> allocated3 = am1.allocateAndWaitForContainers("127.0.0.1",
+        1, 2 * GB, nm1);
+
+    Assert.assertEquals(1, allocated3.size());
+    // Now App1 will have 5 containers and 1 AM. App2 will have 2 containers.
+    Assert.assertEquals(6, schedulerAppAttemptApp1.getLiveContainers().size());
+    rm.stop();
+  }
 }

Reply via email to