Repository: hadoop Updated Branches: refs/heads/trunk 49f6e3d35 -> 395205444
YARN-3319. Implement a FairOrderingPolicy. (Craig Welch via wangda) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/39520544 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/39520544 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/39520544 Branch: refs/heads/trunk Commit: 395205444e8a9ae6fc86f0a441e98486a775511a Parents: 49f6e3d Author: Wangda Tan <wan...@apache.org> Authored: Thu Apr 23 10:47:15 2015 -0700 Committer: Wangda Tan <wan...@apache.org> Committed: Thu Apr 23 10:47:15 2015 -0700 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 2 + .../dev-support/findbugs-exclude.xml | 8 + .../CapacitySchedulerConfiguration.java | 20 ++- .../scheduler/policy/CompoundComparator.java | 43 ++++++ .../scheduler/policy/FairOrderingPolicy.java | 114 ++++++++++++++ .../scheduler/capacity/TestLeafQueue.java | 116 +++++++++++++++ .../policy/TestFairOrderingPolicy.java | 149 +++++++++++++++++++ 7 files changed, 450 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/39520544/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index d335389..a8d6d6f 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -100,6 +100,8 @@ Release 2.8.0 - UNRELEASED network bandwidth traffic originating from YARN containers (Sidharta Seethana via vinodkv) + YARN-3319. Implement a FairOrderingPolicy. (Craig Welch via wangda) + IMPROVEMENTS YARN-1880. Cleanup TestApplicationClientProtocolOnHA http://git-wip-us.apache.org/repos/asf/hadoop/blob/39520544/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml index ece8548..114851f 100644 --- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml +++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml @@ -146,6 +146,14 @@ <Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" /> </Match> <Match> + <Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy$FairComparator" /> + <Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" /> + </Match> + <Match> + <Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.CompoundComparator" /> + <Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" /> + </Match> + <Match> <Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PartitionedQueueComparator" /> <Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" /> </Match> http://git-wip-us.apache.org/repos/asf/hadoop/blob/39520544/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index c9e83a1..b00f25c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -122,7 +122,11 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur public static final String ORDERING_POLICY = "ordering-policy"; - public static final String DEFAULT_ORDERING_POLICY = "fifo"; + public static final String FIFO_ORDERING_POLICY = "fifo"; + + public static final String FAIR_ORDERING_POLICY = "fair"; + + public static final String DEFAULT_ORDERING_POLICY = FIFO_ORDERING_POLICY; @Private public static final int DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS = 10000; @@ -395,9 +399,12 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur OrderingPolicy<S> orderingPolicy; - if (policyType.trim().equals("fifo")) { + if (policyType.trim().equals(FIFO_ORDERING_POLICY)) { policyType = FifoOrderingPolicy.class.getName(); } + if (policyType.trim().equals(FAIR_ORDERING_POLICY)) { + policyType = FairOrderingPolicy.class.getName(); + } try { orderingPolicy = (OrderingPolicy<S>) Class.forName(policyType).newInstance(); @@ -405,6 +412,15 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur String message = "Unable to construct ordering policy for: " + policyType + ", " + e.getMessage(); throw new RuntimeException(message, e); } + + Map<String, String> config = new HashMap<String, String>(); + String confPrefix = getQueuePrefix(queue) + ORDERING_POLICY + "."; + for (Map.Entry<String, String> kv : this) { + if (kv.getKey().startsWith(confPrefix)) { + config.put(kv.getKey().substring(confPrefix.length()), kv.getValue()); + } + } + orderingPolicy.configure(config); return orderingPolicy; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/39520544/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/CompoundComparator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/CompoundComparator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/CompoundComparator.java new file mode 100644 index 0000000..3027ab7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/CompoundComparator.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; + +import java.util.*; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*; + +//Some policies will use multiple comparators joined together +class CompoundComparator implements Comparator<SchedulableEntity> { + + List<Comparator<SchedulableEntity>> comparators; + + CompoundComparator(List<Comparator<SchedulableEntity>> comparators) { + this.comparators = comparators; + } + + @Override + public int compare(final SchedulableEntity r1, final SchedulableEntity r2) { + for (Comparator<SchedulableEntity> comparator : comparators) { + int result = comparator.compare(r1, r2); + if (result != 0) return result; + } + return 0; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/39520544/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FairOrderingPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FairOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FairOrderingPolicy.java new file mode 100644 index 0000000..3ab74de --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FairOrderingPolicy.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; + +import java.util.*; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; + +/** + * An OrderingPolicy which orders SchedulableEntities for fairness (see + * FairScheduler + * FairSharePolicy), generally, processes with lesser usage are lesser. If + * sizedBasedWeight is set to true then an application with high demand + * may be prioritized ahead of an application with less usage. This + * is to offset the tendency to favor small apps, which could result in + * starvation for large apps if many small ones enter and leave the queue + * continuously (optional, default false) + */ +public class FairOrderingPolicy<S extends SchedulableEntity> extends AbstractComparatorOrderingPolicy<S> { + + public static final String ENABLE_SIZE_BASED_WEIGHT = + "fair.enable-size-based-weight"; + + protected class FairComparator implements Comparator<SchedulableEntity> { + @Override + public int compare(final SchedulableEntity r1, final SchedulableEntity r2) { + int res = (int) Math.signum( getMagnitude(r1) - getMagnitude(r2) ); + return res; + } + } + + private CompoundComparator fairComparator; + + private boolean sizeBasedWeight = false; + + public FairOrderingPolicy() { + List<Comparator<SchedulableEntity>> comparators = + new ArrayList<Comparator<SchedulableEntity>>(); + comparators.add(new FairComparator()); + comparators.add(new FifoComparator()); + fairComparator = new CompoundComparator( + comparators + ); + this.comparator = fairComparator; + this.schedulableEntities = new TreeSet<S>(comparator); + } + + private double getMagnitude(SchedulableEntity r) { + double mag = r.getSchedulingResourceUsage().getCachedUsed( + CommonNodeLabelsManager.ANY).getMemory(); + if (sizeBasedWeight) { + double weight = Math.log1p(r.getSchedulingResourceUsage().getCachedDemand( + CommonNodeLabelsManager.ANY).getMemory()) / Math.log(2); + mag = mag / weight; + } + return mag; + } + + @VisibleForTesting + public boolean getSizeBasedWeight() { + return sizeBasedWeight; + } + + @VisibleForTesting + public void setSizeBasedWeight(boolean sizeBasedWeight) { + this.sizeBasedWeight = sizeBasedWeight; + } + + @Override + public void configure(Map<String, String> conf) { + if (conf.containsKey(ENABLE_SIZE_BASED_WEIGHT)) { + sizeBasedWeight = Boolean.valueOf(conf.get(ENABLE_SIZE_BASED_WEIGHT)); + } + } + + @Override + public void containerAllocated(S schedulableEntity, + RMContainer r) { + reorderSchedulableEntity(schedulableEntity); + } + + @Override + public void containerReleased(S schedulableEntity, + RMContainer r) { + reorderSchedulableEntity(schedulableEntity); + } + + @Override + public String getInfo() { + String sbw = sizeBasedWeight ? " with sizeBasedWeight" : ""; + return "FairOrderingPolicy" + sbw; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/39520544/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 3ba8036..34248a4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -84,6 +84,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptA import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -460,6 +461,41 @@ public class TestLeafQueue { } @Test + public void testFairConfiguration() throws Exception { + + CapacitySchedulerConfiguration testConf = + new CapacitySchedulerConfiguration(); + + String tproot = CapacitySchedulerConfiguration.ROOT + "." + + "testPolicyRoot" + System.currentTimeMillis(); + + OrderingPolicy<FiCaSchedulerApp> schedOrder = + testConf.<FiCaSchedulerApp>getOrderingPolicy(tproot); + + //override default to fair + String policyType = CapacitySchedulerConfiguration.PREFIX + tproot + + "." + CapacitySchedulerConfiguration.ORDERING_POLICY; + + testConf.set(policyType, + CapacitySchedulerConfiguration.FAIR_ORDERING_POLICY); + schedOrder = + testConf.<FiCaSchedulerApp>getOrderingPolicy(tproot); + FairOrderingPolicy fop = (FairOrderingPolicy<FiCaSchedulerApp>) schedOrder; + assertFalse(fop.getSizeBasedWeight()); + + //Now with sizeBasedWeight + String sbwConfig = CapacitySchedulerConfiguration.PREFIX + tproot + + "." + CapacitySchedulerConfiguration.ORDERING_POLICY + "." + + FairOrderingPolicy.ENABLE_SIZE_BASED_WEIGHT; + testConf.set(sbwConfig, "true"); + schedOrder = + testConf.<FiCaSchedulerApp>getOrderingPolicy(tproot); + fop = (FairOrderingPolicy<FiCaSchedulerApp>) schedOrder; + assertTrue(fop.getSizeBasedWeight()); + + } + + @Test public void testSingleQueueWithOneUser() throws Exception { // Manipulate queue 'a' @@ -2621,6 +2657,86 @@ public class TestLeafQueue { } + @Test + public void testFairAssignment() throws Exception { + + LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A)); + + OrderingPolicy<FiCaSchedulerApp> schedulingOrder = + new FairOrderingPolicy<FiCaSchedulerApp>(); + + a.setOrderingPolicy(schedulingOrder); + + String host_0_0 = "127.0.0.1"; + String rack_0 = "rack_0"; + FiCaSchedulerNode node_0_0 = TestUtils.getMockNode(host_0_0, rack_0, 0, 16*GB); + + final int numNodes = 4; + Resource clusterResource = Resources.createResource( + numNodes * (16*GB), numNodes * 16); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + + String user_0 = "user_0"; + + final ApplicationAttemptId appAttemptId_0 = + TestUtils.getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app_0 = + spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a, + mock(ActiveUsersManager.class), spyRMContext)); + a.submitApplicationAttempt(app_0, user_0); + + final ApplicationAttemptId appAttemptId_1 = + TestUtils.getMockApplicationAttemptId(1, 0); + FiCaSchedulerApp app_1 = + spy(new FiCaSchedulerApp(appAttemptId_1, user_0, a, + mock(ActiveUsersManager.class), spyRMContext)); + a.submitApplicationAttempt(app_1, user_0); + + Priority priority = TestUtils.createMockPriority(1); + List<ResourceRequest> app_0_requests_0 = new ArrayList<ResourceRequest>(); + List<ResourceRequest> app_1_requests_0 = new ArrayList<ResourceRequest>(); + + app_0_requests_0.clear(); + app_0_requests_0.add( + TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, + true, priority, recordFactory)); + app_0.updateResourceRequests(app_0_requests_0); + + app_1_requests_0.clear(); + app_1_requests_0.add( + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, + true, priority, recordFactory)); + app_1.updateResourceRequests(app_1_requests_0); + + a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + Assert.assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); + a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + Assert.assertEquals(1*GB, app_1.getCurrentConsumption().getMemory()); + + app_0_requests_0.clear(); + app_0_requests_0.add( + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, + true, priority, recordFactory)); + app_0.updateResourceRequests(app_0_requests_0); + + app_1_requests_0.clear(); + app_1_requests_0.add( + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, + true, priority, recordFactory)); + app_1.updateResourceRequests(app_1_requests_0); + + //Since it already has more resources, app_0 will not get + //assigned first, but app_1 will + a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + Assert.assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); + Assert.assertEquals(2*GB, app_1.getCurrentConsumption().getMemory()); + + //and only then will app_0 + a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + Assert.assertEquals(3*GB, app_0.getCurrentConsumption().getMemory()); + + } + private List<FiCaSchedulerApp> createListOfApps(int noOfApps, String user, LeafQueue defaultQueue) { List<FiCaSchedulerApp> appsLists = new ArrayList<FiCaSchedulerApp>(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/39520544/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java new file mode 100644 index 0000000..ffb9d93 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; + +import java.util.*; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; + +public class TestFairOrderingPolicy { + + final static int GB = 1024; + + @Test + public void testSimpleComparison() { + FairOrderingPolicy<MockSchedulableEntity> policy = + new FairOrderingPolicy<MockSchedulableEntity>(); + MockSchedulableEntity r1 = new MockSchedulableEntity(); + MockSchedulableEntity r2 = new MockSchedulableEntity(); + + Assert.assertTrue(policy.getComparator().compare(r1, r2) == 0); + + //consumption + r1.setUsed(Resources.createResource(1, 0)); + AbstractComparatorOrderingPolicy.updateSchedulingResourceUsage( + r1.getSchedulingResourceUsage()); + Assert.assertTrue(policy.getComparator().compare(r1, r2) > 0); + } + + @Test + public void testSizeBasedWeight() { + FairOrderingPolicy<MockSchedulableEntity> policy = + new FairOrderingPolicy<MockSchedulableEntity>(); + policy.setSizeBasedWeight(true); + MockSchedulableEntity r1 = new MockSchedulableEntity(); + MockSchedulableEntity r2 = new MockSchedulableEntity(); + + //No changes, equal + Assert.assertTrue(policy.getComparator().compare(r1, r2) == 0); + + r1.setUsed(Resources.createResource(4 * GB)); + r2.setUsed(Resources.createResource(4 * GB)); + + r1.setPending(Resources.createResource(4 * GB)); + r2.setPending(Resources.createResource(4 * GB)); + + AbstractComparatorOrderingPolicy.updateSchedulingResourceUsage( + r1.getSchedulingResourceUsage()); + AbstractComparatorOrderingPolicy.updateSchedulingResourceUsage( + r2.getSchedulingResourceUsage()); + + //Same, equal + Assert.assertTrue(policy.getComparator().compare(r1, r2) == 0); + + r2.setUsed(Resources.createResource(5 * GB)); + r2.setPending(Resources.createResource(5 * GB)); + + AbstractComparatorOrderingPolicy.updateSchedulingResourceUsage( + r2.getSchedulingResourceUsage()); + + //More demand and consumption, but not enough more demand to overcome + //additional consumption + Assert.assertTrue(policy.getComparator().compare(r1, r2) < 0); + + //High demand, enough to reverse sbw + r2.setPending(Resources.createResource(100 * GB)); + AbstractComparatorOrderingPolicy.updateSchedulingResourceUsage( + r2.getSchedulingResourceUsage()); + Assert.assertTrue(policy.getComparator().compare(r1, r2) > 0); + } + + @Test + public void testIterators() { + OrderingPolicy<MockSchedulableEntity> schedOrder = + new FairOrderingPolicy<MockSchedulableEntity>(); + + MockSchedulableEntity msp1 = new MockSchedulableEntity(); + MockSchedulableEntity msp2 = new MockSchedulableEntity(); + MockSchedulableEntity msp3 = new MockSchedulableEntity(); + + msp1.setId("1"); + msp2.setId("2"); + msp3.setId("3"); + + msp1.setUsed(Resources.createResource(3)); + msp2.setUsed(Resources.createResource(2)); + msp3.setUsed(Resources.createResource(1)); + + AbstractComparatorOrderingPolicy.updateSchedulingResourceUsage( + msp1.getSchedulingResourceUsage()); + AbstractComparatorOrderingPolicy.updateSchedulingResourceUsage( + msp2.getSchedulingResourceUsage()); + AbstractComparatorOrderingPolicy.updateSchedulingResourceUsage( + msp2.getSchedulingResourceUsage()); + + schedOrder.addSchedulableEntity(msp1); + schedOrder.addSchedulableEntity(msp2); + schedOrder.addSchedulableEntity(msp3); + + + //Assignment, least to greatest consumption + checkIds(schedOrder.getAssignmentIterator(), new String[]{"3", "2", "1"}); + + //Preemption, greatest to least + checkIds(schedOrder.getPreemptionIterator(), new String[]{"1", "2", "3"}); + + //Change value without inform, should see no change + msp2.setUsed(Resources.createResource(6)); + checkIds(schedOrder.getAssignmentIterator(), new String[]{"3", "2", "1"}); + checkIds(schedOrder.getPreemptionIterator(), new String[]{"1", "2", "3"}); + + //Do inform, will reorder + schedOrder.containerAllocated(msp2, null); + checkIds(schedOrder.getAssignmentIterator(), new String[]{"3", "1", "2"}); + checkIds(schedOrder.getPreemptionIterator(), new String[]{"2", "1", "3"}); + } + + public void checkIds(Iterator<MockSchedulableEntity> si, + String[] ids) { + for (int i = 0;i < ids.length;i++) { + Assert.assertEquals(si.next().getId(), + ids[i]); + } + } + +}