http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java index 0652e96..2ccf827 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java @@ -106,6 +106,7 @@ import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdatedContainer; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -297,8 +298,7 @@ public class MockResourceManagerFacade implements new ArrayList<ContainerStatus>(), containerList, new ArrayList<NodeReport>(), null, AMCommand.AM_RESYNC, 1, null, new ArrayList<NMToken>(), - new ArrayList<Container>(), - new ArrayList<Container>()); + new ArrayList<UpdatedContainer>()); } @Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java index 1279896..55f936f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java @@ -122,9 +122,10 @@ public class NMMemoryStateStoreService extends NMStateStoreService { @Override public synchronized void storeContainer(ContainerId containerId, - StartContainerRequest startRequest) throws IOException { + int version, StartContainerRequest startRequest) throws IOException { RecoveredContainerState rcs = new RecoveredContainerState(); rcs.startRequest = startRequest; + rcs.version = version; containerStates.put(containerId, rcs); } @@ -147,9 +148,11 @@ public class NMMemoryStateStoreService extends NMStateStoreService { @Override public synchronized void storeContainerResourceChanged( - ContainerId containerId, Resource capability) throws IOException { + ContainerId containerId, int version, Resource capability) + throws IOException { RecoveredContainerState rcs = getRecoveredContainerState(containerId); rcs.capability = capability; + rcs.version = version; } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java index 5ac237c..6755ec0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java @@ -262,11 +262,12 @@ public class TestNMLeveldbStateStoreService { StartContainerRequest.newInstance(clc, containerToken); // store a container and verify recovered - stateStore.storeContainer(containerId, containerReq); + stateStore.storeContainer(containerId, 1, containerReq); restartStateStore(); recoveredContainers = stateStore.loadContainersState(); assertEquals(1, recoveredContainers.size()); RecoveredContainerState rcs = recoveredContainers.get(0); + assertEquals(1, rcs.getVersion()); assertEquals(RecoveredContainerStatus.REQUESTED, rcs.getStatus()); assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode()); assertEquals(false, rcs.getKilled()); @@ -296,11 +297,13 @@ public class TestNMLeveldbStateStoreService { assertEquals(diags.toString(), rcs.getDiagnostics()); // increase the container size, and verify recovered - stateStore.storeContainerResourceChanged(containerId, Resource.newInstance(2468, 4)); + stateStore.storeContainerResourceChanged(containerId, 2, + Resource.newInstance(2468, 4)); restartStateStore(); recoveredContainers = stateStore.loadContainersState(); assertEquals(1, recoveredContainers.size()); rcs = recoveredContainers.get(0); + assertEquals(2, rcs.getVersion()); assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus()); assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode()); assertEquals(false, rcs.getKilled()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java index 394a92c..700d2e7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java @@ -66,7 +66,7 @@ public class MockContainer implements Container { long currentTime = System.currentTimeMillis(); this.containerTokenIdentifier = BuilderUtils.newContainerTokenIdentifier(BuilderUtils - .newContainerToken(id, "127.0.0.1", 1234, user, + .newContainerToken(id, 0, "127.0.0.1", 1234, user, BuilderUtils.newResource(1024, 1), currentTime + 10000, 123, "password".getBytes(), currentTime)); this.state = ContainerState.NEW; http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java index 40e984f..d17b908 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java @@ -212,9 +212,9 @@ public class TestNMWebServer { recordFactory.newRecordInstance(ContainerLaunchContext.class); long currentTime = System.currentTimeMillis(); Token containerToken = - BuilderUtils.newContainerToken(containerId, "127.0.0.1", 1234, user, - BuilderUtils.newResource(1024, 1), currentTime + 10000L, 123, - "password".getBytes(), currentTime); + BuilderUtils.newContainerToken(containerId, 0, "127.0.0.1", 1234, + user, BuilderUtils.newResource(1024, 1), currentTime + 10000L, + 123, "password".getBytes(), currentTime); Context context = mock(Context.class); Container container = new ContainerImpl(conf, dispatcher, launchContext, http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index ab94175..1ff69ba 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerUpdateType; import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeReport; @@ -64,6 +65,9 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.StrictPreemptionContract; +import org.apache.hadoop.yarn.api.records.UpdateContainerError; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; +import org.apache.hadoop.yarn.api.records.UpdatedContainer; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException; import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; @@ -85,6 +89,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptS import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent; + import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; @@ -471,15 +476,6 @@ public class ApplicationMasterService extends AbstractService implements throw e; } - try { - RMServerUtils.increaseDecreaseRequestSanityCheck(rmContext, - request.getIncreaseRequests(), request.getDecreaseRequests(), - maximumCapacity); - } catch (InvalidResourceRequestException e) { - LOG.warn(e); - throw e; - } - // In the case of work-preserving AM restart, it's possible for the // AM to release containers from the earlier attempt. if (!app.getApplicationSubmissionContext() @@ -487,11 +483,22 @@ public class ApplicationMasterService extends AbstractService implements try { RMServerUtils.validateContainerReleaseRequest(release, appAttemptId); } catch (InvalidContainerReleaseException e) { - LOG.warn("Invalid container release by application " + appAttemptId, e); + LOG.warn("Invalid container release by application " + appAttemptId, + e); throw e; } } + // Split Update Resource Requests into increase and decrease. + // No Exceptions are thrown here. All update errors are aggregated + // and returned to the AM. + List<UpdateContainerRequest> increaseResourceReqs = new ArrayList<>(); + List<UpdateContainerRequest> decreaseResourceReqs = new ArrayList<>(); + List<UpdateContainerError> updateContainerErrors = + RMServerUtils.validateAndSplitUpdateResourceRequests(rmContext, + request, maximumCapacity, increaseResourceReqs, + decreaseResourceReqs); + // Send new requests to appAttempt. Allocation allocation; RMAppAttemptState state = @@ -506,7 +513,7 @@ public class ApplicationMasterService extends AbstractService implements allocation = this.rScheduler.allocate(appAttemptId, ask, release, blacklistAdditions, blacklistRemovals, - request.getIncreaseRequests(), request.getDecreaseRequests()); + increaseResourceReqs, decreaseResourceReqs); } if (!blacklistAdditions.isEmpty() || !blacklistRemovals.isEmpty()) { @@ -521,6 +528,10 @@ public class ApplicationMasterService extends AbstractService implements allocateResponse.setNMTokens(allocation.getNMTokens()); } + // Notify the AM of container update errors + if (!updateContainerErrors.isEmpty()) { + allocateResponse.setUpdateErrors(updateContainerErrors); + } // update the response with the deltas of node status changes List<RMNode> updatedNodes = new ArrayList<RMNode>(); if(app.pullRMNodeUpdates(updatedNodes) > 0) { @@ -554,8 +565,23 @@ public class ApplicationMasterService extends AbstractService implements allocateResponse.setAvailableResources(allocation.getResourceLimit()); // Handling increased/decreased containers - allocateResponse.setIncreasedContainers(allocation.getIncreasedContainers()); - allocateResponse.setDecreasedContainers(allocation.getDecreasedContainers()); + List<UpdatedContainer> updatedContainers = new ArrayList<>(); + if (allocation.getIncreasedContainers() != null) { + for (Container c : allocation.getIncreasedContainers()) { + updatedContainers.add( + UpdatedContainer.newInstance( + ContainerUpdateType.INCREASE_RESOURCE, c)); + } + } + if (allocation.getDecreasedContainers() != null) { + for (Container c : allocation.getDecreasedContainers()) { + updatedContainers.add( + UpdatedContainer.newInstance( + ContainerUpdateType.DECREASE_RESOURCE, c)); + } + } + + allocateResponse.setUpdatedContainers(updatedContainers); allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes()); @@ -599,7 +625,7 @@ public class ApplicationMasterService extends AbstractService implements return allocateResponse; } } - + private PreemptionMessage generatePreemptionMessage(Allocation allocation){ PreemptionMessage pMsg = null; // assemble strict preemption request http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java index 9b9b02e..bdf99c9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -33,28 +33,35 @@ import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.authorize.ProxyUsers; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdateContainerError; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException; -import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestException; +import org.apache.hadoop.yarn.exceptions + .InvalidResourceBlacklistRequestException; import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.security.YarnAuthorizationProvider; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt + .RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler + .ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler .SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; @@ -68,6 +75,18 @@ import org.apache.hadoop.yarn.util.resource.Resources; */ public class RMServerUtils { + private static final String UPDATE_OUTSTANDING_ERROR = + "UPDATE_OUTSTANDING_ERROR"; + private static final String INCORRECT_CONTAINER_VERSION_ERROR = + "INCORRECT_CONTAINER_VERSION_ERROR"; + private static final String INVALID_CONTAINER_ID = + "INVALID_CONTAINER_ID"; + private static final String RESOURCE_OUTSIDE_ALLOWED_RANGE = + "RESOURCE_OUTSIDE_ALLOWED_RANGE"; + + protected static final RecordFactory RECORD_FACTORY = + RecordFactoryProvider.getRecordFactory(null); + public static List<RMNode> queryRMNodes(RMContext context, EnumSet<NodeState> acceptedStates) { // nodes contains nodes that are NEW, RUNNING OR UNHEALTHY @@ -97,6 +116,78 @@ public class RMServerUtils { } /** + * Check if we have: + * - Request for same containerId and different target resource + * - If targetResources violates maximum/minimumAllocation + * @param rmContext RM context + * @param request Allocate Request + * @param maximumAllocation Maximum Allocation + * @param increaseResourceReqs Increase Resource Request + * @param decreaseResourceReqs Decrease Resource Request + * @return List of container Errors + */ + public static List<UpdateContainerError> + validateAndSplitUpdateResourceRequests(RMContext rmContext, + AllocateRequest request, Resource maximumAllocation, + List<UpdateContainerRequest> increaseResourceReqs, + List<UpdateContainerRequest> decreaseResourceReqs) { + List<UpdateContainerError> errors = new ArrayList<>(); + Set<ContainerId> outstandingUpdate = new HashSet<>(); + for (UpdateContainerRequest updateReq : request.getUpdateRequests()) { + RMContainer rmContainer = rmContext.getScheduler().getRMContainer( + updateReq.getContainerId()); + String msg = null; + if (rmContainer == null) { + msg = INVALID_CONTAINER_ID; + } + // Only allow updates if the requested version matches the current + // version + if (msg == null && updateReq.getContainerVersion() != + rmContainer.getContainer().getVersion()) { + msg = INCORRECT_CONTAINER_VERSION_ERROR + "|" + + updateReq.getContainerVersion() + "|" + + rmContainer.getContainer().getVersion(); + } + // No more than 1 container update per request. + if (msg == null && + outstandingUpdate.contains(updateReq.getContainerId())) { + msg = UPDATE_OUTSTANDING_ERROR; + } + if (msg == null) { + Resource original = rmContainer.getContainer().getResource(); + Resource target = updateReq.getCapability(); + if (Resources.fitsIn(target, original)) { + // This is a decrease request + if (validateIncreaseDecreaseRequest(rmContext, updateReq, + maximumAllocation, false)) { + decreaseResourceReqs.add(updateReq); + outstandingUpdate.add(updateReq.getContainerId()); + } else { + msg = RESOURCE_OUTSIDE_ALLOWED_RANGE; + } + } else { + // This is an increase request + if (validateIncreaseDecreaseRequest(rmContext, updateReq, + maximumAllocation, true)) { + increaseResourceReqs.add(updateReq); + outstandingUpdate.add(updateReq.getContainerId()); + } else { + msg = RESOURCE_OUTSIDE_ALLOWED_RANGE; + } + } + } + if (msg != null) { + UpdateContainerError updateError = RECORD_FACTORY + .newRecordInstance(UpdateContainerError.class); + updateError.setReason(msg); + updateError.setUpdateContainerRequest(updateReq); + errors.add(updateError); + } + } + return errors; + } + + /** * Utility method to validate a list resource requests, by insuring that the * requested memory/vcore is non-negative and not greater than max */ @@ -122,8 +213,6 @@ public class RMServerUtils { * the queue lock to make sure that the access to container resource is * atomic. Refer to LeafQueue.decreaseContainer() and * CapacityScheduelr.updateIncreaseRequests() - * - * * <pre> * - Throw exception when any other error happens * </pre> @@ -145,7 +234,7 @@ public class RMServerUtils { if (increase) { if (originalResource.getMemorySize() > targetResource.getMemorySize() || originalResource.getVirtualCores() > targetResource - .getVirtualCores()) { + .getVirtualCores()) { String msg = "Trying to increase a container, but target resource has some" + " resource < original resource, target=" + targetResource @@ -156,7 +245,7 @@ public class RMServerUtils { } else { if (originalResource.getMemorySize() < targetResource.getMemorySize() || originalResource.getVirtualCores() < targetResource - .getVirtualCores()) { + .getVirtualCores()) { String msg = "Trying to decrease a container, but target resource has " + "some resource > original resource, target=" + targetResource @@ -194,112 +283,46 @@ public class RMServerUtils { } } } - - /** - * Check if we have: - * - Request for same containerId and different target resource - * - If targetResources violates maximum/minimumAllocation - */ - public static void increaseDecreaseRequestSanityCheck(RMContext rmContext, - List<ContainerResourceChangeRequest> incRequests, - List<ContainerResourceChangeRequest> decRequests, - Resource maximumAllocation) throws InvalidResourceRequestException { - checkDuplicatedIncreaseDecreaseRequest(incRequests, decRequests); - validateIncreaseDecreaseRequest(rmContext, incRequests, maximumAllocation, - true); - validateIncreaseDecreaseRequest(rmContext, decRequests, maximumAllocation, - false); - } - - private static void checkDuplicatedIncreaseDecreaseRequest( - List<ContainerResourceChangeRequest> incRequests, - List<ContainerResourceChangeRequest> decRequests) - throws InvalidResourceRequestException { - String msg = "There're multiple increase or decrease container requests " - + "for same containerId="; - Set<ContainerId> existedContainerIds = new HashSet<ContainerId>(); - if (incRequests != null) { - for (ContainerResourceChangeRequest r : incRequests) { - if (!existedContainerIds.add(r.getContainerId())) { - throw new InvalidResourceRequestException(msg + r.getContainerId()); - } - } - } - - if (decRequests != null) { - for (ContainerResourceChangeRequest r : decRequests) { - if (!existedContainerIds.add(r.getContainerId())) { - throw new InvalidResourceRequestException(msg + r.getContainerId()); - } - } - } - } // Sanity check and normalize target resource - private static void validateIncreaseDecreaseRequest(RMContext rmContext, - List<ContainerResourceChangeRequest> requests, Resource maximumAllocation, - boolean increase) - throws InvalidResourceRequestException { - if (requests == null) { - return; + private static boolean validateIncreaseDecreaseRequest(RMContext rmContext, + UpdateContainerRequest request, Resource maximumAllocation, + boolean increase) { + if (request.getCapability().getMemorySize() < 0 + || request.getCapability().getMemorySize() > maximumAllocation + .getMemorySize()) { + return false; } - for (ContainerResourceChangeRequest request : requests) { - if (request.getCapability().getMemorySize() < 0 - || request.getCapability().getMemorySize() > maximumAllocation - .getMemorySize()) { - throw new InvalidResourceRequestException("Invalid " - + (increase ? "increase" : "decrease") + " request" - + ", requested memory < 0" - + ", or requested memory > max configured" + ", requestedMemory=" - + request.getCapability().getMemorySize() + ", maxMemory=" - + maximumAllocation.getMemorySize()); - } - if (request.getCapability().getVirtualCores() < 0 - || request.getCapability().getVirtualCores() > maximumAllocation - .getVirtualCores()) { - throw new InvalidResourceRequestException("Invalid " - + (increase ? "increase" : "decrease") + " request" - + ", requested virtual cores < 0" - + ", or requested virtual cores > max configured" - + ", requestedVirtualCores=" - + request.getCapability().getVirtualCores() + ", maxVirtualCores=" - + maximumAllocation.getVirtualCores()); - } - ContainerId containerId = request.getContainerId(); - ResourceScheduler scheduler = rmContext.getScheduler(); - RMContainer rmContainer = scheduler.getRMContainer(containerId); - if (null == rmContainer) { - String msg = - "Failed to get rmContainer for " - + (increase ? "increase" : "decrease") - + " request, with container-id=" + containerId; - throw new InvalidResourceRequestException(msg); - } - ResourceCalculator rc = scheduler.getResourceCalculator(); - Resource targetResource = Resources.normalize(rc, request.getCapability(), - scheduler.getMinimumResourceCapability(), - scheduler.getMaximumResourceCapability(), - scheduler.getMinimumResourceCapability()); - // Update normalized target resource - request.setCapability(targetResource); + if (request.getCapability().getVirtualCores() < 0 + || request.getCapability().getVirtualCores() > maximumAllocation + .getVirtualCores()) { + return false; } + ResourceScheduler scheduler = rmContext.getScheduler(); + ResourceCalculator rc = scheduler.getResourceCalculator(); + Resource targetResource = Resources.normalize(rc, request.getCapability(), + scheduler.getMinimumResourceCapability(), + scheduler.getMaximumResourceCapability(), + scheduler.getMinimumResourceCapability()); + // Update normalized target resource + request.setCapability(targetResource); + return true; } /** * It will validate to make sure all the containers belong to correct * application attempt id. If not then it will throw * {@link InvalidContainerReleaseException} - * - * @param containerReleaseList - * containers to be released as requested by application master. - * @param appAttemptId - * Application attempt Id + * + * @param containerReleaseList containers to be released as requested by + * application master. + * @param appAttemptId Application attempt Id * @throws InvalidContainerReleaseException */ public static void validateContainerReleaseRequest(List<ContainerId> containerReleaseList, - ApplicationAttemptId appAttemptId) - throws InvalidContainerReleaseException { + ApplicationAttemptId appAttemptId) + throws InvalidContainerReleaseException { for (ContainerId cId : containerReleaseList) { if (!appAttemptId.equals(cId.getApplicationAttemptId())) { throw new InvalidContainerReleaseException( @@ -321,10 +344,11 @@ public class RMServerUtils { /** * Utility method to verify if the current user has access based on the * passed {@link AccessControlList} + * * @param authorizer the {@link AccessControlList} to check against - * @param method the method name to be logged - * @param module like AdminService or NodeLabelManager - * @param LOG the logger to use + * @param method the method name to be logged + * @param module like AdminService or NodeLabelManager + * @param LOG the logger to use * @return {@link UserGroupInformation} of the current user * @throws IOException */ @@ -347,11 +371,11 @@ public class RMServerUtils { " to call '" + method + "'"); RMAuditLogger.logFailure(user.getShortUserName(), method, "", module, - RMAuditLogger.AuditConstants.UNAUTHORIZED_USER); + RMAuditLogger.AuditConstants.UNAUTHORIZED_USER); throw new AccessControlException("User " + user.getShortUserName() + - " doesn't have permission" + - " to call '" + method + "'"); + " doesn't have permission" + + " to call '" + method + "'"); } if (LOG.isTraceEnabled()) { LOG.trace(method + " invoked by user " + user.getShortUserName()); @@ -362,56 +386,56 @@ public class RMServerUtils { public static YarnApplicationState createApplicationState( RMAppState rmAppState) { switch (rmAppState) { - case NEW: - return YarnApplicationState.NEW; - case NEW_SAVING: - return YarnApplicationState.NEW_SAVING; - case SUBMITTED: - return YarnApplicationState.SUBMITTED; - case ACCEPTED: - return YarnApplicationState.ACCEPTED; - case RUNNING: - return YarnApplicationState.RUNNING; - case FINISHING: - case FINISHED: - return YarnApplicationState.FINISHED; - case KILLED: - return YarnApplicationState.KILLED; - case FAILED: - return YarnApplicationState.FAILED; - default: - throw new YarnRuntimeException("Unknown state passed!"); - } + case NEW: + return YarnApplicationState.NEW; + case NEW_SAVING: + return YarnApplicationState.NEW_SAVING; + case SUBMITTED: + return YarnApplicationState.SUBMITTED; + case ACCEPTED: + return YarnApplicationState.ACCEPTED; + case RUNNING: + return YarnApplicationState.RUNNING; + case FINISHING: + case FINISHED: + return YarnApplicationState.FINISHED; + case KILLED: + return YarnApplicationState.KILLED; + case FAILED: + return YarnApplicationState.FAILED; + default: + throw new YarnRuntimeException("Unknown state passed!"); + } } public static YarnApplicationAttemptState createApplicationAttemptState( RMAppAttemptState rmAppAttemptState) { switch (rmAppAttemptState) { - case NEW: - return YarnApplicationAttemptState.NEW; - case SUBMITTED: - return YarnApplicationAttemptState.SUBMITTED; - case SCHEDULED: - return YarnApplicationAttemptState.SCHEDULED; - case ALLOCATED: - return YarnApplicationAttemptState.ALLOCATED; - case LAUNCHED: - return YarnApplicationAttemptState.LAUNCHED; - case ALLOCATED_SAVING: - case LAUNCHED_UNMANAGED_SAVING: - return YarnApplicationAttemptState.ALLOCATED_SAVING; - case RUNNING: - return YarnApplicationAttemptState.RUNNING; - case FINISHING: - return YarnApplicationAttemptState.FINISHING; - case FINISHED: - return YarnApplicationAttemptState.FINISHED; - case KILLED: - return YarnApplicationAttemptState.KILLED; - case FAILED: - return YarnApplicationAttemptState.FAILED; - default: - throw new YarnRuntimeException("Unknown state passed!"); + case NEW: + return YarnApplicationAttemptState.NEW; + case SUBMITTED: + return YarnApplicationAttemptState.SUBMITTED; + case SCHEDULED: + return YarnApplicationAttemptState.SCHEDULED; + case ALLOCATED: + return YarnApplicationAttemptState.ALLOCATED; + case LAUNCHED: + return YarnApplicationAttemptState.LAUNCHED; + case ALLOCATED_SAVING: + case LAUNCHED_UNMANAGED_SAVING: + return YarnApplicationAttemptState.ALLOCATED_SAVING; + case RUNNING: + return YarnApplicationAttemptState.RUNNING; + case FINISHING: + return YarnApplicationAttemptState.FINISHING; + case FINISHED: + return YarnApplicationAttemptState.FINISHED; + case KILLED: + return YarnApplicationAttemptState.KILLED; + case FAILED: + return YarnApplicationAttemptState.FAILED; + default: + throw new YarnRuntimeException("Unknown state passed!"); } } @@ -420,13 +444,12 @@ public class RMServerUtils { * a return value when a valid report cannot be found. */ public static final ApplicationResourceUsageReport - DUMMY_APPLICATION_RESOURCE_USAGE_REPORT = + DUMMY_APPLICATION_RESOURCE_USAGE_REPORT = BuilderUtils.newApplicationResourceUsageReport(-1, -1, Resources.createResource(-1, -1), Resources.createResource(-1, -1), Resources.createResource(-1, -1), 0, 0); - /** * Find all configs whose name starts with * YarnConfiguration.RM_PROXY_USER_PREFIX, and add a record for each one by @@ -438,7 +461,8 @@ public class RMServerUtils { String propName = entry.getKey(); if (propName.startsWith(YarnConfiguration.RM_PROXY_USER_PREFIX)) { rmProxyUsers.put(ProxyUsers.CONF_HADOOP_PROXYUSER + "." + - propName.substring(YarnConfiguration.RM_PROXY_USER_PREFIX.length()), + propName.substring(YarnConfiguration.RM_PROXY_USER_PREFIX + .length()), entry.getValue()); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 43c2e66..864c3a8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -44,7 +44,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; @@ -52,6 +51,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -468,6 +468,7 @@ public abstract class AbstractYarnScheduler Container.newInstance(status.getContainerId(), node.getNodeID(), node.getHttpAddress(), status.getAllocatedResource(), status.getPriority(), null); + container.setVersion(status.getVersion()); ApplicationAttemptId attemptId = container.getId().getApplicationAttemptId(); RMContainer rmContainer = @@ -590,7 +591,7 @@ public abstract class AbstractYarnScheduler } protected void decreaseContainers( - List<ContainerResourceChangeRequest> decreaseRequests, + List<UpdateContainerRequest> decreaseRequests, SchedulerApplicationAttempt attempt) { if (null == decreaseRequests || decreaseRequests.isEmpty()) { return; @@ -841,7 +842,7 @@ public abstract class AbstractYarnScheduler /** * Sanity check increase/decrease request, and return * SchedulerContainerResourceChangeRequest according to given - * ContainerResourceChangeRequest. + * UpdateContainerRequest. * * <pre> * - Returns non-null value means validation succeeded @@ -849,7 +850,7 @@ public abstract class AbstractYarnScheduler * </pre> */ private SchedContainerChangeRequest createSchedContainerChangeRequest( - ContainerResourceChangeRequest request, boolean increase) + UpdateContainerRequest request, boolean increase) throws YarnException { ContainerId containerId = request.getContainerId(); RMContainer rmContainer = getRMContainer(containerId); @@ -868,11 +869,11 @@ public abstract class AbstractYarnScheduler protected List<SchedContainerChangeRequest> createSchedContainerChangeRequests( - List<ContainerResourceChangeRequest> changeRequests, + List<UpdateContainerRequest> changeRequests, boolean increase) { List<SchedContainerChangeRequest> schedulerChangeRequests = new ArrayList<SchedContainerChangeRequest>(); - for (ContainerResourceChangeRequest r : changeRequests) { + for (UpdateContainerRequest r : changeRequests) { SchedContainerChangeRequest sr = null; try { sr = createSchedContainerChangeRequest(r, increase); http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedContainerChangeRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedContainerChangeRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedContainerChangeRequest.java index e4ab3a2..94b006c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedContainerChangeRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedContainerChangeRequest.java @@ -27,7 +27,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.util.resource.Resources; /** - * This is ContainerResourceChangeRequest in scheduler side, it contains some + * This is UpdateContainerRequest in scheduler side, it contains some * pointers to runtime objects like RMContainer, SchedulerNode, etc. This will * be easier for scheduler making decision. */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index fc952c8..ac3b7a3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -496,6 +496,9 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { boolean newContainer, boolean increasedContainer) { Container container = rmContainer.getContainer(); ContainerType containerType = ContainerType.TASK; + if (!newContainer) { + container.setVersion(container.getVersion() + 1); + } // The working knowledge is that masterContainer for AM is null as it // itself is the master container. if (isWaitingForAMContainer()) { @@ -504,10 +507,11 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { try { // create container token and NMToken altogether. container.setContainerToken(rmContext.getContainerTokenSecretManager() - .createContainerToken(container.getId(), container.getNodeId(), - getUser(), container.getResource(), container.getPriority(), - rmContainer.getCreationTime(), this.logAggregationContext, - rmContainer.getNodeLabelExpression(), containerType)); + .createContainerToken(container.getId(), container.getVersion(), + container.getNodeId(), getUser(), container.getResource(), + container.getPriority(), rmContainer.getCreationTime(), + this.logAggregationContext, rmContainer.getNodeLabelExpression(), + containerType)); NMToken nmToken = rmContext.getNMTokenSecretManager().createAndGetNMToken(getUser(), getApplicationAttemptId(), container); http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java index 0aff669..c4f575f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java @@ -35,7 +35,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; @@ -43,6 +42,7 @@ import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -143,8 +143,8 @@ public interface YarnScheduler extends EventHandler<SchedulerEvent> { Allocation allocate(ApplicationAttemptId appAttemptId, List<ResourceRequest> ask, List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals, - List<ContainerResourceChangeRequest> increaseRequests, - List<ContainerResourceChangeRequest> decreaseRequests); + List<UpdateContainerRequest> increaseRequests, + List<UpdateContainerRequest> decreaseRequests); /** * Get node resource usage report. http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 3803ef1..7601e2e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -37,7 +37,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; @@ -49,6 +48,7 @@ import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -902,7 +902,7 @@ public class CapacityScheduler extends // SchedContainerChangeRequest // 2. Deadlock with the scheduling thread. private LeafQueue updateIncreaseRequests( - List<ContainerResourceChangeRequest> increaseRequests, + List<UpdateContainerRequest> increaseRequests, FiCaSchedulerApp app) { if (null == increaseRequests || increaseRequests.isEmpty()) { return null; @@ -930,8 +930,8 @@ public class CapacityScheduler extends public Allocation allocate(ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask, List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals, - List<ContainerResourceChangeRequest> increaseRequests, - List<ContainerResourceChangeRequest> decreaseRequests) { + List<UpdateContainerRequest> increaseRequests, + List<UpdateContainerRequest> decreaseRequests) { FiCaSchedulerApp application = getApplicationAttempt(applicationAttemptId); if (application == null) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 7b9f102..d6bc1fd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -40,7 +40,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; @@ -51,6 +50,7 @@ import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -955,8 +955,8 @@ public class FairScheduler extends public Allocation allocate(ApplicationAttemptId appAttemptId, List<ResourceRequest> ask, List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals, - List<ContainerResourceChangeRequest> increaseRequests, - List<ContainerResourceChangeRequest> decreaseRequests) { + List<UpdateContainerRequest> increaseRequests, + List<UpdateContainerRequest> decreaseRequests) { // Make sure this application exists FSAppAttempt application = getSchedulerApp(appAttemptId); http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 4b004ed..12b060e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -40,7 +40,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; @@ -52,6 +51,7 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; @@ -316,8 +316,8 @@ public class FifoScheduler extends public Allocation allocate(ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask, List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals, - List<ContainerResourceChangeRequest> increaseRequests, - List<ContainerResourceChangeRequest> decreaseRequests) { + List<UpdateContainerRequest> increaseRequests, + List<UpdateContainerRequest> decreaseRequests) { FiCaSchedulerApp application = getApplicationAttempt(applicationAttemptId); if (application == null) { LOG.error("Calling allocate on removed " + http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java index 6f00615..bc18a36 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java @@ -168,39 +168,43 @@ public class RMContainerTokenSecretManager extends /** * Helper function for creating ContainerTokens * - * @param containerId - * @param nodeId - * @param appSubmitter - * @param capability - * @param priority - * @param createTime + * @param containerId Container Id + * @param containerVersion Container Version + * @param nodeId Node Id + * @param appSubmitter App Submitter + * @param capability Capability + * @param priority Priority + * @param createTime Create Time * @return the container-token */ - public Token createContainerToken(ContainerId containerId, NodeId nodeId, - String appSubmitter, Resource capability, Priority priority, - long createTime) { - return createContainerToken(containerId, nodeId, appSubmitter, capability, - priority, createTime, null, null, ContainerType.TASK); + public Token createContainerToken(ContainerId containerId, + int containerVersion, NodeId nodeId, String appSubmitter, + Resource capability, Priority priority, long createTime) { + return createContainerToken(containerId, containerVersion, nodeId, + appSubmitter, capability, priority, createTime, + null, null, ContainerType.TASK); } /** * Helper function for creating ContainerTokens * - * @param containerId - * @param nodeId - * @param appSubmitter - * @param capability - * @param priority - * @param createTime - * @param logAggregationContext - * @param nodeLabelExpression - * @param containerType + * @param containerId Container Id + * @param containerVersion Container version + * @param nodeId Node Id + * @param appSubmitter App Submitter + * @param capability Capability + * @param priority Priority + * @param createTime Create Time + * @param logAggregationContext Log Aggregation Context + * @param nodeLabelExpression Node Label Expression + * @param containerType Container Type * @return the container-token */ - public Token createContainerToken(ContainerId containerId, NodeId nodeId, - String appSubmitter, Resource capability, Priority priority, - long createTime, LogAggregationContext logAggregationContext, - String nodeLabelExpression, ContainerType containerType) { + public Token createContainerToken(ContainerId containerId, + int containerVersion, NodeId nodeId, String appSubmitter, + Resource capability, Priority priority, long createTime, + LogAggregationContext logAggregationContext, String nodeLabelExpression, + ContainerType containerType) { byte[] password; ContainerTokenIdentifier tokenIdentifier; long expiryTimeStamp = @@ -210,11 +214,11 @@ public class RMContainerTokenSecretManager extends this.readLock.lock(); try { tokenIdentifier = - new ContainerTokenIdentifier(containerId, nodeId.toString(), - appSubmitter, capability, expiryTimeStamp, this.currentMasterKey - .getMasterKey().getKeyId(), - ResourceManager.getClusterTimeStamp(), priority, createTime, - logAggregationContext, nodeLabelExpression, containerType); + new ContainerTokenIdentifier(containerId, containerVersion, + nodeId.toString(), appSubmitter, capability, expiryTimeStamp, + this.currentMasterKey.getMasterKey().getKeyId(), + ResourceManager.getClusterTimeStamp(), priority, createTime, + logAggregationContext, nodeLabelExpression, containerType); password = this.createPassword(tokenIdentifier); } finally { http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java index 0bd8df6..b87bf2b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java @@ -34,11 +34,11 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRespo import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -238,10 +238,9 @@ public class MockAM { } public AllocateResponse sendContainerResizingRequest( - List<ContainerResourceChangeRequest> increaseRequests, - List<ContainerResourceChangeRequest> decreaseRequests) throws Exception { + List<UpdateContainerRequest> updateRequests) throws Exception { final AllocateRequest req = AllocateRequest.newInstance(0, 0F, null, null, - null, increaseRequests, decreaseRequests); + null, updateRequests); return allocate(req); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java index 6703202..6411130 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java @@ -383,9 +383,10 @@ public class TestApplicationCleanup { // nm1/nm2 register to rm2, and do a heartbeat nm1.setResourceTrackerService(rm2.getResourceTrackerService()); nm1.registerNode(Arrays.asList(NMContainerStatus.newInstance( - ContainerId.newContainerId(am0.getApplicationAttemptId(), 1), - ContainerState.COMPLETE, Resource.newInstance(1024, 1), "", 0, - Priority.newInstance(0), 1234)), Arrays.asList(app0.getApplicationId())); + ContainerId.newContainerId(am0.getApplicationAttemptId(), 1), 0, + ContainerState.COMPLETE, Resource.newInstance(1024, 1), "", 0, + Priority.newInstance(0), 1234)), + Arrays.asList(app0.getApplicationId())); nm2.setResourceTrackerService(rm2.getResourceTrackerService()); nm2.registerNode(Arrays.asList(app0.getApplicationId())); @@ -595,7 +596,7 @@ public class TestApplicationCleanup { int memory) { ContainerId containerId = ContainerId.newContainerId(appAttemptId, id); NMContainerStatus containerReport = - NMContainerStatus.newInstance(containerId, containerState, + NMContainerStatus.newInstance(containerId, 0, containerState, Resource.newInstance(memory, 1), "recover container", 0, Priority.newInstance(0), 0); return containerReport; http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java index cef1b5f..d042817 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java @@ -35,16 +35,16 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRespo import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; +import org.apache.hadoop.yarn.api.records.ContainerUpdateType; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException; -import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -380,57 +380,47 @@ public class TestApplicationMasterService { // Ask for a normal increase should be successfull am1.sendContainerResizingRequest(Arrays.asList( - ContainerResourceChangeRequest.newInstance( - ContainerId.newContainerId(attempt1.getAppAttemptId(), 1), - Resources.createResource(2048))), null); + UpdateContainerRequest.newInstance( + 0, ContainerId.newContainerId(attempt1.getAppAttemptId(), 1), + ContainerUpdateType.INCREASE_RESOURCE, + Resources.createResource(2048)))); // Target resource is negative, should fail - boolean exceptionCaught = false; - try { - am1.sendContainerResizingRequest(Arrays.asList( - ContainerResourceChangeRequest.newInstance( - ContainerId.newContainerId(attempt1.getAppAttemptId(), 1), - Resources.createResource(-1))), null); - } catch (InvalidResourceRequestException e) { - // This is expected - exceptionCaught = true; - } - Assert.assertTrue(exceptionCaught); - + AllocateResponse response = + am1.sendContainerResizingRequest(Arrays.asList( + UpdateContainerRequest.newInstance(0, + ContainerId.newContainerId(attempt1.getAppAttemptId(), 1), + ContainerUpdateType.INCREASE_RESOURCE, + Resources.createResource(-1)))); + Assert.assertEquals(1, response.getUpdateErrors().size()); + Assert.assertEquals("RESOURCE_OUTSIDE_ALLOWED_RANGE", + response.getUpdateErrors().get(0).getReason()); + // Target resource is more than maxAllocation, should fail - try { - am1.sendContainerResizingRequest(Arrays.asList( - ContainerResourceChangeRequest.newInstance( - ContainerId.newContainerId(attempt1.getAppAttemptId(), 1), - Resources - .add(registerResponse.getMaximumResourceCapability(), - Resources.createResource(1)))), null); - } catch (InvalidResourceRequestException e) { - // This is expected - exceptionCaught = true; - } + response = am1.sendContainerResizingRequest(Arrays.asList( + UpdateContainerRequest.newInstance(0, + ContainerId.newContainerId(attempt1.getAppAttemptId(), 1), + ContainerUpdateType.INCREASE_RESOURCE, + Resources.add( + registerResponse.getMaximumResourceCapability(), + Resources.createResource(1))))); + Assert.assertEquals(1, response.getUpdateErrors().size()); + Assert.assertEquals("RESOURCE_OUTSIDE_ALLOWED_RANGE", + response.getUpdateErrors().get(0).getReason()); - Assert.assertTrue(exceptionCaught); - // Contains multiple increase/decrease requests for same contaienrId - try { - am1.sendContainerResizingRequest(Arrays.asList( - ContainerResourceChangeRequest.newInstance( - ContainerId.newContainerId(attempt1.getAppAttemptId(), 1), - Resources - .add(registerResponse.getMaximumResourceCapability(), - Resources.createResource(1)))), Arrays.asList( - ContainerResourceChangeRequest.newInstance( - ContainerId.newContainerId(attempt1.getAppAttemptId(), 1), - Resources - .add(registerResponse.getMaximumResourceCapability(), - Resources.createResource(1))))); - } catch (InvalidResourceRequestException e) { - // This is expected - exceptionCaught = true; - } - - Assert.assertTrue(exceptionCaught); + response = am1.sendContainerResizingRequest(Arrays.asList( + UpdateContainerRequest.newInstance(0, + ContainerId.newContainerId(attempt1.getAppAttemptId(), 1), + ContainerUpdateType.INCREASE_RESOURCE, + Resources.createResource(2048, 4)), + UpdateContainerRequest.newInstance(0, + ContainerId.newContainerId(attempt1.getAppAttemptId(), 1), + ContainerUpdateType.DECREASE_RESOURCE, + Resources.createResource(1024, 1)))); + Assert.assertEquals(1, response.getUpdateErrors().size()); + Assert.assertEquals("UPDATE_OUTSTANDING_ERROR", + response.getUpdateErrors().get(0).getReason()); } finally { if (rm != null) { rm.close(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 029520d..a38ffb4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -2025,7 +2025,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { String nodeLabelExpression) { ContainerId containerId = ContainerId.newContainerId(appAttemptId, id); NMContainerStatus containerReport = - NMContainerStatus.newInstance(containerId, containerState, + NMContainerStatus.newInstance(containerId, 0, containerState, Resource.newInstance(1024, 1), "recover container", 0, Priority.newInstance(0), 0, nodeLabelExpression); return containerReport; http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index 87e268b..96efcfd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -933,7 +933,7 @@ public class TestResourceTrackerService extends NodeLabelTestBase { NMContainerStatus report = NMContainerStatus.newInstance( ContainerId.newContainerId( - ApplicationAttemptId.newInstance(app.getApplicationId(), 2), 1), + ApplicationAttemptId.newInstance(app.getApplicationId(), 2), 1), 0, ContainerState.COMPLETE, Resource.newInstance(1024, 1), "Dummy Completed", 0, Priority.newInstance(10), 1234); rm.getResourceTrackerService().handleNMContainerStatus(report, null); @@ -944,7 +944,7 @@ public class TestResourceTrackerService extends NodeLabelTestBase { (RMAppAttemptImpl) app.getCurrentAppAttempt(); currentAttempt.setMasterContainer(null); report = NMContainerStatus.newInstance( - ContainerId.newContainerId(currentAttempt.getAppAttemptId(), 0), + ContainerId.newContainerId(currentAttempt.getAppAttemptId(), 0), 0, ContainerState.COMPLETE, Resource.newInstance(1024, 1), "Dummy Completed", 0, Priority.newInstance(10), 1234); rm.getResourceTrackerService().handleNMContainerStatus(report, null); @@ -956,7 +956,7 @@ public class TestResourceTrackerService extends NodeLabelTestBase { // Case 2.1: AppAttemptId is null report = NMContainerStatus.newInstance( ContainerId.newContainerId( - ApplicationAttemptId.newInstance(app.getApplicationId(), 2), 1), + ApplicationAttemptId.newInstance(app.getApplicationId(), 2), 1), 0, ContainerState.COMPLETE, Resource.newInstance(1024, 1), "Dummy Completed", 0, Priority.newInstance(10), 1234); try { @@ -971,7 +971,7 @@ public class TestResourceTrackerService extends NodeLabelTestBase { (RMAppAttemptImpl) app.getCurrentAppAttempt(); currentAttempt.setMasterContainer(null); report = NMContainerStatus.newInstance( - ContainerId.newContainerId(currentAttempt.getAppAttemptId(), 0), + ContainerId.newContainerId(currentAttempt.getAppAttemptId(), 0), 0, ContainerState.COMPLETE, Resource.newInstance(1024, 1), "Dummy Completed", 0, Priority.newInstance(10), 1234); try { http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index f1d36d5..ceffebd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -58,9 +58,9 @@ import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ContainerUpdateType; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; @@ -70,6 +70,7 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; @@ -3229,9 +3230,10 @@ public class TestCapacityScheduler { // am1 asks to change its AM container from 1GB to 3GB am1.sendContainerResizingRequest(Arrays.asList( - ContainerResourceChangeRequest - .newInstance(containerId1, Resources.createResource(3 * GB))), - null); + UpdateContainerRequest + .newInstance(0, containerId1, + ContainerUpdateType.INCREASE_RESOURCE, + Resources.createResource(3 * GB)))); FiCaSchedulerApp app = getFiCaSchedulerApp(rm, app1.getApplicationId()); @@ -3243,11 +3245,14 @@ public class TestCapacityScheduler { // am1 asks to change containerId2 (2G -> 3G) and containerId3 (2G -> 5G) am1.sendContainerResizingRequest(Arrays.asList( - ContainerResourceChangeRequest - .newInstance(containerId2, Resources.createResource(3 * GB)), - ContainerResourceChangeRequest - .newInstance(containerId3, Resources.createResource(5 * GB))), - null); + UpdateContainerRequest + .newInstance(0, containerId2, + ContainerUpdateType.INCREASE_RESOURCE, + Resources.createResource(3 * GB)), + UpdateContainerRequest + .newInstance(0, containerId3, + ContainerUpdateType.INCREASE_RESOURCE, + Resources.createResource(5 * GB)))); Assert.assertEquals(6 * GB, app.getAppAttemptResourceUsage().getPending().getMemorySize()); @@ -3258,13 +3263,18 @@ public class TestCapacityScheduler { // am1 asks to change containerId1 (1G->3G), containerId2 (2G -> 4G) and // containerId3 (2G -> 2G) am1.sendContainerResizingRequest(Arrays.asList( - ContainerResourceChangeRequest - .newInstance(containerId1, Resources.createResource(3 * GB)), - ContainerResourceChangeRequest - .newInstance(containerId2, Resources.createResource(4 * GB)), - ContainerResourceChangeRequest - .newInstance(containerId3, Resources.createResource(2 * GB))), - null); + UpdateContainerRequest + .newInstance(0, containerId1, + ContainerUpdateType.INCREASE_RESOURCE, + Resources.createResource(3 * GB)), + UpdateContainerRequest + .newInstance(0, containerId2, + ContainerUpdateType.INCREASE_RESOURCE, + Resources.createResource(4 * GB)), + UpdateContainerRequest + .newInstance(0, containerId3, + ContainerUpdateType.INCREASE_RESOURCE, + Resources.createResource(2 * GB)))); Assert.assertEquals(4 * GB, app.getAppAttemptResourceUsage().getPending().getMemorySize()); checkPendingResource(rm, "a1", 4 * GB, null); http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java index b4f434e..aefdaea 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java @@ -288,13 +288,14 @@ public class TestContainerAllocation { @Override public Token createContainerToken(ContainerId containerId, - NodeId nodeId, String appSubmitter, Resource capability, - Priority priority, long createTime, - LogAggregationContext logAggregationContext, String nodeLabelExp, ContainerType containerType) { + int containerVersion, NodeId nodeId, String appSubmitter, + Resource capability, Priority priority, long createTime, + LogAggregationContext logAggregationContext, String nodeLabelExp, + ContainerType containerType) { numRetries++; - return super.createContainerToken(containerId, nodeId, appSubmitter, - capability, priority, createTime, logAggregationContext, - nodeLabelExp, containerType); + return super.createContainerToken(containerId, containerVersion, + nodeId, appSubmitter, capability, priority, createTime, + logAggregationContext, nodeLabelExp, containerType); } }; } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
