YARN-7520. Queue Ordering policy changes for ordering auto created leaf queues within Managed parent Queues. (Suma Shivaprasad via wangda)
Change-Id: I482f086945bd448d512cb5b3879d7371e37ee134 Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a8316df8 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a8316df8 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a8316df8 Branch: refs/heads/trunk Commit: a8316df8c05a7b3d1a5577174b838711a49ef971 Parents: f548bff Author: Wangda Tan <[email protected]> Authored: Fri Dec 8 15:11:28 2017 -0800 Committer: Wangda Tan <[email protected]> Committed: Fri Dec 8 15:11:28 2017 -0800 ---------------------------------------------------------------------- .../PriorityUtilizationQueueOrderingPolicy.java | 83 ++++++++++++++------ ...tPriorityUtilizationQueueOrderingPolicy.java | 78 ++++++++++++------ 2 files changed, 113 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8316df8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java index 4985a1a..e684c2b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -21,9 +21,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy; import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels + .RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity + .CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.util.resource.Resources; import java.util.ArrayList; @@ -49,7 +51,8 @@ import java.util.function.Supplier; * other is under: The queue that is under its capacity guarantee gets the * resources. */ -public class PriorityUtilizationQueueOrderingPolicy implements QueueOrderingPolicy { +public class PriorityUtilizationQueueOrderingPolicy + implements QueueOrderingPolicy { private List<CSQueue> queues; private boolean respectPriority; @@ -78,7 +81,7 @@ public class PriorityUtilizationQueueOrderingPolicy implements QueueOrderingPoli if (priority1 == priority2) { // The queue with less relative used-capacity goes first return Double.compare(relativeAssigned1, relativeAssigned2); - } else { + } else{ // When priority is different: if ((relativeAssigned1 < 1.0f && relativeAssigned2 < 1.0f) || ( relativeAssigned1 >= 1.0f && relativeAssigned2 >= 1.0f)) { @@ -86,7 +89,7 @@ public class PriorityUtilizationQueueOrderingPolicy implements QueueOrderingPoli // Or both the queues are over or meeting their guaranteed capacities // queue with higher used-capacity goes first return Integer.compare(priority2, priority1); - } else { + } else{ // Otherwise, when one of the queues is over or meeting their // guaranteed capacities and the other is under: The queue that is // under its capacity guarantee gets the resources. @@ -98,7 +101,7 @@ public class PriorityUtilizationQueueOrderingPolicy implements QueueOrderingPoli /** * Comparator that both looks at priority and utilization */ - private class PriorityQueueComparator implements Comparator<CSQueue> { + private class PriorityQueueComparator implements Comparator<CSQueue> { @Override public int compare(CSQueue q1, CSQueue q2) { @@ -109,8 +112,38 @@ public class PriorityUtilizationQueueOrderingPolicy implements QueueOrderingPoli return rc; } - float used1 = q1.getQueueCapacities().getUsedCapacity(p); - float used2 = q2.getQueueCapacities().getUsedCapacity(p); + float q1AbsCapacity = q1.getQueueCapacities().getAbsoluteCapacity(p); + float q2AbsCapacity = q2.getQueueCapacities().getAbsoluteCapacity(p); + + //If q1's abs capacity > 0 and q2 is 0, then prioritize q1 + if (Float.compare(q1AbsCapacity, 0f) > 0 && Float.compare(q2AbsCapacity, + 0f) == 0) { + return -1; + //If q2's abs capacity > 0 and q1 is 0, then prioritize q2 + } else if (Float.compare(q2AbsCapacity, 0f) > 0 && Float.compare( + q1AbsCapacity, 0f) == 0) { + return 1; + } else if (Float.compare(q1AbsCapacity, 0f) == 0 && Float.compare( + q2AbsCapacity, 0f) == 0) { + // both q1 has 0 and q2 has 0 capacity, then fall back to using + // priority, abs used capacity to prioritize + float used1 = q1.getQueueCapacities().getAbsoluteUsedCapacity(p); + float used2 = q2.getQueueCapacities().getAbsoluteUsedCapacity(p); + + return compare(q1, q2, used1, used2, p); + } else{ + // both q1 has positive abs capacity and q2 has positive abs + // capacity + float used1 = q1.getQueueCapacities().getUsedCapacity(p); + float used2 = q2.getQueueCapacities().getUsedCapacity(p); + + return compare(q1, q2, used1, used2, p); + } + } + + private int compare(CSQueue q1, CSQueue q2, float q1Used, float q2Used, + String partition) { + int p1 = 0; int p2 = 0; if (respectPriority) { @@ -118,29 +151,31 @@ public class PriorityUtilizationQueueOrderingPolicy implements QueueOrderingPoli p2 = q2.getPriority().getPriority(); } - rc = PriorityUtilizationQueueOrderingPolicy.compare(used1, used2, p1, p2); + int rc = PriorityUtilizationQueueOrderingPolicy.compare(q1Used, q2Used, + p1, p2); // For queue with same used ratio / priority, queue with higher configured // capacity goes first if (0 == rc) { - Resource minEffRes1 = q1.getQueueResourceQuotas() - .getConfiguredMinResource(p); - Resource minEffRes2 = q2.getQueueResourceQuotas() - .getConfiguredMinResource(p); - if (!minEffRes1.equals(Resources.none()) - && !minEffRes2.equals(Resources.none())) { + Resource minEffRes1 = + q1.getQueueResourceQuotas().getConfiguredMinResource(partition); + Resource minEffRes2 = + q2.getQueueResourceQuotas().getConfiguredMinResource(partition); + if (!minEffRes1.equals(Resources.none()) && !minEffRes2.equals( + Resources.none())) { return minEffRes2.compareTo(minEffRes1); } - float abs1 = q1.getQueueCapacities().getAbsoluteCapacity(p); - float abs2 = q2.getQueueCapacities().getAbsoluteCapacity(p); + float abs1 = q1.getQueueCapacities().getAbsoluteCapacity(partition); + float abs2 = q2.getQueueCapacities().getAbsoluteCapacity(partition); return Float.compare(abs2, abs1); } return rc; } - private int compareQueueAccessToPartition(CSQueue q1, CSQueue q2, String partition) { + private int compareQueueAccessToPartition(CSQueue q1, CSQueue q2, + String partition) { // Everybody has access to default partition if (StringUtils.equals(partition, RMNodeLabelsManager.NO_LABEL)) { return 0; @@ -190,9 +225,11 @@ public class PriorityUtilizationQueueOrderingPolicy implements QueueOrderingPoli @Override public String getConfigName() { if (respectPriority) { - return CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY; + return CapacitySchedulerConfiguration. + QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY; } else{ - return CapacitySchedulerConfiguration.QUEUE_UTILIZATION_ORDERING_POLICY; + return CapacitySchedulerConfiguration. + QUEUE_UTILIZATION_ORDERING_POLICY; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8316df8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/TestPriorityUtilizationQueueOrderingPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/TestPriorityUtilizationQueueOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/TestPriorityUtilizationQueueOrderingPolicy.java index b9d5b82..ca9a84b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/TestPriorityUtilizationQueueOrderingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/TestPriorityUtilizationQueueOrderingPolicy.java @@ -36,7 +36,7 @@ import static org.mockito.Mockito.when; public class TestPriorityUtilizationQueueOrderingPolicy { private List<CSQueue> mockCSQueues(String[] queueNames, int[] priorities, - float[] utilizations, String partition) { + float[] utilizations, float[] absCapacities, String partition) { // sanity check assert queueNames != null && priorities != null && utilizations != null && queueNames.length > 0 && queueNames.length == priorities.length @@ -48,6 +48,7 @@ public class TestPriorityUtilizationQueueOrderingPolicy { when(q.getQueueName()).thenReturn(queueNames[i]); QueueCapacities qc = new QueueCapacities(false); + qc.setAbsoluteCapacity(partition, absCapacities[i]); qc.setUsedCapacity(partition, utilizations[i]); when(q.getQueueCapacities()).thenReturn(qc); @@ -81,41 +82,45 @@ public class TestPriorityUtilizationQueueOrderingPolicy { // Case 1, one queue policy.setQueues(mockCSQueues(new String[] { "a" }, new int[] { 0 }, - new float[] { 0.1f }, "")); + new float[] { 0.1f }, new float[] {0.2f}, "")); verifyOrder(policy, "", new String[] { "a" }); // Case 2, 2 queues policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 0, 0 }, - new float[] { 0.1f, 0.0f }, "")); + new float[] { 0.1f, 0.0f }, new float[] {0.2f, 0.3f}, "")); verifyOrder(policy, "", new String[] { "b", "a" }); // Case 3, 3 queues policy.setQueues( mockCSQueues(new String[] { "a", "b", "c" }, new int[] { 0, 0, 0 }, - new float[] { 0.1f, 0.0f, 0.2f }, "")); + new float[] { 0.1f, 0.0f, 0.2f }, new float[] {0.2f, 0.3f, 0.4f}, + "")); verifyOrder(policy, "", new String[] { "b", "a", "c" }); // Case 4, 3 queues, ignore priority policy.setQueues( mockCSQueues(new String[] { "a", "b", "c" }, new int[] { 2, 1, 0 }, - new float[] { 0.1f, 0.0f, 0.2f }, "")); + new float[] { 0.1f, 0.0f, 0.2f }, new float[] {0.2f, 0.3f, 0.4f}, + "")); verifyOrder(policy, "", new String[] { "b", "a", "c" }); // Case 5, 3 queues, look at partition (default) policy.setQueues( mockCSQueues(new String[] { "a", "b", "c" }, new int[] { 2, 1, 0 }, - new float[] { 0.1f, 0.0f, 0.2f }, "x")); + new float[] { 0.1f, 0.0f, 0.2f }, new float[] {0.2f, 0.3f, 0.4f}, + "x")); verifyOrder(policy, "", new String[] { "a", "b", "c" }); // Case 5, 3 queues, look at partition (x) policy.setQueues( mockCSQueues(new String[] { "a", "b", "c" }, new int[] { 2, 1, 0 }, - new float[] { 0.1f, 0.0f, 0.2f }, "x")); + new float[] { 0.1f, 0.0f, 0.2f }, new float[] {0.2f, 0.3f, 0.4f}, + "x")); verifyOrder(policy, "x", new String[] { "b", "a", "c" }); // Case 6, 3 queues, with different accessibility to partition List<CSQueue> queues = mockCSQueues(new String[] { "a", "b", "c" }, new int[] { 2, 1, 0 }, - new float[] { 0.1f, 0.0f, 0.2f }, "x"); + new float[] { 0.1f, 0.0f, 0.2f }, new float[] {0.2f, 0.3f, 0.4f}, "x"); // a can access "x" when(queues.get(0).getAccessibleNodeLabels()).thenReturn(ImmutableSet.of("x", "y")); // c can access "x" @@ -131,89 +136,94 @@ public class TestPriorityUtilizationQueueOrderingPolicy { // Case 1, one queue policy.setQueues(mockCSQueues(new String[] { "a" }, new int[] { 1 }, - new float[] { 0.1f }, "")); + new float[] { 0.1f }, new float[] {0.2f}, "")); verifyOrder(policy, "", new String[] { "a" }); // Case 2, 2 queues, both under utilized, same priority policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 1, 1 }, - new float[] { 0.2f, 0.1f }, "")); + new float[] { 0.2f, 0.1f }, new float[] {0.2f, 0.3f}, "")); verifyOrder(policy, "", new String[] { "b", "a" }); // Case 3, 2 queues, both over utilized, same priority policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 1, 1 }, - new float[] { 1.1f, 1.2f }, "")); + new float[] { 1.1f, 1.2f },new float[] {0.2f, 0.3f}, "")); verifyOrder(policy, "", new String[] { "a", "b" }); // Case 4, 2 queues, one under and one over, same priority policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 1, 1 }, - new float[] { 0.1f, 1.2f }, "")); + new float[] { 0.1f, 1.2f }, new float[] {0.2f, 0.3f}, "")); verifyOrder(policy, "", new String[] { "a", "b" }); // Case 5, 2 queues, both over utilized, different priority policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 1, 2 }, - new float[] { 1.1f, 1.2f }, "")); + new float[] { 1.1f, 1.2f }, new float[] {0.2f, 0.3f}, "")); verifyOrder(policy, "", new String[] { "b", "a" }); // Case 6, 2 queues, both under utilized, different priority policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 1, 2 }, - new float[] { 0.1f, 0.2f }, "")); + new float[] { 0.1f, 0.2f }, new float[] {0.2f, 0.3f}, "")); verifyOrder(policy, "", new String[] { "b", "a" }); // Case 7, 2 queues, one under utilized and one over utilized, // different priority (1) policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 1, 2 }, - new float[] { 0.1f, 1.2f }, "")); + new float[] { 0.1f, 1.2f }, new float[] {0.2f, 0.3f}, "")); verifyOrder(policy, "", new String[] { "a", "b" }); // Case 8, 2 queues, one under utilized and one over utilized, // different priority (1) policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 2, 1 }, - new float[] { 0.1f, 1.2f }, "")); + new float[] { 0.1f, 1.2f }, new float[] {0.2f, 0.3f}, "")); verifyOrder(policy, "", new String[] { "a", "b" }); // Case 9, 2 queues, one under utilized and one meet, different priority (1) policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 1, 2 }, - new float[] { 0.1f, 1.0f }, "")); + new float[] { 0.1f, 1.0f }, new float[] {0.2f, 0.3f}, "")); verifyOrder(policy, "", new String[] { "a", "b" }); // Case 10, 2 queues, one under utilized and one meet, different priority (2) policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 2, 1 }, - new float[] { 0.1f, 1.0f }, "")); + new float[] { 0.1f, 1.0f }, new float[] {0.2f, 0.3f}, "")); verifyOrder(policy, "", new String[] { "a", "b" }); // Case 11, 2 queues, one under utilized and one meet, same priority policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 1, 1 }, - new float[] { 0.1f, 1.0f }, "")); + new float[] { 0.1f, 1.0f }, new float[] {0.2f, 0.3f}, "")); verifyOrder(policy, "", new String[] { "a", "b" }); // Case 12, 2 queues, both meet, different priority policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 1, 2 }, - new float[] { 1.0f, 1.0f }, "")); + new float[] { 1.0f, 1.0f }, new float[] {0.2f, 0.3f}, "")); verifyOrder(policy, "", new String[] { "b", "a" }); // Case 13, 5 queues, different priority policy.setQueues(mockCSQueues(new String[] { "a", "b", "c", "d", "e" }, new int[] { 1, 2, 0, 0, 3 }, - new float[] { 1.2f, 1.0f, 0.2f, 1.1f, 0.2f }, "")); + new float[] { 1.2f, 1.0f, 0.2f, 1.1f, 0.2f }, + new float[] { 0.2f, 0.1f, 0.1f, 0.3f, 0.3f }, "")); verifyOrder(policy, "", new String[] { "e", "c", "b", "a", "d" }); - // Case 14, 5 queues, different priority, partition default; + // Case 14, 5 queues, different priority, + // partition default - abs capacity is 0; policy.setQueues(mockCSQueues(new String[] { "a", "b", "c", "d", "e" }, new int[] { 1, 2, 0, 0, 3 }, - new float[] { 1.2f, 1.0f, 0.2f, 1.1f, 0.2f }, "x")); + new float[] { 1.2f, 1.0f, 0.2f, 1.1f, 0.2f }, + new float[] { 0.2f, 0.1f, 0.1f, 0.3f, 0.3f }, "x")); verifyOrder(policy, "", new String[] { "e", "b", "a", "c", "d" }); // Case 15, 5 queues, different priority, partition x; policy.setQueues(mockCSQueues(new String[] { "a", "b", "c", "d", "e" }, new int[] { 1, 2, 0, 0, 3 }, - new float[] { 1.2f, 1.0f, 0.2f, 1.1f, 0.2f }, "x")); + new float[] { 1.2f, 1.0f, 0.2f, 1.1f, 0.2f }, + new float[] { 0.2f, 0.1f, 0.1f, 0.3f, 0.3f }, "x")); verifyOrder(policy, "x", new String[] { "e", "c", "b", "a", "d" }); // Case 16, 5 queues, different priority, partition x; and different // accessibility List<CSQueue> queues = mockCSQueues(new String[] { "a", "b", "c", "d", "e" }, new int[] { 1, 2, 0, 0, 3 }, - new float[] { 1.2f, 1.0f, 0.2f, 1.1f, 0.2f }, "x"); + new float[] { 1.2f, 1.0f, 0.2f, 1.1f, 0.2f }, + new float[] { 0.2f, 0.1f, 0.1f, 0.3f, 0.3f }, "x"); // Only a/d has access to x when(queues.get(0).getAccessibleNodeLabels()).thenReturn( ImmutableSet.of("x")); @@ -221,5 +231,23 @@ public class TestPriorityUtilizationQueueOrderingPolicy { ImmutableSet.of("x")); policy.setQueues(queues); verifyOrder(policy, "x", new String[] { "a", "d", "e", "c", "b" }); + + // Case 17, 2 queues, one's abs capacity is 0 + policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 1, 1 }, + new float[] { 0.1f, 1.2f }, new float[] {0.0f, 0.3f}, "")); + verifyOrder(policy, "", new String[] { "b", "a" }); + + // Case 18, 2 queues, one's abs capacity is 0 + policy.setQueues(mockCSQueues(new String[] { "a", "b" }, new int[] { 1, 1 }, + new float[] { 0.1f, 1.2f }, new float[] {0.3f, 0.0f}, "")); + verifyOrder(policy, "", new String[] { "a", "b" }); + + //Case 19, 5 queues with 2 having abs capacity 0 are prioritized last + policy.setQueues(mockCSQueues(new String[] { "a", "b", "c", "d", "e" }, + new int[] { 1, 2, 0, 0, 3 }, + new float[] { 1.2f, 1.0f, 0.2f, 1.1f, 0.2f }, + new float[] { 0.0f, 0.0f, 0.1f, 0.3f, 0.3f }, "x")); + verifyOrder(policy, "x", new String[] { "e", "c", "d", "b", "a" }); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
