Repository: hadoop
Updated Branches:
  refs/heads/branch-2 6f35700f0 -> 65e7ae5dc


YARN-5906. Update AppSchedulingInfo to use SchedulingPlacementSet. Contributed 
by Wangda Tan.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/65e7ae5d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/65e7ae5d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/65e7ae5d

Branch: refs/heads/branch-2
Commit: 65e7ae5dcf5b5141446b28cfc6a0f828f4eb833a
Parents: 6f35700
Author: Sunil G <sun...@apache.org>
Authored: Fri Jan 6 21:30:52 2017 +0530
Committer: Sunil G <sun...@apache.org>
Committed: Fri Jan 6 21:30:52 2017 +0530

----------------------------------------------------------------------
 .../scheduler/AppSchedulingInfo.java            | 427 +++++--------------
 .../LocalitySchedulingPlacementSet.java         | 311 ++++++++++++++
 .../placement/SchedulingPlacementSet.java       |  22 +-
 .../TestApplicationLimitsByPartition.java       |   6 +
 4 files changed, 445 insertions(+), 321 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/65e7ae5d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
index 80811b1..b9deb6c 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
@@ -18,22 +18,6 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.commons.collections.IteratorUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -48,14 +32,28 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
-import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.LocalitySchedulingPlacementSet;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.ResourceRequestUpdateResult;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 /**
  * This class keeps track of all the consumption of an application. This also
  * keeps track of current running/completed containers for the application.
@@ -87,8 +85,8 @@ public class AppSchedulingInfo {
 
   private final ConcurrentSkipListMap<SchedulerRequestKey, Integer>
       schedulerKeys = new ConcurrentSkipListMap<>();
-  final Map<SchedulerRequestKey, Map<String, ResourceRequest>>
-      resourceRequestMap = new ConcurrentHashMap<>();
+  final Map<SchedulerRequestKey, SchedulingPlacementSet<SchedulerNode>>
+      schedulerKeyToPlacementSets = new ConcurrentHashMap<>();
   final Map<NodeId, Map<SchedulerRequestKey, Map<ContainerId,
       SchedContainerChangeRequest>>> containerIncreaseRequestMap =
       new ConcurrentHashMap<>();
@@ -151,7 +149,7 @@ public class AppSchedulingInfo {
    */
   private void clearRequests() {
     schedulerKeys.clear();
-    resourceRequestMap.clear();
+    schedulerKeyToPlacementSets.clear();
     LOG.info("Application " + applicationId + " requests cleared");
   }
 
@@ -297,7 +295,7 @@ public class AppSchedulingInfo {
     }
   }
 
-  private void decrementSchedulerKeyReference(
+  public void decrementSchedulerKeyReference(
       SchedulerRequestKey schedulerKey) {
     Integer schedulerKeyCount = schedulerKeys.get(schedulerKey);
     if (schedulerKeyCount != null) {
@@ -389,49 +387,56 @@ public class AppSchedulingInfo {
    */
   public boolean updateResourceRequests(List<ResourceRequest> requests,
       boolean recoverPreemptedRequestForAContainer) {
+    if (null == requests || requests.isEmpty()) {
+      return false;
+    }
+
     // Flag to track if any incoming requests update "ANY" requests
-    boolean anyResourcesUpdated = false;
+    boolean offswitchResourcesUpdated = false;
 
     try {
       this.writeLock.lock();
-      // Update resource requests
-      for (ResourceRequest request : requests) {
-        SchedulerRequestKey schedulerKey = SchedulerRequestKey.create(request);
-        String resourceName = request.getResourceName();
 
-        // Update node labels if required
-        updateNodeLabels(request);
-
-        Map<String, ResourceRequest> asks =
-            this.resourceRequestMap.get(schedulerKey);
-        if (asks == null) {
-          asks = new ConcurrentHashMap<>();
-          this.resourceRequestMap.put(schedulerKey, asks);
-        }
+      // A map to group resource requests and dedup
+      Map<SchedulerRequestKey, Map<String, ResourceRequest>> dedupRequests =
+          new HashMap<>();
 
-        // Increment number of containers if recovering preempted resources
-        ResourceRequest lastRequest = asks.get(resourceName);
-        if (recoverPreemptedRequestForAContainer && lastRequest != null) {
-          request.setNumContainers(lastRequest.getNumContainers() + 1);
+      // Group resource request by schedulerRequestKey and resourceName
+      for (ResourceRequest request : requests) {
+        SchedulerRequestKey schedulerKey = SchedulerRequestKey.create(request);
+        if (!dedupRequests.containsKey(schedulerKey)) {
+          dedupRequests.put(schedulerKey,
+              new HashMap<String, ResourceRequest>());
         }
+        dedupRequests.get(schedulerKey).put(request.getResourceName(), 
request);
+      }
 
-        // Update asks
-        asks.put(resourceName, request);
-
-        if (resourceName.equals(ResourceRequest.ANY)) {
-          //update the applications requested labels set
-          requestedPartitions.add(request.getNodeLabelExpression() == null
-              ? RMNodeLabelsManager.NO_LABEL :
-                  request.getNodeLabelExpression());
+      // Update scheduling placement set
+      for (Map.Entry<SchedulerRequestKey, Map<String, ResourceRequest>> entry 
: dedupRequests.entrySet()) {
+        SchedulerRequestKey schedulerRequestKey = entry.getKey();
 
-          anyResourcesUpdated = true;
+        if (!schedulerKeyToPlacementSets.containsKey(schedulerRequestKey)) {
+          schedulerKeyToPlacementSets.put(schedulerRequestKey,
+              new LocalitySchedulingPlacementSet<>(this));
+        }
 
-          // Update pendingResources
-          updatePendingResources(lastRequest, request, schedulerKey,
+        // Update placement set
+        ResourceRequestUpdateResult pendingAmountChanges =
+            schedulerKeyToPlacementSets.get(schedulerRequestKey)
+                .updateResourceRequests(
+                    entry.getValue().values(),
+                    recoverPreemptedRequestForAContainer);
+
+        if (null != pendingAmountChanges) {
+          updatePendingResources(
+              pendingAmountChanges.getLastAnyResourceRequest(),
+              pendingAmountChanges.getNewResourceRequest(), 
schedulerRequestKey,
               queue.getMetrics());
+          offswitchResourcesUpdated = true;
         }
       }
-      return anyResourcesUpdated;
+
+      return offswitchResourcesUpdated;
     } finally {
       this.writeLock.unlock();
     }
@@ -481,35 +486,13 @@ public class AppSchedulingInfo {
     }
   }
 
-  private void updateNodeLabels(ResourceRequest request) {
-    SchedulerRequestKey schedulerKey = SchedulerRequestKey.create(request);
-    String resourceName = request.getResourceName();
-    if (resourceName.equals(ResourceRequest.ANY)) {
-      ResourceRequest previousAnyRequest =
-          getResourceRequest(schedulerKey, resourceName);
-
-      // When there is change in ANY request label expression, we should
-      // update label for all resource requests already added of same
-      // priority as ANY resource request.
-      if ((null == previousAnyRequest)
-          || hasRequestLabelChanged(previousAnyRequest, request)) {
-        Map<String, ResourceRequest> resourceRequest =
-            getResourceRequests(schedulerKey);
-        if (resourceRequest != null) {
-          for (ResourceRequest r : resourceRequest.values()) {
-            if (!r.getResourceName().equals(ResourceRequest.ANY)) {
-              r.setNodeLabelExpression(request.getNodeLabelExpression());
-            }
-          }
-        }
-      }
-    } else {
-      ResourceRequest anyRequest =
-          getResourceRequest(schedulerKey, ResourceRequest.ANY);
-      if (anyRequest != null) {
-        request.setNodeLabelExpression(anyRequest.getNodeLabelExpression());
-      }
-    }
+  public void addRequestedPartition(String partition) {
+    requestedPartitions.add(partition);
+  }
+
+  public void decPendingResource(String partition, Resource toDecrease) {
+    queue.decPendingResource(partition, toDecrease);
+    appResourceUsage.decPending(partition, toDecrease);
   }
 
   private boolean hasRequestLabelChanged(ResourceRequest requestOne,
@@ -582,17 +565,22 @@ public class AppSchedulingInfo {
     return schedulerKeys.keySet();
   }
 
+  @SuppressWarnings("unchecked")
   public Map<String, ResourceRequest> getResourceRequests(
       SchedulerRequestKey schedulerKey) {
-    return resourceRequestMap.get(schedulerKey);
+    SchedulingPlacementSet ps = schedulerKeyToPlacementSets.get(schedulerKey);
+    if (null != ps) {
+      return ps.getResourceRequests();
+    }
+    return Collections.emptyMap();
   }
 
   public List<ResourceRequest> getAllResourceRequests() {
     List<ResourceRequest> ret = new ArrayList<>();
     try {
       this.readLock.lock();
-      for (Map<String, ResourceRequest> r : resourceRequestMap.values()) {
-        ret.addAll(r.values());
+      for (SchedulingPlacementSet ps : schedulerKeyToPlacementSets.values()) {
+        ret.addAll(ps.getResourceRequests().values());
       }
     } finally {
       this.readLock.unlock();
@@ -604,9 +592,9 @@ public class AppSchedulingInfo {
       String resourceName) {
     try {
       this.readLock.lock();
-      Map<String, ResourceRequest> nodeRequests =
-          resourceRequestMap.get(schedulerKey);
-      return (nodeRequests == null) ? null : nodeRequests.get(resourceName);
+      SchedulingPlacementSet ps =
+          schedulerKeyToPlacementSets.get(schedulerKey);
+      return (ps == null) ? null : ps.getResourceRequest(resourceName);
     } finally {
       this.readLock.unlock();
     }
@@ -698,141 +686,29 @@ public class AppSchedulingInfo {
 
   public List<ResourceRequest> allocate(NodeType type,
       SchedulerNode node, SchedulerRequestKey schedulerKey,
+      ResourceRequest request,
       Container containerAllocated) {
     try {
       writeLock.lock();
-      ResourceRequest request;
-      if (type == NodeType.NODE_LOCAL) {
-        request = resourceRequestMap.get(schedulerKey).get(node.getNodeName());
-      } else if (type == NodeType.RACK_LOCAL) {
-        request = resourceRequestMap.get(schedulerKey).get(node.getRackName());
-      } else{
-        request = 
resourceRequestMap.get(schedulerKey).get(ResourceRequest.ANY);
-      }
-      return allocate(type, node, schedulerKey, request, containerAllocated);
-    } finally {
-      writeLock.unlock();
-    }
-  }
-
-  /**
-   * Resources have been allocated to this application by the resource
-   * scheduler. Track them.
-   * @param type Node Type
-   * @param node SchedulerNode
-   * @param schedulerKey SchedulerRequestKey
-   * @param request ResourceRequest
-   * @param containerAllocated Container Allocated
-   * @return List of ResourceRequests
-   */
-  public List<ResourceRequest> allocate(NodeType type,
-      SchedulerNode node, SchedulerRequestKey schedulerKey,
-      ResourceRequest request, Container containerAllocated) {
-    try {
-      writeLock.lock();
-      List<ResourceRequest> resourceRequests = new ArrayList<>();
-      if (type == NodeType.NODE_LOCAL) {
-        allocateNodeLocal(node, schedulerKey, request, resourceRequests);
-      } else if (type == NodeType.RACK_LOCAL) {
-        allocateRackLocal(node, schedulerKey, request, resourceRequests);
-      } else{
-        allocateOffSwitch(request, resourceRequests, schedulerKey);
-      }
 
       if (null != containerAllocated) {
-        updateMetricsForAllocatedContainer(request, type, containerAllocated);
+        updateMetricsForAllocatedContainer(type, containerAllocated);
       }
-      return resourceRequests;
+
+      return schedulerKeyToPlacementSets.get(schedulerKey).allocate(type, node,
+          request);
     } finally {
       writeLock.unlock();
     }
   }
 
-  /**
-   * The {@link ResourceScheduler} is allocating data-local resources to the
-   * application.
-   */
-  private void allocateNodeLocal(SchedulerNode node,
-      SchedulerRequestKey schedulerKey, ResourceRequest nodeLocalRequest,
-      List<ResourceRequest> resourceRequests) {
-    // Update future requirements
-    decResourceRequest(node.getNodeName(), schedulerKey, nodeLocalRequest);
-
-    ResourceRequest rackLocalRequest = 
resourceRequestMap.get(schedulerKey).get(
-        node.getRackName());
-    decResourceRequest(node.getRackName(), schedulerKey, rackLocalRequest);
-
-    ResourceRequest offRackRequest = resourceRequestMap.get(schedulerKey).get(
-        ResourceRequest.ANY);
-    decrementOutstanding(offRackRequest, schedulerKey);
-
-    // Update cloned NodeLocal, RackLocal and OffRack requests for recovery
-    resourceRequests.add(cloneResourceRequest(nodeLocalRequest));
-    resourceRequests.add(cloneResourceRequest(rackLocalRequest));
-    resourceRequests.add(cloneResourceRequest(offRackRequest));
-  }
-
-  private void decResourceRequest(String resourceName,
-      SchedulerRequestKey schedulerKey, ResourceRequest request) {
-    request.setNumContainers(request.getNumContainers() - 1);
-    if (request.getNumContainers() == 0) {
-      resourceRequestMap.get(schedulerKey).remove(resourceName);
-    }
-  }
-
-  /**
-   * The {@link ResourceScheduler} is allocating data-local resources to the
-   * application.
-   */
-  private void allocateRackLocal(SchedulerNode node,
-      SchedulerRequestKey schedulerKey, ResourceRequest rackLocalRequest,
-      List<ResourceRequest> resourceRequests) {
-    // Update future requirements
-    decResourceRequest(node.getRackName(), schedulerKey, rackLocalRequest);
-    
-    ResourceRequest offRackRequest = resourceRequestMap.get(schedulerKey).get(
-        ResourceRequest.ANY);
-    decrementOutstanding(offRackRequest, schedulerKey);
-
-    // Update cloned RackLocal and OffRack requests for recovery
-    resourceRequests.add(cloneResourceRequest(rackLocalRequest));
-    resourceRequests.add(cloneResourceRequest(offRackRequest));
-  }
-
-  /**
-   * The {@link ResourceScheduler} is allocating data-local resources to the
-   * application.
-   */
-  private void allocateOffSwitch(ResourceRequest offSwitchRequest,
-      List<ResourceRequest> resourceRequests,
-      SchedulerRequestKey schedulerKey) {
-    // Update future requirements
-    decrementOutstanding(offSwitchRequest, schedulerKey);
-    // Update cloned OffRack requests for recovery
-    resourceRequests.add(cloneResourceRequest(offSwitchRequest));
-  }
-
-  private void decrementOutstanding(ResourceRequest offSwitchRequest,
-      SchedulerRequestKey schedulerKey) {
-    int numOffSwitchContainers = offSwitchRequest.getNumContainers() - 1;
-
-    // Do not remove ANY
-    offSwitchRequest.setNumContainers(numOffSwitchContainers);
-    
-    // Do we have any outstanding requests?
-    // If there is nothing, we need to deactivate this application
-    if (numOffSwitchContainers == 0) {
-      decrementSchedulerKeyReference(schedulerKey);
-      checkForDeactivation();
-    }
-    
-    appResourceUsage.decPending(offSwitchRequest.getNodeLabelExpression(),
-        offSwitchRequest.getCapability());
-    queue.decPendingResource(offSwitchRequest.getNodeLabelExpression(),
-        offSwitchRequest.getCapability());
+  public List<ResourceRequest> allocate(NodeType type,
+      SchedulerNode node, SchedulerRequestKey schedulerKey,
+      Container containerAllocated) {
+    return allocate(type, node, schedulerKey, null, containerAllocated);
   }
 
-  private void checkForDeactivation() {
+  public void checkForDeactivation() {
     if (schedulerKeys.isEmpty()) {
       activeUsersManager.deactivateApplication(user, applicationId);
     }
@@ -843,9 +719,9 @@ public class AppSchedulingInfo {
       this.writeLock.lock();
       QueueMetrics oldMetrics = queue.getMetrics();
       QueueMetrics newMetrics = newQueue.getMetrics();
-      for (Map<String, ResourceRequest> asks : resourceRequestMap.values()) {
-        ResourceRequest request = asks.get(ResourceRequest.ANY);
-        if (request != null) {
+      for (SchedulingPlacementSet ps : schedulerKeyToPlacementSets.values()) {
+        ResourceRequest request = ps.getResourceRequest(ResourceRequest.ANY);
+        if (request != null && request.getNumContainers() > 0) {
           oldMetrics.decrPendingResources(user, request.getNumContainers(),
               request.getCapability());
           newMetrics.incrPendingResources(user, request.getNumContainers(),
@@ -874,9 +750,9 @@ public class AppSchedulingInfo {
     try {
       this.writeLock.lock();
       QueueMetrics metrics = queue.getMetrics();
-      for (Map<String, ResourceRequest> asks : resourceRequestMap.values()) {
-        ResourceRequest request = asks.get(ResourceRequest.ANY);
-        if (request != null) {
+      for (SchedulingPlacementSet ps : schedulerKeyToPlacementSets.values()) {
+        ResourceRequest request = ps.getResourceRequest(ResourceRequest.ANY);
+        if (request != null && request.getNumContainers() > 0) {
           metrics.decrPendingResources(user, request.getNumContainers(),
               request.getCapability());
 
@@ -945,17 +821,6 @@ public class AppSchedulingInfo {
     }
   }
 
-  public ResourceRequest cloneResourceRequest(ResourceRequest request) {
-    ResourceRequest newRequest = ResourceRequest.newBuilder()
-        .priority(request.getPriority())
-        .resourceName(request.getResourceName())
-        .capability(request.getCapability())
-        .numContainers(1)
-        .relaxLocality(request.getRelaxLocality())
-        .nodeLabelExpression(request.getNodeLabelExpression()).build();
-    return newRequest;
-  }
-
   /*
    * In async environment, pending resource request could be updated during
    * scheduling, this method checks pending request before allocating
@@ -964,107 +829,43 @@ public class AppSchedulingInfo {
       SchedulerRequestKey schedulerKey) {
     try {
       readLock.lock();
-      ResourceRequest r = resourceRequestMap.get(schedulerKey).get(
-          ResourceRequest.ANY);
-      if (r == null || r.getNumContainers() <= 0) {
+      SchedulingPlacementSet ps = 
schedulerKeyToPlacementSets.get(schedulerKey);
+      if (null == ps) {
         return false;
       }
-      if (type == NodeType.RACK_LOCAL || type == NodeType.NODE_LOCAL) {
-        r = resourceRequestMap.get(schedulerKey).get(node.getRackName());
-        if (r == null || r.getNumContainers() <= 0) {
-          return false;
-        }
-        if (type == NodeType.NODE_LOCAL) {
-          r = resourceRequestMap.get(schedulerKey).get(node.getNodeName());
-          if (r == null || r.getNumContainers() <= 0) {
-            return false;
-          }
-        }
-      }
-
-      return true;
+      return ps.canAllocate(type, node);
     } finally {
       readLock.unlock();
     }
   }
 
-  public void updateMetricsForAllocatedContainer(
-      ResourceRequest request, NodeType type, Container containerAllocated) {
-    try {
-      writeLock.lock();
-      QueueMetrics metrics = queue.getMetrics();
-      if (pending) {
-        // once an allocation is done we assume the application is
-        // running from scheduler's POV.
-        pending = false;
-        metrics.runAppAttempt(applicationId, user);
-      }
+  private void updateMetricsForAllocatedContainer(
+    NodeType type, Container containerAllocated) {
+    QueueMetrics metrics = queue.getMetrics();
+    if (pending) {
+      // once an allocation is done we assume the application is
+      // running from scheduler's POV.
+      pending = false;
+      metrics.runAppAttempt(applicationId, user);
+    }
 
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("allocate: applicationId=" + applicationId + " container="
-            + containerAllocated.getId() + " host=" + containerAllocated
-            .getNodeId().toString() + " user=" + user + " resource=" + request
-            .getCapability() + " type=" + type);
-      }
-      metrics.allocateResources(user, 1, request.getCapability(), true);
-      metrics.incrNodeTypeAggregations(user, type);
-    } finally {
-      writeLock.unlock();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("allocate: applicationId=" + applicationId + " container="
+          + containerAllocated.getId() + " host=" + containerAllocated
+          .getNodeId().toString() + " user=" + user + " resource="
+          + containerAllocated.getResource() + " type="
+          + type);
     }
+    metrics.allocateResources(user, 1, containerAllocated.getResource(),
+        true);
+    metrics.incrNodeTypeAggregations(user, type);
   }
 
   // Get placement-set by specified schedulerKey
   // Now simply return all node of the input clusterPlacementSet
-  // TODO, need update this when we support global scheduling
   public <N extends SchedulerNode> SchedulingPlacementSet<N> 
getSchedulingPlacementSet(
       SchedulerRequestKey schedulerkey) {
-    return new SchedulingPlacementSet<N>() {
-      @Override
-      @SuppressWarnings("unchecked")
-      public Iterator<N> getPreferredNodeIterator(
-          PlacementSet<N> clusterPlacementSet) {
-        return IteratorUtils.singletonIterator(
-            clusterPlacementSet.getAllNodes().values().iterator().next());
-      }
-
-      @Override
-      public ResourceRequestUpdateResult updateResourceRequests(
-          List<ResourceRequest> requests,
-          boolean recoverPreemptedRequestForAContainer) {
-        return null;
-      }
-
-      @Override
-      public Map<String, ResourceRequest> getResourceRequests() {
-        return null;
-      }
-
-      @Override
-      public ResourceRequest getResourceRequest(String resourceName,
-          SchedulerRequestKey requestKey) {
-        return null;
-      }
-
-      @Override
-      public List<ResourceRequest> allocate(NodeType type, SchedulerNode node,
-          ResourceRequest request) {
-        return null;
-      }
-
-      @Override
-      public Map<NodeId, N> getAllNodes() {
-        return null;
-      }
-
-      @Override
-      public long getVersion() {
-        return 0;
-      }
-
-      @Override
-      public String getPartition() {
-        return null;
-      }
-    };
+    return (SchedulingPlacementSet<N>) schedulerKeyToPlacementSets.get(
+        schedulerkey);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/65e7ae5d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalitySchedulingPlacementSet.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalitySchedulingPlacementSet.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalitySchedulingPlacementSet.java
new file mode 100644
index 0000000..ffaad58
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalitySchedulingPlacementSet.java
@@ -0,0 +1,311 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class LocalitySchedulingPlacementSet<N extends SchedulerNode>
+    implements SchedulingPlacementSet<N> {
+  private final Map<String, ResourceRequest> resourceRequestMap =
+      new ConcurrentHashMap<>();
+  private AppSchedulingInfo appSchedulingInfo;
+
+  private final ReentrantReadWriteLock.ReadLock readLock;
+  private final ReentrantReadWriteLock.WriteLock writeLock;
+
+  public LocalitySchedulingPlacementSet(AppSchedulingInfo info) {
+    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    readLock = lock.readLock();
+    writeLock = lock.writeLock();
+    this.appSchedulingInfo = info;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public Iterator<N> getPreferredNodeIterator(
+      PlacementSet<N> clusterPlacementSet) {
+    // Now only handle the case that single node in placementSet
+    // TODO, Add support to multi-hosts inside placement-set which is passed 
in.
+
+    N singleNode = PlacementSetUtils.getSingleNode(clusterPlacementSet);
+    if (null != singleNode) {
+      return IteratorUtils.singletonIterator(singleNode);
+    }
+
+    return IteratorUtils.emptyIterator();
+  }
+
+  private boolean hasRequestLabelChanged(ResourceRequest requestOne,
+      ResourceRequest requestTwo) {
+    String requestOneLabelExp = requestOne.getNodeLabelExpression();
+    String requestTwoLabelExp = requestTwo.getNodeLabelExpression();
+    // First request label expression can be null and second request
+    // is not null then we have to consider it as changed.
+    if ((null == requestOneLabelExp) && (null != requestTwoLabelExp)) {
+      return true;
+    }
+    // If the label is not matching between both request when
+    // requestOneLabelExp is not null.
+    return ((null != requestOneLabelExp) && !(requestOneLabelExp
+        .equals(requestTwoLabelExp)));
+  }
+
+  private void updateNodeLabels(ResourceRequest request) {
+    String resourceName = request.getResourceName();
+    if (resourceName.equals(ResourceRequest.ANY)) {
+      ResourceRequest previousAnyRequest =
+          getResourceRequest(resourceName);
+
+      // When there is change in ANY request label expression, we should
+      // update label for all resource requests already added of same
+      // priority as ANY resource request.
+      if ((null == previousAnyRequest) || hasRequestLabelChanged(
+          previousAnyRequest, request)) {
+        for (ResourceRequest r : resourceRequestMap.values()) {
+          if (!r.getResourceName().equals(ResourceRequest.ANY)) {
+            r.setNodeLabelExpression(request.getNodeLabelExpression());
+          }
+        }
+      }
+    } else{
+      ResourceRequest anyRequest = getResourceRequest(ResourceRequest.ANY);
+      if (anyRequest != null) {
+        request.setNodeLabelExpression(anyRequest.getNodeLabelExpression());
+      }
+    }
+  }
+
+  @Override
+  public ResourceRequestUpdateResult updateResourceRequests(
+      Collection<ResourceRequest> requests,
+      boolean recoverPreemptedRequestForAContainer) {
+    try {
+      this.writeLock.lock();
+
+      ResourceRequestUpdateResult updateResult = null;
+
+      // Update resource requests
+      for (ResourceRequest request : requests) {
+        String resourceName = request.getResourceName();
+
+        // Update node labels if required
+        updateNodeLabels(request);
+
+        // Increment number of containers if recovering preempted resources
+        ResourceRequest lastRequest = resourceRequestMap.get(resourceName);
+        if (recoverPreemptedRequestForAContainer && lastRequest != null) {
+          request.setNumContainers(lastRequest.getNumContainers() + 1);
+        }
+
+        // Update asks
+        resourceRequestMap.put(resourceName, request);
+
+        if (resourceName.equals(ResourceRequest.ANY)) {
+          //update the applications requested labels set
+          appSchedulingInfo.addRequestedPartition(
+              request.getNodeLabelExpression() == null ?
+                  RMNodeLabelsManager.NO_LABEL :
+                  request.getNodeLabelExpression());
+
+          updateResult = new ResourceRequestUpdateResult(lastRequest, request);
+        }
+      }
+      return updateResult;
+    } finally {
+      this.writeLock.unlock();
+    }
+  }
+
+  @Override
+  public Map<String, ResourceRequest> getResourceRequests() {
+    return resourceRequestMap;
+  }
+
+  @Override
+  public ResourceRequest getResourceRequest(String resourceName) {
+    return resourceRequestMap.get(resourceName);
+  }
+
+  private void decrementOutstanding(ResourceRequest offSwitchRequest) {
+    int numOffSwitchContainers = offSwitchRequest.getNumContainers() - 1;
+
+    // Do not remove ANY
+    offSwitchRequest.setNumContainers(numOffSwitchContainers);
+
+    // Do we have any outstanding requests?
+    // If there is nothing, we need to deactivate this application
+    if (numOffSwitchContainers == 0) {
+      SchedulerRequestKey schedulerRequestKey = SchedulerRequestKey.create(
+          offSwitchRequest);
+      appSchedulingInfo.decrementSchedulerKeyReference(schedulerRequestKey);
+      appSchedulingInfo.checkForDeactivation();
+    }
+
+    appSchedulingInfo.decPendingResource(
+        offSwitchRequest.getNodeLabelExpression(),
+        offSwitchRequest.getCapability());
+  }
+
+  private ResourceRequest cloneResourceRequest(ResourceRequest request) {
+    ResourceRequest newRequest =
+        ResourceRequest.newInstance(request.getPriority(),
+            request.getResourceName(), request.getCapability(), 1,
+            request.getRelaxLocality(), request.getNodeLabelExpression());
+    return newRequest;
+  }
+
+  /**
+   * The {@link ResourceScheduler} is allocating data-local resources to the
+   * application.
+   */
+  private void allocateRackLocal(SchedulerNode node,
+      ResourceRequest rackLocalRequest,
+      List<ResourceRequest> resourceRequests) {
+    // Update future requirements
+    decResourceRequest(node.getRackName(), rackLocalRequest);
+
+    ResourceRequest offRackRequest = resourceRequestMap.get(
+        ResourceRequest.ANY);
+    decrementOutstanding(offRackRequest);
+
+    // Update cloned RackLocal and OffRack requests for recovery
+    resourceRequests.add(cloneResourceRequest(rackLocalRequest));
+    resourceRequests.add(cloneResourceRequest(offRackRequest));
+  }
+
+  /**
+   * The {@link ResourceScheduler} is allocating data-local resources to the
+   * application.
+   */
+  private void allocateOffSwitch(ResourceRequest offSwitchRequest,
+      List<ResourceRequest> resourceRequests) {
+    // Update future requirements
+    decrementOutstanding(offSwitchRequest);
+    // Update cloned OffRack requests for recovery
+    resourceRequests.add(cloneResourceRequest(offSwitchRequest));
+  }
+
+
+  /**
+   * The {@link ResourceScheduler} is allocating data-local resources to the
+   * application.
+   */
+  private void allocateNodeLocal(SchedulerNode node,
+      ResourceRequest nodeLocalRequest,
+      List<ResourceRequest> resourceRequests) {
+    // Update future requirements
+    decResourceRequest(node.getNodeName(), nodeLocalRequest);
+
+    ResourceRequest rackLocalRequest = resourceRequestMap.get(
+        node.getRackName());
+    decResourceRequest(node.getRackName(), rackLocalRequest);
+
+    ResourceRequest offRackRequest = resourceRequestMap.get(
+        ResourceRequest.ANY);
+    decrementOutstanding(offRackRequest);
+
+    // Update cloned NodeLocal, RackLocal and OffRack requests for recovery
+    resourceRequests.add(cloneResourceRequest(nodeLocalRequest));
+    resourceRequests.add(cloneResourceRequest(rackLocalRequest));
+    resourceRequests.add(cloneResourceRequest(offRackRequest));
+  }
+
+  private void decResourceRequest(String resourceName,
+      ResourceRequest request) {
+    request.setNumContainers(request.getNumContainers() - 1);
+    if (request.getNumContainers() == 0) {
+      resourceRequestMap.remove(resourceName);
+    }
+  }
+
+  @Override
+  public boolean canAllocate(NodeType type, SchedulerNode node) {
+    try {
+      readLock.lock();
+      ResourceRequest r = resourceRequestMap.get(
+          ResourceRequest.ANY);
+      if (r == null || r.getNumContainers() <= 0) {
+        return false;
+      }
+      if (type == NodeType.RACK_LOCAL || type == NodeType.NODE_LOCAL) {
+        r = resourceRequestMap.get(node.getRackName());
+        if (r == null || r.getNumContainers() <= 0) {
+          return false;
+        }
+        if (type == NodeType.NODE_LOCAL) {
+          r = resourceRequestMap.get(node.getNodeName());
+          if (r == null || r.getNumContainers() <= 0) {
+            return false;
+          }
+        }
+      }
+
+      return true;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public List<ResourceRequest> allocate(NodeType type, SchedulerNode node,
+      ResourceRequest request) {
+    try {
+      writeLock.lock();
+
+      List<ResourceRequest> resourceRequests = new ArrayList<>();
+
+      if (null == request) {
+        if (type == NodeType.NODE_LOCAL) {
+          request = resourceRequestMap.get(node.getNodeName());
+        } else if (type == NodeType.RACK_LOCAL) {
+          request = resourceRequestMap.get(node.getRackName());
+        } else{
+          request = resourceRequestMap.get(ResourceRequest.ANY);
+        }
+      }
+
+      if (type == NodeType.NODE_LOCAL) {
+        allocateNodeLocal(node, request, resourceRequests);
+      } else if (type == NodeType.RACK_LOCAL) {
+        allocateRackLocal(node, request, resourceRequests);
+      } else{
+        allocateOffSwitch(request, resourceRequests);
+      }
+
+      return resourceRequests;
+    } finally {
+      writeLock.unlock();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/65e7ae5d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java
index f87f764..d78e710 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java
@@ -23,13 +23,14 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
 
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
 /**
  * <p>
- * In addition to {@link PlacementSet}, this also maintains
+ * Comparing to {@link PlacementSet}, this also maintains
  * pending ResourceRequests:
  * - When new ResourceRequest(s) added to scheduler, or,
  * - Or new container allocated, scheduler can notify corresponding
@@ -42,8 +43,7 @@ import java.util.Map;
  * can have different ways to order nodes depends on requests.
  * </p>
  */
-public interface SchedulingPlacementSet<N extends SchedulerNode>
-    extends PlacementSet<N> {
+public interface SchedulingPlacementSet<N extends SchedulerNode> {
   /**
    * Get iterator of preferred node depends on requirement and/or availability
    * @param clusterPlacementSet input cluster PlacementSet
@@ -60,7 +60,7 @@ public interface SchedulingPlacementSet<N extends 
SchedulerNode>
    * @return true if total pending resource changed
    */
   ResourceRequestUpdateResult updateResourceRequests(
-      List<ResourceRequest> requests,
+      Collection<ResourceRequest> requests,
       boolean recoverPreemptedRequestForAContainer);
 
   /**
@@ -72,19 +72,25 @@ public interface SchedulingPlacementSet<N extends 
SchedulerNode>
   /**
    * Get ResourceRequest by given schedulerKey and resourceName
    * @param resourceName resourceName
-   * @param schedulerRequestKey schedulerRequestKey
    * @return ResourceRequest
    */
-  ResourceRequest getResourceRequest(String resourceName,
-      SchedulerRequestKey schedulerRequestKey);
+  ResourceRequest getResourceRequest(String resourceName);
 
   /**
    * Notify container allocated.
    * @param type Type of the allocation
    * @param node Which node this container allocated on
-   * @param request resource request
+   * @param request Which resource request to allocate
    * @return list of ResourceRequests deducted
    */
   List<ResourceRequest> allocate(NodeType type, SchedulerNode node,
       ResourceRequest request);
+
+  /**
+   * We can still have pending requirement for a given NodeType and node
+   * @param type Locality Type
+   * @param node which node we will allocate on
+   * @return true if we has pending requirement
+   */
+  boolean canAllocate(NodeType type, SchedulerNode node);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/65e7ae5d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java
index 5c53fda..1f87c53 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java
@@ -687,6 +687,9 @@ public class TestApplicationLimitsByPartition {
     List<ResourceRequest> app_0_1_requests = new ArrayList<ResourceRequest>();
     app_0_1_requests.add(TestUtils.createResourceRequest(ResourceRequest.ANY,
         1 * GB, 2, true, priority_1, recordFactory));
+    app_0_1.updateResourceRequests(app_0_1_requests);
+
+    app_0_1_requests.clear();
     app_0_1_requests.add(TestUtils.createResourceRequest(ResourceRequest.ANY,
         1 * GB, 2, true, priority_1, recordFactory, "y"));
     app_0_1.updateResourceRequests(app_0_1_requests);
@@ -715,6 +718,9 @@ public class TestApplicationLimitsByPartition {
     List<ResourceRequest> app_1_0_requests = new ArrayList<ResourceRequest>();
     app_1_0_requests.add(TestUtils.createResourceRequest(ResourceRequest.ANY,
         1 * GB, 2, true, priority_1, recordFactory));
+    app_1_0.updateResourceRequests(app_1_0_requests);
+
+    app_1_0_requests.clear();
     app_1_0_requests.add(TestUtils.createResourceRequest(ResourceRequest.ANY,
         1 * GB, 2, true, priority_1, recordFactory, "y"));
     app_1_0.updateResourceRequests(app_1_0_requests);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to