Repository: hadoop Updated Branches: refs/heads/branch-2.8 a7f1dc8aa -> 564d9e610
YARN-5540. Scheduler spends too much time looking at empty priorities. Contributed by Jason Lowe Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/564d9e61 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/564d9e61 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/564d9e61 Branch: refs/heads/branch-2.8 Commit: 564d9e6101d9c6f3542f804380ff8d81cc5aa2b1 Parents: a7f1dc8 Author: Jason Lowe <jl...@apache.org> Authored: Mon Sep 19 20:34:46 2016 +0000 Committer: Jason Lowe <jl...@apache.org> Committed: Mon Sep 19 20:34:46 2016 +0000 ---------------------------------------------------------------------- .../scheduler/AppSchedulingInfo.java | 71 +++++++++++--------- .../scheduler/TestAppSchedulingInfo.java | 65 ++++++++++++++++++ 2 files changed, 104 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/564d9e61/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index cbd6f79..8d54966 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -27,8 +27,8 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; -import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -82,7 +82,8 @@ public class AppSchedulingInfo { private Set<String> requestedPartitions = new HashSet<>(); - final Set<Priority> priorities = new TreeSet<>(COMPARATOR); + private final ConcurrentSkipListMap<Priority, Integer> priorities = + new ConcurrentSkipListMap<>(COMPARATOR); final Map<Priority, Map<String, ResourceRequest>> resourceRequestMap = new ConcurrentHashMap<>(); final Map<NodeId, Map<Priority, Map<ContainerId, @@ -233,6 +234,7 @@ public class AppSchedulingInfo { if (null == requestsOnNodeWithPriority) { requestsOnNodeWithPriority = new TreeMap<>(); requestsOnNode.put(priority, requestsOnNodeWithPriority); + incrementPriorityReference(priority); } requestsOnNodeWithPriority.put(containerId, request); @@ -247,11 +249,28 @@ public class AppSchedulingInfo { LOG.debug("Added increase request:" + request.getContainerId() + " delta=" + delta); } - - // update priorities - priorities.add(priority); } + private void incrementPriorityReference(Priority priority) { + Integer priorityCount = priorities.get(priority); + if (priorityCount == null) { + priorities.put(priority, 1); + } else { + priorities.put(priority, priorityCount + 1); + } + } + + private void decrementPriorityReference(Priority priority) { + Integer priorityCount = priorities.get(priority); + if (priorityCount != null) { + if (priorityCount > 1) { + priorities.put(priority, priorityCount - 1); + } else { + priorities.remove(priority); + } + } + } + public synchronized boolean removeIncreaseRequest(NodeId nodeId, Priority priority, ContainerId containerId) { Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode = @@ -272,6 +291,7 @@ public class AppSchedulingInfo { // remove hierarchies if it becomes empty if (requestsOnNodeWithPriority.isEmpty()) { requestsOnNode.remove(priority); + decrementPriorityReference(priority); } if (requestsOnNode.isEmpty()) { containerIncreaseRequestMap.remove(nodeId); @@ -337,7 +357,6 @@ public class AppSchedulingInfo { if (asks == null) { asks = new ConcurrentHashMap<>(); this.resourceRequestMap.put(priority, asks); - this.priorities.add(priority); } // Increment number of containers if recovering preempted resources @@ -356,12 +375,6 @@ public class AppSchedulingInfo { anyResourcesUpdated = true; - // Activate application. Metrics activation is done here. - // TODO: Shouldn't we activate even if numContainers = 0? - if (request.getNumContainers() > 0) { - activeUsersManager.activateApplication(user, applicationId); - } - // Update pendingResources updatePendingResources(lastRequest, request, queue.getMetrics()); } @@ -371,14 +384,23 @@ public class AppSchedulingInfo { private void updatePendingResources(ResourceRequest lastRequest, ResourceRequest request, QueueMetrics metrics) { + int lastRequestContainers = + (lastRequest != null) ? lastRequest.getNumContainers() : 0; if (request.getNumContainers() <= 0) { + if (lastRequestContainers >= 0) { + decrementPriorityReference(request.getPriority()); + } LOG.info("checking for deactivate of application :" + this.applicationId); checkForDeactivation(); + } else { + // Activate application. Metrics activation is done here. + if (lastRequestContainers <= 0) { + incrementPriorityReference(request.getPriority()); + activeUsersManager.activateApplication(user, applicationId); + } } - int lastRequestContainers = - (lastRequest != null) ? lastRequest.getNumContainers() : 0; Resource lastRequestCapability = lastRequest != null ? lastRequest.getCapability() : Resources.none(); metrics.incrPendingResources(user, @@ -501,7 +523,7 @@ public class AppSchedulingInfo { } public synchronized Collection<Priority> getPriorities() { - return priorities; + return priorities.keySet(); } public synchronized Map<String, ResourceRequest> getResourceRequests( @@ -700,6 +722,7 @@ public class AppSchedulingInfo { // Do we have any outstanding requests? // If there is nothing, we need to deactivate this application if (numOffSwitchContainers == 0) { + decrementPriorityReference(offSwitchRequest.getPriority()); checkForDeactivation(); } @@ -710,23 +733,7 @@ public class AppSchedulingInfo { } private synchronized void checkForDeactivation() { - boolean deactivate = true; - for (Priority priority : getPriorities()) { - ResourceRequest request = getResourceRequest(priority, ResourceRequest.ANY); - if (request != null) { - if (request.getNumContainers() > 0) { - deactivate = false; - break; - } - } - } - - // also we need to check increase request - if (!deactivate) { - deactivate = containerIncreaseRequestMap.isEmpty(); - } - - if (deactivate) { + if (priorities.isEmpty()) { activeUsersManager.deactivateApplication(user, applicationId); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/564d9e61/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java index a1c6294..6981f2b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java @@ -20,10 +20,15 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.doReturn; + import java.util.ArrayList; +import java.util.List; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue; import org.junit.Assert; import org.junit.Test; @@ -70,4 +75,64 @@ public class TestAppSchedulingInfo { blacklistRemovals); Assert.assertFalse(appSchedulingInfo.getAndResetBlacklistChanged()); } + + @Test + public void testPriorityAccounting() { + ApplicationId appIdImpl = ApplicationId.newInstance(0, 1); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appIdImpl, 1); + + Queue queue = mock(Queue.class); + doReturn(mock(QueueMetrics.class)).when(queue).getMetrics(); + AppSchedulingInfo info = new AppSchedulingInfo( + appAttemptId, "test", queue, mock(ActiveUsersManager.class), 0, + new ResourceUsage()); + Assert.assertEquals(0, info.getPriorities().size()); + + Priority pri1 = Priority.newInstance(1); + ResourceRequest req1 = ResourceRequest.newInstance(pri1, + ResourceRequest.ANY, Resource.newInstance(1024, 1), 1); + Priority pri2 = Priority.newInstance(2); + ResourceRequest req2 = ResourceRequest.newInstance(pri2, + ResourceRequest.ANY, Resource.newInstance(1024, 1), 2); + List<ResourceRequest> reqs = new ArrayList<>(); + reqs.add(req1); + reqs.add(req2); + info.updateResourceRequests(reqs, false); + ArrayList<Priority> priorities = new ArrayList<>(info.getPriorities()); + Assert.assertEquals(2, priorities.size()); + Assert.assertEquals(req1.getPriority(), priorities.get(0)); + Assert.assertEquals(req2.getPriority(), priorities.get(1)); + + // iterate to verify no ConcurrentModificationException + for (Priority priority: info.getPriorities()) { + info.allocate(NodeType.OFF_SWITCH, null, priority, req1, null); + } + Assert.assertEquals(1, info.getPriorities().size()); + Assert.assertEquals(req2.getPriority(), + info.getPriorities().iterator().next()); + + req2 = ResourceRequest.newInstance(pri2, + ResourceRequest.ANY, Resource.newInstance(1024, 1), 1); + reqs.clear(); + reqs.add(req2); + info.updateResourceRequests(reqs, false); + info.allocate(NodeType.OFF_SWITCH, null, req2.getPriority(), req2, null); + Assert.assertEquals(0, info.getPriorities().size()); + + req1 = ResourceRequest.newInstance(pri1, + ResourceRequest.ANY, Resource.newInstance(1024, 1), 5); + reqs.clear(); + reqs.add(req1); + info.updateResourceRequests(reqs, false); + Assert.assertEquals(1, info.getPriorities().size()); + Assert.assertEquals(req1.getPriority(), + info.getPriorities().iterator().next()); + req1 = ResourceRequest.newInstance(pri1, + ResourceRequest.ANY, Resource.newInstance(1024, 1), 0); + reqs.clear(); + reqs.add(req1); + info.updateResourceRequests(reqs, false); + Assert.assertEquals(0, info.getPriorities().size()); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org