YARN-4781. Support intra-queue preemption for fairness ordering policy. Contributed by Eric Payne.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d966c766 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d966c766 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d966c766 Branch: refs/heads/HDDS-48 Commit: d966c766f420fb4fb9b66d55c9266b8cb363e1ab Parents: e4767d4 Author: Sunil G <sun...@apache.org> Authored: Mon May 28 16:32:53 2018 +0530 Committer: Hanisha Koneru <hanishakon...@apache.org> Committed: Wed May 30 14:00:25 2018 -0700 ---------------------------------------------------------------------- .../FifoIntraQueuePreemptionPlugin.java | 37 ++- .../capacity/IntraQueueCandidatesSelector.java | 40 +++ .../monitor/capacity/TempAppPerPartition.java | 9 + .../AbstractComparatorOrderingPolicy.java | 2 - ...alCapacityPreemptionPolicyMockFramework.java | 12 +- ...yPreemptionPolicyIntraQueueFairOrdering.java | 276 +++++++++++++++++++ 6 files changed, 366 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/d966c766/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java index 40f333f..12c178c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java @@ -34,6 +34,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.IntraQueueCandidatesSelector.TAFairOrderingComparator; import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.IntraQueueCandidatesSelector.TAPriorityComparator; import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.IntraQueuePreemptionOrderPolicy; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; @@ -41,6 +42,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -263,8 +266,17 @@ public class FifoIntraQueuePreemptionPlugin Resource queueReassignableResource, PriorityQueue<TempAppPerPartition> orderedByPriority) { - Comparator<TempAppPerPartition> reverseComp = Collections - .reverseOrder(new TAPriorityComparator()); + Comparator<TempAppPerPartition> reverseComp; + OrderingPolicy<FiCaSchedulerApp> queueOrderingPolicy = + tq.leafQueue.getOrderingPolicy(); + if (queueOrderingPolicy instanceof FairOrderingPolicy + && (context.getIntraQueuePreemptionOrderPolicy() + == IntraQueuePreemptionOrderPolicy.USERLIMIT_FIRST)) { + reverseComp = Collections.reverseOrder( + new TAFairOrderingComparator(this.rc, clusterResource)); + } else { + reverseComp = Collections.reverseOrder(new TAPriorityComparator()); + } TreeSet<TempAppPerPartition> orderedApps = new TreeSet<>(reverseComp); String partition = tq.partition; @@ -355,7 +367,16 @@ public class FifoIntraQueuePreemptionPlugin TempQueuePerPartition tq, Collection<FiCaSchedulerApp> apps, Resource clusterResource, Map<String, Resource> perUserAMUsed) { - TAPriorityComparator taComparator = new TAPriorityComparator(); + Comparator<TempAppPerPartition> taComparator; + OrderingPolicy<FiCaSchedulerApp> orderingPolicy = + tq.leafQueue.getOrderingPolicy(); + if (orderingPolicy instanceof FairOrderingPolicy + && (context.getIntraQueuePreemptionOrderPolicy() + == IntraQueuePreemptionOrderPolicy.USERLIMIT_FIRST)) { + taComparator = new TAFairOrderingComparator(this.rc, clusterResource); + } else { + taComparator = new TAPriorityComparator(); + } PriorityQueue<TempAppPerPartition> orderedByPriority = new PriorityQueue<>( 100, taComparator); @@ -393,13 +414,12 @@ public class FifoIntraQueuePreemptionPlugin // Set ideal allocation of app as 0. tmpApp.idealAssigned = Resources.createResource(0, 0); - orderedByPriority.add(tmpApp); - // Create a TempUserPerPartition structure to hold more information // regarding each user's entities such as UserLimit etc. This could // be kept in a user to TempUserPerPartition map for further reference. String userName = app.getUser(); - if (!usersPerPartition.containsKey(userName)) { + TempUserPerPartition tmpUser = usersPerPartition.get(userName); + if (tmpUser == null) { ResourceUsage userResourceUsage = tq.leafQueue.getUser(userName) .getResourceUsage(); @@ -409,7 +429,7 @@ public class FifoIntraQueuePreemptionPlugin amUsed = (userSpecificAmUsed == null) ? Resources.none() : userSpecificAmUsed; - TempUserPerPartition tmpUser = new TempUserPerPartition( + tmpUser = new TempUserPerPartition( tq.leafQueue.getUser(userName), tq.queueName, Resources.clone(userResourceUsage.getUsed(partition)), Resources.clone(amUsed), @@ -432,7 +452,10 @@ public class FifoIntraQueuePreemptionPlugin tmpUser.idealAssigned = Resources.createResource(0, 0); tq.addUserPerPartition(userName, tmpUser); } + tmpApp.setTempUserPerPartition(tmpUser); + orderedByPriority.add(tmpApp); } + return orderedByPriority; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d966c766/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java index a91fac7..8ab9507 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java @@ -28,6 +28,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.AbstractComparatorOrderingPolicy; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; import java.io.Serializable; @@ -64,6 +66,44 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector { } } + /* + * Order first by amount used from least to most. Then order from oldest to + * youngest if amount used is the same. + */ + static class TAFairOrderingComparator + implements Comparator<TempAppPerPartition> { + + private ResourceCalculator rc; + private Resource clusterRes; + + TAFairOrderingComparator(ResourceCalculator rc, Resource clusterRes) { + this.rc = rc; + this.clusterRes = clusterRes; + } + + @Override + public int compare(TempAppPerPartition ta1, TempAppPerPartition ta2) { + if (ta1.getUser().equals(ta2.getUser())) { + AbstractComparatorOrderingPolicy<FiCaSchedulerApp> acop = + (AbstractComparatorOrderingPolicy<FiCaSchedulerApp>) + ta1.getFiCaSchedulerApp().getCSLeafQueue().getOrderingPolicy(); + return acop.getComparator() + .compare(ta1.getFiCaSchedulerApp(), ta2.getFiCaSchedulerApp()); + } else { + Resource usedByUser1 = ta1.getTempUserPerPartition().getUsedDeductAM(); + Resource usedByUser2 = ta2.getTempUserPerPartition().getUsedDeductAM(); + if (Resources.equals(usedByUser1, usedByUser2)) { + return ta1.getApplicationId().compareTo(ta2.getApplicationId()); + } + if (Resources.lessThan(rc, clusterRes, usedByUser1, usedByUser2)) { + return -1; + } else { + return 1; + } + } + } + } + IntraQueuePreemptionComputePlugin fifoPreemptionComputePlugin = null; final CapacitySchedulerPreemptionContext context; http://git-wip-us.apache.org/repos/asf/hadoop/blob/d966c766/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java index e9a934b..05d8096 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java @@ -34,6 +34,7 @@ public class TempAppPerPartition extends AbstractPreemptionEntity { // Following fields are settled and used by candidate selection policies private final int priority; private final ApplicationId applicationId; + private TempUserPerPartition tempUser; FiCaSchedulerApp app; @@ -102,4 +103,12 @@ public class TempAppPerPartition extends AbstractPreemptionEntity { Resources.subtractFrom(getActuallyToBePreempted(), toBeDeduct); } } + + public void setTempUserPerPartition(TempUserPerPartition tu) { + tempUser = tu; + } + + public TempUserPerPartition getTempUserPerPartition() { + return tempUser; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d966c766/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java index b7cb1bf..09dd3bf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java @@ -26,7 +26,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; -import com.google.common.annotations.VisibleForTesting; /** @@ -89,7 +88,6 @@ public abstract class AbstractComparatorOrderingPolicy<S extends SchedulableEnti } } - @VisibleForTesting public Comparator<SchedulableEntity> getComparator() { return comparator; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d966c766/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java index a972584..64b56fb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java @@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preempti import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; @@ -64,6 +65,7 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.mockito.ArgumentMatcher; +import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -337,9 +339,11 @@ public class ProportionalCapacityPreemptionPolicyMockFramework { .thenReturn(pendingForDefaultPartition); // need to set pending resource in resource usage as well - ResourceUsage ru = new ResourceUsage(); + ResourceUsage ru = Mockito.spy(new ResourceUsage()); ru.setUsed(label, used); + when(ru.getCachedUsed(anyString())).thenReturn(used); when(app.getAppAttemptResourceUsage()).thenReturn(ru); + when(app.getSchedulingResourceUsage()).thenReturn(ru); start = end + 1; } @@ -637,6 +641,12 @@ public class ProportionalCapacityPreemptionPolicyMockFramework { when(leafQueue.getApplications()).thenReturn(apps); when(leafQueue.getAllApplications()).thenReturn(apps); OrderingPolicy<FiCaSchedulerApp> so = mock(OrderingPolicy.class); + String opName = conf.get(CapacitySchedulerConfiguration.PREFIX + + CapacitySchedulerConfiguration.ROOT + "." + getQueueName(q) + + ".ordering-policy", "fifo"); + if (opName.equals("fair")) { + so = Mockito.spy(new FairOrderingPolicy<FiCaSchedulerApp>()); + } when(so.getPreemptionIterator()).thenAnswer(new Answer() { public Object answer(InvocationOnMock invocation) { return apps.descendingIterator(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/d966c766/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueFairOrdering.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueFairOrdering.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueFairOrdering.java new file mode 100644 index 0000000..1678651 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueFairOrdering.java @@ -0,0 +1,276 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import static org.mockito.Matchers.argThat; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.io.IOException; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.junit.Before; +import org.junit.Test; + +/* + * Test class for testing intra-queue preemption when the fair ordering policy + * is enabled on a capacity queue. + */ +public class TestProportionalCapacityPreemptionPolicyIntraQueueFairOrdering + extends ProportionalCapacityPreemptionPolicyMockFramework { + @Before + public void setup() { + super.setup(); + conf.setBoolean( + CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ENABLED, true); + policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs, mClock); + } + + /* + * When the capacity scheduler fair ordering policy is enabled, preempt first + * from the application owned by the user that is the farthest over their + * user limit. + */ + @Test + public void testIntraQueuePreemptionFairOrderingPolicyEnabledOneAppPerUser() + throws IOException { + // Enable FairOrderingPolicy for yarn.scheduler.capacity.root.a + conf.set(CapacitySchedulerConfiguration.PREFIX + + CapacitySchedulerConfiguration.ROOT + ".a.ordering-policy", "fair"); + // Make sure all containers will be preempted in a single round. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 1.0); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 100 1 0]);" + // root + "-a(=[100 100 100 1 0])"; // a + + // user1/app1 has 60 resources in queue a + // user2/app2 has 40 resources in queue a + // user3/app3 is requesting 20 resources in queue a + // With 3 users, preemptable user limit should be around 35 resources each. + // With FairOrderingPolicy enabled on queue a, all 20 resources should be + // preempted from app1 + String appsConfig = + // queueName\t(prio,resource,host,expression,#repeat,reserved,pending,user) + "a\t" // app1, user1 in a + + "(1,1,n1,,60,false,0,user1);" + + "a\t" // app2, user2 in a + + "(1,1,n1,,40,false,0,user2);" + + "a\t" // app3, user3 in a + + "(1,1,n1,,0,false,20,user3)" + ; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + verify(mDisp, times(20)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + } + + /* + * When the capacity scheduler fifo ordering policy is enabled, preempt first + * from the youngest application until reduced to user limit, then preempt + * from next youngest app. + */ + @Test + public void testIntraQueuePreemptionFifoOrderingPolicyEnabled() + throws IOException { + // Enable FifoOrderingPolicy for yarn.scheduler.capacity.root.a + conf.set(CapacitySchedulerConfiguration.PREFIX + + CapacitySchedulerConfiguration.ROOT + ".a.ordering-policy", "fifo"); + // Make sure all containers will be preempted in a single round. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 1.0); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 100 1 0]);" + // root + "-a(=[100 100 100 1 0])"; // a + + // user1/app1 has 60 resources in queue a + // user2/app2 has 40 resources in queue a + // user3/app3 is requesting 20 resources in queue a + // With 3 users, preemptable user limit should be around 35 resources each. + // With FifoOrderingPolicy enabled on queue a, the first 5 should come from + // the youngest app, app2, until app2 is reduced to the user limit of 35. + String appsConfig = + // queueName\t(prio,resource,host,expression,#repeat,reserved,pending,user) + "a\t" // app1, user1 in a + + "(1,1,n1,,60,false,0,user1);" + + "a\t" // app2, user2 in a + + "(1,1,n1,,40,false,0,user2);" + + "a\t" // app3, user3 in a + + "(1,1,n1,,0,false,5,user3)" + ; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + verify(mDisp, times(5)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + + // user1/app1 has 60 resources in queue a + // user2/app2 has 35 resources in queue a + // user3/app3 has 5 resources and is requesting 15 resources in queue a + // With 3 users, preemptable user limit should be around 35 resources each. + // The next 15 should come from app1 even though app2 is younger since app2 + // has already been reduced to its user limit. + appsConfig = + // queueName\t(prio,resource,host,expression,#repeat,reserved,pending,user) + "a\t" // app1, user1 in a + + "(1,1,n1,,60,false,0,user1);" + + "a\t" // app2, user2 in a + + "(1,1,n1,,35,false,0,user2);" + + "a\t" // app3, user3 in a + + "(1,1,n1,,5,false,15,user3)" + ; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + verify(mDisp, times(15)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + } + + /* + * When the capacity scheduler fair ordering policy is enabled, preempt first + * from the youngest application from the user that is the farthest over their + * user limit. + */ + @Test + public void testIntraQueuePreemptionFairOrderingPolicyMulitipleAppsPerUser() + throws IOException { + // Enable FairOrderingPolicy for yarn.scheduler.capacity.root.a + conf.set(CapacitySchedulerConfiguration.PREFIX + + CapacitySchedulerConfiguration.ROOT + ".a.ordering-policy", "fair"); + // Make sure all containers will be preempted in a single round. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 1.0); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 100 1 0]);" + // root + "-a(=[100 100 100 1 0])"; // a + + // user1/app1 has 35 resources in queue a + // user1/app2 has 25 resources in queue a + // user2/app3 has 40 resources in queue a + // user3/app4 is requesting 20 resources in queue a + // With 3 users, preemptable user limit should be around 35 resources each. + // With FairOrderingPolicy enabled on queue a, all 20 resources should be + // preempted from app1 since it's the most over served app from the most + // over served user + String appsConfig = + // queueName\t(prio,resource,host,expression,#repeat,reserved,pending,user) + "a\t" // app1 and app2, user1 in a + + "(1,1,n1,,35,false,0,user1);" + + "a\t" + + "(1,1,n1,,25,false,0,user1);" + + "a\t" // app3, user2 in a + + "(1,1,n1,,40,false,0,user2);" + + "a\t" // app4, user3 in a + + "(1,1,n1,,0,false,20,user3)" + ; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + verify(mDisp, times(20)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + } + + /* + * When the capacity scheduler fifo ordering policy is enabled and a user has + * multiple apps, preempt first from the youngest application. + */ + @Test + public void testIntraQueuePreemptionFifoOrderingPolicyMultipleAppsPerUser() + throws IOException { + // Enable FifoOrderingPolicy for yarn.scheduler.capacity.root.a + conf.set(CapacitySchedulerConfiguration.PREFIX + + CapacitySchedulerConfiguration.ROOT + ".a.ordering-policy", "fifo"); + // Make sure all containers will be preempted in a single round. + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 1.0); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 100 1 0]);" + // root + "-a(=[100 100 100 1 0])"; // a + + // user1/app1 has 40 resources in queue a + // user1/app2 has 20 resources in queue a + // user3/app3 has 40 resources in queue a + // user4/app4 is requesting 20 resources in queue a + // With 3 users, preemptable user limit should be around 35 resources each. + String appsConfig = + // queueName\t(prio,resource,host,expression,#repeat,reserved,pending,user) + "a\t" // app1, user1 in a + + "(1,1,n1,,40,false,0,user1);" + + "a\t" // app2, user1 in a + + "(1,1,n1,,20,false,0,user1);" + + "a\t" // app3, user3 in a + + "(1,1,n1,,40,false,0,user3);" + + "a\t" // app4, user4 in a + + "(1,1,n1,,0,false,25,user4)" + ; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // app3 is the younges and also over its user limit. 5 should be preempted + // from app3 until it comes down to user3's user limit. + verify(mDisp, times(5)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(3)))); + + // User1's app2 is its youngest. 19 should be preempted from app2, leaving + // only the AM + verify(mDisp, times(19)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + + // Preempt the remaining resource from User1's oldest app1. + verify(mDisp, times(1)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org