YARN-7437. Rename PlacementSet and SchedulingPlacementSet. (Wangda Tan via kkaranasos)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ac4d2b10 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ac4d2b10 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ac4d2b10 Branch: refs/heads/YARN-6592 Commit: ac4d2b1081d8836a21bc70e77f4e6cd2071a9949 Parents: a2c150a Author: Konstantinos Karanasos <kkarana...@apache.org> Authored: Thu Nov 9 13:01:14 2017 -0800 Committer: Konstantinos Karanasos <kkarana...@apache.org> Committed: Thu Nov 9 13:01:24 2017 -0800 ---------------------------------------------------------------------- .../scheduler/AppSchedulingInfo.java | 93 ++-- .../scheduler/ContainerUpdateContext.java | 15 +- .../scheduler/SchedulerApplicationAttempt.java | 27 +- .../scheduler/activities/ActivitiesLogger.java | 2 +- .../scheduler/capacity/AbstractCSQueue.java | 4 +- .../scheduler/capacity/CSQueue.java | 12 +- .../scheduler/capacity/CapacityScheduler.java | 69 +-- .../scheduler/capacity/LeafQueue.java | 39 +- .../scheduler/capacity/ParentQueue.java | 37 +- .../allocator/AbstractContainerAllocator.java | 10 +- .../capacity/allocator/ContainerAllocator.java | 11 +- .../allocator/RegularContainerAllocator.java | 57 +-- .../scheduler/common/fica/FiCaSchedulerApp.java | 23 +- .../scheduler/fair/FSAppAttempt.java | 2 +- .../placement/AppPlacementAllocator.java | 163 +++++++ .../scheduler/placement/CandidateNodeSet.java | 61 +++ .../placement/CandidateNodeSetUtils.java | 44 ++ .../LocalityAppPlacementAllocator.java | 422 +++++++++++++++++++ .../LocalitySchedulingPlacementSet.java | 416 ------------------ .../scheduler/placement/PlacementSet.java | 65 --- .../scheduler/placement/PlacementSetUtils.java | 36 -- .../placement/SchedulingPlacementSet.java | 158 ------- .../placement/SimpleCandidateNodeSet.java | 68 +++ .../scheduler/placement/SimplePlacementSet.java | 70 --- .../scheduler/placement/package-info.java | 28 ++ .../capacity/TestCapacityScheduler.java | 5 +- .../scheduler/capacity/TestChildQueueOrder.java | 18 +- .../capacity/TestContainerResizing.java | 7 +- .../TestNodeLabelContainerAllocation.java | 2 +- .../scheduler/capacity/TestParentQueue.java | 119 +++--- 30 files changed, 1082 insertions(+), 1001 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac4d2b10/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index 082ec14..9f49880 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -46,9 +46,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.LocalitySchedulingPlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.LocalityAppPlacementAllocator; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.ResourceRequestUpdateResult; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.util.resource.Resources; /** @@ -82,8 +82,8 @@ public class AppSchedulingInfo { private final ConcurrentSkipListSet<SchedulerRequestKey> schedulerKeys = new ConcurrentSkipListSet<>(); - final Map<SchedulerRequestKey, SchedulingPlacementSet<SchedulerNode>> - schedulerKeyToPlacementSets = new ConcurrentHashMap<>(); + private final Map<SchedulerRequestKey, AppPlacementAllocator<SchedulerNode>> + schedulerKeyToAppPlacementAllocator = new ConcurrentHashMap<>(); private final ReentrantReadWriteLock.ReadLock readLock; private final ReentrantReadWriteLock.WriteLock writeLock; @@ -146,7 +146,7 @@ public class AppSchedulingInfo { */ private void clearRequests() { schedulerKeys.clear(); - schedulerKeyToPlacementSets.clear(); + schedulerKeyToAppPlacementAllocator.clear(); LOG.info("Application " + applicationId + " requests cleared"); } @@ -190,9 +190,9 @@ public class AppSchedulingInfo { dedupRequests.get(schedulerKey).put(request.getResourceName(), request); } - // Update scheduling placement set + // Update AppPlacementAllocator by dedup requests. offswitchResourcesUpdated = - addToPlacementSets( + addRequestToAppPlacement( recoverPreemptedRequestForAContainer, dedupRequests); return offswitchResourcesUpdated; @@ -201,11 +201,11 @@ public class AppSchedulingInfo { } } - public void removePlacementSets(SchedulerRequestKey schedulerRequestKey) { - schedulerKeyToPlacementSets.remove(schedulerRequestKey); + public void removeAppPlacement(SchedulerRequestKey schedulerRequestKey) { + schedulerKeyToAppPlacementAllocator.remove(schedulerRequestKey); } - boolean addToPlacementSets( + boolean addRequestToAppPlacement( boolean recoverPreemptedRequestForAContainer, Map<SchedulerRequestKey, Map<String, ResourceRequest>> dedupRequests) { boolean offswitchResourcesUpdated = false; @@ -213,14 +213,15 @@ public class AppSchedulingInfo { dedupRequests.entrySet()) { SchedulerRequestKey schedulerRequestKey = entry.getKey(); - if (!schedulerKeyToPlacementSets.containsKey(schedulerRequestKey)) { - schedulerKeyToPlacementSets.put(schedulerRequestKey, - new LocalitySchedulingPlacementSet<>(this)); + if (!schedulerKeyToAppPlacementAllocator.containsKey( + schedulerRequestKey)) { + schedulerKeyToAppPlacementAllocator.put(schedulerRequestKey, + new LocalityAppPlacementAllocator<>(this)); } - // Update placement set + // Update AppPlacementAllocator ResourceRequestUpdateResult pendingAmountChanges = - schedulerKeyToPlacementSets.get(schedulerRequestKey) + schedulerKeyToAppPlacementAllocator.get(schedulerRequestKey) .updateResourceRequests( entry.getValue().values(), recoverPreemptedRequestForAContainer); @@ -244,7 +245,7 @@ public class AppSchedulingInfo { if (request.getNumContainers() <= 0) { if (lastRequestContainers >= 0) { schedulerKeys.remove(schedulerKey); - schedulerKeyToPlacementSets.remove(schedulerKey); + schedulerKeyToAppPlacementAllocator.remove(schedulerKey); } LOG.info("checking for deactivate of application :" + this.applicationId); @@ -356,8 +357,9 @@ public class AppSchedulingInfo { List<ResourceRequest> ret = new ArrayList<>(); try { this.readLock.lock(); - for (SchedulingPlacementSet ps : schedulerKeyToPlacementSets.values()) { - ret.addAll(ps.getResourceRequests().values()); + for (AppPlacementAllocator ap : schedulerKeyToAppPlacementAllocator + .values()) { + ret.addAll(ap.getResourceRequests().values()); } } finally { this.readLock.unlock(); @@ -384,8 +386,9 @@ public class AppSchedulingInfo { String resourceName) { try { this.readLock.lock(); - SchedulingPlacementSet ps = schedulerKeyToPlacementSets.get(schedulerKey); - return (ps == null) ? PendingAsk.ZERO : ps.getPendingAsk(resourceName); + AppPlacementAllocator ap = schedulerKeyToAppPlacementAllocator.get( + schedulerKey); + return (ap == null) ? PendingAsk.ZERO : ap.getPendingAsk(resourceName); } finally { this.readLock.unlock(); } @@ -424,7 +427,7 @@ public class AppSchedulingInfo { updateMetricsForAllocatedContainer(type, node, containerAllocated); } - return schedulerKeyToPlacementSets.get(schedulerKey).allocate( + return schedulerKeyToAppPlacementAllocator.get(schedulerKey).allocate( schedulerKey, type, node); } finally { writeLock.unlock(); @@ -442,23 +445,24 @@ public class AppSchedulingInfo { this.writeLock.lock(); QueueMetrics oldMetrics = queue.getMetrics(); QueueMetrics newMetrics = newQueue.getMetrics(); - for (SchedulingPlacementSet ps : schedulerKeyToPlacementSets.values()) { - PendingAsk ask = ps.getPendingAsk(ResourceRequest.ANY); + for (AppPlacementAllocator ap : schedulerKeyToAppPlacementAllocator + .values()) { + PendingAsk ask = ap.getPendingAsk(ResourceRequest.ANY); if (ask.getCount() > 0) { oldMetrics.decrPendingResources( - ps.getPrimaryRequestedNodePartition(), + ap.getPrimaryRequestedNodePartition(), user, ask.getCount(), ask.getPerAllocationResource()); newMetrics.incrPendingResources( - ps.getPrimaryRequestedNodePartition(), + ap.getPrimaryRequestedNodePartition(), user, ask.getCount(), ask.getPerAllocationResource()); Resource delta = Resources.multiply(ask.getPerAllocationResource(), ask.getCount()); // Update Queue queue.decPendingResource( - ps.getPrimaryRequestedNodePartition(), delta); + ap.getPrimaryRequestedNodePartition(), delta); newQueue.incPendingResource( - ps.getPrimaryRequestedNodePartition(), delta); + ap.getPrimaryRequestedNodePartition(), delta); } } oldMetrics.moveAppFrom(this); @@ -477,15 +481,16 @@ public class AppSchedulingInfo { try { this.writeLock.lock(); QueueMetrics metrics = queue.getMetrics(); - for (SchedulingPlacementSet ps : schedulerKeyToPlacementSets.values()) { - PendingAsk ask = ps.getPendingAsk(ResourceRequest.ANY); + for (AppPlacementAllocator ap : schedulerKeyToAppPlacementAllocator + .values()) { + PendingAsk ask = ap.getPendingAsk(ResourceRequest.ANY); if (ask.getCount() > 0) { - metrics.decrPendingResources(ps.getPrimaryRequestedNodePartition(), + metrics.decrPendingResources(ap.getPrimaryRequestedNodePartition(), user, ask.getCount(), ask.getPerAllocationResource()); // Update Queue queue.decPendingResource( - ps.getPrimaryRequestedNodePartition(), + ap.getPrimaryRequestedNodePartition(), Resources.multiply(ask.getPerAllocationResource(), ask.getCount())); } @@ -559,11 +564,12 @@ public class AppSchedulingInfo { SchedulerRequestKey schedulerKey) { try { readLock.lock(); - SchedulingPlacementSet ps = schedulerKeyToPlacementSets.get(schedulerKey); - if (null == ps) { + AppPlacementAllocator ap = schedulerKeyToAppPlacementAllocator.get( + schedulerKey); + if (null == ap) { return false; } - return ps.canAllocate(type, node); + return ap.canAllocate(type, node); } finally { readLock.unlock(); } @@ -593,11 +599,10 @@ public class AppSchedulingInfo { metrics.incrNodeTypeAggregations(user, type); } - // Get placement-set by specified schedulerKey - // Now simply return all node of the input clusterPlacementSet - public <N extends SchedulerNode> SchedulingPlacementSet<N> getSchedulingPlacementSet( + // Get AppPlacementAllocator by specified schedulerKey + public <N extends SchedulerNode> AppPlacementAllocator<N> getAppPlacementAllocator( SchedulerRequestKey schedulerkey) { - return (SchedulingPlacementSet<N>) schedulerKeyToPlacementSets.get( + return (AppPlacementAllocator<N>) schedulerKeyToAppPlacementAllocator.get( schedulerkey); } @@ -614,9 +619,9 @@ public class AppSchedulingInfo { SchedulerRequestKey schedulerKey, String resourceName) { try { this.readLock.lock(); - SchedulingPlacementSet ps = - schedulerKeyToPlacementSets.get(schedulerKey); - return (ps == null) || ps.canDelayTo(resourceName); + AppPlacementAllocator ap = + schedulerKeyToAppPlacementAllocator.get(schedulerKey); + return (ap == null) || ap.canDelayTo(resourceName); } finally { this.readLock.unlock(); } @@ -626,9 +631,9 @@ public class AppSchedulingInfo { String nodePartition, SchedulingMode schedulingMode) { try { this.readLock.lock(); - SchedulingPlacementSet ps = - schedulerKeyToPlacementSets.get(schedulerKey); - return (ps != null) && ps.acceptNodePartition(nodePartition, + AppPlacementAllocator ap = + schedulerKeyToAppPlacementAllocator.get(schedulerKey); + return (ap != null) && ap.acceptNodePartition(nodePartition, schedulingMode); } finally { this.readLock.unlock(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac4d2b10/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java index 5ac2ac5..93995a1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java @@ -34,8 +34,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer .RMContainerImpl; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement - .SchedulingPlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.util.resource.Resources; @@ -146,17 +145,17 @@ public class ContainerUpdateContext { createResourceRequests(rmContainer, schedulerNode, schedulerKey, resToIncrease); updateResReqs.put(schedulerKey, resMap); - appSchedulingInfo.addToPlacementSets(false, updateResReqs); + appSchedulingInfo.addRequestToAppPlacement(false, updateResReqs); } return true; } private void cancelPreviousRequest(SchedulerNode schedulerNode, SchedulerRequestKey schedulerKey) { - SchedulingPlacementSet<SchedulerNode> schedulingPlacementSet = - appSchedulingInfo.getSchedulingPlacementSet(schedulerKey); - if (schedulingPlacementSet != null) { - Map<String, ResourceRequest> resourceRequests = schedulingPlacementSet + AppPlacementAllocator<SchedulerNode> appPlacementAllocator = + appSchedulingInfo.getAppPlacementAllocator(schedulerKey); + if (appPlacementAllocator != null) { + Map<String, ResourceRequest> resourceRequests = appPlacementAllocator .getResourceRequests(); ResourceRequest prevReq = resourceRequests.get(ResourceRequest.ANY); // Decrement the pending using a dummy RR with @@ -290,7 +289,7 @@ public class ContainerUpdateContext { (rmContainer, node, schedulerKey, rmContainer.getContainer().getResource()); reqsToUpdate.put(schedulerKey, resMap); - appSchedulingInfo.addToPlacementSets(true, reqsToUpdate); + appSchedulingInfo.addRequestToAppPlacement(true, reqsToUpdate); return UNDEFINED; } return retVal; http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac4d2b10/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index ce71afa..346bd20 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -33,7 +33,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; -import com.google.common.collect.ConcurrentHashMultiset; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.time.DateUtils; import org.apache.commons.lang.time.FastDateFormat; @@ -75,14 +74,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRese import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerUpdatesAcquiredEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; - import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeUpdateContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; - -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity; - import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity; import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.state.InvalidStateTransitionException; @@ -91,6 +87,7 @@ import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.ConcurrentHashMultiset; /** * Represents an application attempt from the viewpoint of the scheduler. @@ -316,9 +313,9 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { String resourceName) { try { readLock.lock(); - SchedulingPlacementSet ps = appSchedulingInfo.getSchedulingPlacementSet( + AppPlacementAllocator ap = appSchedulingInfo.getAppPlacementAllocator( schedulerKey); - return ps == null ? 0 : ps.getOutstandingAsksCount(resourceName); + return ap == null ? 0 : ap.getOutstandingAsksCount(resourceName); } finally { readLock.unlock(); } @@ -617,13 +614,13 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { try { readLock.lock(); for (SchedulerRequestKey schedulerKey : getSchedulerKeys()) { - SchedulingPlacementSet ps = getSchedulingPlacementSet(schedulerKey); - if (ps != null && - ps.getOutstandingAsksCount(ResourceRequest.ANY) > 0) { + AppPlacementAllocator ap = getAppPlacementAllocator(schedulerKey); + if (ap != null && + ap.getOutstandingAsksCount(ResourceRequest.ANY) > 0) { LOG.debug("showRequests:" + " application=" + getApplicationId() + " headRoom=" + getHeadroom() + " currentConsumption=" + attemptResourceUsage.getUsed().getMemorySize()); - ps.showRequests(); + ap.showRequests(); } } } finally { @@ -1334,14 +1331,14 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { this.isAttemptRecovering = isRecovering; } - public <N extends SchedulerNode> SchedulingPlacementSet<N> getSchedulingPlacementSet( + public <N extends SchedulerNode> AppPlacementAllocator<N> getAppPlacementAllocator( SchedulerRequestKey schedulerRequestKey) { - return appSchedulingInfo.getSchedulingPlacementSet(schedulerRequestKey); + return appSchedulingInfo.getAppPlacementAllocator(schedulerRequestKey); } public Map<String, ResourceRequest> getResourceRequests( SchedulerRequestKey schedulerRequestKey) { - return appSchedulingInfo.getSchedulingPlacementSet(schedulerRequestKey) + return appSchedulingInfo.getAppPlacementAllocator(schedulerRequestKey) .getResourceRequests(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac4d2b10/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java index 12aff02..0c351b6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java @@ -32,7 +32,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; /** * Utility for logging scheduler activities */ -// FIXME: make sure PlacementSet works with this class +// FIXME: make sure CandidateNodeSet works with this class public class ActivitiesLogger { private static final Log LOG = LogFactory.getLog(ActivitiesLogger.class); http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac4d2b10/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 250f4e6..183cb36 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -65,7 +65,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCo import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimpleCandidateNodeSet; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -876,7 +876,7 @@ public abstract class AbstractCSQueue implements CSQueue { public CSAssignment assignContainers(Resource clusterResource, FiCaSchedulerNode node, ResourceLimits resourceLimits, SchedulingMode schedulingMode) { - return assignContainers(clusterResource, new SimplePlacementSet<>(node), + return assignContainers(clusterResource, new SimpleCandidateNodeSet<>(node), resourceLimits, schedulingMode); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac4d2b10/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index 3a17d1b..43e7f53 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -37,7 +37,6 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; import org.apache.hadoop.yarn.security.PrivilegedEntity; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; @@ -46,12 +45,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerQueue; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet; /** * <code>CSQueue</code> represents a node in the tree of @@ -188,15 +185,16 @@ public interface CSQueue extends SchedulerQueue<CSQueue> { /** * Assign containers to applications in the queue or it's children (if any). * @param clusterResource the resource of the cluster. - * @param ps {@link PlacementSet} of nodes which resources are available + * @param candidates {@link CandidateNodeSet} the nodes that are considered + * for the current placement. * @param resourceLimits how much overall resource of this queue can use. * @param schedulingMode Type of exclusive check when assign container on a * NodeManager, see {@link SchedulingMode}. * @return the assignment */ public CSAssignment assignContainers(Resource clusterResource, - PlacementSet<FiCaSchedulerNode> ps, ResourceLimits resourceLimits, - SchedulingMode schedulingMode); + CandidateNodeSet<FiCaSchedulerNode> candidates, + ResourceLimits resourceLimits, SchedulingMode schedulingMode); /** * A container assigned to the queue has completed. http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac4d2b10/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 5e172b8..6f630f8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -132,9 +132,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateS import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ReleaseContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSetUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimpleCandidateNodeSet; import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.utils.Lock; @@ -1183,7 +1183,7 @@ public class CapacityScheduler extends /** * We need to make sure when doing allocation, Node should be existed - * And we will construct a {@link PlacementSet} before proceeding + * And we will construct a {@link CandidateNodeSet} before proceeding */ private void allocateContainersToNode(NodeId nodeId, boolean withNodeHeartbeat) { @@ -1192,8 +1192,10 @@ public class CapacityScheduler extends int offswitchCount = 0; int assignedContainers = 0; - PlacementSet<FiCaSchedulerNode> ps = new SimplePlacementSet<>(node); - CSAssignment assignment = allocateContainersToNode(ps, withNodeHeartbeat); + CandidateNodeSet<FiCaSchedulerNode> candidates = + new SimpleCandidateNodeSet<>(node); + CSAssignment assignment = allocateContainersToNode(candidates, + withNodeHeartbeat); // Only check if we can allocate more container on the same node when // scheduling is triggered by node heartbeat if (null != assignment && withNodeHeartbeat) { @@ -1210,7 +1212,7 @@ public class CapacityScheduler extends assignedContainers)) { // Try to see if it is possible to allocate multiple container for // the same node heartbeat - assignment = allocateContainersToNode(ps, true); + assignment = allocateContainersToNode(candidates, true); if (null != assignment && assignment.getType() == NodeType.OFF_SWITCH) { @@ -1237,8 +1239,9 @@ public class CapacityScheduler extends /* * Logics of allocate container on a single node (Old behavior) */ - private CSAssignment allocateContainerOnSingleNode(PlacementSet<FiCaSchedulerNode> ps, - FiCaSchedulerNode node, boolean withNodeHeartbeat) { + private CSAssignment allocateContainerOnSingleNode( + CandidateNodeSet<FiCaSchedulerNode> candidates, FiCaSchedulerNode node, + boolean withNodeHeartbeat) { // Backward compatible way to make sure previous behavior which allocation // driven by node heartbeat works. if (getNode(node.getNodeID()) != node) { @@ -1262,7 +1265,7 @@ public class CapacityScheduler extends .getApplicationId() + " on node: " + node.getNodeID()); LeafQueue queue = ((LeafQueue) reservedApplication.getQueue()); - assignment = queue.assignContainers(getClusterResource(), ps, + assignment = queue.assignContainers(getClusterResource(), candidates, // TODO, now we only consider limits for parent for non-labeled // resources, should consider labeled resources as well. new ResourceLimits(labelManager @@ -1329,14 +1332,16 @@ public class CapacityScheduler extends + node.getUnallocatedResource()); } - return allocateOrReserveNewContainers(ps, withNodeHeartbeat); + return allocateOrReserveNewContainers(candidates, withNodeHeartbeat); } private CSAssignment allocateOrReserveNewContainers( - PlacementSet<FiCaSchedulerNode> ps, boolean withNodeHeartbeat) { + CandidateNodeSet<FiCaSchedulerNode> candidates, + boolean withNodeHeartbeat) { CSAssignment assignment = getRootQueue().assignContainers( - getClusterResource(), ps, new ResourceLimits(labelManager - .getResourceByLabel(ps.getPartition(), getClusterResource())), + getClusterResource(), candidates, new ResourceLimits(labelManager + .getResourceByLabel(candidates.getPartition(), + getClusterResource())), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assignment.setSchedulingMode(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); @@ -1346,30 +1351,34 @@ public class CapacityScheduler extends assignment.getResource(), Resources.none())) { if (withNodeHeartbeat) { updateSchedulerHealth(lastNodeUpdateTime, - PlacementSetUtils.getSingleNode(ps).getNodeID(), assignment); + CandidateNodeSetUtils.getSingleNode(candidates).getNodeID(), + assignment); } return assignment; } // Only do non-exclusive allocation when node has node-labels. - if (StringUtils.equals(ps.getPartition(), RMNodeLabelsManager.NO_LABEL)) { + if (StringUtils.equals(candidates.getPartition(), + RMNodeLabelsManager.NO_LABEL)) { return null; } // Only do non-exclusive allocation when the node-label supports that try { if (rmContext.getNodeLabelManager().isExclusiveNodeLabel( - ps.getPartition())) { + candidates.getPartition())) { return null; } } catch (IOException e) { - LOG.warn("Exception when trying to get exclusivity of node label=" + ps + LOG.warn( + "Exception when trying to get exclusivity of node label=" + candidates .getPartition(), e); return null; } // Try to use NON_EXCLUSIVE - assignment = getRootQueue().assignContainers(getClusterResource(), ps, + assignment = getRootQueue().assignContainers(getClusterResource(), + candidates, // TODO, now we only consider limits for parent for non-labeled // resources, should consider labeled resources as well. new ResourceLimits(labelManager @@ -1386,13 +1395,14 @@ public class CapacityScheduler extends * New behavior, allocate containers considering multiple nodes */ private CSAssignment allocateContainersOnMultiNodes( - PlacementSet<FiCaSchedulerNode> ps) { + CandidateNodeSet<FiCaSchedulerNode> candidates) { // When this time look at multiple nodes, try schedule if the // partition has any available resource or killable resource if (getRootQueue().getQueueCapacities().getUsedCapacity( - ps.getPartition()) >= 1.0f && preemptionManager.getKillableResource( - CapacitySchedulerConfiguration.ROOT, ps.getPartition()) == Resources - .none()) { + candidates.getPartition()) >= 1.0f + && preemptionManager.getKillableResource( + CapacitySchedulerConfiguration.ROOT, candidates.getPartition()) + == Resources.none()) { if (LOG.isDebugEnabled()) { LOG.debug("This node or this node partition doesn't have available or" + "killable resource"); @@ -1400,11 +1410,12 @@ public class CapacityScheduler extends return null; } - return allocateOrReserveNewContainers(ps, false); + return allocateOrReserveNewContainers(candidates, false); } @VisibleForTesting - CSAssignment allocateContainersToNode(PlacementSet<FiCaSchedulerNode> ps, + CSAssignment allocateContainersToNode( + CandidateNodeSet<FiCaSchedulerNode> candidates, boolean withNodeHeartbeat) { if (rmContext.isWorkPreservingRecoveryEnabled() && !rmContext .isSchedulerReadyForAllocatingContainers()) { @@ -1413,14 +1424,14 @@ public class CapacityScheduler extends // Backward compatible way to make sure previous behavior which allocation // driven by node heartbeat works. - FiCaSchedulerNode node = PlacementSetUtils.getSingleNode(ps); + FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates); // We have two different logics to handle allocation on single node / multi // nodes. if (null != node) { - return allocateContainerOnSingleNode(ps, node, withNodeHeartbeat); - } else { - return allocateContainersOnMultiNodes(ps); + return allocateContainerOnSingleNode(candidates, node, withNodeHeartbeat); + } else{ + return allocateContainersOnMultiNodes(candidates); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac4d2b10/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index f2f1baf..e8342d9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -69,8 +69,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCo import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSetUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicyForPendingApps; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; import org.apache.hadoop.yarn.server.utils.Lock; @@ -970,10 +970,10 @@ public class LeafQueue extends AbstractCSQueue { limits.setIsAllowPreemption(usedCapacity < guaranteedCapacity); } - private CSAssignment allocateFromReservedContainer( - Resource clusterResource, PlacementSet<FiCaSchedulerNode> ps, + private CSAssignment allocateFromReservedContainer(Resource clusterResource, + CandidateNodeSet<FiCaSchedulerNode> candidates, ResourceLimits currentResourceLimits, SchedulingMode schedulingMode) { - FiCaSchedulerNode node = PlacementSetUtils.getSingleNode(ps); + FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates); if (null == node) { return null; } @@ -987,7 +987,8 @@ public class LeafQueue extends AbstractCSQueue { ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager, node.getNodeID(), SystemClock.getInstance().getTime(), application); CSAssignment assignment = application.assignContainers(clusterResource, - ps, currentResourceLimits, schedulingMode, reservedContainer); + candidates, currentResourceLimits, schedulingMode, + reservedContainer); return assignment; } } @@ -997,43 +998,44 @@ public class LeafQueue extends AbstractCSQueue { @Override public CSAssignment assignContainers(Resource clusterResource, - PlacementSet<FiCaSchedulerNode> ps, ResourceLimits currentResourceLimits, - SchedulingMode schedulingMode) { + CandidateNodeSet<FiCaSchedulerNode> candidates, + ResourceLimits currentResourceLimits, SchedulingMode schedulingMode) { updateCurrentResourceLimits(currentResourceLimits, clusterResource); - FiCaSchedulerNode node = PlacementSetUtils.getSingleNode(ps); + FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates); if (LOG.isDebugEnabled()) { - LOG.debug("assignContainers: partition=" + ps.getPartition() + LOG.debug("assignContainers: partition=" + candidates.getPartition() + " #applications=" + orderingPolicy.getNumSchedulableEntities()); } - setPreemptionAllowed(currentResourceLimits, ps.getPartition()); + setPreemptionAllowed(currentResourceLimits, candidates.getPartition()); // Check for reserved resources, try to allocate reserved container first. CSAssignment assignment = allocateFromReservedContainer(clusterResource, - ps, currentResourceLimits, schedulingMode); + candidates, currentResourceLimits, schedulingMode); if (null != assignment) { return assignment; } // if our queue cannot access this node, just return if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY - && !accessibleToPartition(ps.getPartition())) { + && !accessibleToPartition(candidates.getPartition())) { ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, getParent().getQueueName(), getQueueName(), ActivityState.REJECTED, - ActivityDiagnosticConstant.NOT_ABLE_TO_ACCESS_PARTITION + ps + ActivityDiagnosticConstant.NOT_ABLE_TO_ACCESS_PARTITION + candidates .getPartition()); return CSAssignment.NULL_ASSIGNMENT; } // Check if this queue need more resource, simply skip allocation if this // queue doesn't need more resources. - if (!hasPendingResourceRequest(ps.getPartition(), clusterResource, + if (!hasPendingResourceRequest(candidates.getPartition(), clusterResource, schedulingMode)) { if (LOG.isDebugEnabled()) { LOG.debug("Skip this queue=" + getQueuePath() + ", because it doesn't need more resource, schedulingMode=" - + schedulingMode.name() + " node-partition=" + ps.getPartition()); + + schedulingMode.name() + " node-partition=" + candidates + .getPartition()); } ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED, @@ -1078,7 +1080,8 @@ public class LeafQueue extends AbstractCSQueue { cachedUserLimit = cul.userLimit; } Resource userLimit = computeUserLimitAndSetHeadroom(application, - clusterResource, ps.getPartition(), schedulingMode, cachedUserLimit); + clusterResource, candidates.getPartition(), schedulingMode, + cachedUserLimit); if (cul == null) { cul = new CachedUserLimit(userLimit); userLimits.put(application.getUser(), cul); @@ -1106,7 +1109,7 @@ public class LeafQueue extends AbstractCSQueue { // Try to schedule assignment = application.assignContainers(clusterResource, - ps, currentResourceLimits, schedulingMode, null); + candidates, currentResourceLimits, schedulingMode, null); if (LOG.isDebugEnabled()) { LOG.debug("post-assignContainers for application " + application http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac4d2b10/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 2c288f2..d61951b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -65,8 +65,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCo import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSetUtils; import org.apache.hadoop.yarn.util.resource.Resources; @Private @@ -479,16 +479,16 @@ public class ParentQueue extends AbstractCSQueue { @Override public CSAssignment assignContainers(Resource clusterResource, - PlacementSet<FiCaSchedulerNode> ps, ResourceLimits resourceLimits, - SchedulingMode schedulingMode) { - FiCaSchedulerNode node = PlacementSetUtils.getSingleNode(ps); + CandidateNodeSet<FiCaSchedulerNode> candidates, + ResourceLimits resourceLimits, SchedulingMode schedulingMode) { + FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates); // if our queue cannot access this node, just return if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY - && !accessibleToPartition(ps.getPartition())) { + && !accessibleToPartition(candidates.getPartition())) { if (LOG.isDebugEnabled()) { LOG.debug("Skip this queue=" + getQueuePath() - + ", because it is not able to access partition=" + ps + + ", because it is not able to access partition=" + candidates .getPartition()); } @@ -506,12 +506,12 @@ public class ParentQueue extends AbstractCSQueue { // Check if this queue need more resource, simply skip allocation if this // queue doesn't need more resources. - if (!super.hasPendingResourceRequest(ps.getPartition(), clusterResource, - schedulingMode)) { + if (!super.hasPendingResourceRequest(candidates.getPartition(), + clusterResource, schedulingMode)) { if (LOG.isDebugEnabled()) { LOG.debug("Skip this queue=" + getQueuePath() + ", because it doesn't need more resource, schedulingMode=" - + schedulingMode.name() + " node-partition=" + ps + + schedulingMode.name() + " node-partition=" + candidates .getPartition()); } @@ -538,7 +538,8 @@ public class ParentQueue extends AbstractCSQueue { // Are we over maximum-capacity for this queue? // This will also consider parent's limits and also continuous reservation // looking - if (!super.canAssignToThisQueue(clusterResource, ps.getPartition(), + if (!super.canAssignToThisQueue(clusterResource, + candidates.getPartition(), resourceLimits, Resources .createResource(getMetrics().getReservedMB(), getMetrics().getReservedVirtualCores()), schedulingMode)) { @@ -556,7 +557,7 @@ public class ParentQueue extends AbstractCSQueue { // Schedule CSAssignment assignedToChild = assignContainersToChildQueues( - clusterResource, ps, resourceLimits, schedulingMode); + clusterResource, candidates, resourceLimits, schedulingMode); assignment.setType(assignedToChild.getType()); assignment.setRequestLocalityType( assignedToChild.getRequestLocalityType()); @@ -710,7 +711,7 @@ public class ParentQueue extends AbstractCSQueue { } private CSAssignment assignContainersToChildQueues(Resource cluster, - PlacementSet<FiCaSchedulerNode> ps, ResourceLimits limits, + CandidateNodeSet<FiCaSchedulerNode> candidates, ResourceLimits limits, SchedulingMode schedulingMode) { CSAssignment assignment = CSAssignment.NULL_ASSIGNMENT; @@ -719,7 +720,7 @@ public class ParentQueue extends AbstractCSQueue { // Try to assign to most 'under-served' sub-queue for (Iterator<CSQueue> iter = sortAndGetChildrenAllocationIterator( - ps.getPartition()); iter.hasNext(); ) { + candidates.getPartition()); iter.hasNext(); ) { CSQueue childQueue = iter.next(); if(LOG.isDebugEnabled()) { LOG.debug("Trying to assign to queue: " + childQueue.getQueuePath() @@ -729,10 +730,10 @@ public class ParentQueue extends AbstractCSQueue { // Get ResourceLimits of child queue before assign containers ResourceLimits childLimits = getResourceLimitsOfChild(childQueue, cluster, parentLimits, - ps.getPartition()); - - CSAssignment childAssignment = childQueue.assignContainers(cluster, ps, - childLimits, schedulingMode); + candidates.getPartition()); + + CSAssignment childAssignment = childQueue.assignContainers(cluster, + candidates, childLimits, schedulingMode); if(LOG.isDebugEnabled()) { LOG.debug("Assigned to queue: " + childQueue.getQueuePath() + " stats: " + childQueue + " --> " + http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac4d2b10/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java index 5809d86..95e0533 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java @@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocat import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; @@ -34,7 +33,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssign import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -176,13 +175,14 @@ public abstract class AbstractContainerAllocator { * </ul> * * @param clusterResource clusterResource - * @param ps PlacementSet + * @param candidates CandidateNodeSet * @param schedulingMode scheduling mode (exclusive or nonexclusive) * @param resourceLimits resourceLimits * @param reservedContainer reservedContainer * @return CSAssignemnt proposal */ public abstract CSAssignment assignContainers(Resource clusterResource, - PlacementSet<FiCaSchedulerNode> ps, SchedulingMode schedulingMode, - ResourceLimits resourceLimits, RMContainer reservedContainer); + CandidateNodeSet<FiCaSchedulerNode> candidates, + SchedulingMode schedulingMode, ResourceLimits resourceLimits, + RMContainer reservedContainer); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac4d2b10/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java index 4879fae..9df03b8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java @@ -21,16 +21,14 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocat import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; -import org.apache.hadoop.yarn.util.resource.Resources; public class ContainerAllocator extends AbstractContainerAllocator { private AbstractContainerAllocator regularContainerAllocator; @@ -50,10 +48,11 @@ public class ContainerAllocator extends AbstractContainerAllocator { @Override public CSAssignment assignContainers(Resource clusterResource, - PlacementSet<FiCaSchedulerNode> ps, SchedulingMode schedulingMode, - ResourceLimits resourceLimits, RMContainer reservedContainer) { + CandidateNodeSet<FiCaSchedulerNode> candidates, + SchedulingMode schedulingMode, ResourceLimits resourceLimits, + RMContainer reservedContainer) { return regularContainerAllocator.assignContainers(clusterResource, - ps, schedulingMode, resourceLimits, reservedContainer); + candidates, schedulingMode, resourceLimits, reservedContainer); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac4d2b10/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java index 72dfbdd..69e90c6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java @@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant; @@ -50,9 +51,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssign import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSetUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -91,15 +91,16 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { /* * Pre-check if we can allocate a pending resource request - * (given schedulerKey) to a given PlacementSet. + * (given schedulerKey) to a given CandidateNodeSet. * We will consider stuffs like exclusivity, pending resource, node partition, * headroom, etc. */ - private ContainerAllocation preCheckForPlacementSet(Resource clusterResource, - PlacementSet<FiCaSchedulerNode> ps, SchedulingMode schedulingMode, - ResourceLimits resourceLimits, SchedulerRequestKey schedulerKey) { + private ContainerAllocation preCheckForNodeCandidateSet( + Resource clusterResource, CandidateNodeSet<FiCaSchedulerNode> candidates, + SchedulingMode schedulingMode, ResourceLimits resourceLimits, + SchedulerRequestKey schedulerKey) { Priority priority = schedulerKey.getPriority(); - FiCaSchedulerNode node = PlacementSetUtils.getSingleNode(ps); + FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates); PendingAsk offswitchPendingAsk = application.getPendingAsk(schedulerKey, ResourceRequest.ANY); @@ -164,7 +165,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { } if (!checkHeadroom(clusterResource, resourceLimits, required, - ps.getPartition())) { + candidates.getPartition())) { if (LOG.isDebugEnabled()) { LOG.debug("cannot allocate required resource=" + required + " because of headroom"); @@ -182,7 +183,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { // Only do this when request associated with given scheduler key accepts // NO_LABEL under RESPECT_EXCLUSIVITY mode if (StringUtils.equals(RMNodeLabelsManager.NO_LABEL, - appInfo.getSchedulingPlacementSet(schedulerKey) + appInfo.getAppPlacementAllocator(schedulerKey) .getPrimaryRequestedNodePartition())) { missedNonPartitionedRequestSchedulingOpportunity = application.addMissedNonPartitionedRequestSchedulingOpportunity( @@ -265,7 +266,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { SchedulerRequestKey schedulerKey, int clusterNodes) { // Estimate: Required unique resources (i.e. hosts + racks) int requiredResources = Math.max( - application.getSchedulingPlacementSet(schedulerKey) + application.getAppPlacementAllocator(schedulerKey) .getUniqueLocationAsks() - 1, 0); // waitFactor can't be more than '1' @@ -780,15 +781,15 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { } private ContainerAllocation allocate(Resource clusterResource, - PlacementSet<FiCaSchedulerNode> ps, SchedulingMode schedulingMode, - ResourceLimits resourceLimits, SchedulerRequestKey schedulerKey, - RMContainer reservedContainer) { + CandidateNodeSet<FiCaSchedulerNode> candidates, + SchedulingMode schedulingMode, ResourceLimits resourceLimits, + SchedulerRequestKey schedulerKey, RMContainer reservedContainer) { // Do checks before determining which node to allocate // Directly return if this check fails. ContainerAllocation result; if (reservedContainer == null) { - result = preCheckForPlacementSet(clusterResource, ps, schedulingMode, - resourceLimits, schedulerKey); + result = preCheckForNodeCandidateSet(clusterResource, candidates, + schedulingMode, resourceLimits, schedulerKey); if (null != result) { return result; } @@ -801,14 +802,14 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { } } - SchedulingPlacementSet<FiCaSchedulerNode> schedulingPS = - application.getAppSchedulingInfo().getSchedulingPlacementSet( + AppPlacementAllocator<FiCaSchedulerNode> schedulingPS = + application.getAppSchedulingInfo().getAppPlacementAllocator( schedulerKey); result = ContainerAllocation.PRIORITY_SKIPPED; Iterator<FiCaSchedulerNode> iter = schedulingPS.getPreferredNodeIterator( - ps); + candidates); while (iter.hasNext()) { FiCaSchedulerNode node = iter.next(); @@ -827,19 +828,20 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { @Override public CSAssignment assignContainers(Resource clusterResource, - PlacementSet<FiCaSchedulerNode> ps, SchedulingMode schedulingMode, - ResourceLimits resourceLimits, + CandidateNodeSet<FiCaSchedulerNode> candidates, + SchedulingMode schedulingMode, ResourceLimits resourceLimits, RMContainer reservedContainer) { - FiCaSchedulerNode node = PlacementSetUtils.getSingleNode(ps); + FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates); if (reservedContainer == null) { // Check if application needs more resource, skip if it doesn't need more. if (!application.hasPendingResourceRequest(rc, - ps.getPartition(), clusterResource, schedulingMode)) { + candidates.getPartition(), clusterResource, schedulingMode)) { if (LOG.isDebugEnabled()) { LOG.debug("Skip app_attempt=" + application.getApplicationAttemptId() + ", because it doesn't need more resource, schedulingMode=" - + schedulingMode.name() + " node-label=" + ps.getPartition()); + + schedulingMode.name() + " node-label=" + candidates + .getPartition()); } ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( activitiesManager, node, application, application.getPriority(), @@ -849,9 +851,8 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { // Schedule in priority order for (SchedulerRequestKey schedulerKey : application.getSchedulerKeys()) { - ContainerAllocation result = - allocate(clusterResource, ps, schedulingMode, resourceLimits, - schedulerKey, null); + ContainerAllocation result = allocate(clusterResource, candidates, + schedulingMode, resourceLimits, schedulerKey, null); AllocationState allocationState = result.getAllocationState(); if (allocationState == AllocationState.PRIORITY_SKIPPED) { @@ -869,7 +870,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { return CSAssignment.SKIP_ASSIGNMENT; } else { ContainerAllocation result = - allocate(clusterResource, ps, schedulingMode, resourceLimits, + allocate(clusterResource, candidates, schedulingMode, resourceLimits, reservedContainer.getReservedSchedulerKey(), reservedContainer); return getCSAssignmentFromAllocateResult(clusterResource, result, reservedContainer, node); http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac4d2b10/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index a12c5ec..40405fc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -76,8 +76,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerA import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; @@ -224,10 +224,10 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { return null; } - SchedulingPlacementSet<FiCaSchedulerNode> ps = - appSchedulingInfo.getSchedulingPlacementSet(schedulerKey); + AppPlacementAllocator<FiCaSchedulerNode> ps = + appSchedulingInfo.getAppPlacementAllocator(schedulerKey); if (null == ps) { - LOG.warn("Failed to get " + SchedulingPlacementSet.class.getName() + LOG.warn("Failed to get " + AppPlacementAllocator.class.getName() + " for application=" + getApplicationId() + " schedulerRequestKey=" + schedulerKey); return null; @@ -636,8 +636,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { Map<String, Resource> ret = new HashMap<>(); for (SchedulerRequestKey schedulerKey : appSchedulingInfo .getSchedulerKeys()) { - SchedulingPlacementSet<FiCaSchedulerNode> ps = - appSchedulingInfo.getSchedulingPlacementSet(schedulerKey); + AppPlacementAllocator<FiCaSchedulerNode> ps = + appSchedulingInfo.getAppPlacementAllocator(schedulerKey); String nodePartition = ps.getPrimaryRequestedNodePartition(); Resource res = ret.get(nodePartition); @@ -844,8 +844,9 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { } public CSAssignment assignContainers(Resource clusterResource, - PlacementSet<FiCaSchedulerNode> ps, ResourceLimits currentResourceLimits, - SchedulingMode schedulingMode, RMContainer reservedContainer) { + CandidateNodeSet<FiCaSchedulerNode> ps, + ResourceLimits currentResourceLimits, SchedulingMode schedulingMode, + RMContainer reservedContainer) { if (LOG.isDebugEnabled()) { LOG.debug("pre-assignContainers for application " + getApplicationId()); @@ -962,9 +963,9 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { @Override @SuppressWarnings("unchecked") - public SchedulingPlacementSet<FiCaSchedulerNode> getSchedulingPlacementSet( + public AppPlacementAllocator<FiCaSchedulerNode> getAppPlacementAllocator( SchedulerRequestKey schedulerRequestKey) { - return super.getSchedulingPlacementSet(schedulerRequestKey); + return super.getAppPlacementAllocator(schedulerRequestKey); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac4d2b10/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 157d264..bbd4418 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -1019,7 +1019,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt } if (offswitchAsk.getCount() > 0) { - if (getSchedulingPlacementSet(schedulerKey).getUniqueLocationAsks() + if (getAppPlacementAllocator(schedulerKey).getUniqueLocationAsks() <= 1 || allowedLocality.equals(NodeType.OFF_SWITCH)) { if (LOG.isTraceEnabled()) { LOG.trace("Assign container on " + node.getNodeName() http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac4d2b10/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java new file mode 100644 index 0000000..63b22a3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; + +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; + +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * <p> + * This class has the following functionality: + * 1) Keeps track of pending resource requests when following events happen: + * - New ResourceRequests are added to scheduler. + * - New containers get allocated. + * + * 2) Determines the order that the nodes given in the {@link CandidateNodeSet} + * will be used for allocating containers. + * </p> + * + * <p> + * And different set of resource requests (E.g., resource requests with the + * same schedulerKey) can have one instance of AppPlacementAllocator, each + * AppPlacementAllocator can have different ways to order nodes depends on + * requests. + * </p> + */ +public interface AppPlacementAllocator<N extends SchedulerNode> { + /** + * Get iterator of preferred node depends on requirement and/or availability + * @param candidateNodeSet input CandidateNodeSet + * @return iterator of preferred node + */ + Iterator<N> getPreferredNodeIterator(CandidateNodeSet<N> candidateNodeSet); + + /** + * Replace existing ResourceRequest by the new requests + * + * @param requests new ResourceRequests + * @param recoverPreemptedRequestForAContainer if we're recovering resource + * requests for preempted container + * @return true if total pending resource changed + */ + ResourceRequestUpdateResult updateResourceRequests( + Collection<ResourceRequest> requests, + boolean recoverPreemptedRequestForAContainer); + + /** + * Get pending ResourceRequests by given schedulerRequestKey + * @return Map of resourceName to ResourceRequest + */ + Map<String, ResourceRequest> getResourceRequests(); + + /** + * Get pending ask for given resourceName. If there's no such pendingAsk, + * returns {@link PendingAsk#ZERO} + * + * @param resourceName resourceName + * @return PendingAsk + */ + PendingAsk getPendingAsk(String resourceName); + + /** + * Get #pending-allocations for given resourceName. If there's no such + * pendingAsk, returns 0 + * + * @param resourceName resourceName + * @return #pending-allocations + */ + int getOutstandingAsksCount(String resourceName); + + /** + * Notify container allocated. + * @param schedulerKey SchedulerRequestKey for this ResourceRequest + * @param type Type of the allocation + * @param node Which node this container allocated on + * @return list of ResourceRequests deducted + */ + List<ResourceRequest> allocate(SchedulerRequestKey schedulerKey, + NodeType type, SchedulerNode node); + + /** + * Returns list of accepted resourceNames. + * @return Iterator of accepted resourceNames + */ + Iterator<String> getAcceptedResouceNames(); + + /** + * We can still have pending requirement for a given NodeType and node + * @param type Locality Type + * @param node which node we will allocate on + * @return true if we has pending requirement + */ + boolean canAllocate(NodeType type, SchedulerNode node); + + /** + * Can delay to give locality? + * TODO: This should be moved out of AppPlacementAllocator + * and should belong to specific delay scheduling policy impl. + * See YARN-7457 for more details. + * + * @param resourceName resourceName + * @return can/cannot + */ + boolean canDelayTo(String resourceName); + + /** + * Does this {@link AppPlacementAllocator} accept resources on nodePartition? + * + * @param nodePartition nodePartition + * @param schedulingMode schedulingMode + * @return accepted/not + */ + boolean acceptNodePartition(String nodePartition, + SchedulingMode schedulingMode); + + /** + * It is possible that one request can accept multiple node partition, + * So this method returns primary node partition for pending resource / + * headroom calculation. + * + * @return primary requested node partition + */ + String getPrimaryRequestedNodePartition(); + + /** + * @return number of unique location asks with #pending greater than 0, + * (like /rack1, host1, etc.). + * + * TODO: This should be moved out of AppPlacementAllocator + * and should belong to specific delay scheduling policy impl. + * See YARN-7457 for more details. + */ + int getUniqueLocationAsks(); + + /** + * Print human-readable requests to LOG debug. + */ + void showRequests(); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac4d2b10/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/CandidateNodeSet.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/CandidateNodeSet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/CandidateNodeSet.java new file mode 100644 index 0000000..6f127c9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/CandidateNodeSet.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; + +import java.util.Map; + +/** + * A group of nodes which can be allocated by scheduler. + * + * It will have following part: + * + * 1) A map of nodes which can be schedule-able. + * 2) Version of the node set, version should be updated if any node added or + * removed from the node set. This will be used by + * {@link AppPlacementAllocator} or other class to check if it's required to + * invalidate local caches, etc. + * 3) Node partition of the candidate set. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public interface CandidateNodeSet<N extends SchedulerNode> { + /** + * Get all nodes for this CandidateNodeSet + * @return all nodes for this CandidateNodeSet + */ + Map<NodeId, N> getAllNodes(); + + /** + * Version of the CandidateNodeSet, can help {@link AppPlacementAllocator} to + * decide if update is required + * @return version + */ + long getVersion(); + + /** + * Node partition of the node set. + * @return node partition + */ + String getPartition(); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac4d2b10/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/CandidateNodeSetUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/CandidateNodeSetUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/CandidateNodeSetUtils.java new file mode 100644 index 0000000..157d2ee --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/CandidateNodeSetUtils.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; + +/** + * Utility methods for {@link CandidateNodeSet}. + */ +public final class CandidateNodeSetUtils { + + private CandidateNodeSetUtils() { + } + + /* + * If the {@link CandidateNodeSet} only has one entry, return it. Otherwise, + * return null. + */ + public static <N extends SchedulerNode> N getSingleNode( + CandidateNodeSet<N> candidates) { + N node = null; + if (1 == candidates.getAllNodes().size()) { + node = candidates.getAllNodes().values().iterator().next(); + } + + return node; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org