Repository: hadoop
Updated Branches:
  refs/heads/branch-2.8 a7f1dc8aa -> 564d9e610


YARN-5540. Scheduler spends too much time looking at empty priorities. 
Contributed by Jason Lowe


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/564d9e61
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/564d9e61
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/564d9e61

Branch: refs/heads/branch-2.8
Commit: 564d9e6101d9c6f3542f804380ff8d81cc5aa2b1
Parents: a7f1dc8
Author: Jason Lowe <jl...@apache.org>
Authored: Mon Sep 19 20:34:46 2016 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Mon Sep 19 20:34:46 2016 +0000

----------------------------------------------------------------------
 .../scheduler/AppSchedulingInfo.java            | 71 +++++++++++---------
 .../scheduler/TestAppSchedulingInfo.java        | 65 ++++++++++++++++++
 2 files changed, 104 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/564d9e61/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
index cbd6f79..8d54966 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
@@ -27,8 +27,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
-import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -82,7 +82,8 @@ public class AppSchedulingInfo {
 
   private Set<String> requestedPartitions = new HashSet<>();
 
-  final Set<Priority> priorities = new TreeSet<>(COMPARATOR);
+  private final ConcurrentSkipListMap<Priority, Integer> priorities =
+      new ConcurrentSkipListMap<>(COMPARATOR);
   final Map<Priority, Map<String, ResourceRequest>> resourceRequestMap =
       new ConcurrentHashMap<>();
   final Map<NodeId, Map<Priority, Map<ContainerId,
@@ -233,6 +234,7 @@ public class AppSchedulingInfo {
     if (null == requestsOnNodeWithPriority) {
       requestsOnNodeWithPriority = new TreeMap<>();
       requestsOnNode.put(priority, requestsOnNodeWithPriority);
+      incrementPriorityReference(priority);
     }
 
     requestsOnNodeWithPriority.put(containerId, request);
@@ -247,11 +249,28 @@ public class AppSchedulingInfo {
       LOG.debug("Added increase request:" + request.getContainerId()
           + " delta=" + delta);
     }
-    
-    // update priorities
-    priorities.add(priority);
   }
   
+  private void incrementPriorityReference(Priority priority) {
+    Integer priorityCount = priorities.get(priority);
+    if (priorityCount == null) {
+      priorities.put(priority, 1);
+    } else {
+      priorities.put(priority, priorityCount + 1);
+    }
+  }
+
+  private void decrementPriorityReference(Priority priority) {
+    Integer priorityCount = priorities.get(priority);
+    if (priorityCount != null) {
+      if (priorityCount > 1) {
+        priorities.put(priority, priorityCount - 1);
+      } else {
+        priorities.remove(priority);
+      }
+    }
+  }
+
   public synchronized boolean removeIncreaseRequest(NodeId nodeId, Priority 
priority,
       ContainerId containerId) {
     Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> 
requestsOnNode =
@@ -272,6 +291,7 @@ public class AppSchedulingInfo {
     // remove hierarchies if it becomes empty
     if (requestsOnNodeWithPriority.isEmpty()) {
       requestsOnNode.remove(priority);
+      decrementPriorityReference(priority);
     }
     if (requestsOnNode.isEmpty()) {
       containerIncreaseRequestMap.remove(nodeId);
@@ -337,7 +357,6 @@ public class AppSchedulingInfo {
       if (asks == null) {
         asks = new ConcurrentHashMap<>();
         this.resourceRequestMap.put(priority, asks);
-        this.priorities.add(priority);
       }
 
       // Increment number of containers if recovering preempted resources
@@ -356,12 +375,6 @@ public class AppSchedulingInfo {
 
         anyResourcesUpdated = true;
 
-        // Activate application. Metrics activation is done here.
-        // TODO: Shouldn't we activate even if numContainers = 0?
-        if (request.getNumContainers() > 0) {
-          activeUsersManager.activateApplication(user, applicationId);
-        }
-
         // Update pendingResources
         updatePendingResources(lastRequest, request, queue.getMetrics());
       }
@@ -371,14 +384,23 @@ public class AppSchedulingInfo {
 
   private void updatePendingResources(ResourceRequest lastRequest,
       ResourceRequest request, QueueMetrics metrics) {
+    int lastRequestContainers =
+        (lastRequest != null) ? lastRequest.getNumContainers() : 0;
     if (request.getNumContainers() <= 0) {
+      if (lastRequestContainers >= 0) {
+        decrementPriorityReference(request.getPriority());
+      }
       LOG.info("checking for deactivate of application :"
           + this.applicationId);
       checkForDeactivation();
+    } else {
+      // Activate application. Metrics activation is done here.
+      if (lastRequestContainers <= 0) {
+        incrementPriorityReference(request.getPriority());
+        activeUsersManager.activateApplication(user, applicationId);
+      }
     }
 
-    int lastRequestContainers =
-        (lastRequest != null) ? lastRequest.getNumContainers() : 0;
     Resource lastRequestCapability =
         lastRequest != null ? lastRequest.getCapability() : Resources.none();
     metrics.incrPendingResources(user,
@@ -501,7 +523,7 @@ public class AppSchedulingInfo {
   }
 
   public synchronized Collection<Priority> getPriorities() {
-    return priorities;
+    return priorities.keySet();
   }
 
   public synchronized Map<String, ResourceRequest> getResourceRequests(
@@ -700,6 +722,7 @@ public class AppSchedulingInfo {
     // Do we have any outstanding requests?
     // If there is nothing, we need to deactivate this application
     if (numOffSwitchContainers == 0) {
+      decrementPriorityReference(offSwitchRequest.getPriority());
       checkForDeactivation();
     }
     
@@ -710,23 +733,7 @@ public class AppSchedulingInfo {
   }
   
   private synchronized void checkForDeactivation() {
-    boolean deactivate = true;
-    for (Priority priority : getPriorities()) {
-      ResourceRequest request = getResourceRequest(priority, 
ResourceRequest.ANY);
-      if (request != null) {
-        if (request.getNumContainers() > 0) {
-          deactivate = false;
-          break;
-        }
-      }
-    }
-    
-    // also we need to check increase request
-    if (!deactivate) {
-      deactivate = containerIncreaseRequestMap.isEmpty();
-    }
-
-    if (deactivate) {
+    if (priorities.isEmpty()) {
       activeUsersManager.deactivateApplication(user, applicationId);
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/564d9e61/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java
index a1c6294..6981f2b 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java
@@ -20,10 +20,15 @@ package 
org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.doReturn;
+
 import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue;
 import org.junit.Assert;
 import org.junit.Test;
@@ -70,4 +75,64 @@ public class TestAppSchedulingInfo {
         blacklistRemovals);
     Assert.assertFalse(appSchedulingInfo.getAndResetBlacklistChanged());
   }
+
+  @Test
+  public void testPriorityAccounting() {
+    ApplicationId appIdImpl = ApplicationId.newInstance(0, 1);
+    ApplicationAttemptId appAttemptId =
+        ApplicationAttemptId.newInstance(appIdImpl, 1);
+
+    Queue queue = mock(Queue.class);
+    doReturn(mock(QueueMetrics.class)).when(queue).getMetrics();
+    AppSchedulingInfo  info = new AppSchedulingInfo(
+        appAttemptId, "test", queue, mock(ActiveUsersManager.class), 0,
+        new ResourceUsage());
+    Assert.assertEquals(0, info.getPriorities().size());
+
+    Priority pri1 = Priority.newInstance(1);
+    ResourceRequest req1 = ResourceRequest.newInstance(pri1,
+        ResourceRequest.ANY, Resource.newInstance(1024, 1), 1);
+    Priority pri2 = Priority.newInstance(2);
+    ResourceRequest req2 = ResourceRequest.newInstance(pri2,
+        ResourceRequest.ANY, Resource.newInstance(1024, 1), 2);
+    List<ResourceRequest> reqs = new ArrayList<>();
+    reqs.add(req1);
+    reqs.add(req2);
+    info.updateResourceRequests(reqs, false);
+    ArrayList<Priority> priorities = new ArrayList<>(info.getPriorities());
+    Assert.assertEquals(2, priorities.size());
+    Assert.assertEquals(req1.getPriority(), priorities.get(0));
+    Assert.assertEquals(req2.getPriority(), priorities.get(1));
+
+    // iterate to verify no ConcurrentModificationException
+    for (Priority priority: info.getPriorities()) {
+      info.allocate(NodeType.OFF_SWITCH, null, priority, req1, null);
+    }
+    Assert.assertEquals(1, info.getPriorities().size());
+    Assert.assertEquals(req2.getPriority(),
+        info.getPriorities().iterator().next());
+
+    req2 = ResourceRequest.newInstance(pri2,
+        ResourceRequest.ANY, Resource.newInstance(1024, 1), 1);
+    reqs.clear();
+    reqs.add(req2);
+    info.updateResourceRequests(reqs, false);
+    info.allocate(NodeType.OFF_SWITCH, null, req2.getPriority(), req2, null);
+    Assert.assertEquals(0, info.getPriorities().size());
+
+    req1 = ResourceRequest.newInstance(pri1,
+        ResourceRequest.ANY, Resource.newInstance(1024, 1), 5);
+    reqs.clear();
+    reqs.add(req1);
+    info.updateResourceRequests(reqs, false);
+    Assert.assertEquals(1, info.getPriorities().size());
+    Assert.assertEquals(req1.getPriority(),
+        info.getPriorities().iterator().next());
+    req1 = ResourceRequest.newInstance(pri1,
+        ResourceRequest.ANY, Resource.newInstance(1024, 1), 0);
+    reqs.clear();
+    reqs.add(req1);
+    info.updateResourceRequests(reqs, false);
+    Assert.assertEquals(0, info.getPriorities().size());
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to