Repository: hadoop
Updated Branches:
  refs/heads/branch-2 9942ca2bf -> 035f5f8f1


YARN-5540. Scheduler spends too much time looking at empty priorities. 
Contributed by Jason Lowe
(cherry picked from commit 7558dbbb481eab055e794beb3603bbe5671a4b4c)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/035f5f8f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/035f5f8f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/035f5f8f

Branch: refs/heads/branch-2
Commit: 035f5f8f1dee396612776fbe935cd3371d87a57e
Parents: 9942ca2
Author: Jason Lowe <jl...@apache.org>
Authored: Mon Sep 19 20:31:35 2016 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Mon Sep 19 20:33:59 2016 +0000

----------------------------------------------------------------------
 .../scheduler/AppSchedulingInfo.java            | 96 +++++++++++---------
 .../scheduler/TestAppSchedulingInfo.java        | 65 +++++++++++++
 2 files changed, 118 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/035f5f8f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
index c677345..39820f7 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
@@ -26,8 +26,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
-import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -59,7 +59,6 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 public class AppSchedulingInfo {
   
   private static final Log LOG = LogFactory.getLog(AppSchedulingInfo.class);
-  private static final int EPOCH_BIT_SHIFT = 40;
 
   private final ApplicationId applicationId;
   private final ApplicationAttemptId applicationAttemptId;
@@ -79,7 +78,8 @@ public class AppSchedulingInfo {
 
   private Set<String> requestedPartitions = new HashSet<>();
 
-  final Set<SchedulerRequestKey> schedulerKeys = new TreeSet<>();
+  private final ConcurrentSkipListMap<SchedulerRequestKey, Integer>
+      schedulerKeys = new ConcurrentSkipListMap<>();
   final Map<SchedulerRequestKey, Map<String, ResourceRequest>>
       resourceRequestMap = new ConcurrentHashMap<>();
   final Map<NodeId, Map<SchedulerRequestKey, Map<ContainerId,
@@ -236,6 +236,7 @@ public class AppSchedulingInfo {
     if (null == requestsOnNodeWithPriority) {
       requestsOnNodeWithPriority = new TreeMap<>();
       requestsOnNode.put(schedulerKey, requestsOnNodeWithPriority);
+      incrementSchedulerKeyReference(schedulerKey);
     }
 
     requestsOnNodeWithPriority.put(containerId, request);
@@ -250,11 +251,30 @@ public class AppSchedulingInfo {
       LOG.debug("Added increase request:" + request.getContainerId()
           + " delta=" + delta);
     }
-    
-    // update Scheduler Keys
-    schedulerKeys.add(schedulerKey);
   }
-  
+
+  private void incrementSchedulerKeyReference(
+      SchedulerRequestKey schedulerKey) {
+    Integer schedulerKeyCount = schedulerKeys.get(schedulerKey);
+    if (schedulerKeyCount == null) {
+      schedulerKeys.put(schedulerKey, 1);
+    } else {
+      schedulerKeys.put(schedulerKey, schedulerKeyCount + 1);
+    }
+  }
+
+  private void decrementSchedulerKeyReference(
+      SchedulerRequestKey schedulerKey) {
+    Integer schedulerKeyCount = schedulerKeys.get(schedulerKey);
+    if (schedulerKeyCount != null) {
+      if (schedulerKeyCount > 1) {
+        schedulerKeys.put(schedulerKey, schedulerKeyCount - 1);
+      } else {
+        schedulerKeys.remove(schedulerKey);
+      }
+    }
+  }
+
   public synchronized boolean removeIncreaseRequest(NodeId nodeId,
       SchedulerRequestKey schedulerKey, ContainerId containerId) {
     Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
@@ -275,6 +295,7 @@ public class AppSchedulingInfo {
     // remove hierarchies if it becomes empty
     if (requestsOnNodeWithPriority.isEmpty()) {
       requestsOnNode.remove(schedulerKey);
+      decrementSchedulerKeyReference(schedulerKey);
     }
     if (requestsOnNode.isEmpty()) {
       containerIncreaseRequestMap.remove(nodeId);
@@ -341,7 +362,6 @@ public class AppSchedulingInfo {
       if (asks == null) {
         asks = new ConcurrentHashMap<>();
         this.resourceRequestMap.put(schedulerKey, asks);
-        this.schedulerKeys.add(schedulerKey);
       }
 
       // Increment number of containers if recovering preempted resources
@@ -360,29 +380,34 @@ public class AppSchedulingInfo {
 
         anyResourcesUpdated = true;
 
-        // Activate application. Metrics activation is done here.
-        // TODO: Shouldn't we activate even if numContainers = 0?
-        if (request.getNumContainers() > 0) {
-          activeUsersManager.activateApplication(user, applicationId);
-        }
-
         // Update pendingResources
-        updatePendingResources(lastRequest, request, queue.getMetrics());
+        updatePendingResources(lastRequest, request, schedulerKey,
+            queue.getMetrics());
       }
     }
     return anyResourcesUpdated;
   }
 
   private void updatePendingResources(ResourceRequest lastRequest,
-      ResourceRequest request, QueueMetrics metrics) {
+      ResourceRequest request, SchedulerRequestKey schedulerKey,
+      QueueMetrics metrics) {
+    int lastRequestContainers =
+        (lastRequest != null) ? lastRequest.getNumContainers() : 0;
     if (request.getNumContainers() <= 0) {
+      if (lastRequestContainers >= 0) {
+        decrementSchedulerKeyReference(schedulerKey);
+      }
       LOG.info("checking for deactivate of application :"
           + this.applicationId);
       checkForDeactivation();
+    } else {
+      // Activate application. Metrics activation is done here.
+      if (lastRequestContainers <= 0) {
+        incrementSchedulerKeyReference(schedulerKey);
+        activeUsersManager.activateApplication(user, applicationId);
+      }
     }
 
-    int lastRequestContainers =
-        (lastRequest != null) ? lastRequest.getNumContainers() : 0;
     Resource lastRequestCapability =
         lastRequest != null ? lastRequest.getCapability() : Resources.none();
     metrics.incrPendingResources(user,
@@ -505,7 +530,7 @@ public class AppSchedulingInfo {
   }
 
   public synchronized Collection<SchedulerRequestKey> getSchedulerKeys() {
-    return schedulerKeys;
+    return schedulerKeys.keySet();
   }
 
   public synchronized Map<String, ResourceRequest> getResourceRequests(
@@ -617,7 +642,7 @@ public class AppSchedulingInfo {
     } else if (type == NodeType.RACK_LOCAL) {
       allocateRackLocal(node, schedulerKey, request, resourceRequests);
     } else {
-      allocateOffSwitch(request, resourceRequests);
+      allocateOffSwitch(request, resourceRequests, schedulerKey);
     }
     QueueMetrics metrics = queue.getMetrics();
     if (pending) {
@@ -656,7 +681,7 @@ public class AppSchedulingInfo {
 
     ResourceRequest offRackRequest = resourceRequestMap.get(schedulerKey).get(
         ResourceRequest.ANY);
-    decrementOutstanding(offRackRequest);
+    decrementOutstanding(offRackRequest, schedulerKey);
 
     // Update cloned NodeLocal, RackLocal and OffRack requests for recovery
     resourceRequests.add(cloneResourceRequest(nodeLocalRequest));
@@ -684,7 +709,7 @@ public class AppSchedulingInfo {
     
     ResourceRequest offRackRequest = resourceRequestMap.get(schedulerKey).get(
         ResourceRequest.ANY);
-    decrementOutstanding(offRackRequest);
+    decrementOutstanding(offRackRequest, schedulerKey);
 
     // Update cloned RackLocal and OffRack requests for recovery
     resourceRequests.add(cloneResourceRequest(rackLocalRequest));
@@ -696,15 +721,16 @@ public class AppSchedulingInfo {
    * application.
    */
   private synchronized void allocateOffSwitch(
-      ResourceRequest offSwitchRequest, List<ResourceRequest> 
resourceRequests) {
+      ResourceRequest offSwitchRequest, List<ResourceRequest> resourceRequests,
+      SchedulerRequestKey schedulerKey) {
     // Update future requirements
-    decrementOutstanding(offSwitchRequest);
+    decrementOutstanding(offSwitchRequest, schedulerKey);
     // Update cloned OffRack requests for recovery
     resourceRequests.add(cloneResourceRequest(offSwitchRequest));
   }
 
   private synchronized void decrementOutstanding(
-      ResourceRequest offSwitchRequest) {
+      ResourceRequest offSwitchRequest, SchedulerRequestKey schedulerKey) {
     int numOffSwitchContainers = offSwitchRequest.getNumContainers() - 1;
 
     // Do not remove ANY
@@ -713,6 +739,7 @@ public class AppSchedulingInfo {
     // Do we have any outstanding requests?
     // If there is nothing, we need to deactivate this application
     if (numOffSwitchContainers == 0) {
+      decrementSchedulerKeyReference(schedulerKey);
       checkForDeactivation();
     }
     
@@ -723,24 +750,7 @@ public class AppSchedulingInfo {
   }
   
   private synchronized void checkForDeactivation() {
-    boolean deactivate = true;
-    for (SchedulerRequestKey schedulerKey : getSchedulerKeys()) {
-      ResourceRequest request =
-          getResourceRequest(schedulerKey, ResourceRequest.ANY);
-      if (request != null) {
-        if (request.getNumContainers() > 0) {
-          deactivate = false;
-          break;
-        }
-      }
-    }
-    
-    // also we need to check increase request
-    if (!deactivate) {
-      deactivate = containerIncreaseRequestMap.isEmpty();
-    }
-
-    if (deactivate) {
+    if (schedulerKeys.isEmpty()) {
       activeUsersManager.deactivateApplication(user, applicationId);
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/035f5f8f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java
index 503ea34..7f9c719 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java
@@ -23,11 +23,14 @@ import static org.mockito.Mockito.mock;
 
 import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 import java.util.TreeSet;
 
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue;
 import org.junit.Assert;
@@ -105,4 +108,66 @@ public class TestAppSchedulingInfo {
     Assert.assertEquals(2, sk.getPriority().getPriority());
     Assert.assertEquals(6, sk.getAllocationRequestId());
   }
+
+  @Test
+  public void testSchedulerKeyAccounting() {
+    ApplicationId appIdImpl = ApplicationId.newInstance(0, 1);
+    ApplicationAttemptId appAttemptId =
+        ApplicationAttemptId.newInstance(appIdImpl, 1);
+
+    Queue queue = mock(Queue.class);
+    doReturn(mock(QueueMetrics.class)).when(queue).getMetrics();
+    AppSchedulingInfo  info = new AppSchedulingInfo(
+        appAttemptId, "test", queue, mock(ActiveUsersManager.class), 0,
+        new ResourceUsage());
+    Assert.assertEquals(0, info.getSchedulerKeys().size());
+
+    Priority pri1 = Priority.newInstance(1);
+    ResourceRequest req1 = ResourceRequest.newInstance(pri1,
+        ResourceRequest.ANY, Resource.newInstance(1024, 1), 1);
+    Priority pri2 = Priority.newInstance(2);
+    ResourceRequest req2 = ResourceRequest.newInstance(pri2,
+        ResourceRequest.ANY, Resource.newInstance(1024, 1), 2);
+    List<ResourceRequest> reqs = new ArrayList<>();
+    reqs.add(req1);
+    reqs.add(req2);
+    info.updateResourceRequests(reqs, false);
+    ArrayList<SchedulerRequestKey> keys =
+        new ArrayList<>(info.getSchedulerKeys());
+    Assert.assertEquals(2, keys.size());
+    Assert.assertEquals(SchedulerRequestKey.create(req1), keys.get(0));
+    Assert.assertEquals(SchedulerRequestKey.create(req2), keys.get(1));
+
+    // iterate to verify no ConcurrentModificationException
+    for (SchedulerRequestKey schedulerKey : info.getSchedulerKeys()) {
+      info.allocate(NodeType.OFF_SWITCH, null, schedulerKey, req1, null);
+    }
+    Assert.assertEquals(1, info.getSchedulerKeys().size());
+    Assert.assertEquals(SchedulerRequestKey.create(req2),
+        info.getSchedulerKeys().iterator().next());
+
+    req2 = ResourceRequest.newInstance(pri2,
+        ResourceRequest.ANY, Resource.newInstance(1024, 1), 1);
+    reqs.clear();
+    reqs.add(req2);
+    info.updateResourceRequests(reqs, false);
+    info.allocate(NodeType.OFF_SWITCH, null, SchedulerRequestKey.create(req2),
+        req2, null);
+    Assert.assertEquals(0, info.getSchedulerKeys().size());
+
+    req1 = ResourceRequest.newInstance(pri1,
+        ResourceRequest.ANY, Resource.newInstance(1024, 1), 5);
+    reqs.clear();
+    reqs.add(req1);
+    info.updateResourceRequests(reqs, false);
+    Assert.assertEquals(1, info.getSchedulerKeys().size());
+    Assert.assertEquals(SchedulerRequestKey.create(req1),
+        info.getSchedulerKeys().iterator().next());
+    req1 = ResourceRequest.newInstance(pri1,
+        ResourceRequest.ANY, Resource.newInstance(1024, 1), 0);
+    reqs.clear();
+    reqs.add(req1);
+    info.updateResourceRequests(reqs, false);
+    Assert.assertEquals(0, info.getSchedulerKeys().size());
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to