YARN-5938. Refactoring OpportunisticContainerAllocator use SchedulerRequestKey instead of Priority. (asuresh)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8796e0d5 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8796e0d5 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8796e0d5 Branch: refs/heads/YARN-5085 Commit: 8796e0d5a42c1f5a8d186b95542a029b2ef13a16 Parents: fcbe152 Author: Arun Suresh <[email protected]> Authored: Tue Nov 29 06:18:48 2016 -0800 Committer: Arun Suresh <[email protected]> Committed: Tue Dec 20 11:48:35 2016 -0800 ---------------------------------------------------------------------- .../api/records/UpdateContainerRequest.java | 11 + .../impl/pb/AllocateResponsePBImpl.java | 4 +- .../OpportunisticContainerAllocator.java | 46 +- .../OpportunisticContainerContext.java | 38 +- .../server/scheduler/SchedulerRequestKey.java | 130 ++++++ .../scheduler/DistributedScheduler.java | 11 +- .../ApplicationMasterService.java | 421 ++++++++++--------- ...pportunisticContainerAllocatorAMService.java | 35 +- .../rmcontainer/RMContainer.java | 3 +- .../rmcontainer/RMContainerImpl.java | 2 +- .../rmcontainer/RMContainerReservedEvent.java | 2 +- .../scheduler/AppSchedulingInfo.java | 3 + .../scheduler/SchedulerApplicationAttempt.java | 4 +- .../scheduler/SchedulerNode.java | 1 + .../scheduler/SchedulerRequestKey.java | 122 ------ .../allocator/IncreaseContainerAllocator.java | 2 +- .../allocator/RegularContainerAllocator.java | 2 +- .../scheduler/common/SchedulerContainer.java | 2 +- .../scheduler/common/fica/FiCaSchedulerApp.java | 3 +- .../common/fica/FiCaSchedulerNode.java | 2 +- .../scheduler/fair/FSAppAttempt.java | 2 +- .../scheduler/fair/FSSchedulerNode.java | 2 +- .../scheduler/fifo/FifoAppAttempt.java | 4 +- .../scheduler/fifo/FifoScheduler.java | 4 +- .../placement/SchedulingPlacementSet.java | 2 +- .../server/resourcemanager/Application.java | 2 +- .../yarn/server/resourcemanager/Task.java | 2 +- ...alCapacityPreemptionPolicyMockFramework.java | 3 +- ...estProportionalCapacityPreemptionPolicy.java | 3 +- .../scheduler/TestAppSchedulingInfo.java | 1 + .../TestSchedulerApplicationAttempt.java | 2 + .../capacity/TestCapacityScheduler.java | 2 +- .../scheduler/capacity/TestLeafQueue.java | 2 +- .../scheduler/capacity/TestReservations.java | 2 +- .../scheduler/capacity/TestUtils.java | 2 +- .../fair/TestContinuousScheduling.java | 2 +- .../scheduler/fair/TestFSAppAttempt.java | 2 +- 37 files changed, 456 insertions(+), 427 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/8796e0d5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerRequest.java index 200dea3..e4f7a82 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerRequest.java @@ -159,6 +159,17 @@ public abstract class UpdateContainerRequest extends AbstractResourceRequest { } @Override + public String toString() { + return "UpdateReq{" + + "containerId=" + getContainerId() + ", " + + "containerVersion=" + getContainerVersion() + ", " + + "targetExecType=" + getExecutionType() + ", " + + "targetCapability=" + getCapability() + ", " + + "updateType=" + getContainerUpdateType() + ", " + + "}"; + } + + @Override public boolean equals(Object obj) { if (this == obj) { return true; http://git-wip-us.apache.org/repos/asf/hadoop/blob/8796e0d5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java index c0d52a6..de3fc47 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java @@ -282,8 +282,8 @@ public class AllocateResponsePBImpl extends AllocateResponse { final List<Container> containers) { if (containers == null) return; - // this looks like a bug because it results in append and not set initLocalNewContainerList(); + allocatedContainers.clear(); allocatedContainers.addAll(containers); } @@ -299,6 +299,7 @@ public class AllocateResponsePBImpl extends AllocateResponse { if (containers == null) return; initLocalUpdatedContainerList(); + updatedContainers.clear(); updatedContainers.addAll(containers); } @@ -315,6 +316,7 @@ public class AllocateResponsePBImpl extends AllocateResponse { if (containers == null) return; initLocalFinishedContainerList(); + completedContainersStatuses.clear(); completedContainersStatuses.addAll(containers); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8796e0d5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java index 16436bd..2d77671 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java @@ -192,7 +192,8 @@ public class OpportunisticContainerAllocator { /** * Allocate OPPORTUNISTIC containers. - * @param request AllocateRequest + * @param blackList Resource BlackList Request + * @param oppResourceReqs Opportunistic Resource Requests * @param applicationAttemptId ApplicationAttemptId * @param opportContext App specific OpportunisticContainerContext * @param rmIdentifier RM Identifier @@ -200,32 +201,24 @@ public class OpportunisticContainerAllocator { * @return List of Containers. * @throws YarnException YarnException */ - public List<Container> allocateContainers( - AllocateRequest request, ApplicationAttemptId applicationAttemptId, + public List<Container> allocateContainers(ResourceBlacklistRequest blackList, + List<ResourceRequest> oppResourceReqs, + ApplicationAttemptId applicationAttemptId, OpportunisticContainerContext opportContext, long rmIdentifier, String appSubmitter) throws YarnException { - // Update released containers. - List<ContainerId> releasedContainers = request.getReleaseList(); - int numReleasedContainers = releasedContainers.size(); - if (numReleasedContainers > 0) { - LOG.info("AttemptID: " + applicationAttemptId + " released: " - + numReleasedContainers); - opportContext.getContainersAllocated().removeAll(releasedContainers); - } // Update black list. - ResourceBlacklistRequest rbr = request.getResourceBlacklistRequest(); - if (rbr != null) { - opportContext.getBlacklist().removeAll(rbr.getBlacklistRemovals()); - opportContext.getBlacklist().addAll(rbr.getBlacklistAdditions()); + if (blackList != null) { + opportContext.getBlacklist().removeAll(blackList.getBlacklistRemovals()); + opportContext.getBlacklist().addAll(blackList.getBlacklistAdditions()); } // Add OPPORTUNISTIC requests to the outstanding ones. - opportContext.addToOutstandingReqs(request.getAskList()); + opportContext.addToOutstandingReqs(oppResourceReqs); // Satisfy the outstanding OPPORTUNISTIC requests. List<Container> allocatedContainers = new ArrayList<>(); - for (Priority priority : + for (SchedulerRequestKey schedulerKey : opportContext.getOutstandingOpReqs().descendingKeySet()) { // Allocated containers : // Key = Requested Capability, @@ -234,7 +227,7 @@ public class OpportunisticContainerAllocator { // we need the requested capability (key) to match against // the outstanding reqs) Map<Resource, List<Container>> allocated = allocate(rmIdentifier, - opportContext, priority, applicationAttemptId, appSubmitter); + opportContext, schedulerKey, applicationAttemptId, appSubmitter); for (Map.Entry<Resource, List<Container>> e : allocated.entrySet()) { opportContext.matchAllocationToOutstandingRequest( e.getKey(), e.getValue()); @@ -246,19 +239,22 @@ public class OpportunisticContainerAllocator { } private Map<Resource, List<Container>> allocate(long rmIdentifier, - OpportunisticContainerContext appContext, Priority priority, + OpportunisticContainerContext appContext, SchedulerRequestKey schedKey, ApplicationAttemptId appAttId, String userName) throws YarnException { Map<Resource, List<Container>> containers = new HashMap<>(); for (ResourceRequest anyAsk : - appContext.getOutstandingOpReqs().get(priority).values()) { + appContext.getOutstandingOpReqs().get(schedKey).values()) { allocateContainersInternal(rmIdentifier, appContext.getAppParams(), appContext.getContainerIdGenerator(), appContext.getBlacklist(), appAttId, appContext.getNodeMap(), userName, containers, anyAsk); - LOG.info("Opportunistic allocation requested for [" - + "priority=" + anyAsk.getPriority() - + ", num_containers=" + anyAsk.getNumContainers() - + ", capability=" + anyAsk.getCapability() + "]" - + " allocated = " + containers.get(anyAsk.getCapability()).size()); + if (!containers.isEmpty()) { + LOG.info("Opportunistic allocation requested for [" + + "priority=" + anyAsk.getPriority() + + ", allocationRequestId=" + anyAsk.getAllocationRequestId() + + ", num_containers=" + anyAsk.getNumContainers() + + ", capability=" + anyAsk.getCapability() + "]" + + " allocated = " + containers.get(anyAsk.getCapability()).size()); + } } return containers; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8796e0d5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java index 725e2d9..a2f9f4d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java @@ -18,12 +18,7 @@ package org.apache.hadoop.yarn.server.scheduler; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerStatus; -import org.apache.hadoop.yarn.api.records.ExecutionType; -import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode; @@ -52,9 +47,6 @@ public class OpportunisticContainerContext { private static final Logger LOG = LoggerFactory .getLogger(OpportunisticContainerContext.class); - // Currently just used to keep track of allocated containers. - // Can be used for reporting stats later. - private Set<ContainerId> containersAllocated = new HashSet<>(); private AllocationParams appParams = new AllocationParams(); private ContainerIdGenerator containerIdGenerator = @@ -69,13 +61,9 @@ public class OpportunisticContainerContext { // Resource Name (host/rack/any) and capability. This mapping is required // to match a received Container to an outstanding OPPORTUNISTIC // ResourceRequest (ask). - private final TreeMap<Priority, Map<Resource, ResourceRequest>> + private final TreeMap<SchedulerRequestKey, Map<Resource, ResourceRequest>> outstandingOpReqs = new TreeMap<>(); - public Set<ContainerId> getContainersAllocated() { - return containersAllocated; - } - public AllocationParams getAppParams() { return appParams; } @@ -119,20 +107,11 @@ public class OpportunisticContainerContext { return blacklist; } - public TreeMap<Priority, Map<Resource, ResourceRequest>> + public TreeMap<SchedulerRequestKey, Map<Resource, ResourceRequest>> getOutstandingOpReqs() { return outstandingOpReqs; } - public void updateCompletedContainers(AllocateResponse allocateResponse) { - for (ContainerStatus cs : - allocateResponse.getCompletedContainersStatuses()) { - if (cs.getExecutionType() == ExecutionType.OPPORTUNISTIC) { - containersAllocated.remove(cs.getContainerId()); - } - } - } - /** * Takes a list of ResourceRequests (asks), extracts the key information viz. * (Priority, ResourceName, Capability) and adds to the outstanding @@ -144,7 +123,7 @@ public class OpportunisticContainerContext { */ public void addToOutstandingReqs(List<ResourceRequest> resourceAsks) { for (ResourceRequest request : resourceAsks) { - Priority priority = request.getPriority(); + SchedulerRequestKey schedulerKey = SchedulerRequestKey.create(request); // TODO: Extend for Node/Rack locality. We only handle ANY requests now if (!ResourceRequest.isAnyLocation(request.getResourceName())) { @@ -156,10 +135,10 @@ public class OpportunisticContainerContext { } Map<Resource, ResourceRequest> reqMap = - outstandingOpReqs.get(priority); + outstandingOpReqs.get(schedulerKey); if (reqMap == null) { reqMap = new HashMap<>(); - outstandingOpReqs.put(priority, reqMap); + outstandingOpReqs.put(schedulerKey, reqMap); } ResourceRequest resourceRequest = reqMap.get(request.getCapability()); @@ -171,7 +150,8 @@ public class OpportunisticContainerContext { resourceRequest.getNumContainers() + request.getNumContainers()); } if (ResourceRequest.isAnyLocation(request.getResourceName())) { - LOG.info("# of outstandingOpReqs in ANY (at priority = " + priority + LOG.info("# of outstandingOpReqs in ANY (at" + + "priority = "+ schedulerKey.getPriority() + ", with capability = " + request.getCapability() + " ) : " + resourceRequest.getNumContainers()); } @@ -187,9 +167,9 @@ public class OpportunisticContainerContext { public void matchAllocationToOutstandingRequest(Resource capability, List<Container> allocatedContainers) { for (Container c : allocatedContainers) { - containersAllocated.add(c.getId()); + SchedulerRequestKey schedulerKey = SchedulerRequestKey.extractFrom(c); Map<Resource, ResourceRequest> asks = - outstandingOpReqs.get(c.getPriority()); + outstandingOpReqs.get(schedulerKey); if (asks == null) { continue; http://git-wip-us.apache.org/repos/asf/hadoop/blob/8796e0d5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java new file mode 100644 index 0000000..9b7edbe --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.scheduler; + +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ResourceRequest; + +/** + * Composite key for outstanding scheduler requests for any schedulable entity. + * Currently it includes {@link Priority}. + */ +public final class SchedulerRequestKey implements + Comparable<SchedulerRequestKey> { + + private final Priority priority; + private final long allocationRequestId; + + /** + * Factory method to generate a SchedulerRequestKey from a ResourceRequest. + * @param req ResourceRequest + * @return SchedulerRequestKey + */ + public static SchedulerRequestKey create(ResourceRequest req) { + return new SchedulerRequestKey(req.getPriority(), + req.getAllocationRequestId()); + } + + /** + * Convenience method to extract the SchedulerRequestKey used to schedule the + * Container. + * @param container Container + * @return SchedulerRequestKey + */ + public static SchedulerRequestKey extractFrom(Container container) { + return new SchedulerRequestKey(container.getPriority(), + container.getAllocationRequestId()); + } + + SchedulerRequestKey(Priority priority, long allocationRequestId) { + this.priority = priority; + this.allocationRequestId = allocationRequestId; + } + + /** + * Get the {@link Priority} of the request. + * + * @return the {@link Priority} of the request + */ + public Priority getPriority() { + return priority; + } + + /** + * Get the Id of the associated {@link ResourceRequest}. + * + * @return the Id of the associated {@link ResourceRequest} + */ + public long getAllocationRequestId() { + return allocationRequestId; + } + + @Override + public int compareTo(SchedulerRequestKey o) { + if (o == null) { + return (priority != null) ? -1 : 0; + } else { + if (priority == null) { + return 1; + } + } + int priorityCompare = o.getPriority().compareTo(priority); + // we first sort by priority and then by allocationRequestId + if (priorityCompare != 0) { + return priorityCompare; + } + return Long.compare(allocationRequestId, o.getAllocationRequestId()); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof SchedulerRequestKey)) { + return false; + } + + SchedulerRequestKey that = (SchedulerRequestKey) o; + + if (getAllocationRequestId() != that.getAllocationRequestId()) { + return false; + } + return getPriority() != null ? + getPriority().equals(that.getPriority()) : + that.getPriority() == null; + } + + @Override + public int hashCode() { + int result = getPriority() != null ? getPriority().hashCode() : 0; + result = 31 * result + (int) (getAllocationRequestId() ^ ( + getAllocationRequestId() >>> 32)); + return result; + } + + @Override + public String toString() { + return "SchedulerRequestKey{" + + "priority=" + priority + + ", allocationRequestId=" + allocationRequestId + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/8796e0d5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java index 0f47c93..a9b5ed4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java @@ -227,10 +227,10 @@ public final class DistributedScheduler extends AbstractRequestInterceptor { .partitionAskList(request.getAllocateRequest().getAskList()); // Allocate OPPORTUNISTIC containers. - request.getAllocateRequest().setAskList(partitionedAsks.getOpportunistic()); List<Container> allocatedContainers = containerAllocator.allocateContainers( - request.getAllocateRequest(), applicationAttemptId, + request.getAllocateRequest().getResourceBlacklistRequest(), + partitionedAsks.getOpportunistic(), applicationAttemptId, oppContainerContext, rmIdentifier, appSubmitter); // Prepare request for sending to RM for scheduling GUARANTEED containers. @@ -252,18 +252,11 @@ public final class DistributedScheduler extends AbstractRequestInterceptor { nodeTokens.put(nmToken.getNodeId(), nmToken); } - oppContainerContext.updateCompletedContainers(dsResp.getAllocateResponse()); - // Check if we have NM tokens for all the allocated containers. If not // generate one and update the response. updateAllocateResponse( dsResp.getAllocateResponse(), nmTokens, allocatedContainers); - if (LOG.isDebugEnabled()) { - LOG.debug("Number of opportunistic containers currently" + - "allocated by application: " + oppContainerContext - .getContainersAllocated().size()); - } return dsResp; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8796e0d5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 3d7b2b1..99d1a4d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -405,7 +405,6 @@ public class ApplicationMasterService extends AbstractService implements ApplicationAttemptId appAttemptId = amrmTokenIdentifier.getApplicationAttemptId(); - ApplicationId applicationId = appAttemptId.getApplicationId(); this.amLivelinessMonitor.receivedPing(appAttemptId); @@ -422,8 +421,10 @@ public class ApplicationMasterService extends AbstractService implements AllocateResponse lastResponse = lock.getAllocateResponse(); if (!hasApplicationMasterRegistered(appAttemptId)) { String message = - "AM is not registered for known application attempt: " + appAttemptId - + " or RM had restarted after AM registered . AM should re-register."; + "AM is not registered for known application attempt: " + + appAttemptId + + " or RM had restarted after AM registered. " + + " AM should re-register."; throw new ApplicationMasterNotRegisteredException(message); } @@ -438,185 +439,10 @@ public class ApplicationMasterService extends AbstractService implements throw new InvalidApplicationMasterRequestException(message); } - //filter illegal progress values - float filteredProgress = request.getProgress(); - if (Float.isNaN(filteredProgress) || filteredProgress == Float.NEGATIVE_INFINITY - || filteredProgress < 0) { - request.setProgress(0); - } else if (filteredProgress > 1 || filteredProgress == Float.POSITIVE_INFINITY) { - request.setProgress(1); - } - - // Send the status update to the appAttempt. - this.rmContext.getDispatcher().getEventHandler().handle( - new RMAppAttemptStatusupdateEvent(appAttemptId, request - .getProgress())); - - List<ResourceRequest> ask = request.getAskList(); - List<ContainerId> release = request.getReleaseList(); - - ResourceBlacklistRequest blacklistRequest = - request.getResourceBlacklistRequest(); - List<String> blacklistAdditions = - (blacklistRequest != null) ? - blacklistRequest.getBlacklistAdditions() : Collections.EMPTY_LIST; - List<String> blacklistRemovals = - (blacklistRequest != null) ? - blacklistRequest.getBlacklistRemovals() : Collections.EMPTY_LIST; - RMApp app = - this.rmContext.getRMApps().get(applicationId); - - // set label expression for Resource Requests if resourceName=ANY - ApplicationSubmissionContext asc = app.getApplicationSubmissionContext(); - for (ResourceRequest req : ask) { - if (null == req.getNodeLabelExpression() - && ResourceRequest.ANY.equals(req.getResourceName())) { - req.setNodeLabelExpression(asc.getNodeLabelExpression()); - } - } - - Resource maximumCapacity = rScheduler.getMaximumResourceCapability(); - - // sanity check - try { - RMServerUtils.normalizeAndValidateRequests(ask, - maximumCapacity, app.getQueue(), - rScheduler, rmContext); - } catch (InvalidResourceRequestException e) { - LOG.warn("Invalid resource ask by application " + appAttemptId, e); - throw e; - } - - try { - RMServerUtils.validateBlacklistRequest(blacklistRequest); - } catch (InvalidResourceBlacklistRequestException e) { - LOG.warn("Invalid blacklist request by application " + appAttemptId, e); - throw e; - } - - // In the case of work-preserving AM restart, it's possible for the - // AM to release containers from the earlier attempt. - if (!app.getApplicationSubmissionContext() - .getKeepContainersAcrossApplicationAttempts()) { - try { - RMServerUtils.validateContainerReleaseRequest(release, appAttemptId); - } catch (InvalidContainerReleaseException e) { - LOG.warn("Invalid container release by application " + appAttemptId, - e); - throw e; - } - } - - // Split Update Resource Requests into increase and decrease. - // No Exceptions are thrown here. All update errors are aggregated - // and returned to the AM. - List<UpdateContainerRequest> increaseResourceReqs = new ArrayList<>(); - List<UpdateContainerRequest> decreaseResourceReqs = new ArrayList<>(); - List<UpdateContainerError> updateContainerErrors = - RMServerUtils.validateAndSplitUpdateResourceRequests(rmContext, - request, maximumCapacity, increaseResourceReqs, - decreaseResourceReqs); - - // Send new requests to appAttempt. - Allocation allocation; - RMAppAttemptState state = - app.getRMAppAttempt(appAttemptId).getAppAttemptState(); - if (state.equals(RMAppAttemptState.FINAL_SAVING) || - state.equals(RMAppAttemptState.FINISHING) || - app.isAppFinalStateStored()) { - LOG.warn(appAttemptId + " is in " + state + - " state, ignore container allocate request."); - allocation = EMPTY_ALLOCATION; - } else { - allocation = - this.rScheduler.allocate(appAttemptId, ask, release, - blacklistAdditions, blacklistRemovals, - increaseResourceReqs, decreaseResourceReqs); - } - - if (!blacklistAdditions.isEmpty() || !blacklistRemovals.isEmpty()) { - LOG.info("blacklist are updated in Scheduler." + - "blacklistAdditions: " + blacklistAdditions + ", " + - "blacklistRemovals: " + blacklistRemovals); - } - RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId); - AllocateResponse allocateResponse = + AllocateResponse response = recordFactory.newRecordInstance(AllocateResponse.class); - if (allocation.getNMTokens() != null && - !allocation.getNMTokens().isEmpty()) { - allocateResponse.setNMTokens(allocation.getNMTokens()); - } - - // Notify the AM of container update errors - if (!updateContainerErrors.isEmpty()) { - allocateResponse.setUpdateErrors(updateContainerErrors); - } - // update the response with the deltas of node status changes - List<RMNode> updatedNodes = new ArrayList<RMNode>(); - if(app.pullRMNodeUpdates(updatedNodes) > 0) { - List<NodeReport> updatedNodeReports = new ArrayList<NodeReport>(); - for(RMNode rmNode: updatedNodes) { - SchedulerNodeReport schedulerNodeReport = - rScheduler.getNodeReport(rmNode.getNodeID()); - Resource used = BuilderUtils.newResource(0, 0); - int numContainers = 0; - if (schedulerNodeReport != null) { - used = schedulerNodeReport.getUsedResource(); - numContainers = schedulerNodeReport.getNumContainers(); - } - NodeId nodeId = rmNode.getNodeID(); - NodeReport report = - BuilderUtils.newNodeReport(nodeId, rmNode.getState(), - rmNode.getHttpAddress(), rmNode.getRackName(), used, - rmNode.getTotalCapability(), numContainers, - rmNode.getHealthReport(), rmNode.getLastHealthReportTime(), - rmNode.getNodeLabels()); - - updatedNodeReports.add(report); - } - allocateResponse.setUpdatedNodes(updatedNodeReports); - } - - allocateResponse.setAllocatedContainers(allocation.getContainers()); - allocateResponse.setCompletedContainersStatuses(appAttempt - .pullJustFinishedContainers()); - allocateResponse.setResponseId(lastResponse.getResponseId() + 1); - allocateResponse.setAvailableResources(allocation.getResourceLimit()); - - // Handling increased/decreased containers - List<UpdatedContainer> updatedContainers = new ArrayList<>(); - if (allocation.getIncreasedContainers() != null) { - for (Container c : allocation.getIncreasedContainers()) { - updatedContainers.add( - UpdatedContainer.newInstance( - ContainerUpdateType.INCREASE_RESOURCE, c)); - } - } - if (allocation.getDecreasedContainers() != null) { - for (Container c : allocation.getDecreasedContainers()) { - updatedContainers.add( - UpdatedContainer.newInstance( - ContainerUpdateType.DECREASE_RESOURCE, c)); - } - } - - allocateResponse.setUpdatedContainers(updatedContainers); - - allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes()); - - // add collector address for this application - if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) { - allocateResponse.setCollectorAddr( - this.rmContext.getRMApps().get(applicationId).getCollectorAddr()); - } - - // add preemption to the allocateResponse message (if any) - allocateResponse - .setPreemptionMessage(generatePreemptionMessage(allocation)); - - // Set application priority - allocateResponse.setApplicationPriority(app - .getApplicationPriority()); + allocateInternal(amrmTokenIdentifier.getApplicationAttemptId(), + request, response); // update AMRMToken if the token is rolled-up MasterKeyData nextMasterKey = @@ -624,21 +450,24 @@ public class ApplicationMasterService extends AbstractService implements if (nextMasterKey != null && nextMasterKey.getMasterKey().getKeyId() != amrmTokenIdentifier - .getKeyId()) { + .getKeyId()) { + RMApp app = + this.rmContext.getRMApps().get(appAttemptId.getApplicationId()); + RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId); RMAppAttemptImpl appAttemptImpl = (RMAppAttemptImpl)appAttempt; Token<AMRMTokenIdentifier> amrmToken = appAttempt.getAMRMToken(); if (nextMasterKey.getMasterKey().getKeyId() != appAttemptImpl.getAMRMTokenKeyId()) { LOG.info("The AMRMToken has been rolled-over. Send new AMRMToken back" - + " to application: " + applicationId); + + " to application: " + appAttemptId.getApplicationId()); amrmToken = rmContext.getAMRMTokenSecretManager() .createAndGetAMRMToken(appAttemptId); appAttemptImpl.setAMRMToken(amrmToken); } - allocateResponse.setAMRMToken(org.apache.hadoop.yarn.api.records.Token - .newInstance(amrmToken.getIdentifier(), amrmToken.getKind() - .toString(), amrmToken.getPassword(), amrmToken.getService() - .toString())); + response.setAMRMToken(org.apache.hadoop.yarn.api.records.Token + .newInstance(amrmToken.getIdentifier(), amrmToken.getKind() + .toString(), amrmToken.getPassword(), amrmToken.getService() + .toString())); } /* @@ -646,11 +475,225 @@ public class ApplicationMasterService extends AbstractService implements * need to worry about unregister call occurring in between (which * removes the lock object). */ - lock.setAllocateResponse(allocateResponse); - return allocateResponse; + response.setResponseId(lastResponse.getResponseId() + 1); + lock.setAllocateResponse(response); + return response; } } + protected void allocateInternal(ApplicationAttemptId appAttemptId, + AllocateRequest request, AllocateResponse allocateResponse) + throws YarnException { + + //filter illegal progress values + float filteredProgress = request.getProgress(); + if (Float.isNaN(filteredProgress) || filteredProgress == Float.NEGATIVE_INFINITY + || filteredProgress < 0) { + request.setProgress(0); + } else if (filteredProgress > 1 || filteredProgress == Float.POSITIVE_INFINITY) { + request.setProgress(1); + } + + // Send the status update to the appAttempt. + this.rmContext.getDispatcher().getEventHandler().handle( + new RMAppAttemptStatusupdateEvent(appAttemptId, request + .getProgress())); + + List<ResourceRequest> ask = request.getAskList(); + List<ContainerId> release = request.getReleaseList(); + + ResourceBlacklistRequest blacklistRequest = + request.getResourceBlacklistRequest(); + List<String> blacklistAdditions = + (blacklistRequest != null) ? + blacklistRequest.getBlacklistAdditions() : Collections.EMPTY_LIST; + List<String> blacklistRemovals = + (blacklistRequest != null) ? + blacklistRequest.getBlacklistRemovals() : Collections.EMPTY_LIST; + RMApp app = + this.rmContext.getRMApps().get(appAttemptId.getApplicationId()); + + // set label expression for Resource Requests if resourceName=ANY + ApplicationSubmissionContext asc = app.getApplicationSubmissionContext(); + for (ResourceRequest req : ask) { + if (null == req.getNodeLabelExpression() + && ResourceRequest.ANY.equals(req.getResourceName())) { + req.setNodeLabelExpression(asc.getNodeLabelExpression()); + } + } + + Resource maximumCapacity = rScheduler.getMaximumResourceCapability(); + + // sanity check + try { + RMServerUtils.normalizeAndValidateRequests(ask, + maximumCapacity, app.getQueue(), + rScheduler, rmContext); + } catch (InvalidResourceRequestException e) { + LOG.warn("Invalid resource ask by application " + appAttemptId, e); + throw e; + } + + try { + RMServerUtils.validateBlacklistRequest(blacklistRequest); + } catch (InvalidResourceBlacklistRequestException e) { + LOG.warn("Invalid blacklist request by application " + appAttemptId, e); + throw e; + } + + // In the case of work-preserving AM restart, it's possible for the + // AM to release containers from the earlier attempt. + if (!app.getApplicationSubmissionContext() + .getKeepContainersAcrossApplicationAttempts()) { + try { + RMServerUtils.validateContainerReleaseRequest(release, appAttemptId); + } catch (InvalidContainerReleaseException e) { + LOG.warn("Invalid container release by application " + appAttemptId, + e); + throw e; + } + } + + // Split Update Resource Requests into increase and decrease. + // No Exceptions are thrown here. All update errors are aggregated + // and returned to the AM. + List<UpdateContainerRequest> increaseResourceReqs = new ArrayList<>(); + List<UpdateContainerRequest> decreaseResourceReqs = new ArrayList<>(); + List<UpdateContainerError> updateContainerErrors = + RMServerUtils.validateAndSplitUpdateResourceRequests( + rmContext, request, maximumCapacity, + increaseResourceReqs, decreaseResourceReqs); + + // Send new requests to appAttempt. + Allocation allocation; + RMAppAttemptState state = + app.getRMAppAttempt(appAttemptId).getAppAttemptState(); + if (state.equals(RMAppAttemptState.FINAL_SAVING) || + state.equals(RMAppAttemptState.FINISHING) || + app.isAppFinalStateStored()) { + LOG.warn(appAttemptId + " is in " + state + + " state, ignore container allocate request."); + allocation = EMPTY_ALLOCATION; + } else { + allocation = + this.rScheduler.allocate(appAttemptId, ask, release, + blacklistAdditions, blacklistRemovals, + increaseResourceReqs, decreaseResourceReqs); + } + + if (!blacklistAdditions.isEmpty() || !blacklistRemovals.isEmpty()) { + LOG.info("blacklist are updated in Scheduler." + + "blacklistAdditions: " + blacklistAdditions + ", " + + "blacklistRemovals: " + blacklistRemovals); + } + RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId); + + if (allocation.getNMTokens() != null && + !allocation.getNMTokens().isEmpty()) { + allocateResponse.setNMTokens(allocation.getNMTokens()); + } + + // Notify the AM of container update errors + addToUpdateContainerErrors(allocateResponse, updateContainerErrors); + + // update the response with the deltas of node status changes + List<RMNode> updatedNodes = new ArrayList<RMNode>(); + if(app.pullRMNodeUpdates(updatedNodes) > 0) { + List<NodeReport> updatedNodeReports = new ArrayList<NodeReport>(); + for(RMNode rmNode: updatedNodes) { + SchedulerNodeReport schedulerNodeReport = + rScheduler.getNodeReport(rmNode.getNodeID()); + Resource used = BuilderUtils.newResource(0, 0); + int numContainers = 0; + if (schedulerNodeReport != null) { + used = schedulerNodeReport.getUsedResource(); + numContainers = schedulerNodeReport.getNumContainers(); + } + NodeId nodeId = rmNode.getNodeID(); + NodeReport report = + BuilderUtils.newNodeReport(nodeId, rmNode.getState(), + rmNode.getHttpAddress(), rmNode.getRackName(), used, + rmNode.getTotalCapability(), numContainers, + rmNode.getHealthReport(), rmNode.getLastHealthReportTime(), + rmNode.getNodeLabels()); + + updatedNodeReports.add(report); + } + allocateResponse.setUpdatedNodes(updatedNodeReports); + } + + addToAllocatedContainers(allocateResponse, allocation.getContainers()); + + allocateResponse.setCompletedContainersStatuses(appAttempt + .pullJustFinishedContainers()); + allocateResponse.setAvailableResources(allocation.getResourceLimit()); + + // Handling increased containers + addToUpdatedContainers( + allocateResponse, ContainerUpdateType.INCREASE_RESOURCE, + allocation.getIncreasedContainers()); + + // Handling decreased containers + addToUpdatedContainers( + allocateResponse, ContainerUpdateType.DECREASE_RESOURCE, + allocation.getDecreasedContainers()); + + allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes()); + + // add collector address for this application + if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) { + allocateResponse.setCollectorAddr( + this.rmContext.getRMApps().get(appAttemptId.getApplicationId()) + .getCollectorAddr()); + } + + // add preemption to the allocateResponse message (if any) + allocateResponse + .setPreemptionMessage(generatePreemptionMessage(allocation)); + + // Set application priority + allocateResponse.setApplicationPriority(app + .getApplicationPriority()); + } + + protected void addToUpdateContainerErrors(AllocateResponse allocateResponse, + List<UpdateContainerError> updateContainerErrors) { + if (!updateContainerErrors.isEmpty()) { + if (allocateResponse.getUpdateErrors() != null + && !allocateResponse.getUpdateErrors().isEmpty()) { + updateContainerErrors = new ArrayList<>(updateContainerErrors); + updateContainerErrors.addAll(allocateResponse.getUpdateErrors()); + } + allocateResponse.setUpdateErrors(updateContainerErrors); + } + } + + protected void addToUpdatedContainers(AllocateResponse allocateResponse, + ContainerUpdateType updateType, List<Container> updatedContainers) { + if (updatedContainers != null && updatedContainers.size() > 0) { + ArrayList<UpdatedContainer> containersToSet = new ArrayList<>(); + if (allocateResponse.getUpdatedContainers() != null && + !allocateResponse.getUpdatedContainers().isEmpty()) { + containersToSet.addAll(allocateResponse.getUpdatedContainers()); + } + for (Container updatedContainer : updatedContainers) { + containersToSet.add( + UpdatedContainer.newInstance(updateType, updatedContainer)); + } + allocateResponse.setUpdatedContainers(containersToSet); + } + } + + protected void addToAllocatedContainers(AllocateResponse allocateResponse, + List<Container> allocatedContainers) { + if (allocateResponse.getAllocatedContainers() != null + && !allocateResponse.getAllocatedContainers().isEmpty()) { + allocatedContainers = new ArrayList<>(allocatedContainers); + allocatedContainers.addAll(allocateResponse.getAllocatedContainers()); + } + allocateResponse.setAllocatedContainers(allocatedContainers); + } + private PreemptionMessage generatePreemptionMessage(Allocation allocation){ PreemptionMessage pMsg = null; // assemble strict preemption request http://git-wip-us.apache.org/repos/asf/hadoop/blob/8796e0d5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java index a527d04..4c36a2f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java @@ -218,8 +218,9 @@ public class OpportunisticContainerAllocatorAMService } @Override - public AllocateResponse allocate(AllocateRequest request) throws - YarnException, IOException { + protected void allocateInternal(ApplicationAttemptId appAttemptId, + AllocateRequest request, AllocateResponse allocateResponse) + throws YarnException { // Partition requests to GUARANTEED and OPPORTUNISTIC. OpportunisticContainerAllocator.PartitionedResourceRequests @@ -227,40 +228,30 @@ public class OpportunisticContainerAllocatorAMService oppContainerAllocator.partitionAskList(request.getAskList()); // Allocate OPPORTUNISTIC containers. - request.setAskList(partitionedAsks.getOpportunistic()); - final ApplicationAttemptId appAttemptId = getAppAttemptId(); - SchedulerApplicationAttempt appAttempt = ((AbstractYarnScheduler) - rmContext.getScheduler()).getApplicationAttempt(appAttemptId); + SchedulerApplicationAttempt appAttempt = + ((AbstractYarnScheduler)rmContext.getScheduler()) + .getApplicationAttempt(appAttemptId); OpportunisticContainerContext oppCtx = appAttempt.getOpportunisticContainerContext(); oppCtx.updateNodeList(getLeastLoadedNodes()); List<Container> oppContainers = - oppContainerAllocator.allocateContainers(request, appAttemptId, oppCtx, - ResourceManager.getClusterTimeStamp(), appAttempt.getUser()); + oppContainerAllocator.allocateContainers( + request.getResourceBlacklistRequest(), + partitionedAsks.getOpportunistic(), appAttemptId, oppCtx, + ResourceManager.getClusterTimeStamp(), appAttempt.getUser()); // Create RMContainers and update the NMTokens. if (!oppContainers.isEmpty()) { handleNewContainers(oppContainers, false); appAttempt.updateNMTokens(oppContainers); + addToAllocatedContainers(allocateResponse, oppContainers); } // Allocate GUARANTEED containers. request.setAskList(partitionedAsks.getGuaranteed()); - AllocateResponse allocateResp = super.allocate(request); - - // Add allocated OPPORTUNISTIC containers to the AllocateResponse. - if (!oppContainers.isEmpty()) { - allocateResp.getAllocatedContainers().addAll(oppContainers); - } - - // Update opportunistic container context with the allocated GUARANTEED - // containers. - oppCtx.updateCompletedContainers(allocateResp); - - // Add all opportunistic containers - return allocateResp; + super.allocateInternal(appAttemptId, request, allocateResponse); } @Override @@ -304,7 +295,7 @@ public class OpportunisticContainerAllocatorAMService } private void handleNewContainers(List<Container> allocContainers, - boolean isRemotelyAllocated) { + boolean isRemotelyAllocated) { for (Container container : allocContainers) { // Create RMContainer SchedulerApplicationAttempt appAttempt = http://git-wip-us.apache.org/repos/asf/hadoop/blob/8796e0d5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java index a244ad8..020764b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java @@ -25,14 +25,13 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.ContainerState; -import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/8796e0d5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index 0cfa1d3..dbc6169 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -53,7 +53,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAt import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode .RMNodeDecreaseContainerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.state.InvalidStateTransitionException; import org.apache.hadoop.yarn.state.MultipleArcTransition; import org.apache.hadoop.yarn.state.SingleArcTransition; http://git-wip-us.apache.org/repos/asf/hadoop/blob/8796e0d5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerReservedEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerReservedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerReservedEvent.java index d7d1e94..80e7c0b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerReservedEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerReservedEvent.java @@ -21,7 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; /** * The event signifying that a container has been reserved. http://git-wip-us.apache.org/repos/asf/hadoop/blob/8796e0d5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index feb20ee..30f7ef9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -54,6 +54,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerStat import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.ResourceRequestUpdateResult; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet; + +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.util.resource.Resources; /** @@ -965,6 +967,7 @@ public class AppSchedulingInfo { public ResourceRequest cloneResourceRequest(ResourceRequest request) { ResourceRequest newRequest = ResourceRequest.newBuilder() .priority(request.getPriority()) + .allocationRequestId(request.getAllocationRequestId()) .resourceName(request.getResourceName()) .capability(request.getCapability()) .numContainers(1) http://git-wip-us.apache.org/repos/asf/hadoop/blob/8796e0d5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index b3ef471..e94d800 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -51,7 +51,6 @@ import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.LogAggregationContext; import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -73,12 +72,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerStat import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerUpdatesAcquiredEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity; import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.state.InvalidStateTransitionException; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; http://git-wip-us.apache.org/repos/asf/hadoop/blob/8796e0d5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index 6744c2e..759db05 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.collect.ImmutableSet; http://git-wip-us.apache.org/repos/asf/hadoop/blob/8796e0d5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerRequestKey.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerRequestKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerRequestKey.java deleted file mode 100644 index 4b640ae..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerRequestKey.java +++ /dev/null @@ -1,122 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.scheduler; - -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.ResourceRequest; - -/** - * Composite key for outstanding scheduler requests for any schedulable entity. - * Currently it includes {@link Priority}. - */ -public final class SchedulerRequestKey implements - Comparable<SchedulerRequestKey> { - - private final Priority priority; - private final long allocationRequestId; - - /** - * Factory method to generate a SchedulerRequestKey from a ResourceRequest. - * @param req ResourceRequest - * @return SchedulerRequestKey - */ - public static SchedulerRequestKey create(ResourceRequest req) { - return new SchedulerRequestKey(req.getPriority(), - req.getAllocationRequestId()); - } - - /** - * Convenience method to extract the SchedulerRequestKey used to schedule the - * Container. - * @param container Container - * @return SchedulerRequestKey - */ - public static SchedulerRequestKey extractFrom(Container container) { - return new SchedulerRequestKey(container.getPriority(), - container.getAllocationRequestId()); - } - - private SchedulerRequestKey(Priority priority, long allocationRequestId) { - this.priority = priority; - this.allocationRequestId = allocationRequestId; - } - - /** - * Get the {@link Priority} of the request. - * - * @return the {@link Priority} of the request - */ - public Priority getPriority() { - return priority; - } - - /** - * Get the Id of the associated {@link ResourceRequest}. - * - * @return the Id of the associated {@link ResourceRequest} - */ - public long getAllocationRequestId() { - return allocationRequestId; - } - - @Override - public int compareTo(SchedulerRequestKey o) { - if (o == null) { - return (priority != null) ? -1 : 0; - } else { - if (priority == null) { - return 1; - } - } - int priorityCompare = o.getPriority().compareTo(priority); - // we first sort by priority and then by allocationRequestId - if (priorityCompare != 0) { - return priorityCompare; - } - return Long.compare(allocationRequestId, o.getAllocationRequestId()); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (!(o instanceof SchedulerRequestKey)) { - return false; - } - - SchedulerRequestKey that = (SchedulerRequestKey) o; - - if (getAllocationRequestId() != that.getAllocationRequestId()) { - return false; - } - return getPriority() != null ? - getPriority().equals(that.getPriority()) : - that.getPriority() == null; - } - - @Override - public int hashCode() { - int result = getPriority() != null ? getPriority().hashCode() : 0; - result = 31 * result + (int) (getAllocationRequestId() ^ ( - getAllocationRequestId() >>> 32)); - return result; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/8796e0d5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java index 74a64c1..0dc527f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java @@ -31,7 +31,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; http://git-wip-us.apache.org/repos/asf/hadoop/blob/8796e0d5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java index 3e8282f..c12bc6a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java @@ -38,7 +38,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant; http://git-wip-us.apache.org/repos/asf/hadoop/blob/8796e0d5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/SchedulerContainer.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/SchedulerContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/SchedulerContainer.java index 8b4907b..159fb09 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/SchedulerContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/SchedulerContainer.java @@ -22,7 +22,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; /** * Contexts for a container inside scheduler http://git-wip-us.apache.org/repos/asf/hadoop/blob/8796e0d5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 498f34f..b14bc20 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -52,7 +52,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractCSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAMContainerLaunchDiagnosticsConstants; @@ -69,6 +68,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCo import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet; + +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; http://git-wip-us.apache.org/repos/asf/hadoop/blob/8796e0d5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java index d79fcaf..344daf2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java @@ -29,7 +29,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.util.resource.Resources; http://git-wip-us.apache.org/repos/asf/hadoop/blob/8796e0d5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 39f4a3d..a9591a5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -56,7 +56,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; http://git-wip-us.apache.org/repos/asf/hadoop/blob/8796e0d5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java index a27a222..d983ea0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java @@ -26,7 +26,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import java.util.Collection; http://git-wip-us.apache.org/repos/asf/hadoop/blob/8796e0d5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java index d275bfd..e60f70e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java @@ -33,10 +33,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; + +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; + import java.util.List; public class FifoAppAttempt extends FiCaSchedulerApp { http://git-wip-us.apache.org/repos/asf/hadoop/blob/8796e0d5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index b5122c0..81c1f30 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -77,7 +77,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerCha import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; @@ -709,7 +709,7 @@ public class FifoScheduler extends // Inform the application RMContainer rmContainer = application.allocate(type, node, schedulerKey, request, container); - + // Inform the node node.allocateContainer(rmContainer); http://git-wip-us.apache.org/repos/asf/hadoop/blob/8796e0d5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java index f87f764..86843b0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java @@ -21,7 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import java.util.Iterator; import java.util.List; http://git-wip-us.apache.org/repos/asf/hadoop/blob/8796e0d5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java index e70c3e0..3288d39 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java @@ -61,7 +61,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity http://git-wip-us.apache.org/repos/asf/hadoop/blob/8796e0d5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Task.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Task.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Task.java index 35218bd..31b372e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Task.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Task.java @@ -31,7 +31,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity .TestUtils; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
