Repository: hadoop
Updated Branches:
  refs/heads/trunk 49f6e3d35 -> 395205444


YARN-3319. Implement a FairOrderingPolicy. (Craig Welch via wangda)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/39520544
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/39520544
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/39520544

Branch: refs/heads/trunk
Commit: 395205444e8a9ae6fc86f0a441e98486a775511a
Parents: 49f6e3d
Author: Wangda Tan <wan...@apache.org>
Authored: Thu Apr 23 10:47:15 2015 -0700
Committer: Wangda Tan <wan...@apache.org>
Committed: Thu Apr 23 10:47:15 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   2 +
 .../dev-support/findbugs-exclude.xml            |   8 +
 .../CapacitySchedulerConfiguration.java         |  20 ++-
 .../scheduler/policy/CompoundComparator.java    |  43 ++++++
 .../scheduler/policy/FairOrderingPolicy.java    | 114 ++++++++++++++
 .../scheduler/capacity/TestLeafQueue.java       | 116 +++++++++++++++
 .../policy/TestFairOrderingPolicy.java          | 149 +++++++++++++++++++
 7 files changed, 450 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/39520544/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index d335389..a8d6d6f 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -100,6 +100,8 @@ Release 2.8.0 - UNRELEASED
     network bandwidth traffic originating from YARN containers (Sidharta 
Seethana
     via vinodkv)
 
+    YARN-3319. Implement a FairOrderingPolicy. (Craig Welch via wangda)
+
   IMPROVEMENTS
 
     YARN-1880. Cleanup TestApplicationClientProtocolOnHA

http://git-wip-us.apache.org/repos/asf/hadoop/blob/39520544/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml 
b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index ece8548..114851f 100644
--- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -146,6 +146,14 @@
     <Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" />
   </Match>
   <Match>
+    <Class 
name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy$FairComparator"
 />
+    <Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" />
+  </Match>
+    <Match>
+    <Class 
name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.CompoundComparator"
 />
+    <Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" />
+  </Match>
+  <Match>
     <Class 
name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PartitionedQueueComparator"
 />
     <Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" />
   </Match>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/39520544/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index c9e83a1..b00f25c 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -122,7 +122,11 @@ public class CapacitySchedulerConfiguration extends 
ReservationSchedulerConfigur
   
   public static final String ORDERING_POLICY = "ordering-policy";
   
-  public static final String DEFAULT_ORDERING_POLICY = "fifo";
+  public static final String FIFO_ORDERING_POLICY = "fifo";
+
+  public static final String FAIR_ORDERING_POLICY = "fair";
+
+  public static final String DEFAULT_ORDERING_POLICY = FIFO_ORDERING_POLICY;
   
   @Private
   public static final int DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS = 10000;
@@ -395,9 +399,12 @@ public class CapacitySchedulerConfiguration extends 
ReservationSchedulerConfigur
     
     OrderingPolicy<S> orderingPolicy;
     
-    if (policyType.trim().equals("fifo")) {
+    if (policyType.trim().equals(FIFO_ORDERING_POLICY)) {
        policyType = FifoOrderingPolicy.class.getName();
     }
+    if (policyType.trim().equals(FAIR_ORDERING_POLICY)) {
+       policyType = FairOrderingPolicy.class.getName();
+    }
     try {
       orderingPolicy = (OrderingPolicy<S>)
         Class.forName(policyType).newInstance();
@@ -405,6 +412,15 @@ public class CapacitySchedulerConfiguration extends 
ReservationSchedulerConfigur
       String message = "Unable to construct ordering policy for: " + 
policyType + ", " + e.getMessage();
       throw new RuntimeException(message, e);
     }
+
+    Map<String, String> config = new HashMap<String, String>();
+    String confPrefix = getQueuePrefix(queue) + ORDERING_POLICY + ".";
+    for (Map.Entry<String, String> kv : this) {
+      if (kv.getKey().startsWith(confPrefix)) {
+         config.put(kv.getKey().substring(confPrefix.length()), kv.getValue());
+      }
+    }
+    orderingPolicy.configure(config);
     return orderingPolicy;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/39520544/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/CompoundComparator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/CompoundComparator.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/CompoundComparator.java
new file mode 100644
index 0000000..3027ab7
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/CompoundComparator.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
+
+import java.util.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
+
+//Some policies will use multiple comparators joined together
+class CompoundComparator implements Comparator<SchedulableEntity> {
+
+    List<Comparator<SchedulableEntity>> comparators;
+
+    CompoundComparator(List<Comparator<SchedulableEntity>> comparators) {
+      this.comparators = comparators;
+    }
+
+    @Override
+    public int compare(final SchedulableEntity r1, final SchedulableEntity r2) 
{
+      for (Comparator<SchedulableEntity> comparator : comparators) {
+        int result = comparator.compare(r1, r2);
+        if (result != 0) return result;
+      }
+      return 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/39520544/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FairOrderingPolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FairOrderingPolicy.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FairOrderingPolicy.java
new file mode 100644
index 0000000..3ab74de
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FairOrderingPolicy.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
+
+import java.util.*;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
+import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
+
+/**
+ * An OrderingPolicy which orders SchedulableEntities for fairness (see
+ * FairScheduler
+ * FairSharePolicy), generally, processes with lesser usage are lesser. If
+ * sizedBasedWeight is set to true then an application with high demand
+ * may be prioritized ahead of an application with less usage.  This
+ * is to offset the tendency to favor small apps, which could result in
+ * starvation for large apps if many small ones enter and leave the queue
+ * continuously (optional, default false)
+ */
+public class FairOrderingPolicy<S extends SchedulableEntity> extends 
AbstractComparatorOrderingPolicy<S> {
+
+  public static final String ENABLE_SIZE_BASED_WEIGHT =
+        "fair.enable-size-based-weight";
+
+  protected class FairComparator implements Comparator<SchedulableEntity> {
+    @Override
+    public int compare(final SchedulableEntity r1, final SchedulableEntity r2) 
{
+      int res = (int) Math.signum( getMagnitude(r1) - getMagnitude(r2) );
+      return res;
+    }
+  }
+
+  private CompoundComparator fairComparator;
+
+  private boolean sizeBasedWeight = false;
+
+  public FairOrderingPolicy() {
+    List<Comparator<SchedulableEntity>> comparators =
+      new ArrayList<Comparator<SchedulableEntity>>();
+    comparators.add(new FairComparator());
+    comparators.add(new FifoComparator());
+    fairComparator = new CompoundComparator(
+      comparators
+      );
+    this.comparator = fairComparator;
+    this.schedulableEntities = new TreeSet<S>(comparator);
+  }
+
+  private double getMagnitude(SchedulableEntity r) {
+    double mag = r.getSchedulingResourceUsage().getCachedUsed(
+      CommonNodeLabelsManager.ANY).getMemory();
+    if (sizeBasedWeight) {
+      double weight = 
Math.log1p(r.getSchedulingResourceUsage().getCachedDemand(
+        CommonNodeLabelsManager.ANY).getMemory()) / Math.log(2);
+      mag = mag / weight;
+    }
+    return mag;
+  }
+
+  @VisibleForTesting
+  public boolean getSizeBasedWeight() {
+   return sizeBasedWeight;
+  }
+
+  @VisibleForTesting
+  public void setSizeBasedWeight(boolean sizeBasedWeight) {
+   this.sizeBasedWeight = sizeBasedWeight;
+  }
+
+  @Override
+  public void configure(Map<String, String> conf) {
+    if (conf.containsKey(ENABLE_SIZE_BASED_WEIGHT)) {
+      sizeBasedWeight = Boolean.valueOf(conf.get(ENABLE_SIZE_BASED_WEIGHT));
+    }
+  }
+
+  @Override
+  public void containerAllocated(S schedulableEntity,
+    RMContainer r) {
+      reorderSchedulableEntity(schedulableEntity);
+    }
+
+  @Override
+  public void containerReleased(S schedulableEntity,
+    RMContainer r) {
+      reorderSchedulableEntity(schedulableEntity);
+    }
+
+  @Override
+  public String getInfo() {
+    String sbw = sizeBasedWeight ? " with sizeBasedWeight" : "";
+    return "FairOrderingPolicy" + sbw;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/39520544/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index 3ba8036..34248a4 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -84,6 +84,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptA
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
@@ -460,6 +461,41 @@ public class TestLeafQueue {
   }
 
   @Test
+  public void testFairConfiguration() throws Exception {
+
+    CapacitySchedulerConfiguration testConf =
+        new CapacitySchedulerConfiguration();
+
+    String tproot = CapacitySchedulerConfiguration.ROOT + "." +
+      "testPolicyRoot" + System.currentTimeMillis();
+
+    OrderingPolicy<FiCaSchedulerApp> schedOrder =
+      testConf.<FiCaSchedulerApp>getOrderingPolicy(tproot);
+
+    //override default to fair
+    String policyType = CapacitySchedulerConfiguration.PREFIX + tproot +
+      "." + CapacitySchedulerConfiguration.ORDERING_POLICY;
+
+    testConf.set(policyType,
+      CapacitySchedulerConfiguration.FAIR_ORDERING_POLICY);
+    schedOrder =
+      testConf.<FiCaSchedulerApp>getOrderingPolicy(tproot);
+    FairOrderingPolicy fop = (FairOrderingPolicy<FiCaSchedulerApp>) schedOrder;
+    assertFalse(fop.getSizeBasedWeight());
+
+    //Now with sizeBasedWeight
+    String sbwConfig = CapacitySchedulerConfiguration.PREFIX + tproot +
+      "." + CapacitySchedulerConfiguration.ORDERING_POLICY + "." +
+      FairOrderingPolicy.ENABLE_SIZE_BASED_WEIGHT;
+    testConf.set(sbwConfig, "true");
+    schedOrder =
+      testConf.<FiCaSchedulerApp>getOrderingPolicy(tproot);
+    fop = (FairOrderingPolicy<FiCaSchedulerApp>) schedOrder;
+    assertTrue(fop.getSizeBasedWeight());
+
+  }
+
+  @Test
   public void testSingleQueueWithOneUser() throws Exception {
 
     // Manipulate queue 'a'
@@ -2621,6 +2657,86 @@ public class TestLeafQueue {
 
   }
 
+  @Test
+  public void testFairAssignment() throws Exception {
+
+    LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
+
+    OrderingPolicy<FiCaSchedulerApp> schedulingOrder =
+      new FairOrderingPolicy<FiCaSchedulerApp>();
+
+    a.setOrderingPolicy(schedulingOrder);
+
+    String host_0_0 = "127.0.0.1";
+    String rack_0 = "rack_0";
+    FiCaSchedulerNode node_0_0 = TestUtils.getMockNode(host_0_0, rack_0, 0, 
16*GB);
+
+    final int numNodes = 4;
+    Resource clusterResource = Resources.createResource(
+        numNodes * (16*GB), numNodes * 16);
+    when(csContext.getNumClusterNodes()).thenReturn(numNodes);
+
+    String user_0 = "user_0";
+
+    final ApplicationAttemptId appAttemptId_0 =
+        TestUtils.getMockApplicationAttemptId(0, 0);
+    FiCaSchedulerApp app_0 =
+        spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a,
+            mock(ActiveUsersManager.class), spyRMContext));
+    a.submitApplicationAttempt(app_0, user_0);
+
+    final ApplicationAttemptId appAttemptId_1 =
+        TestUtils.getMockApplicationAttemptId(1, 0);
+    FiCaSchedulerApp app_1 =
+        spy(new FiCaSchedulerApp(appAttemptId_1, user_0, a,
+            mock(ActiveUsersManager.class), spyRMContext));
+    a.submitApplicationAttempt(app_1, user_0);
+
+    Priority priority = TestUtils.createMockPriority(1);
+    List<ResourceRequest> app_0_requests_0 = new ArrayList<ResourceRequest>();
+    List<ResourceRequest> app_1_requests_0 = new ArrayList<ResourceRequest>();
+
+    app_0_requests_0.clear();
+    app_0_requests_0.add(
+        TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1,
+            true, priority, recordFactory));
+    app_0.updateResourceRequests(app_0_requests_0);
+
+    app_1_requests_0.clear();
+    app_1_requests_0.add(
+        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1,
+            true, priority, recordFactory));
+    app_1.updateResourceRequests(app_1_requests_0);
+
+    a.assignContainers(clusterResource, node_0_0, new 
ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    Assert.assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
+    a.assignContainers(clusterResource, node_0_0, new 
ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    Assert.assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
+
+    app_0_requests_0.clear();
+    app_0_requests_0.add(
+        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1,
+            true, priority, recordFactory));
+    app_0.updateResourceRequests(app_0_requests_0);
+
+    app_1_requests_0.clear();
+    app_1_requests_0.add(
+        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1,
+            true, priority, recordFactory));
+    app_1.updateResourceRequests(app_1_requests_0);
+
+    //Since it already has more resources, app_0 will not get
+    //assigned first, but app_1 will
+    a.assignContainers(clusterResource, node_0_0, new 
ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    Assert.assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
+    Assert.assertEquals(2*GB, app_1.getCurrentConsumption().getMemory());
+
+    //and only then will app_0
+    a.assignContainers(clusterResource, node_0_0, new 
ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    Assert.assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
+
+  }
+
   private List<FiCaSchedulerApp> createListOfApps(int noOfApps, String user,
       LeafQueue defaultQueue) {
     List<FiCaSchedulerApp> appsLists = new ArrayList<FiCaSchedulerApp>();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/39520544/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java
new file mode 100644
index 0000000..ffb9d93
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
+
+import java.util.*;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+
+public class TestFairOrderingPolicy {
+
+  final static int GB = 1024;
+
+  @Test
+  public void testSimpleComparison() {
+    FairOrderingPolicy<MockSchedulableEntity> policy =
+      new FairOrderingPolicy<MockSchedulableEntity>();
+    MockSchedulableEntity r1 = new MockSchedulableEntity();
+    MockSchedulableEntity r2 = new MockSchedulableEntity();
+
+    Assert.assertTrue(policy.getComparator().compare(r1, r2) == 0);
+
+    //consumption
+    r1.setUsed(Resources.createResource(1, 0));
+    AbstractComparatorOrderingPolicy.updateSchedulingResourceUsage(
+      r1.getSchedulingResourceUsage());
+    Assert.assertTrue(policy.getComparator().compare(r1, r2) > 0);
+  }
+
+  @Test
+  public void testSizeBasedWeight() {
+    FairOrderingPolicy<MockSchedulableEntity> policy =
+      new FairOrderingPolicy<MockSchedulableEntity>();
+    policy.setSizeBasedWeight(true);
+    MockSchedulableEntity r1 = new MockSchedulableEntity();
+    MockSchedulableEntity r2 = new MockSchedulableEntity();
+
+    //No changes, equal
+    Assert.assertTrue(policy.getComparator().compare(r1, r2) == 0);
+
+    r1.setUsed(Resources.createResource(4 * GB));
+    r2.setUsed(Resources.createResource(4 * GB));
+
+    r1.setPending(Resources.createResource(4 * GB));
+    r2.setPending(Resources.createResource(4 * GB));
+
+    AbstractComparatorOrderingPolicy.updateSchedulingResourceUsage(
+      r1.getSchedulingResourceUsage());
+    AbstractComparatorOrderingPolicy.updateSchedulingResourceUsage(
+      r2.getSchedulingResourceUsage());
+
+    //Same, equal
+    Assert.assertTrue(policy.getComparator().compare(r1, r2) == 0);
+
+    r2.setUsed(Resources.createResource(5 * GB));
+    r2.setPending(Resources.createResource(5 * GB));
+
+    AbstractComparatorOrderingPolicy.updateSchedulingResourceUsage(
+      r2.getSchedulingResourceUsage());
+
+    //More demand and consumption, but not enough more demand to overcome
+    //additional consumption
+    Assert.assertTrue(policy.getComparator().compare(r1, r2) < 0);
+
+    //High demand, enough to reverse sbw
+    r2.setPending(Resources.createResource(100 * GB));
+    AbstractComparatorOrderingPolicy.updateSchedulingResourceUsage(
+      r2.getSchedulingResourceUsage());
+    Assert.assertTrue(policy.getComparator().compare(r1, r2) > 0);
+  }
+
+  @Test
+  public void testIterators() {
+    OrderingPolicy<MockSchedulableEntity> schedOrder =
+     new FairOrderingPolicy<MockSchedulableEntity>();
+
+    MockSchedulableEntity msp1 = new MockSchedulableEntity();
+    MockSchedulableEntity msp2 = new MockSchedulableEntity();
+    MockSchedulableEntity msp3 = new MockSchedulableEntity();
+
+    msp1.setId("1");
+    msp2.setId("2");
+    msp3.setId("3");
+
+    msp1.setUsed(Resources.createResource(3));
+    msp2.setUsed(Resources.createResource(2));
+    msp3.setUsed(Resources.createResource(1));
+
+    AbstractComparatorOrderingPolicy.updateSchedulingResourceUsage(
+      msp1.getSchedulingResourceUsage());
+    AbstractComparatorOrderingPolicy.updateSchedulingResourceUsage(
+      msp2.getSchedulingResourceUsage());
+    AbstractComparatorOrderingPolicy.updateSchedulingResourceUsage(
+      msp2.getSchedulingResourceUsage());
+
+    schedOrder.addSchedulableEntity(msp1);
+    schedOrder.addSchedulableEntity(msp2);
+    schedOrder.addSchedulableEntity(msp3);
+
+
+    //Assignment, least to greatest consumption
+    checkIds(schedOrder.getAssignmentIterator(), new String[]{"3", "2", "1"});
+
+    //Preemption, greatest to least
+    checkIds(schedOrder.getPreemptionIterator(), new String[]{"1", "2", "3"});
+
+    //Change value without inform, should see no change
+    msp2.setUsed(Resources.createResource(6));
+    checkIds(schedOrder.getAssignmentIterator(), new String[]{"3", "2", "1"});
+    checkIds(schedOrder.getPreemptionIterator(), new String[]{"1", "2", "3"});
+
+    //Do inform, will reorder
+    schedOrder.containerAllocated(msp2, null);
+    checkIds(schedOrder.getAssignmentIterator(), new String[]{"3", "1", "2"});
+    checkIds(schedOrder.getPreemptionIterator(), new String[]{"2", "1", "3"});
+  }
+
+  public void checkIds(Iterator<MockSchedulableEntity> si,
+      String[] ids) {
+    for (int i = 0;i < ids.length;i++) {
+      Assert.assertEquals(si.next().getId(),
+        ids[i]);
+    }
+  }
+
+}

Reply via email to