Repository: hadoop Updated Branches: refs/heads/apache-trunk [created] ac4d2b108
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac4d2b10/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java new file mode 100644 index 0000000..7f89435 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java @@ -0,0 +1,422 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; + +import org.apache.commons.collections.IteratorUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * This is an implementation of the {@link AppPlacementAllocator} that takes + * into account locality preferences (node, rack, any) when allocating + * containers. + */ +public class LocalityAppPlacementAllocator<N extends SchedulerNode> + implements AppPlacementAllocator<N> { + private static final Log LOG = + LogFactory.getLog(LocalityAppPlacementAllocator.class); + + private final Map<String, ResourceRequest> resourceRequestMap = + new ConcurrentHashMap<>(); + private AppSchedulingInfo appSchedulingInfo; + private volatile String primaryRequestedPartition = + RMNodeLabelsManager.NO_LABEL; + + private final ReentrantReadWriteLock.ReadLock readLock; + private final ReentrantReadWriteLock.WriteLock writeLock; + + public LocalityAppPlacementAllocator(AppSchedulingInfo info) { + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + readLock = lock.readLock(); + writeLock = lock.writeLock(); + this.appSchedulingInfo = info; + } + + @Override + @SuppressWarnings("unchecked") + public Iterator<N> getPreferredNodeIterator( + CandidateNodeSet<N> candidateNodeSet) { + // Now only handle the case that single node in the candidateNodeSet + // TODO, Add support to multi-hosts inside candidateNodeSet which is passed + // in. + + N singleNode = CandidateNodeSetUtils.getSingleNode(candidateNodeSet); + if (null != singleNode) { + return IteratorUtils.singletonIterator(singleNode); + } + + return IteratorUtils.emptyIterator(); + } + + private boolean hasRequestLabelChanged(ResourceRequest requestOne, + ResourceRequest requestTwo) { + String requestOneLabelExp = requestOne.getNodeLabelExpression(); + String requestTwoLabelExp = requestTwo.getNodeLabelExpression(); + // First request label expression can be null and second request + // is not null then we have to consider it as changed. + if ((null == requestOneLabelExp) && (null != requestTwoLabelExp)) { + return true; + } + // If the label is not matching between both request when + // requestOneLabelExp is not null. + return ((null != requestOneLabelExp) && !(requestOneLabelExp + .equals(requestTwoLabelExp))); + } + + private void updateNodeLabels(ResourceRequest request) { + String resourceName = request.getResourceName(); + if (resourceName.equals(ResourceRequest.ANY)) { + ResourceRequest previousAnyRequest = + getResourceRequest(resourceName); + + // When there is change in ANY request label expression, we should + // update label for all resource requests already added of same + // priority as ANY resource request. + if ((null == previousAnyRequest) || hasRequestLabelChanged( + previousAnyRequest, request)) { + for (ResourceRequest r : resourceRequestMap.values()) { + if (!r.getResourceName().equals(ResourceRequest.ANY)) { + r.setNodeLabelExpression(request.getNodeLabelExpression()); + } + } + } + } else{ + ResourceRequest anyRequest = getResourceRequest(ResourceRequest.ANY); + if (anyRequest != null) { + request.setNodeLabelExpression(anyRequest.getNodeLabelExpression()); + } + } + } + + @Override + public ResourceRequestUpdateResult updateResourceRequests( + Collection<ResourceRequest> requests, + boolean recoverPreemptedRequestForAContainer) { + try { + this.writeLock.lock(); + + ResourceRequestUpdateResult updateResult = null; + + // Update resource requests + for (ResourceRequest request : requests) { + String resourceName = request.getResourceName(); + + // Update node labels if required + updateNodeLabels(request); + + // Increment number of containers if recovering preempted resources + ResourceRequest lastRequest = resourceRequestMap.get(resourceName); + if (recoverPreemptedRequestForAContainer && lastRequest != null) { + request.setNumContainers(lastRequest.getNumContainers() + 1); + } + + // Update asks + resourceRequestMap.put(resourceName, request); + + if (resourceName.equals(ResourceRequest.ANY)) { + String partition = request.getNodeLabelExpression() == null ? + RMNodeLabelsManager.NO_LABEL : + request.getNodeLabelExpression(); + + this.primaryRequestedPartition = partition; + + //update the applications requested labels set + appSchedulingInfo.addRequestedPartition(partition); + + updateResult = new ResourceRequestUpdateResult(lastRequest, request); + } + } + return updateResult; + } finally { + this.writeLock.unlock(); + } + } + + @Override + public Map<String, ResourceRequest> getResourceRequests() { + return resourceRequestMap; + } + + private ResourceRequest getResourceRequest(String resourceName) { + return resourceRequestMap.get(resourceName); + } + + @Override + public PendingAsk getPendingAsk(String resourceName) { + try { + readLock.lock(); + ResourceRequest request = getResourceRequest(resourceName); + if (null == request) { + return PendingAsk.ZERO; + } else{ + return new PendingAsk(request.getCapability(), + request.getNumContainers()); + } + } finally { + readLock.unlock(); + } + + } + + @Override + public int getOutstandingAsksCount(String resourceName) { + try { + readLock.lock(); + ResourceRequest request = getResourceRequest(resourceName); + if (null == request) { + return 0; + } else{ + return request.getNumContainers(); + } + } finally { + readLock.unlock(); + } + + } + + private void decrementOutstanding(SchedulerRequestKey schedulerRequestKey, + ResourceRequest offSwitchRequest) { + int numOffSwitchContainers = offSwitchRequest.getNumContainers() - 1; + offSwitchRequest.setNumContainers(numOffSwitchContainers); + + // Do we have any outstanding requests? + // If there is nothing, we need to deactivate this application + if (numOffSwitchContainers == 0) { + appSchedulingInfo.getSchedulerKeys().remove(schedulerRequestKey); + appSchedulingInfo.checkForDeactivation(); + resourceRequestMap.remove(ResourceRequest.ANY); + if (resourceRequestMap.isEmpty()) { + appSchedulingInfo.removeAppPlacement(schedulerRequestKey); + } + } + + appSchedulingInfo.decPendingResource( + offSwitchRequest.getNodeLabelExpression(), + offSwitchRequest.getCapability()); + } + + public ResourceRequest cloneResourceRequest(ResourceRequest request) { + ResourceRequest newRequest = ResourceRequest.newBuilder() + .priority(request.getPriority()) + .allocationRequestId(request.getAllocationRequestId()) + .resourceName(request.getResourceName()) + .capability(request.getCapability()) + .numContainers(1) + .relaxLocality(request.getRelaxLocality()) + .nodeLabelExpression(request.getNodeLabelExpression()).build(); + return newRequest; + } + + /** + * The {@link ResourceScheduler} is allocating data-local resources to the + * application. + */ + private void allocateRackLocal(SchedulerRequestKey schedulerKey, + SchedulerNode node, ResourceRequest rackLocalRequest, + List<ResourceRequest> resourceRequests) { + // Update future requirements + decResourceRequest(node.getRackName(), rackLocalRequest); + + ResourceRequest offRackRequest = resourceRequestMap.get( + ResourceRequest.ANY); + decrementOutstanding(schedulerKey, offRackRequest); + + // Update cloned RackLocal and OffRack requests for recovery + resourceRequests.add(cloneResourceRequest(rackLocalRequest)); + resourceRequests.add(cloneResourceRequest(offRackRequest)); + } + + /** + * The {@link ResourceScheduler} is allocating data-local resources to the + * application. + */ + private void allocateOffSwitch(SchedulerRequestKey schedulerKey, + ResourceRequest offSwitchRequest, + List<ResourceRequest> resourceRequests) { + // Update future requirements + decrementOutstanding(schedulerKey, offSwitchRequest); + // Update cloned OffRack requests for recovery + resourceRequests.add(cloneResourceRequest(offSwitchRequest)); + } + + + /** + * The {@link ResourceScheduler} is allocating data-local resources to the + * application. + */ + private void allocateNodeLocal(SchedulerRequestKey schedulerKey, + SchedulerNode node, ResourceRequest nodeLocalRequest, + List<ResourceRequest> resourceRequests) { + // Update future requirements + decResourceRequest(node.getNodeName(), nodeLocalRequest); + + ResourceRequest rackLocalRequest = resourceRequestMap.get( + node.getRackName()); + decResourceRequest(node.getRackName(), rackLocalRequest); + + ResourceRequest offRackRequest = resourceRequestMap.get( + ResourceRequest.ANY); + decrementOutstanding(schedulerKey, offRackRequest); + + // Update cloned NodeLocal, RackLocal and OffRack requests for recovery + resourceRequests.add(cloneResourceRequest(nodeLocalRequest)); + resourceRequests.add(cloneResourceRequest(rackLocalRequest)); + resourceRequests.add(cloneResourceRequest(offRackRequest)); + } + + private void decResourceRequest(String resourceName, + ResourceRequest request) { + request.setNumContainers(request.getNumContainers() - 1); + if (request.getNumContainers() == 0) { + resourceRequestMap.remove(resourceName); + } + } + + @Override + public boolean canAllocate(NodeType type, SchedulerNode node) { + try { + readLock.lock(); + ResourceRequest r = resourceRequestMap.get( + ResourceRequest.ANY); + if (r == null || r.getNumContainers() <= 0) { + return false; + } + if (type == NodeType.RACK_LOCAL || type == NodeType.NODE_LOCAL) { + r = resourceRequestMap.get(node.getRackName()); + if (r == null || r.getNumContainers() <= 0) { + return false; + } + if (type == NodeType.NODE_LOCAL) { + r = resourceRequestMap.get(node.getNodeName()); + if (r == null || r.getNumContainers() <= 0) { + return false; + } + } + } + + return true; + } finally { + readLock.unlock(); + } + } + + @Override + public boolean canDelayTo(String resourceName) { + try { + readLock.lock(); + ResourceRequest request = getResourceRequest(resourceName); + return request == null || request.getRelaxLocality(); + } finally { + readLock.unlock(); + } + + } + + @Override + public boolean acceptNodePartition(String nodePartition, + SchedulingMode schedulingMode) { + // We will only look at node label = nodeLabelToLookAt according to + // schedulingMode and partition of node. + String nodePartitionToLookAt; + if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) { + nodePartitionToLookAt = nodePartition; + } else { + nodePartitionToLookAt = RMNodeLabelsManager.NO_LABEL; + } + + return primaryRequestedPartition.equals(nodePartitionToLookAt); + } + + @Override + public String getPrimaryRequestedNodePartition() { + return primaryRequestedPartition; + } + + @Override + public int getUniqueLocationAsks() { + return resourceRequestMap.size(); + } + + @Override + public void showRequests() { + for (ResourceRequest request : resourceRequestMap.values()) { + if (request.getNumContainers() > 0) { + LOG.debug("\tRequest=" + request); + } + } + } + + @Override + public List<ResourceRequest> allocate(SchedulerRequestKey schedulerKey, + NodeType type, SchedulerNode node) { + try { + writeLock.lock(); + + List<ResourceRequest> resourceRequests = new ArrayList<>(); + + ResourceRequest request; + if (type == NodeType.NODE_LOCAL) { + request = resourceRequestMap.get(node.getNodeName()); + } else if (type == NodeType.RACK_LOCAL) { + request = resourceRequestMap.get(node.getRackName()); + } else{ + request = resourceRequestMap.get(ResourceRequest.ANY); + } + + if (type == NodeType.NODE_LOCAL) { + allocateNodeLocal(schedulerKey, node, request, resourceRequests); + } else if (type == NodeType.RACK_LOCAL) { + allocateRackLocal(schedulerKey, node, request, resourceRequests); + } else{ + allocateOffSwitch(schedulerKey, request, resourceRequests); + } + + return resourceRequests; + } finally { + writeLock.unlock(); + } + } + + @Override + public Iterator<String> getAcceptedResouceNames() { + try { + readLock.lock(); + return resourceRequestMap.keySet().iterator(); + } finally { + readLock.unlock(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac4d2b10/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalitySchedulingPlacementSet.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalitySchedulingPlacementSet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalitySchedulingPlacementSet.java deleted file mode 100644 index 6cc8cc7..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalitySchedulingPlacementSet.java +++ /dev/null @@ -1,416 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; - -import org.apache.commons.collections.IteratorUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; -import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -public class LocalitySchedulingPlacementSet<N extends SchedulerNode> - implements SchedulingPlacementSet<N> { - private static final Log LOG = - LogFactory.getLog(LocalitySchedulingPlacementSet.class); - - private final Map<String, ResourceRequest> resourceRequestMap = - new ConcurrentHashMap<>(); - private AppSchedulingInfo appSchedulingInfo; - private volatile String primaryRequestedPartition = - RMNodeLabelsManager.NO_LABEL; - - private final ReentrantReadWriteLock.ReadLock readLock; - private final ReentrantReadWriteLock.WriteLock writeLock; - - public LocalitySchedulingPlacementSet(AppSchedulingInfo info) { - ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - readLock = lock.readLock(); - writeLock = lock.writeLock(); - this.appSchedulingInfo = info; - } - - @Override - @SuppressWarnings("unchecked") - public Iterator<N> getPreferredNodeIterator( - PlacementSet<N> clusterPlacementSet) { - // Now only handle the case that single node in placementSet - // TODO, Add support to multi-hosts inside placement-set which is passed in. - - N singleNode = PlacementSetUtils.getSingleNode(clusterPlacementSet); - if (null != singleNode) { - return IteratorUtils.singletonIterator(singleNode); - } - - return IteratorUtils.emptyIterator(); - } - - private boolean hasRequestLabelChanged(ResourceRequest requestOne, - ResourceRequest requestTwo) { - String requestOneLabelExp = requestOne.getNodeLabelExpression(); - String requestTwoLabelExp = requestTwo.getNodeLabelExpression(); - // First request label expression can be null and second request - // is not null then we have to consider it as changed. - if ((null == requestOneLabelExp) && (null != requestTwoLabelExp)) { - return true; - } - // If the label is not matching between both request when - // requestOneLabelExp is not null. - return ((null != requestOneLabelExp) && !(requestOneLabelExp - .equals(requestTwoLabelExp))); - } - - private void updateNodeLabels(ResourceRequest request) { - String resourceName = request.getResourceName(); - if (resourceName.equals(ResourceRequest.ANY)) { - ResourceRequest previousAnyRequest = - getResourceRequest(resourceName); - - // When there is change in ANY request label expression, we should - // update label for all resource requests already added of same - // priority as ANY resource request. - if ((null == previousAnyRequest) || hasRequestLabelChanged( - previousAnyRequest, request)) { - for (ResourceRequest r : resourceRequestMap.values()) { - if (!r.getResourceName().equals(ResourceRequest.ANY)) { - r.setNodeLabelExpression(request.getNodeLabelExpression()); - } - } - } - } else{ - ResourceRequest anyRequest = getResourceRequest(ResourceRequest.ANY); - if (anyRequest != null) { - request.setNodeLabelExpression(anyRequest.getNodeLabelExpression()); - } - } - } - - @Override - public ResourceRequestUpdateResult updateResourceRequests( - Collection<ResourceRequest> requests, - boolean recoverPreemptedRequestForAContainer) { - try { - this.writeLock.lock(); - - ResourceRequestUpdateResult updateResult = null; - - // Update resource requests - for (ResourceRequest request : requests) { - String resourceName = request.getResourceName(); - - // Update node labels if required - updateNodeLabels(request); - - // Increment number of containers if recovering preempted resources - ResourceRequest lastRequest = resourceRequestMap.get(resourceName); - if (recoverPreemptedRequestForAContainer && lastRequest != null) { - request.setNumContainers(lastRequest.getNumContainers() + 1); - } - - // Update asks - resourceRequestMap.put(resourceName, request); - - if (resourceName.equals(ResourceRequest.ANY)) { - String partition = request.getNodeLabelExpression() == null ? - RMNodeLabelsManager.NO_LABEL : - request.getNodeLabelExpression(); - - this.primaryRequestedPartition = partition; - - //update the applications requested labels set - appSchedulingInfo.addRequestedPartition(partition); - - updateResult = new ResourceRequestUpdateResult(lastRequest, request); - } - } - return updateResult; - } finally { - this.writeLock.unlock(); - } - } - - @Override - public Map<String, ResourceRequest> getResourceRequests() { - return resourceRequestMap; - } - - private ResourceRequest getResourceRequest(String resourceName) { - return resourceRequestMap.get(resourceName); - } - - @Override - public PendingAsk getPendingAsk(String resourceName) { - try { - readLock.lock(); - ResourceRequest request = getResourceRequest(resourceName); - if (null == request) { - return PendingAsk.ZERO; - } else{ - return new PendingAsk(request.getCapability(), - request.getNumContainers()); - } - } finally { - readLock.unlock(); - } - - } - - @Override - public int getOutstandingAsksCount(String resourceName) { - try { - readLock.lock(); - ResourceRequest request = getResourceRequest(resourceName); - if (null == request) { - return 0; - } else{ - return request.getNumContainers(); - } - } finally { - readLock.unlock(); - } - - } - - private void decrementOutstanding(SchedulerRequestKey schedulerRequestKey, - ResourceRequest offSwitchRequest) { - int numOffSwitchContainers = offSwitchRequest.getNumContainers() - 1; - offSwitchRequest.setNumContainers(numOffSwitchContainers); - - // Do we have any outstanding requests? - // If there is nothing, we need to deactivate this application - if (numOffSwitchContainers == 0) { - appSchedulingInfo.getSchedulerKeys().remove(schedulerRequestKey); - appSchedulingInfo.checkForDeactivation(); - resourceRequestMap.remove(ResourceRequest.ANY); - if (resourceRequestMap.isEmpty()) { - appSchedulingInfo.removePlacementSets(schedulerRequestKey); - } - } - - appSchedulingInfo.decPendingResource( - offSwitchRequest.getNodeLabelExpression(), - offSwitchRequest.getCapability()); - } - - public ResourceRequest cloneResourceRequest(ResourceRequest request) { - ResourceRequest newRequest = ResourceRequest.newBuilder() - .priority(request.getPriority()) - .allocationRequestId(request.getAllocationRequestId()) - .resourceName(request.getResourceName()) - .capability(request.getCapability()) - .numContainers(1) - .relaxLocality(request.getRelaxLocality()) - .nodeLabelExpression(request.getNodeLabelExpression()).build(); - return newRequest; - } - - /** - * The {@link ResourceScheduler} is allocating data-local resources to the - * application. - */ - private void allocateRackLocal(SchedulerRequestKey schedulerKey, - SchedulerNode node, ResourceRequest rackLocalRequest, - List<ResourceRequest> resourceRequests) { - // Update future requirements - decResourceRequest(node.getRackName(), rackLocalRequest); - - ResourceRequest offRackRequest = resourceRequestMap.get( - ResourceRequest.ANY); - decrementOutstanding(schedulerKey, offRackRequest); - - // Update cloned RackLocal and OffRack requests for recovery - resourceRequests.add(cloneResourceRequest(rackLocalRequest)); - resourceRequests.add(cloneResourceRequest(offRackRequest)); - } - - /** - * The {@link ResourceScheduler} is allocating data-local resources to the - * application. - */ - private void allocateOffSwitch(SchedulerRequestKey schedulerKey, - ResourceRequest offSwitchRequest, - List<ResourceRequest> resourceRequests) { - // Update future requirements - decrementOutstanding(schedulerKey, offSwitchRequest); - // Update cloned OffRack requests for recovery - resourceRequests.add(cloneResourceRequest(offSwitchRequest)); - } - - - /** - * The {@link ResourceScheduler} is allocating data-local resources to the - * application. - */ - private void allocateNodeLocal(SchedulerRequestKey schedulerKey, - SchedulerNode node, ResourceRequest nodeLocalRequest, - List<ResourceRequest> resourceRequests) { - // Update future requirements - decResourceRequest(node.getNodeName(), nodeLocalRequest); - - ResourceRequest rackLocalRequest = resourceRequestMap.get( - node.getRackName()); - decResourceRequest(node.getRackName(), rackLocalRequest); - - ResourceRequest offRackRequest = resourceRequestMap.get( - ResourceRequest.ANY); - decrementOutstanding(schedulerKey, offRackRequest); - - // Update cloned NodeLocal, RackLocal and OffRack requests for recovery - resourceRequests.add(cloneResourceRequest(nodeLocalRequest)); - resourceRequests.add(cloneResourceRequest(rackLocalRequest)); - resourceRequests.add(cloneResourceRequest(offRackRequest)); - } - - private void decResourceRequest(String resourceName, - ResourceRequest request) { - request.setNumContainers(request.getNumContainers() - 1); - if (request.getNumContainers() == 0) { - resourceRequestMap.remove(resourceName); - } - } - - @Override - public boolean canAllocate(NodeType type, SchedulerNode node) { - try { - readLock.lock(); - ResourceRequest r = resourceRequestMap.get( - ResourceRequest.ANY); - if (r == null || r.getNumContainers() <= 0) { - return false; - } - if (type == NodeType.RACK_LOCAL || type == NodeType.NODE_LOCAL) { - r = resourceRequestMap.get(node.getRackName()); - if (r == null || r.getNumContainers() <= 0) { - return false; - } - if (type == NodeType.NODE_LOCAL) { - r = resourceRequestMap.get(node.getNodeName()); - if (r == null || r.getNumContainers() <= 0) { - return false; - } - } - } - - return true; - } finally { - readLock.unlock(); - } - } - - @Override - public boolean canDelayTo(String resourceName) { - try { - readLock.lock(); - ResourceRequest request = getResourceRequest(resourceName); - return request == null || request.getRelaxLocality(); - } finally { - readLock.unlock(); - } - - } - - @Override - public boolean acceptNodePartition(String nodePartition, - SchedulingMode schedulingMode) { - // We will only look at node label = nodeLabelToLookAt according to - // schedulingMode and partition of node. - String nodePartitionToLookAt; - if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) { - nodePartitionToLookAt = nodePartition; - } else { - nodePartitionToLookAt = RMNodeLabelsManager.NO_LABEL; - } - - return primaryRequestedPartition.equals(nodePartitionToLookAt); - } - - @Override - public String getPrimaryRequestedNodePartition() { - return primaryRequestedPartition; - } - - @Override - public int getUniqueLocationAsks() { - return resourceRequestMap.size(); - } - - @Override - public void showRequests() { - for (ResourceRequest request : resourceRequestMap.values()) { - if (request.getNumContainers() > 0) { - LOG.debug("\tRequest=" + request); - } - } - } - - @Override - public List<ResourceRequest> allocate(SchedulerRequestKey schedulerKey, - NodeType type, SchedulerNode node) { - try { - writeLock.lock(); - - List<ResourceRequest> resourceRequests = new ArrayList<>(); - - ResourceRequest request; - if (type == NodeType.NODE_LOCAL) { - request = resourceRequestMap.get(node.getNodeName()); - } else if (type == NodeType.RACK_LOCAL) { - request = resourceRequestMap.get(node.getRackName()); - } else{ - request = resourceRequestMap.get(ResourceRequest.ANY); - } - - if (type == NodeType.NODE_LOCAL) { - allocateNodeLocal(schedulerKey, node, request, resourceRequests); - } else if (type == NodeType.RACK_LOCAL) { - allocateRackLocal(schedulerKey, node, request, resourceRequests); - } else{ - allocateOffSwitch(schedulerKey, request, resourceRequests); - } - - return resourceRequests; - } finally { - writeLock.unlock(); - } - } - - @Override - public Iterator<String> getAcceptedResouceNames() { - try { - readLock.lock(); - return resourceRequestMap.keySet().iterator(); - } finally { - readLock.unlock(); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac4d2b10/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PlacementSet.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PlacementSet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PlacementSet.java deleted file mode 100644 index 2e6c3ca..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PlacementSet.java +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; - -import java.util.Iterator; -import java.util.Map; - -/** - * <p> - * PlacementSet is the central place that decide the order of node to fit - * asks by application. - * </p> - * - * <p> - * Also, PlacementSet can cache results (for example, ordered list) for - * better performance. - * </p> - * - * <p> - * PlacementSet can depend on one or more other PlacementSets. - * </p> - */ -@InterfaceAudience.Private -@InterfaceStability.Unstable -public interface PlacementSet<N extends SchedulerNode> { - /** - * Get all nodes for this PlacementSet - * @return all nodes for this PlacementSet - */ - Map<NodeId, N> getAllNodes(); - - /** - * Version of the PlacementSet, can help other PlacementSet with dependencies - * deciding if update is required - * @return version - */ - long getVersion(); - - /** - * Partition of the PlacementSet. - * @return node partition - */ - String getPartition(); -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac4d2b10/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PlacementSetUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PlacementSetUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PlacementSetUtils.java deleted file mode 100644 index 405122b..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PlacementSetUtils.java +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; - -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; - -public class PlacementSetUtils { - /* - * If the {@link PlacementSet} only has one entry, return it. otherwise - * return null - */ - public static <N extends SchedulerNode> N getSingleNode(PlacementSet<N> ps) { - N node = null; - if (1 == ps.getAllNodes().size()) { - node = ps.getAllNodes().values().iterator().next(); - } - - return node; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac4d2b10/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java deleted file mode 100644 index 3e0620e..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java +++ /dev/null @@ -1,158 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; - -import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; -import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; - -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -/** - * <p> - * Comparing to {@link PlacementSet}, this also maintains - * pending ResourceRequests: - * - When new ResourceRequest(s) added to scheduler, or, - * - Or new container allocated, scheduler can notify corresponding - * PlacementSet. - * </p> - * - * <p> - * Different set of resource requests (E.g., resource requests with the - * same schedulerKey) can have one instance of PlacementSet, each PlacementSet - * can have different ways to order nodes depends on requests. - * </p> - */ -public interface SchedulingPlacementSet<N extends SchedulerNode> { - /** - * Get iterator of preferred node depends on requirement and/or availability - * @param clusterPlacementSet input cluster PlacementSet - * @return iterator of preferred node - */ - Iterator<N> getPreferredNodeIterator(PlacementSet<N> clusterPlacementSet); - - /** - * Replace existing ResourceRequest by the new requests - * - * @param requests new ResourceRequests - * @param recoverPreemptedRequestForAContainer if we're recovering resource - * requests for preempted container - * @return true if total pending resource changed - */ - ResourceRequestUpdateResult updateResourceRequests( - Collection<ResourceRequest> requests, - boolean recoverPreemptedRequestForAContainer); - - /** - * Get pending ResourceRequests by given schedulerRequestKey - * @return Map of resourceName to ResourceRequest - */ - Map<String, ResourceRequest> getResourceRequests(); - - /** - * Get pending ask for given resourceName. If there's no such pendingAsk, - * returns {@link PendingAsk#ZERO} - * - * @param resourceName resourceName - * @return PendingAsk - */ - PendingAsk getPendingAsk(String resourceName); - - /** - * Get #pending-allocations for given resourceName. If there's no such - * pendingAsk, returns 0 - * - * @param resourceName resourceName - * @return #pending-allocations - */ - int getOutstandingAsksCount(String resourceName); - - /** - * Notify container allocated. - * @param schedulerKey SchedulerRequestKey for this ResourceRequest - * @param type Type of the allocation - * @param node Which node this container allocated on - * @return list of ResourceRequests deducted - */ - List<ResourceRequest> allocate(SchedulerRequestKey schedulerKey, - NodeType type, SchedulerNode node); - - /** - * Returns list of accepted resourceNames. - * @return Iterator of accepted resourceNames - */ - Iterator<String> getAcceptedResouceNames(); - - /** - * We can still have pending requirement for a given NodeType and node - * @param type Locality Type - * @param node which node we will allocate on - * @return true if we has pending requirement - */ - boolean canAllocate(NodeType type, SchedulerNode node); - - /** - * Can delay to give locality? - * TODO (wangda): This should be moved out of SchedulingPlacementSet - * and should belong to specific delay scheduling policy impl. - * - * @param resourceName resourceName - * @return can/cannot - */ - boolean canDelayTo(String resourceName); - - /** - * Does this {@link SchedulingPlacementSet} accept resources on nodePartition? - * - * @param nodePartition nodePartition - * @param schedulingMode schedulingMode - * @return accepted/not - */ - boolean acceptNodePartition(String nodePartition, - SchedulingMode schedulingMode); - - /** - * It is possible that one request can accept multiple node partition, - * So this method returns primary node partition for pending resource / - * headroom calculation. - * - * @return primary requested node partition - */ - String getPrimaryRequestedNodePartition(); - - /** - * @return number of unique location asks with #pending greater than 0, - * (like /rack1, host1, etc.). - * - * TODO (wangda): This should be moved out of SchedulingPlacementSet - * and should belong to specific delay scheduling policy impl. - */ - int getUniqueLocationAsks(); - - /** - * Print human-readable requests to LOG debug. - */ - void showRequests(); -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac4d2b10/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SimpleCandidateNodeSet.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SimpleCandidateNodeSet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SimpleCandidateNodeSet.java new file mode 100644 index 0000000..31a2170 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SimpleCandidateNodeSet.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; + +import com.google.common.collect.ImmutableMap; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeLabel; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; + +import java.util.Collections; +import java.util.Map; + +/** + * A simple CandidateNodeSet which keeps an unordered map + */ +public class SimpleCandidateNodeSet<N extends SchedulerNode> + implements CandidateNodeSet<N> { + + private Map<NodeId, N> map; + private String partition; + + public SimpleCandidateNodeSet(N node) { + if (null != node) { + // Only one node in the initial CandidateNodeSet + this.map = ImmutableMap.of(node.getNodeID(), node); + this.partition = node.getPartition(); + } else { + this.map = Collections.emptyMap(); + this.partition = NodeLabel.DEFAULT_NODE_LABEL_PARTITION; + } + } + + public SimpleCandidateNodeSet(Map<NodeId, N> map, String partition) { + this.map = map; + this.partition = partition; + } + + @Override + public Map<NodeId, N> getAllNodes() { + return map; + } + + @Override + public long getVersion() { + return 0L; + } + + @Override + public String getPartition() { + return partition; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac4d2b10/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SimplePlacementSet.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SimplePlacementSet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SimplePlacementSet.java deleted file mode 100644 index 48efaa1..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SimplePlacementSet.java +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; - -import com.google.common.collect.ImmutableMap; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.NodeLabel; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; - -import java.util.Collections; -import java.util.Iterator; -import java.util.Map; - -/** - * A simple PlacementSet which keeps an unordered map - */ -public class SimplePlacementSet<N extends SchedulerNode> - implements PlacementSet<N> { - - private Map<NodeId, N> map; - private String partition; - - public SimplePlacementSet(N node) { - if (null != node) { - // Only one node in the initial PlacementSet - this.map = ImmutableMap.of(node.getNodeID(), node); - this.partition = node.getPartition(); - } else { - this.map = Collections.emptyMap(); - this.partition = NodeLabel.DEFAULT_NODE_LABEL_PARTITION; - } - } - - public SimplePlacementSet(Map<NodeId, N> map, String partition) { - this.map = map; - this.partition = partition; - } - - @Override - public Map<NodeId, N> getAllNodes() { - return map; - } - - @Override - public long getVersion() { - return 0L; - } - - @Override - public String getPartition() { - return partition; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac4d2b10/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/package-info.java new file mode 100644 index 0000000..e8268f8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/package-info.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement + * contains classes related to application monitor. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac4d2b10/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 1dea4ee..800d023 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -140,7 +140,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSc import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimpleCandidateNodeSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; @@ -4199,7 +4199,8 @@ public class TestCapacityScheduler { scheduler.handle(new NodeRemovedSchedulerEvent( rm.getRMContext().getRMNodes().get(nm2.getNodeId()))); // schedulerNode is removed, try allocate a container - scheduler.allocateContainersToNode(new SimplePlacementSet<>(node), true); + scheduler.allocateContainersToNode(new SimpleCandidateNodeSet<>(node), + true); AppAttemptRemovedSchedulerEvent appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent( http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac4d2b10/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java index e34665d..0fcc86d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java @@ -55,7 +55,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -147,9 +147,9 @@ public class TestChildQueueOrder { // Next call - nothing if (allocation > 0) { doReturn(new CSAssignment(Resources.none(), type)). - when(queue) - .assignContainers(eq(clusterResource), any(PlacementSet.class), - any(ResourceLimits.class), any(SchedulingMode.class)); + when(queue).assignContainers(eq(clusterResource), + any(CandidateNodeSet.class), any(ResourceLimits.class), + any(SchedulingMode.class)); // Mock the node's resource availability Resource available = node.getUnallocatedResource(); @@ -159,9 +159,9 @@ public class TestChildQueueOrder { return new CSAssignment(allocatedResource, type); } - }). - when(queue).assignContainers(eq(clusterResource), any(PlacementSet.class), - any(ResourceLimits.class), any(SchedulingMode.class)); + }).when(queue).assignContainers(eq(clusterResource), + any(CandidateNodeSet.class), any(ResourceLimits.class), + any(SchedulingMode.class)); doNothing().when(node).releaseContainer(any(ContainerId.class), anyBoolean()); } @@ -425,10 +425,10 @@ public class TestChildQueueOrder { clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); InOrder allocationOrder = inOrder(d,b); allocationOrder.verify(d).assignContainers(eq(clusterResource), - any(PlacementSet.class), any(ResourceLimits.class), + any(CandidateNodeSet.class), any(ResourceLimits.class), any(SchedulingMode.class)); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(PlacementSet.class), any(ResourceLimits.class), + any(CandidateNodeSet.class), any(ResourceLimits.class), any(SchedulingMode.class)); verifyQueueMetrics(a, 3*GB, clusterResource); verifyQueueMetrics(b, 2*GB, clusterResource); http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac4d2b10/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java index 541539d..eacbf6e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java @@ -60,7 +60,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica .FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; import org.junit.Before; @@ -88,13 +88,14 @@ public class TestContainerResizing { @Override public CSAssignment allocateContainersToNode( - PlacementSet<FiCaSchedulerNode> ps, boolean withNodeHeartbeat) { + CandidateNodeSet<FiCaSchedulerNode> candidates, + boolean withNodeHeartbeat) { try { Thread.sleep(1000); } catch(InterruptedException e) { LOG.debug("Thread interrupted."); } - return super.allocateContainersToNode(ps, withNodeHeartbeat); + return super.allocateContainersToNode(candidates, withNodeHeartbeat); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac4d2b10/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java index 740ef33..71fddfc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java @@ -656,7 +656,7 @@ public class TestNodeLabelContainerAllocation { if (key.getPriority().getPriority() == priority) { Assert.assertEquals("Expected partition is " + expectedPartition, expectedPartition, - info.getSchedulingPlacementSet(key) + info.getAppPlacementAllocator(key) .getPrimaryRequestedNodePartition()); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac4d2b10/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java index cdbbc51..a9196d1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java @@ -29,7 +29,6 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.when; import java.util.HashMap; @@ -54,7 +53,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCo import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet; import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -181,8 +180,9 @@ public class TestParentQueue { // Next call - nothing if (allocation > 0) { doReturn(new CSAssignment(Resources.none(), type)).when(queue) - .assignContainers(eq(clusterResource), any(PlacementSet.class), - any(ResourceLimits.class), any(SchedulingMode.class)); + .assignContainers(eq(clusterResource), + any(CandidateNodeSet.class), any(ResourceLimits.class), + any(SchedulingMode.class)); // Mock the node's resource availability Resource available = node.getUnallocatedResource(); @@ -192,8 +192,9 @@ public class TestParentQueue { return new CSAssignment(allocatedResource, type); } - }).when(queue).assignContainers(eq(clusterResource), any(PlacementSet.class), - any(ResourceLimits.class), any(SchedulingMode.class)); + }).when(queue).assignContainers(eq(clusterResource), + any(CandidateNodeSet.class), any(ResourceLimits.class), + any(SchedulingMode.class)); } private float computeQueueAbsoluteUsedCapacity(CSQueue queue, @@ -274,13 +275,14 @@ public class TestParentQueue { SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); InOrder allocationOrder = inOrder(a, b); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), + any(CandidateNodeSet.class), anyResourceLimits(), any(SchedulingMode.class)); root.assignContainers(clusterResource, node_1, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); + allocationOrder.verify(b).assignContainers(eq(clusterResource), + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); verifyQueueMetrics(a, 2*GB, clusterResource); verifyQueueMetrics(b, 2*GB, clusterResource); @@ -293,10 +295,12 @@ public class TestParentQueue { root.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); allocationOrder = inOrder(b, a); - allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); - allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); + allocationOrder.verify(b).assignContainers(eq(clusterResource), + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); + allocationOrder.verify(a).assignContainers(eq(clusterResource), + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); verifyQueueMetrics(a, 3*GB, clusterResource); verifyQueueMetrics(b, 4*GB, clusterResource); @@ -307,10 +311,12 @@ public class TestParentQueue { root.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); allocationOrder = inOrder(b, a); - allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); - allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); + allocationOrder.verify(b).assignContainers(eq(clusterResource), + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); + allocationOrder.verify(a).assignContainers(eq(clusterResource), + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); verifyQueueMetrics(a, 3*GB, clusterResource); verifyQueueMetrics(b, 8*GB, clusterResource); @@ -325,10 +331,12 @@ public class TestParentQueue { new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); allocationOrder = inOrder(a, b); - allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); - allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); + allocationOrder.verify(b).assignContainers(eq(clusterResource), + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); + allocationOrder.verify(a).assignContainers(eq(clusterResource), + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); verifyQueueMetrics(a, 4*GB, clusterResource); verifyQueueMetrics(b, 9*GB, clusterResource); } @@ -547,22 +555,25 @@ public class TestParentQueue { root.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); InOrder allocationOrder = inOrder(a, c, b); - allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); - applyAllocationToQueue(clusterResource, 1*GB, a); + allocationOrder.verify(a).assignContainers(eq(clusterResource), + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); + applyAllocationToQueue(clusterResource, 1 * GB, a); root.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - allocationOrder.verify(c).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); - applyAllocationToQueue(clusterResource, 2*GB, root); + allocationOrder.verify(c).assignContainers(eq(clusterResource), + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); + applyAllocationToQueue(clusterResource, 2 * GB, root); root.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); applyAllocationToQueue(clusterResource, 2*GB, b); verifyQueueMetrics(a, 1*GB, clusterResource); verifyQueueMetrics(b, 6*GB, clusterResource); @@ -586,24 +597,28 @@ public class TestParentQueue { new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); allocationOrder = inOrder(a, a2, a1, b, c); - allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); - allocationOrder.verify(a2).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); + allocationOrder.verify(a).assignContainers(eq(clusterResource), + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); + allocationOrder.verify(a2).assignContainers(eq(clusterResource), + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); applyAllocationToQueue(clusterResource, 2*GB, a); root.assignContainers(clusterResource, node_2, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); + allocationOrder.verify(b).assignContainers(eq(clusterResource), + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); applyAllocationToQueue(clusterResource, 2*GB, b); root.assignContainers(clusterResource, node_2, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - allocationOrder.verify(c).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); + allocationOrder.verify(c).assignContainers(eq(clusterResource), + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); verifyQueueMetrics(a, 3*GB, clusterResource); verifyQueueMetrics(b, 8*GB, clusterResource); verifyQueueMetrics(c, 4*GB, clusterResource); @@ -720,12 +735,14 @@ public class TestParentQueue { new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); InOrder allocationOrder = inOrder(a); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); root.assignContainers(clusterResource, node_1, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); allocationOrder = inOrder(b); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); verifyQueueMetrics(a, 2*GB, clusterResource); verifyQueueMetrics(b, 2*GB, clusterResource); @@ -738,9 +755,11 @@ public class TestParentQueue { new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); allocationOrder = inOrder(b, a); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); verifyQueueMetrics(a, 2*GB, clusterResource); verifyQueueMetrics(b, 4*GB, clusterResource); @@ -800,10 +819,12 @@ public class TestParentQueue { root.assignContainers(clusterResource, node_1, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); InOrder allocationOrder = inOrder(b2, b3); - allocationOrder.verify(b2).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); - allocationOrder.verify(b3).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); + allocationOrder.verify(b2).assignContainers(eq(clusterResource), + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); + allocationOrder.verify(b3).assignContainers(eq(clusterResource), + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); verifyQueueMetrics(b2, 1*GB, clusterResource); verifyQueueMetrics(b3, 2*GB, clusterResource); @@ -815,10 +836,12 @@ public class TestParentQueue { root.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); allocationOrder = inOrder(b3, b2); - allocationOrder.verify(b3).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); - allocationOrder.verify(b2).assignContainers(eq(clusterResource), - any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); + allocationOrder.verify(b3).assignContainers(eq(clusterResource), + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); + allocationOrder.verify(b2).assignContainers(eq(clusterResource), + any(CandidateNodeSet.class), anyResourceLimits(), + any(SchedulingMode.class)); verifyQueueMetrics(b2, 1*GB, clusterResource); verifyQueueMetrics(b3, 3*GB, clusterResource); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org