Repository: hadoop
Updated Branches:
  refs/heads/apache-trunk [created] ac4d2b108


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac4d2b10/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java
new file mode 100644
index 0000000..7f89435
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java
@@ -0,0 +1,422 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * This is an implementation of the {@link AppPlacementAllocator} that takes
+ * into account locality preferences (node, rack, any) when allocating
+ * containers.
+ */
+public class LocalityAppPlacementAllocator<N extends SchedulerNode>
+    implements AppPlacementAllocator<N> {
+  private static final Log LOG =
+      LogFactory.getLog(LocalityAppPlacementAllocator.class);
+
+  private final Map<String, ResourceRequest> resourceRequestMap =
+      new ConcurrentHashMap<>();
+  private AppSchedulingInfo appSchedulingInfo;
+  private volatile String primaryRequestedPartition =
+      RMNodeLabelsManager.NO_LABEL;
+
+  private final ReentrantReadWriteLock.ReadLock readLock;
+  private final ReentrantReadWriteLock.WriteLock writeLock;
+
+  public LocalityAppPlacementAllocator(AppSchedulingInfo info) {
+    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    readLock = lock.readLock();
+    writeLock = lock.writeLock();
+    this.appSchedulingInfo = info;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public Iterator<N> getPreferredNodeIterator(
+      CandidateNodeSet<N> candidateNodeSet) {
+    // Now only handle the case that single node in the candidateNodeSet
+    // TODO, Add support to multi-hosts inside candidateNodeSet which is passed
+    // in.
+
+    N singleNode = CandidateNodeSetUtils.getSingleNode(candidateNodeSet);
+    if (null != singleNode) {
+      return IteratorUtils.singletonIterator(singleNode);
+    }
+
+    return IteratorUtils.emptyIterator();
+  }
+
+  private boolean hasRequestLabelChanged(ResourceRequest requestOne,
+      ResourceRequest requestTwo) {
+    String requestOneLabelExp = requestOne.getNodeLabelExpression();
+    String requestTwoLabelExp = requestTwo.getNodeLabelExpression();
+    // First request label expression can be null and second request
+    // is not null then we have to consider it as changed.
+    if ((null == requestOneLabelExp) && (null != requestTwoLabelExp)) {
+      return true;
+    }
+    // If the label is not matching between both request when
+    // requestOneLabelExp is not null.
+    return ((null != requestOneLabelExp) && !(requestOneLabelExp
+        .equals(requestTwoLabelExp)));
+  }
+
+  private void updateNodeLabels(ResourceRequest request) {
+    String resourceName = request.getResourceName();
+    if (resourceName.equals(ResourceRequest.ANY)) {
+      ResourceRequest previousAnyRequest =
+          getResourceRequest(resourceName);
+
+      // When there is change in ANY request label expression, we should
+      // update label for all resource requests already added of same
+      // priority as ANY resource request.
+      if ((null == previousAnyRequest) || hasRequestLabelChanged(
+          previousAnyRequest, request)) {
+        for (ResourceRequest r : resourceRequestMap.values()) {
+          if (!r.getResourceName().equals(ResourceRequest.ANY)) {
+            r.setNodeLabelExpression(request.getNodeLabelExpression());
+          }
+        }
+      }
+    } else{
+      ResourceRequest anyRequest = getResourceRequest(ResourceRequest.ANY);
+      if (anyRequest != null) {
+        request.setNodeLabelExpression(anyRequest.getNodeLabelExpression());
+      }
+    }
+  }
+
+  @Override
+  public ResourceRequestUpdateResult updateResourceRequests(
+      Collection<ResourceRequest> requests,
+      boolean recoverPreemptedRequestForAContainer) {
+    try {
+      this.writeLock.lock();
+
+      ResourceRequestUpdateResult updateResult = null;
+
+      // Update resource requests
+      for (ResourceRequest request : requests) {
+        String resourceName = request.getResourceName();
+
+        // Update node labels if required
+        updateNodeLabels(request);
+
+        // Increment number of containers if recovering preempted resources
+        ResourceRequest lastRequest = resourceRequestMap.get(resourceName);
+        if (recoverPreemptedRequestForAContainer && lastRequest != null) {
+          request.setNumContainers(lastRequest.getNumContainers() + 1);
+        }
+
+        // Update asks
+        resourceRequestMap.put(resourceName, request);
+
+        if (resourceName.equals(ResourceRequest.ANY)) {
+          String partition = request.getNodeLabelExpression() == null ?
+              RMNodeLabelsManager.NO_LABEL :
+              request.getNodeLabelExpression();
+
+          this.primaryRequestedPartition = partition;
+
+          //update the applications requested labels set
+          appSchedulingInfo.addRequestedPartition(partition);
+
+          updateResult = new ResourceRequestUpdateResult(lastRequest, request);
+        }
+      }
+      return updateResult;
+    } finally {
+      this.writeLock.unlock();
+    }
+  }
+
+  @Override
+  public Map<String, ResourceRequest> getResourceRequests() {
+    return resourceRequestMap;
+  }
+
+  private ResourceRequest getResourceRequest(String resourceName) {
+    return resourceRequestMap.get(resourceName);
+  }
+
+  @Override
+  public PendingAsk getPendingAsk(String resourceName) {
+    try {
+      readLock.lock();
+      ResourceRequest request = getResourceRequest(resourceName);
+      if (null == request) {
+        return PendingAsk.ZERO;
+      } else{
+        return new PendingAsk(request.getCapability(),
+            request.getNumContainers());
+      }
+    } finally {
+      readLock.unlock();
+    }
+
+  }
+
+  @Override
+  public int getOutstandingAsksCount(String resourceName) {
+    try {
+      readLock.lock();
+      ResourceRequest request = getResourceRequest(resourceName);
+      if (null == request) {
+        return 0;
+      } else{
+        return request.getNumContainers();
+      }
+    } finally {
+      readLock.unlock();
+    }
+
+  }
+
+  private void decrementOutstanding(SchedulerRequestKey schedulerRequestKey,
+      ResourceRequest offSwitchRequest) {
+    int numOffSwitchContainers = offSwitchRequest.getNumContainers() - 1;
+    offSwitchRequest.setNumContainers(numOffSwitchContainers);
+
+    // Do we have any outstanding requests?
+    // If there is nothing, we need to deactivate this application
+    if (numOffSwitchContainers == 0) {
+      appSchedulingInfo.getSchedulerKeys().remove(schedulerRequestKey);
+      appSchedulingInfo.checkForDeactivation();
+      resourceRequestMap.remove(ResourceRequest.ANY);
+      if (resourceRequestMap.isEmpty()) {
+        appSchedulingInfo.removeAppPlacement(schedulerRequestKey);
+      }
+    }
+
+    appSchedulingInfo.decPendingResource(
+        offSwitchRequest.getNodeLabelExpression(),
+        offSwitchRequest.getCapability());
+  }
+
+  public ResourceRequest cloneResourceRequest(ResourceRequest request) {
+    ResourceRequest newRequest = ResourceRequest.newBuilder()
+        .priority(request.getPriority())
+        .allocationRequestId(request.getAllocationRequestId())
+        .resourceName(request.getResourceName())
+        .capability(request.getCapability())
+        .numContainers(1)
+        .relaxLocality(request.getRelaxLocality())
+        .nodeLabelExpression(request.getNodeLabelExpression()).build();
+    return newRequest;
+  }
+
+  /**
+   * The {@link ResourceScheduler} is allocating data-local resources to the
+   * application.
+   */
+  private void allocateRackLocal(SchedulerRequestKey schedulerKey,
+      SchedulerNode node, ResourceRequest rackLocalRequest,
+      List<ResourceRequest> resourceRequests) {
+    // Update future requirements
+    decResourceRequest(node.getRackName(), rackLocalRequest);
+
+    ResourceRequest offRackRequest = resourceRequestMap.get(
+        ResourceRequest.ANY);
+    decrementOutstanding(schedulerKey, offRackRequest);
+
+    // Update cloned RackLocal and OffRack requests for recovery
+    resourceRequests.add(cloneResourceRequest(rackLocalRequest));
+    resourceRequests.add(cloneResourceRequest(offRackRequest));
+  }
+
+  /**
+   * The {@link ResourceScheduler} is allocating data-local resources to the
+   * application.
+   */
+  private void allocateOffSwitch(SchedulerRequestKey schedulerKey,
+      ResourceRequest offSwitchRequest,
+      List<ResourceRequest> resourceRequests) {
+    // Update future requirements
+    decrementOutstanding(schedulerKey, offSwitchRequest);
+    // Update cloned OffRack requests for recovery
+    resourceRequests.add(cloneResourceRequest(offSwitchRequest));
+  }
+
+
+  /**
+   * The {@link ResourceScheduler} is allocating data-local resources to the
+   * application.
+   */
+  private void allocateNodeLocal(SchedulerRequestKey schedulerKey,
+      SchedulerNode node, ResourceRequest nodeLocalRequest,
+      List<ResourceRequest> resourceRequests) {
+    // Update future requirements
+    decResourceRequest(node.getNodeName(), nodeLocalRequest);
+
+    ResourceRequest rackLocalRequest = resourceRequestMap.get(
+        node.getRackName());
+    decResourceRequest(node.getRackName(), rackLocalRequest);
+
+    ResourceRequest offRackRequest = resourceRequestMap.get(
+        ResourceRequest.ANY);
+    decrementOutstanding(schedulerKey, offRackRequest);
+
+    // Update cloned NodeLocal, RackLocal and OffRack requests for recovery
+    resourceRequests.add(cloneResourceRequest(nodeLocalRequest));
+    resourceRequests.add(cloneResourceRequest(rackLocalRequest));
+    resourceRequests.add(cloneResourceRequest(offRackRequest));
+  }
+
+  private void decResourceRequest(String resourceName,
+      ResourceRequest request) {
+    request.setNumContainers(request.getNumContainers() - 1);
+    if (request.getNumContainers() == 0) {
+      resourceRequestMap.remove(resourceName);
+    }
+  }
+
+  @Override
+  public boolean canAllocate(NodeType type, SchedulerNode node) {
+    try {
+      readLock.lock();
+      ResourceRequest r = resourceRequestMap.get(
+          ResourceRequest.ANY);
+      if (r == null || r.getNumContainers() <= 0) {
+        return false;
+      }
+      if (type == NodeType.RACK_LOCAL || type == NodeType.NODE_LOCAL) {
+        r = resourceRequestMap.get(node.getRackName());
+        if (r == null || r.getNumContainers() <= 0) {
+          return false;
+        }
+        if (type == NodeType.NODE_LOCAL) {
+          r = resourceRequestMap.get(node.getNodeName());
+          if (r == null || r.getNumContainers() <= 0) {
+            return false;
+          }
+        }
+      }
+
+      return true;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public boolean canDelayTo(String resourceName) {
+    try {
+      readLock.lock();
+      ResourceRequest request = getResourceRequest(resourceName);
+      return request == null || request.getRelaxLocality();
+    } finally {
+      readLock.unlock();
+    }
+
+  }
+
+  @Override
+  public boolean acceptNodePartition(String nodePartition,
+      SchedulingMode schedulingMode) {
+    // We will only look at node label = nodeLabelToLookAt according to
+    // schedulingMode and partition of node.
+    String nodePartitionToLookAt;
+    if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) {
+      nodePartitionToLookAt = nodePartition;
+    } else {
+      nodePartitionToLookAt = RMNodeLabelsManager.NO_LABEL;
+    }
+
+    return primaryRequestedPartition.equals(nodePartitionToLookAt);
+  }
+
+  @Override
+  public String getPrimaryRequestedNodePartition() {
+    return primaryRequestedPartition;
+  }
+
+  @Override
+  public int getUniqueLocationAsks() {
+    return resourceRequestMap.size();
+  }
+
+  @Override
+  public void showRequests() {
+    for (ResourceRequest request : resourceRequestMap.values()) {
+      if (request.getNumContainers() > 0) {
+        LOG.debug("\tRequest=" + request);
+      }
+    }
+  }
+
+  @Override
+  public List<ResourceRequest> allocate(SchedulerRequestKey schedulerKey,
+      NodeType type, SchedulerNode node) {
+    try {
+      writeLock.lock();
+
+      List<ResourceRequest> resourceRequests = new ArrayList<>();
+
+      ResourceRequest request;
+      if (type == NodeType.NODE_LOCAL) {
+        request = resourceRequestMap.get(node.getNodeName());
+      } else if (type == NodeType.RACK_LOCAL) {
+        request = resourceRequestMap.get(node.getRackName());
+      } else{
+        request = resourceRequestMap.get(ResourceRequest.ANY);
+      }
+
+      if (type == NodeType.NODE_LOCAL) {
+        allocateNodeLocal(schedulerKey, node, request, resourceRequests);
+      } else if (type == NodeType.RACK_LOCAL) {
+        allocateRackLocal(schedulerKey, node, request, resourceRequests);
+      } else{
+        allocateOffSwitch(schedulerKey, request, resourceRequests);
+      }
+
+      return resourceRequests;
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  @Override
+  public Iterator<String> getAcceptedResouceNames() {
+    try {
+      readLock.lock();
+      return resourceRequestMap.keySet().iterator();
+    } finally {
+      readLock.unlock();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac4d2b10/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalitySchedulingPlacementSet.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalitySchedulingPlacementSet.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalitySchedulingPlacementSet.java
deleted file mode 100644
index 6cc8cc7..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalitySchedulingPlacementSet.java
+++ /dev/null
@@ -1,416 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
-
-import org.apache.commons.collections.IteratorUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
-import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-public class LocalitySchedulingPlacementSet<N extends SchedulerNode>
-    implements SchedulingPlacementSet<N> {
-  private static final Log LOG =
-      LogFactory.getLog(LocalitySchedulingPlacementSet.class);
-
-  private final Map<String, ResourceRequest> resourceRequestMap =
-      new ConcurrentHashMap<>();
-  private AppSchedulingInfo appSchedulingInfo;
-  private volatile String primaryRequestedPartition =
-      RMNodeLabelsManager.NO_LABEL;
-
-  private final ReentrantReadWriteLock.ReadLock readLock;
-  private final ReentrantReadWriteLock.WriteLock writeLock;
-
-  public LocalitySchedulingPlacementSet(AppSchedulingInfo info) {
-    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
-    readLock = lock.readLock();
-    writeLock = lock.writeLock();
-    this.appSchedulingInfo = info;
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  public Iterator<N> getPreferredNodeIterator(
-      PlacementSet<N> clusterPlacementSet) {
-    // Now only handle the case that single node in placementSet
-    // TODO, Add support to multi-hosts inside placement-set which is passed 
in.
-
-    N singleNode = PlacementSetUtils.getSingleNode(clusterPlacementSet);
-    if (null != singleNode) {
-      return IteratorUtils.singletonIterator(singleNode);
-    }
-
-    return IteratorUtils.emptyIterator();
-  }
-
-  private boolean hasRequestLabelChanged(ResourceRequest requestOne,
-      ResourceRequest requestTwo) {
-    String requestOneLabelExp = requestOne.getNodeLabelExpression();
-    String requestTwoLabelExp = requestTwo.getNodeLabelExpression();
-    // First request label expression can be null and second request
-    // is not null then we have to consider it as changed.
-    if ((null == requestOneLabelExp) && (null != requestTwoLabelExp)) {
-      return true;
-    }
-    // If the label is not matching between both request when
-    // requestOneLabelExp is not null.
-    return ((null != requestOneLabelExp) && !(requestOneLabelExp
-        .equals(requestTwoLabelExp)));
-  }
-
-  private void updateNodeLabels(ResourceRequest request) {
-    String resourceName = request.getResourceName();
-    if (resourceName.equals(ResourceRequest.ANY)) {
-      ResourceRequest previousAnyRequest =
-          getResourceRequest(resourceName);
-
-      // When there is change in ANY request label expression, we should
-      // update label for all resource requests already added of same
-      // priority as ANY resource request.
-      if ((null == previousAnyRequest) || hasRequestLabelChanged(
-          previousAnyRequest, request)) {
-        for (ResourceRequest r : resourceRequestMap.values()) {
-          if (!r.getResourceName().equals(ResourceRequest.ANY)) {
-            r.setNodeLabelExpression(request.getNodeLabelExpression());
-          }
-        }
-      }
-    } else{
-      ResourceRequest anyRequest = getResourceRequest(ResourceRequest.ANY);
-      if (anyRequest != null) {
-        request.setNodeLabelExpression(anyRequest.getNodeLabelExpression());
-      }
-    }
-  }
-
-  @Override
-  public ResourceRequestUpdateResult updateResourceRequests(
-      Collection<ResourceRequest> requests,
-      boolean recoverPreemptedRequestForAContainer) {
-    try {
-      this.writeLock.lock();
-
-      ResourceRequestUpdateResult updateResult = null;
-
-      // Update resource requests
-      for (ResourceRequest request : requests) {
-        String resourceName = request.getResourceName();
-
-        // Update node labels if required
-        updateNodeLabels(request);
-
-        // Increment number of containers if recovering preempted resources
-        ResourceRequest lastRequest = resourceRequestMap.get(resourceName);
-        if (recoverPreemptedRequestForAContainer && lastRequest != null) {
-          request.setNumContainers(lastRequest.getNumContainers() + 1);
-        }
-
-        // Update asks
-        resourceRequestMap.put(resourceName, request);
-
-        if (resourceName.equals(ResourceRequest.ANY)) {
-          String partition = request.getNodeLabelExpression() == null ?
-              RMNodeLabelsManager.NO_LABEL :
-              request.getNodeLabelExpression();
-
-          this.primaryRequestedPartition = partition;
-
-          //update the applications requested labels set
-          appSchedulingInfo.addRequestedPartition(partition);
-
-          updateResult = new ResourceRequestUpdateResult(lastRequest, request);
-        }
-      }
-      return updateResult;
-    } finally {
-      this.writeLock.unlock();
-    }
-  }
-
-  @Override
-  public Map<String, ResourceRequest> getResourceRequests() {
-    return resourceRequestMap;
-  }
-
-  private ResourceRequest getResourceRequest(String resourceName) {
-    return resourceRequestMap.get(resourceName);
-  }
-
-  @Override
-  public PendingAsk getPendingAsk(String resourceName) {
-    try {
-      readLock.lock();
-      ResourceRequest request = getResourceRequest(resourceName);
-      if (null == request) {
-        return PendingAsk.ZERO;
-      } else{
-        return new PendingAsk(request.getCapability(),
-            request.getNumContainers());
-      }
-    } finally {
-      readLock.unlock();
-    }
-
-  }
-
-  @Override
-  public int getOutstandingAsksCount(String resourceName) {
-    try {
-      readLock.lock();
-      ResourceRequest request = getResourceRequest(resourceName);
-      if (null == request) {
-        return 0;
-      } else{
-        return request.getNumContainers();
-      }
-    } finally {
-      readLock.unlock();
-    }
-
-  }
-
-  private void decrementOutstanding(SchedulerRequestKey schedulerRequestKey,
-      ResourceRequest offSwitchRequest) {
-    int numOffSwitchContainers = offSwitchRequest.getNumContainers() - 1;
-    offSwitchRequest.setNumContainers(numOffSwitchContainers);
-
-    // Do we have any outstanding requests?
-    // If there is nothing, we need to deactivate this application
-    if (numOffSwitchContainers == 0) {
-      appSchedulingInfo.getSchedulerKeys().remove(schedulerRequestKey);
-      appSchedulingInfo.checkForDeactivation();
-      resourceRequestMap.remove(ResourceRequest.ANY);
-      if (resourceRequestMap.isEmpty()) {
-        appSchedulingInfo.removePlacementSets(schedulerRequestKey);
-      }
-    }
-
-    appSchedulingInfo.decPendingResource(
-        offSwitchRequest.getNodeLabelExpression(),
-        offSwitchRequest.getCapability());
-  }
-
-  public ResourceRequest cloneResourceRequest(ResourceRequest request) {
-    ResourceRequest newRequest = ResourceRequest.newBuilder()
-        .priority(request.getPriority())
-        .allocationRequestId(request.getAllocationRequestId())
-        .resourceName(request.getResourceName())
-        .capability(request.getCapability())
-        .numContainers(1)
-        .relaxLocality(request.getRelaxLocality())
-        .nodeLabelExpression(request.getNodeLabelExpression()).build();
-    return newRequest;
-  }
-
-  /**
-   * The {@link ResourceScheduler} is allocating data-local resources to the
-   * application.
-   */
-  private void allocateRackLocal(SchedulerRequestKey schedulerKey,
-      SchedulerNode node, ResourceRequest rackLocalRequest,
-      List<ResourceRequest> resourceRequests) {
-    // Update future requirements
-    decResourceRequest(node.getRackName(), rackLocalRequest);
-
-    ResourceRequest offRackRequest = resourceRequestMap.get(
-        ResourceRequest.ANY);
-    decrementOutstanding(schedulerKey, offRackRequest);
-
-    // Update cloned RackLocal and OffRack requests for recovery
-    resourceRequests.add(cloneResourceRequest(rackLocalRequest));
-    resourceRequests.add(cloneResourceRequest(offRackRequest));
-  }
-
-  /**
-   * The {@link ResourceScheduler} is allocating data-local resources to the
-   * application.
-   */
-  private void allocateOffSwitch(SchedulerRequestKey schedulerKey,
-      ResourceRequest offSwitchRequest,
-      List<ResourceRequest> resourceRequests) {
-    // Update future requirements
-    decrementOutstanding(schedulerKey, offSwitchRequest);
-    // Update cloned OffRack requests for recovery
-    resourceRequests.add(cloneResourceRequest(offSwitchRequest));
-  }
-
-
-  /**
-   * The {@link ResourceScheduler} is allocating data-local resources to the
-   * application.
-   */
-  private void allocateNodeLocal(SchedulerRequestKey schedulerKey,
-      SchedulerNode node, ResourceRequest nodeLocalRequest,
-      List<ResourceRequest> resourceRequests) {
-    // Update future requirements
-    decResourceRequest(node.getNodeName(), nodeLocalRequest);
-
-    ResourceRequest rackLocalRequest = resourceRequestMap.get(
-        node.getRackName());
-    decResourceRequest(node.getRackName(), rackLocalRequest);
-
-    ResourceRequest offRackRequest = resourceRequestMap.get(
-        ResourceRequest.ANY);
-    decrementOutstanding(schedulerKey, offRackRequest);
-
-    // Update cloned NodeLocal, RackLocal and OffRack requests for recovery
-    resourceRequests.add(cloneResourceRequest(nodeLocalRequest));
-    resourceRequests.add(cloneResourceRequest(rackLocalRequest));
-    resourceRequests.add(cloneResourceRequest(offRackRequest));
-  }
-
-  private void decResourceRequest(String resourceName,
-      ResourceRequest request) {
-    request.setNumContainers(request.getNumContainers() - 1);
-    if (request.getNumContainers() == 0) {
-      resourceRequestMap.remove(resourceName);
-    }
-  }
-
-  @Override
-  public boolean canAllocate(NodeType type, SchedulerNode node) {
-    try {
-      readLock.lock();
-      ResourceRequest r = resourceRequestMap.get(
-          ResourceRequest.ANY);
-      if (r == null || r.getNumContainers() <= 0) {
-        return false;
-      }
-      if (type == NodeType.RACK_LOCAL || type == NodeType.NODE_LOCAL) {
-        r = resourceRequestMap.get(node.getRackName());
-        if (r == null || r.getNumContainers() <= 0) {
-          return false;
-        }
-        if (type == NodeType.NODE_LOCAL) {
-          r = resourceRequestMap.get(node.getNodeName());
-          if (r == null || r.getNumContainers() <= 0) {
-            return false;
-          }
-        }
-      }
-
-      return true;
-    } finally {
-      readLock.unlock();
-    }
-  }
-
-  @Override
-  public boolean canDelayTo(String resourceName) {
-    try {
-      readLock.lock();
-      ResourceRequest request = getResourceRequest(resourceName);
-      return request == null || request.getRelaxLocality();
-    } finally {
-      readLock.unlock();
-    }
-
-  }
-
-  @Override
-  public boolean acceptNodePartition(String nodePartition,
-      SchedulingMode schedulingMode) {
-    // We will only look at node label = nodeLabelToLookAt according to
-    // schedulingMode and partition of node.
-    String nodePartitionToLookAt;
-    if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) {
-      nodePartitionToLookAt = nodePartition;
-    } else {
-      nodePartitionToLookAt = RMNodeLabelsManager.NO_LABEL;
-    }
-
-    return primaryRequestedPartition.equals(nodePartitionToLookAt);
-  }
-
-  @Override
-  public String getPrimaryRequestedNodePartition() {
-    return primaryRequestedPartition;
-  }
-
-  @Override
-  public int getUniqueLocationAsks() {
-    return resourceRequestMap.size();
-  }
-
-  @Override
-  public void showRequests() {
-    for (ResourceRequest request : resourceRequestMap.values()) {
-      if (request.getNumContainers() > 0) {
-        LOG.debug("\tRequest=" + request);
-      }
-    }
-  }
-
-  @Override
-  public List<ResourceRequest> allocate(SchedulerRequestKey schedulerKey,
-      NodeType type, SchedulerNode node) {
-    try {
-      writeLock.lock();
-
-      List<ResourceRequest> resourceRequests = new ArrayList<>();
-
-      ResourceRequest request;
-      if (type == NodeType.NODE_LOCAL) {
-        request = resourceRequestMap.get(node.getNodeName());
-      } else if (type == NodeType.RACK_LOCAL) {
-        request = resourceRequestMap.get(node.getRackName());
-      } else{
-        request = resourceRequestMap.get(ResourceRequest.ANY);
-      }
-
-      if (type == NodeType.NODE_LOCAL) {
-        allocateNodeLocal(schedulerKey, node, request, resourceRequests);
-      } else if (type == NodeType.RACK_LOCAL) {
-        allocateRackLocal(schedulerKey, node, request, resourceRequests);
-      } else{
-        allocateOffSwitch(schedulerKey, request, resourceRequests);
-      }
-
-      return resourceRequests;
-    } finally {
-      writeLock.unlock();
-    }
-  }
-
-  @Override
-  public Iterator<String> getAcceptedResouceNames() {
-    try {
-      readLock.lock();
-      return resourceRequestMap.keySet().iterator();
-    } finally {
-      readLock.unlock();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac4d2b10/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PlacementSet.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PlacementSet.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PlacementSet.java
deleted file mode 100644
index 2e6c3ca..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PlacementSet.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
-
-import java.util.Iterator;
-import java.util.Map;
-
-/**
- * <p>
- * PlacementSet is the central place that decide the order of node to fit
- * asks by application.
- * </p>
- *
- * <p>
- * Also, PlacementSet can cache results (for example, ordered list) for
- * better performance.
- * </p>
- *
- * <p>
- * PlacementSet can depend on one or more other PlacementSets.
- * </p>
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public interface PlacementSet<N extends SchedulerNode> {
-  /**
-   * Get all nodes for this PlacementSet
-   * @return all nodes for this PlacementSet
-   */
-  Map<NodeId, N> getAllNodes();
-
-  /**
-   * Version of the PlacementSet, can help other PlacementSet with dependencies
-   * deciding if update is required
-   * @return version
-   */
-  long getVersion();
-
-  /**
-   * Partition of the PlacementSet.
-   * @return node partition
-   */
-  String getPartition();
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac4d2b10/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PlacementSetUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PlacementSetUtils.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PlacementSetUtils.java
deleted file mode 100644
index 405122b..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PlacementSetUtils.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
-
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
-
-public class PlacementSetUtils {
-  /*
-   * If the {@link PlacementSet} only has one entry, return it. otherwise
-   * return null
-   */
-  public static <N extends SchedulerNode> N getSingleNode(PlacementSet<N> ps) {
-    N node = null;
-    if (1 == ps.getAllNodes().size()) {
-      node = ps.getAllNodes().values().iterator().next();
-    }
-
-    return node;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac4d2b10/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java
deleted file mode 100644
index 3e0620e..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
-
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
-import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-/**
- * <p>
- * Comparing to {@link PlacementSet}, this also maintains
- * pending ResourceRequests:
- * - When new ResourceRequest(s) added to scheduler, or,
- * - Or new container allocated, scheduler can notify corresponding
- * PlacementSet.
- * </p>
- *
- * <p>
- * Different set of resource requests (E.g., resource requests with the
- * same schedulerKey) can have one instance of PlacementSet, each PlacementSet
- * can have different ways to order nodes depends on requests.
- * </p>
- */
-public interface SchedulingPlacementSet<N extends SchedulerNode> {
-  /**
-   * Get iterator of preferred node depends on requirement and/or availability
-   * @param clusterPlacementSet input cluster PlacementSet
-   * @return iterator of preferred node
-   */
-  Iterator<N> getPreferredNodeIterator(PlacementSet<N> clusterPlacementSet);
-
-  /**
-   * Replace existing ResourceRequest by the new requests
-   *
-   * @param requests new ResourceRequests
-   * @param recoverPreemptedRequestForAContainer if we're recovering resource
-   * requests for preempted container
-   * @return true if total pending resource changed
-   */
-  ResourceRequestUpdateResult updateResourceRequests(
-      Collection<ResourceRequest> requests,
-      boolean recoverPreemptedRequestForAContainer);
-
-  /**
-   * Get pending ResourceRequests by given schedulerRequestKey
-   * @return Map of resourceName to ResourceRequest
-   */
-  Map<String, ResourceRequest> getResourceRequests();
-
-  /**
-   * Get pending ask for given resourceName. If there's no such pendingAsk,
-   * returns {@link PendingAsk#ZERO}
-   *
-   * @param resourceName resourceName
-   * @return PendingAsk
-   */
-  PendingAsk getPendingAsk(String resourceName);
-
-  /**
-   * Get #pending-allocations for given resourceName. If there's no such
-   * pendingAsk, returns 0
-   *
-   * @param resourceName resourceName
-   * @return #pending-allocations
-   */
-  int getOutstandingAsksCount(String resourceName);
-
-  /**
-   * Notify container allocated.
-   * @param schedulerKey SchedulerRequestKey for this ResourceRequest
-   * @param type Type of the allocation
-   * @param node Which node this container allocated on
-   * @return list of ResourceRequests deducted
-   */
-  List<ResourceRequest> allocate(SchedulerRequestKey schedulerKey,
-      NodeType type, SchedulerNode node);
-
-  /**
-   * Returns list of accepted resourceNames.
-   * @return Iterator of accepted resourceNames
-   */
-  Iterator<String> getAcceptedResouceNames();
-
-  /**
-   * We can still have pending requirement for a given NodeType and node
-   * @param type Locality Type
-   * @param node which node we will allocate on
-   * @return true if we has pending requirement
-   */
-  boolean canAllocate(NodeType type, SchedulerNode node);
-
-  /**
-   * Can delay to give locality?
-   * TODO (wangda): This should be moved out of SchedulingPlacementSet
-   * and should belong to specific delay scheduling policy impl.
-   *
-   * @param resourceName resourceName
-   * @return can/cannot
-   */
-  boolean canDelayTo(String resourceName);
-
-  /**
-   * Does this {@link SchedulingPlacementSet} accept resources on 
nodePartition?
-   *
-   * @param nodePartition nodePartition
-   * @param schedulingMode schedulingMode
-   * @return accepted/not
-   */
-  boolean acceptNodePartition(String nodePartition,
-      SchedulingMode schedulingMode);
-
-  /**
-   * It is possible that one request can accept multiple node partition,
-   * So this method returns primary node partition for pending resource /
-   * headroom calculation.
-   *
-   * @return primary requested node partition
-   */
-  String getPrimaryRequestedNodePartition();
-
-  /**
-   * @return number of unique location asks with #pending greater than 0,
-   * (like /rack1, host1, etc.).
-   *
-   * TODO (wangda): This should be moved out of SchedulingPlacementSet
-   * and should belong to specific delay scheduling policy impl.
-   */
-  int getUniqueLocationAsks();
-
-  /**
-   * Print human-readable requests to LOG debug.
-   */
-  void showRequests();
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac4d2b10/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SimpleCandidateNodeSet.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SimpleCandidateNodeSet.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SimpleCandidateNodeSet.java
new file mode 100644
index 0000000..31a2170
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SimpleCandidateNodeSet.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeLabel;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * A simple CandidateNodeSet which keeps an unordered map
+ */
+public class SimpleCandidateNodeSet<N extends SchedulerNode>
+    implements CandidateNodeSet<N> {
+
+  private Map<NodeId, N> map;
+  private String partition;
+
+  public SimpleCandidateNodeSet(N node) {
+    if (null != node) {
+      // Only one node in the initial CandidateNodeSet
+      this.map = ImmutableMap.of(node.getNodeID(), node);
+      this.partition = node.getPartition();
+    } else {
+      this.map = Collections.emptyMap();
+      this.partition = NodeLabel.DEFAULT_NODE_LABEL_PARTITION;
+    }
+  }
+
+  public SimpleCandidateNodeSet(Map<NodeId, N> map, String partition) {
+    this.map = map;
+    this.partition = partition;
+  }
+
+  @Override
+  public Map<NodeId, N> getAllNodes() {
+    return map;
+  }
+
+  @Override
+  public long getVersion() {
+    return 0L;
+  }
+
+  @Override
+  public String getPartition() {
+    return partition;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac4d2b10/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SimplePlacementSet.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SimplePlacementSet.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SimplePlacementSet.java
deleted file mode 100644
index 48efaa1..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SimplePlacementSet.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
-
-import com.google.common.collect.ImmutableMap;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.NodeLabel;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
-
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.Map;
-
-/**
- * A simple PlacementSet which keeps an unordered map
- */
-public class SimplePlacementSet<N extends SchedulerNode>
-    implements PlacementSet<N> {
-
-  private Map<NodeId, N> map;
-  private String partition;
-
-  public SimplePlacementSet(N node) {
-    if (null != node) {
-      // Only one node in the initial PlacementSet
-      this.map = ImmutableMap.of(node.getNodeID(), node);
-      this.partition = node.getPartition();
-    } else {
-      this.map = Collections.emptyMap();
-      this.partition = NodeLabel.DEFAULT_NODE_LABEL_PARTITION;
-    }
-  }
-
-  public SimplePlacementSet(Map<NodeId, N> map, String partition) {
-    this.map = map;
-    this.partition = partition;
-  }
-
-  @Override
-  public Map<NodeId, N> getAllNodes() {
-    return map;
-  }
-
-  @Override
-  public long getVersion() {
-    return 0L;
-  }
-
-  @Override
-  public String getPartition() {
-    return partition;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac4d2b10/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/package-info.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/package-info.java
new file mode 100644
index 0000000..e8268f8
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement
+ * contains classes related to application monitor.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac4d2b10/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index 1dea4ee..800d023 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -140,7 +140,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSc
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimpleCandidateNodeSet;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
@@ -4199,7 +4199,8 @@ public class TestCapacityScheduler {
     scheduler.handle(new NodeRemovedSchedulerEvent(
         rm.getRMContext().getRMNodes().get(nm2.getNodeId())));
     // schedulerNode is removed, try allocate a container
-    scheduler.allocateContainersToNode(new SimplePlacementSet<>(node), true);
+    scheduler.allocateContainersToNode(new SimpleCandidateNodeSet<>(node),
+        true);
 
     AppAttemptRemovedSchedulerEvent appRemovedEvent1 =
         new AppAttemptRemovedSchedulerEvent(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac4d2b10/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
index e34665d..0fcc86d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
@@ -55,7 +55,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet;
 
 import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@@ -147,9 +147,9 @@ public class TestChildQueueOrder {
         // Next call - nothing
         if (allocation > 0) {
           doReturn(new CSAssignment(Resources.none(), type)).
-          when(queue)
-              .assignContainers(eq(clusterResource), any(PlacementSet.class),
-                  any(ResourceLimits.class), any(SchedulingMode.class));
+              when(queue).assignContainers(eq(clusterResource),
+              any(CandidateNodeSet.class), any(ResourceLimits.class),
+              any(SchedulingMode.class));
 
           // Mock the node's resource availability
           Resource available = node.getUnallocatedResource();
@@ -159,9 +159,9 @@ public class TestChildQueueOrder {
 
         return new CSAssignment(allocatedResource, type);
       }
-    }).
-    when(queue).assignContainers(eq(clusterResource), any(PlacementSet.class),
-        any(ResourceLimits.class), any(SchedulingMode.class));
+    }).when(queue).assignContainers(eq(clusterResource),
+        any(CandidateNodeSet.class), any(ResourceLimits.class),
+        any(SchedulingMode.class));
     doNothing().when(node).releaseContainer(any(ContainerId.class),
         anyBoolean());
   }
@@ -425,10 +425,10 @@ public class TestChildQueueOrder {
         clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     InOrder allocationOrder = inOrder(d,b);
     allocationOrder.verify(d).assignContainers(eq(clusterResource),
-        any(PlacementSet.class), any(ResourceLimits.class),
+        any(CandidateNodeSet.class), any(ResourceLimits.class),
         any(SchedulingMode.class));
     allocationOrder.verify(b).assignContainers(eq(clusterResource),
-        any(PlacementSet.class), any(ResourceLimits.class),
+        any(CandidateNodeSet.class), any(ResourceLimits.class),
         any(SchedulingMode.class));
     verifyQueueMetrics(a, 3*GB, clusterResource);
     verifyQueueMetrics(b, 2*GB, clusterResource);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac4d2b10/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java
index 541539d..eacbf6e 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java
@@ -60,7 +60,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica
     .FiCaSchedulerNode;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.Assert;
 import org.junit.Before;
@@ -88,13 +88,14 @@ public class TestContainerResizing {
 
     @Override
     public CSAssignment allocateContainersToNode(
-        PlacementSet<FiCaSchedulerNode> ps, boolean withNodeHeartbeat) {
+        CandidateNodeSet<FiCaSchedulerNode> candidates,
+        boolean withNodeHeartbeat) {
       try {
         Thread.sleep(1000);
       } catch(InterruptedException e) {
         LOG.debug("Thread interrupted.");
       }
-      return super.allocateContainersToNode(ps, withNodeHeartbeat);
+      return super.allocateContainersToNode(candidates, withNodeHeartbeat);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac4d2b10/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
index 740ef33..71fddfc 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
@@ -656,7 +656,7 @@ public class TestNodeLabelContainerAllocation {
       if (key.getPriority().getPriority() == priority) {
         Assert.assertEquals("Expected partition is " + expectedPartition,
             expectedPartition,
-            info.getSchedulingPlacementSet(key)
+            info.getAppPlacementAllocator(key)
                 .getPrimaryRequestedNodePartition());
       }
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac4d2b10/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
index cdbbc51..a9196d1 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
@@ -29,7 +29,6 @@ import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.reset;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.when;
 
 import java.util.HashMap;
@@ -54,7 +53,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCo
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -181,8 +180,9 @@ public class TestParentQueue {
         // Next call - nothing
         if (allocation > 0) {
           doReturn(new CSAssignment(Resources.none(), type)).when(queue)
-              .assignContainers(eq(clusterResource), any(PlacementSet.class),
-                  any(ResourceLimits.class), any(SchedulingMode.class));
+              .assignContainers(eq(clusterResource),
+                  any(CandidateNodeSet.class), any(ResourceLimits.class),
+                  any(SchedulingMode.class));
 
           // Mock the node's resource availability
           Resource available = node.getUnallocatedResource();
@@ -192,8 +192,9 @@ public class TestParentQueue {
 
         return new CSAssignment(allocatedResource, type);
       }
-    }).when(queue).assignContainers(eq(clusterResource), 
any(PlacementSet.class),
-        any(ResourceLimits.class), any(SchedulingMode.class));
+    }).when(queue).assignContainers(eq(clusterResource),
+        any(CandidateNodeSet.class), any(ResourceLimits.class),
+        any(SchedulingMode.class));
   }
   
   private float computeQueueAbsoluteUsedCapacity(CSQueue queue, 
@@ -274,13 +275,14 @@ public class TestParentQueue {
         SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     InOrder allocationOrder = inOrder(a, b);
     allocationOrder.verify(a).assignContainers(eq(clusterResource),
-        any(PlacementSet.class), anyResourceLimits(),
+        any(CandidateNodeSet.class), anyResourceLimits(),
         any(SchedulingMode.class));
     root.assignContainers(clusterResource, node_1,
         new ResourceLimits(clusterResource),
         SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
-    allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(PlacementSet.class), anyResourceLimits(), 
any(SchedulingMode.class));
+    allocationOrder.verify(b).assignContainers(eq(clusterResource),
+        any(CandidateNodeSet.class), anyResourceLimits(),
+        any(SchedulingMode.class));
     verifyQueueMetrics(a, 2*GB, clusterResource);
     verifyQueueMetrics(b, 2*GB, clusterResource);
 
@@ -293,10 +295,12 @@ public class TestParentQueue {
     root.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource), 
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     allocationOrder = inOrder(b, a);
-    allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(PlacementSet.class), anyResourceLimits(), 
any(SchedulingMode.class));
-    allocationOrder.verify(a).assignContainers(eq(clusterResource), 
-        any(PlacementSet.class), anyResourceLimits(), 
any(SchedulingMode.class));
+    allocationOrder.verify(b).assignContainers(eq(clusterResource),
+        any(CandidateNodeSet.class), anyResourceLimits(),
+        any(SchedulingMode.class));
+    allocationOrder.verify(a).assignContainers(eq(clusterResource),
+        any(CandidateNodeSet.class), anyResourceLimits(),
+        any(SchedulingMode.class));
     verifyQueueMetrics(a, 3*GB, clusterResource);
     verifyQueueMetrics(b, 4*GB, clusterResource);
 
@@ -307,10 +311,12 @@ public class TestParentQueue {
     root.assignContainers(clusterResource, node_0, 
         new ResourceLimits(clusterResource), 
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     allocationOrder = inOrder(b, a);
-    allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(PlacementSet.class), anyResourceLimits(), 
any(SchedulingMode.class));
-    allocationOrder.verify(a).assignContainers(eq(clusterResource), 
-        any(PlacementSet.class), anyResourceLimits(), 
any(SchedulingMode.class));
+    allocationOrder.verify(b).assignContainers(eq(clusterResource),
+        any(CandidateNodeSet.class), anyResourceLimits(),
+        any(SchedulingMode.class));
+    allocationOrder.verify(a).assignContainers(eq(clusterResource),
+        any(CandidateNodeSet.class), anyResourceLimits(),
+        any(SchedulingMode.class));
     verifyQueueMetrics(a, 3*GB, clusterResource);
     verifyQueueMetrics(b, 8*GB, clusterResource);
 
@@ -325,10 +331,12 @@ public class TestParentQueue {
         new ResourceLimits(clusterResource),
         SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     allocationOrder = inOrder(a, b);
-    allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(PlacementSet.class), anyResourceLimits(), 
any(SchedulingMode.class));
-    allocationOrder.verify(a).assignContainers(eq(clusterResource), 
-        any(PlacementSet.class), anyResourceLimits(), 
any(SchedulingMode.class));
+    allocationOrder.verify(b).assignContainers(eq(clusterResource),
+        any(CandidateNodeSet.class), anyResourceLimits(),
+        any(SchedulingMode.class));
+    allocationOrder.verify(a).assignContainers(eq(clusterResource),
+        any(CandidateNodeSet.class), anyResourceLimits(),
+        any(SchedulingMode.class));
     verifyQueueMetrics(a, 4*GB, clusterResource);
     verifyQueueMetrics(b, 9*GB, clusterResource);
   }
@@ -547,22 +555,25 @@ public class TestParentQueue {
     root.assignContainers(clusterResource, node_0, 
         new ResourceLimits(clusterResource), 
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     InOrder allocationOrder = inOrder(a, c, b);
-    allocationOrder.verify(a).assignContainers(eq(clusterResource), 
-        any(PlacementSet.class), anyResourceLimits(), 
any(SchedulingMode.class));
-    applyAllocationToQueue(clusterResource, 1*GB, a);
+    allocationOrder.verify(a).assignContainers(eq(clusterResource),
+        any(CandidateNodeSet.class), anyResourceLimits(),
+        any(SchedulingMode.class));
+    applyAllocationToQueue(clusterResource, 1 * GB, a);
 
     root.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource),
         SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
-    allocationOrder.verify(c).assignContainers(eq(clusterResource), 
-        any(PlacementSet.class), anyResourceLimits(), 
any(SchedulingMode.class));
-    applyAllocationToQueue(clusterResource, 2*GB, root);
+    allocationOrder.verify(c).assignContainers(eq(clusterResource),
+        any(CandidateNodeSet.class), anyResourceLimits(),
+        any(SchedulingMode.class));
+    applyAllocationToQueue(clusterResource, 2 * GB, root);
 
     root.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource),
         SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     allocationOrder.verify(b).assignContainers(eq(clusterResource),
-        any(PlacementSet.class), anyResourceLimits(), 
any(SchedulingMode.class));
+        any(CandidateNodeSet.class), anyResourceLimits(),
+        any(SchedulingMode.class));
     applyAllocationToQueue(clusterResource, 2*GB, b);
     verifyQueueMetrics(a, 1*GB, clusterResource);
     verifyQueueMetrics(b, 6*GB, clusterResource);
@@ -586,24 +597,28 @@ public class TestParentQueue {
         new ResourceLimits(clusterResource),
         SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     allocationOrder = inOrder(a, a2, a1, b, c);
-    allocationOrder.verify(a).assignContainers(eq(clusterResource), 
-        any(PlacementSet.class), anyResourceLimits(), 
any(SchedulingMode.class));
-    allocationOrder.verify(a2).assignContainers(eq(clusterResource), 
-        any(PlacementSet.class), anyResourceLimits(), 
any(SchedulingMode.class));
+    allocationOrder.verify(a).assignContainers(eq(clusterResource),
+        any(CandidateNodeSet.class), anyResourceLimits(),
+        any(SchedulingMode.class));
+    allocationOrder.verify(a2).assignContainers(eq(clusterResource),
+        any(CandidateNodeSet.class), anyResourceLimits(),
+        any(SchedulingMode.class));
     applyAllocationToQueue(clusterResource, 2*GB, a);
 
     root.assignContainers(clusterResource, node_2,
         new ResourceLimits(clusterResource),
         SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
-    allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(PlacementSet.class), anyResourceLimits(), 
any(SchedulingMode.class));
+    allocationOrder.verify(b).assignContainers(eq(clusterResource),
+        any(CandidateNodeSet.class), anyResourceLimits(),
+        any(SchedulingMode.class));
     applyAllocationToQueue(clusterResource, 2*GB, b);
 
     root.assignContainers(clusterResource, node_2,
         new ResourceLimits(clusterResource),
         SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
-    allocationOrder.verify(c).assignContainers(eq(clusterResource), 
-        any(PlacementSet.class), anyResourceLimits(), 
any(SchedulingMode.class));
+    allocationOrder.verify(c).assignContainers(eq(clusterResource),
+        any(CandidateNodeSet.class), anyResourceLimits(),
+        any(SchedulingMode.class));
     verifyQueueMetrics(a, 3*GB, clusterResource);
     verifyQueueMetrics(b, 8*GB, clusterResource);
     verifyQueueMetrics(c, 4*GB, clusterResource);
@@ -720,12 +735,14 @@ public class TestParentQueue {
         new ResourceLimits(clusterResource), 
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     InOrder allocationOrder = inOrder(a);
     allocationOrder.verify(a).assignContainers(eq(clusterResource),
-        any(PlacementSet.class), anyResourceLimits(), 
any(SchedulingMode.class));
+        any(CandidateNodeSet.class), anyResourceLimits(),
+        any(SchedulingMode.class));
     root.assignContainers(clusterResource, node_1,
         new ResourceLimits(clusterResource), 
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     allocationOrder = inOrder(b);
     allocationOrder.verify(b).assignContainers(eq(clusterResource),
-        any(PlacementSet.class), anyResourceLimits(), 
any(SchedulingMode.class));
+        any(CandidateNodeSet.class), anyResourceLimits(),
+        any(SchedulingMode.class));
     verifyQueueMetrics(a, 2*GB, clusterResource);
     verifyQueueMetrics(b, 2*GB, clusterResource);
 
@@ -738,9 +755,11 @@ public class TestParentQueue {
         new ResourceLimits(clusterResource), 
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     allocationOrder = inOrder(b, a);
     allocationOrder.verify(b).assignContainers(eq(clusterResource),
-        any(PlacementSet.class), anyResourceLimits(), 
any(SchedulingMode.class));
+        any(CandidateNodeSet.class), anyResourceLimits(),
+        any(SchedulingMode.class));
     allocationOrder.verify(a).assignContainers(eq(clusterResource),
-        any(PlacementSet.class), anyResourceLimits(), 
any(SchedulingMode.class));
+        any(CandidateNodeSet.class), anyResourceLimits(),
+        any(SchedulingMode.class));
     verifyQueueMetrics(a, 2*GB, clusterResource);
     verifyQueueMetrics(b, 4*GB, clusterResource);
 
@@ -800,10 +819,12 @@ public class TestParentQueue {
     root.assignContainers(clusterResource, node_1,
         new ResourceLimits(clusterResource), 
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     InOrder allocationOrder = inOrder(b2, b3);
-    allocationOrder.verify(b2).assignContainers(eq(clusterResource), 
-        any(PlacementSet.class), anyResourceLimits(), 
any(SchedulingMode.class));
-    allocationOrder.verify(b3).assignContainers(eq(clusterResource), 
-        any(PlacementSet.class), anyResourceLimits(), 
any(SchedulingMode.class));
+    allocationOrder.verify(b2).assignContainers(eq(clusterResource),
+        any(CandidateNodeSet.class), anyResourceLimits(),
+        any(SchedulingMode.class));
+    allocationOrder.verify(b3).assignContainers(eq(clusterResource),
+        any(CandidateNodeSet.class), anyResourceLimits(),
+        any(SchedulingMode.class));
     verifyQueueMetrics(b2, 1*GB, clusterResource);
     verifyQueueMetrics(b3, 2*GB, clusterResource);
     
@@ -815,10 +836,12 @@ public class TestParentQueue {
     root.assignContainers(clusterResource, node_0, 
         new ResourceLimits(clusterResource), 
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     allocationOrder = inOrder(b3, b2);
-    allocationOrder.verify(b3).assignContainers(eq(clusterResource), 
-        any(PlacementSet.class), anyResourceLimits(), 
any(SchedulingMode.class));
-    allocationOrder.verify(b2).assignContainers(eq(clusterResource), 
-        any(PlacementSet.class), anyResourceLimits(), 
any(SchedulingMode.class));
+    allocationOrder.verify(b3).assignContainers(eq(clusterResource),
+        any(CandidateNodeSet.class), anyResourceLimits(),
+        any(SchedulingMode.class));
+    allocationOrder.verify(b2).assignContainers(eq(clusterResource),
+        any(CandidateNodeSet.class), anyResourceLimits(),
+        any(SchedulingMode.class));
     verifyQueueMetrics(b2, 1*GB, clusterResource);
     verifyQueueMetrics(b3, 3*GB, clusterResource);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to