YARN-5221. Expose UpdateResourceRequest API to allow AM to request for change in container properties. (asuresh)
(cherry picked from commit d6d9cff21b7b6141ed88359652cf22e8973c0661) (cherry picked from commit b279f42d79175bef6529dc1ac4216198a3aaee4d) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/979b29a0 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/979b29a0 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/979b29a0 Branch: refs/heads/branch-2.8 Commit: 979b29a03cb735259dcdc969263588ef1958e24f Parents: 7adbd56 Author: Arun Suresh <[email protected]> Authored: Sat Aug 27 15:22:43 2016 -0700 Committer: Arun Suresh <[email protected]> Committed: Wed Aug 31 20:06:49 2016 -0700 ---------------------------------------------------------------------- .../app/local/TestLocalContainerAllocator.java | 4 +- .../v2/app/rm/TestRMContainerAllocator.java | 10 +- .../sls/scheduler/ResourceSchedulerWrapper.java | 6 +- .../api/protocolrecords/AllocateRequest.java | 58 ++-- .../api/protocolrecords/AllocateResponse.java | 70 ++-- .../hadoop/yarn/api/records/Container.java | 22 ++ .../records/ContainerResourceChangeRequest.java | 117 ------- .../yarn/api/records/ContainerUpdateType.java | 45 +++ .../yarn/api/records/UpdateContainerError.java | 119 +++++++ .../api/records/UpdateContainerRequest.java | 171 ++++++++++ .../yarn/api/records/UpdatedContainer.java | 118 +++++++ .../src/main/proto/yarn_protos.proto | 5 +- .../src/main/proto/yarn_service_protos.proto | 29 +- .../distributedshell/ApplicationMaster.java | 4 +- .../yarn/client/api/async/AMRMClientAsync.java | 9 +- .../api/async/impl/AMRMClientAsyncImpl.java | 8 +- .../yarn/client/api/impl/AMRMClientImpl.java | 79 +++-- .../api/async/impl/TestAMRMClientAsync.java | 55 +-- .../yarn/client/api/impl/TestAMRMClient.java | 19 +- .../api/impl/TestAMRMClientOnRMRestart.java | 14 +- .../impl/pb/AllocateRequestPBImpl.java | 151 +++------ .../impl/pb/AllocateResponsePBImpl.java | 192 ++++++++--- .../api/records/impl/pb/ContainerPBImpl.java | 13 + .../ContainerResourceChangeRequestPBImpl.java | 141 -------- .../yarn/api/records/impl/pb/ProtoUtils.java | 90 +++++ .../impl/pb/UpdateContainerErrorPBImpl.java | 125 +++++++ .../impl/pb/UpdateContainerRequestPBImpl.java | 166 +++++++++ .../records/impl/pb/UpdatedContainerPBImpl.java | 117 +++++++ .../yarn/security/ContainerTokenIdentifier.java | 26 +- .../src/main/proto/yarn_security_token.proto | 1 + .../hadoop/yarn/api/TestPBImplRecords.java | 19 +- .../yarn/security/TestYARNTokenIdentifier.java | 4 +- .../api/protocolrecords/NMContainerStatus.java | 15 +- .../impl/pb/NMContainerStatusPBImpl.java | 13 + .../hadoop/yarn/server/utils/BuilderUtils.java | 14 +- .../yarn_server_common_service_protos.proto | 1 + .../protocolrecords/TestProtocolRecords.java | 4 +- .../TestRegisterNodeManagerRequest.java | 2 +- .../containermanager/ContainerManagerImpl.java | 17 +- .../container/ContainerImpl.java | 22 +- .../recovery/NMLeveldbStateStoreService.java | 49 ++- .../recovery/NMNullStateStoreService.java | 4 +- .../recovery/NMStateStoreService.java | 24 +- .../nodemanager/TestNodeManagerResync.java | 2 +- .../nodemanager/TestNodeStatusUpdater.java | 12 +- .../amrmproxy/MockResourceManagerFacade.java | 4 +- .../recovery/NMMemoryStateStoreService.java | 7 +- .../TestNMLeveldbStateStoreService.java | 7 +- .../nodemanager/webapp/MockContainer.java | 2 +- .../nodemanager/webapp/TestNMWebServer.java | 6 +- .../ApplicationMasterService.java | 54 ++- .../server/resourcemanager/RMServerUtils.java | 338 ++++++++++--------- .../scheduler/AbstractYarnScheduler.java | 13 +- .../scheduler/SchedContainerChangeRequest.java | 2 +- .../scheduler/SchedulerApplicationAttempt.java | 12 +- .../scheduler/YarnScheduler.java | 6 +- .../scheduler/capacity/CapacityScheduler.java | 8 +- .../scheduler/fair/FairScheduler.java | 6 +- .../scheduler/fifo/FifoScheduler.java | 6 +- .../security/RMContainerTokenSecretManager.java | 62 ++-- .../yarn/server/resourcemanager/MockAM.java | 7 +- .../resourcemanager/TestApplicationCleanup.java | 9 +- .../TestApplicationMasterService.java | 86 +++-- .../server/resourcemanager/TestRMRestart.java | 2 +- .../TestResourceTrackerService.java | 8 +- .../capacity/TestCapacityScheduler.java | 42 ++- .../capacity/TestContainerAllocation.java | 13 +- .../capacity/TestContainerResizing.java | 134 +++++--- .../capacity/TestIncreaseAllocationExpirer.java | 76 +++-- .../server/TestContainerManagerSecurity.java | 18 +- .../TestMiniYarnClusterNodeUtilization.java | 2 - .../src/test/proto/test_token.proto | 1 + 72 files changed, 2059 insertions(+), 1058 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java index 38df8f0..689da7a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java @@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.UpdatedContainer; import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -253,8 +254,7 @@ public class TestLocalContainerAllocator { Resources.none(), null, 1, null, Collections.<NMToken>emptyList(), yarnToken, - Collections.<Container>emptyList(), - Collections.<Container>emptyList()); + Collections.<UpdatedContainer>emptyList()); response.setApplicationPriority(Priority.newInstance(0)); return response; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java index 755f8ee..e3999a4 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java @@ -97,7 +97,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NMToken; @@ -106,6 +105,7 @@ import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; @@ -1701,8 +1701,8 @@ public class TestRMContainerAllocator { ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask, List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals, - List<ContainerResourceChangeRequest> increaseRequests, - List<ContainerResourceChangeRequest> decreaseRequests) { + List<UpdateContainerRequest> increaseRequests, + List<UpdateContainerRequest> decreaseRequests) { List<ResourceRequest> askCopy = new ArrayList<ResourceRequest>(); for (ResourceRequest req : ask) { ResourceRequest reqCopy = ResourceRequest.newInstance(req @@ -1748,8 +1748,8 @@ public class TestRMContainerAllocator { ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask, List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals, - List<ContainerResourceChangeRequest> increaseRequest, - List<ContainerResourceChangeRequest> decreaseRequests) { + List<UpdateContainerRequest> increaseRequest, + List<UpdateContainerRequest> decreaseRequests) { List<ResourceRequest> askCopy = new ArrayList<ResourceRequest>(); for (ResourceRequest req : ask) { ResourceRequest reqCopy = ResourceRequest.newInstance(req http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java index f9e5fac..7a8445e 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java @@ -51,7 +51,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; @@ -60,6 +59,7 @@ import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; @@ -207,8 +207,8 @@ public class ResourceSchedulerWrapper public Allocation allocate(ApplicationAttemptId attemptId, List<ResourceRequest> resourceRequests, List<ContainerId> containerIds, List<String> strings, List<String> strings2, - List<ContainerResourceChangeRequest> increaseRequests, - List<ContainerResourceChangeRequest> decreaseRequests) { + List<UpdateContainerRequest> increaseRequests, + List<UpdateContainerRequest> decreaseRequests) { if (metricsON) { final Timer.Context context = schedulerAllocateTimer.time(); Allocation allocation = null; http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java index 0b65e5c..f7ce127 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java @@ -27,8 +27,8 @@ import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; -import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.util.Records; /** @@ -48,13 +48,8 @@ import org.apache.hadoop.yarn.util.Records; * A list of unused {@link Container} which are being returned. * </li> * <li> - * A list of {@link ContainerResourceChangeRequest} to inform - * the <code>ResourceManager</code> about the resource increase - * requirements of running containers. - * </li> - * <li> - * A list of {@link ContainerResourceChangeRequest} to inform - * the <code>ResourceManager</code> about the resource decrease + * A list of {@link UpdateContainerRequest} to inform + * the <code>ResourceManager</code> about the change in * requirements of running containers. * </li> * </ul> @@ -72,25 +67,23 @@ public abstract class AllocateRequest { List<ContainerId> containersToBeReleased, ResourceBlacklistRequest resourceBlacklistRequest) { return newInstance(responseID, appProgress, resourceAsk, - containersToBeReleased, resourceBlacklistRequest, null, null); + containersToBeReleased, resourceBlacklistRequest, null); } @Public - @Stable + @Unstable public static AllocateRequest newInstance(int responseID, float appProgress, List<ResourceRequest> resourceAsk, List<ContainerId> containersToBeReleased, ResourceBlacklistRequest resourceBlacklistRequest, - List<ContainerResourceChangeRequest> increaseRequests, - List<ContainerResourceChangeRequest> decreaseRequests) { + List<UpdateContainerRequest> updateRequests) { AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class); allocateRequest.setResponseId(responseID); allocateRequest.setProgress(appProgress); allocateRequest.setAskList(resourceAsk); allocateRequest.setReleaseList(containersToBeReleased); allocateRequest.setResourceBlacklistRequest(resourceBlacklistRequest); - allocateRequest.setIncreaseRequests(increaseRequests); - allocateRequest.setDecreaseRequests(decreaseRequests); + allocateRequest.setUpdateRequests(updateRequests); return allocateRequest; } @@ -197,38 +190,25 @@ public abstract class AllocateRequest { ResourceBlacklistRequest resourceBlacklistRequest); /** - * Get the list of container resource increase requests being sent by the - * <code>ApplicationMaster</code>. - */ - @Public - @Unstable - public abstract List<ContainerResourceChangeRequest> getIncreaseRequests(); - - /** - * Set the list of container resource increase requests to inform the - * <code>ResourceManager</code> about the containers whose resources need - * to be increased. - */ - @Public - @Unstable - public abstract void setIncreaseRequests( - List<ContainerResourceChangeRequest> increaseRequests); - - /** - * Get the list of container resource decrease requests being sent by the + * Get the list of container update requests being sent by the * <code>ApplicationMaster</code>. + * @return list of {@link UpdateContainerRequest} + * being sent by the + * <code>ApplicationMaster</code>. */ @Public @Unstable - public abstract List<ContainerResourceChangeRequest> getDecreaseRequests(); + public abstract List<UpdateContainerRequest> getUpdateRequests(); /** - * Set the list of container resource decrease requests to inform the - * <code>ResourceManager</code> about the containers whose resources need - * to be decreased. + * Set the list of container update requests to inform the + * <code>ResourceManager</code> about the containers that need to be + * updated. + * @param updateRequests list of <code>UpdateContainerRequest</code> for + * containers to be updated */ @Public @Unstable - public abstract void setDecreaseRequests( - List<ContainerResourceChangeRequest> decreaseRequests); + public abstract void setUpdateRequests( + List<UpdateContainerRequest> updateRequests); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java index d1b2a3a..beb1fa1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.api.protocolrecords; +import java.util.ArrayList; import java.util.List; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -35,6 +36,8 @@ import org.apache.hadoop.yarn.api.records.PreemptionMessage; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.api.records.UpdateContainerError; +import org.apache.hadoop.yarn.api.records.UpdatedContainer; import org.apache.hadoop.yarn.util.Records; /** @@ -95,19 +98,17 @@ public abstract class AllocateResponse { } @Public - @Stable + @Unstable public static AllocateResponse newInstance(int responseId, List<ContainerStatus> completedContainers, List<Container> allocatedContainers, List<NodeReport> updatedNodes, Resource availResources, AMCommand command, int numClusterNodes, PreemptionMessage preempt, List<NMToken> nmTokens, - List<Container> increasedContainers, - List<Container> decreasedContainers) { + List<UpdatedContainer> updatedContainers) { AllocateResponse response = newInstance(responseId, completedContainers, allocatedContainers, updatedNodes, availResources, command, numClusterNodes, preempt, nmTokens); - response.setIncreasedContainers(increasedContainers); - response.setDecreasedContainers(decreasedContainers); + response.setUpdatedContainers(updatedContainers); return response; } @@ -118,12 +119,11 @@ public abstract class AllocateResponse { List<Container> allocatedContainers, List<NodeReport> updatedNodes, Resource availResources, AMCommand command, int numClusterNodes, PreemptionMessage preempt, List<NMToken> nmTokens, Token amRMToken, - List<Container> increasedContainers, - List<Container> decreasedContainers) { + List<UpdatedContainer> updatedContainers) { AllocateResponse response = newInstance(responseId, completedContainers, allocatedContainers, updatedNodes, availResources, command, numClusterNodes, preempt, - nmTokens, increasedContainers, decreasedContainers); + nmTokens, updatedContainers); response.setAMRMToken(amRMToken); return response; } @@ -270,38 +270,23 @@ public abstract class AllocateResponse { public abstract void setNMTokens(List<NMToken> nmTokens); /** - * Get the list of newly increased containers by - * <code>ResourceManager</code>. - */ - @Public - @Unstable - public abstract List<Container> getIncreasedContainers(); - - /** - * Set the list of newly increased containers by - * <code>ResourceManager</code>. - */ - @Private - @Unstable - public abstract void setIncreasedContainers( - List<Container> increasedContainers); - - /** - * Get the list of newly decreased containers by + * Get the list of newly updated containers by * <code>ResourceManager</code>. */ @Public @Unstable - public abstract List<Container> getDecreasedContainers(); + public abstract List<UpdatedContainer> getUpdatedContainers(); /** - * Set the list of newly decreased containers by + * Set the list of newly updated containers by * <code>ResourceManager</code>. + * + * @param updatedContainers List of Updated Containers. */ @Private @Unstable - public abstract void setDecreasedContainers( - List<Container> decreasedContainers); + public abstract void setUpdatedContainers( + List<UpdatedContainer> updatedContainers); /** * The AMRMToken that belong to this attempt @@ -328,4 +313,29 @@ public abstract class AllocateResponse { @Private @Unstable public abstract void setApplicationPriority(Priority priority); + + /** + * Get the list of container update errors to inform the + * Application Master about the container updates that could not be + * satisfied due to error. + * + * @return List of Update Container Errors. + */ + @Public + @Unstable + public List<UpdateContainerError> getUpdateErrors() { + return new ArrayList<>(); + } + + /** + * Set the list of container update errors to inform the + * Application Master about the container updates that could not be + * satisfied due to error. + * @param updateErrors list of <code>UpdateContainerError</code> for + * containers updates requests that were in error + */ + @Public + @Unstable + public void setUpdateErrors(List<UpdateContainerError> updateErrors) { + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java index 38fa8b9..21ac6a3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java @@ -163,4 +163,26 @@ public abstract class Container implements Comparable<Container> { @Private @Unstable public abstract void setContainerToken(Token containerToken); + + /** + * Get the version of this container. The version will be incremented when + * a container is updated. + * + * @return version of this container. + */ + @Private + @Unstable + public int getVersion() { + return 0; + } + + /** + * Set the version of this container. + * @param version of this container. + */ + @Private + @Unstable + public void setVersion(int version) { + throw new UnsupportedOperationException(); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerResourceChangeRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerResourceChangeRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerResourceChangeRequest.java deleted file mode 100644 index 117015b..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerResourceChangeRequest.java +++ /dev/null @@ -1,117 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.api.records; - -import org.apache.hadoop.classification.InterfaceAudience.Public; -import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; -import org.apache.hadoop.yarn.util.Records; - -/** - * {@code ContainerResourceChangeRequest} represents the request made by an - * application to the {@code ResourceManager} to change resource allocation of - * a running {@code Container}. - * <p> - * It includes: - * <ul> - * <li>{@link ContainerId} for the container.</li> - * <li> - * {@link Resource} capability of the container after the resource change - * is completed. - * </li> - * </ul> - * - * @see ApplicationMasterProtocol#allocate(org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest) - */ -@Public -@Unstable -public abstract class ContainerResourceChangeRequest { - - @Public - @Unstable - public static ContainerResourceChangeRequest newInstance( - ContainerId existingContainerId, Resource targetCapability) { - ContainerResourceChangeRequest context = Records - .newRecord(ContainerResourceChangeRequest.class); - context.setContainerId(existingContainerId); - context.setCapability(targetCapability); - return context; - } - - /** - * Get the <code>ContainerId</code> of the container. - * @return <code>ContainerId</code> of the container - */ - @Public - @Unstable - public abstract ContainerId getContainerId(); - - /** - * Set the <code>ContainerId</code> of the container. - * @param containerId <code>ContainerId</code> of the container - */ - @Public - @Unstable - public abstract void setContainerId(ContainerId containerId); - - /** - * Get the <code>Resource</code> capability of the container. - * @return <code>Resource</code> capability of the container - */ - @Public - @Unstable - public abstract Resource getCapability(); - - /** - * Set the <code>Resource</code> capability of the container. - * @param capability <code>Resource</code> capability of the container - */ - @Public - @Unstable - public abstract void setCapability(Resource capability); - - @Override - public int hashCode() { - return getCapability().hashCode() + getContainerId().hashCode(); - } - - @Override - public boolean equals(Object other) { - if (other instanceof ContainerResourceChangeRequest) { - ContainerResourceChangeRequest ctx = - (ContainerResourceChangeRequest) other; - - if (getContainerId() == null && ctx.getContainerId() != null) { - return false; - } else if (!getContainerId().equals(ctx.getContainerId())) { - return false; - } - - if (getCapability() == null && ctx.getCapability() != null) { - return false; - } else if (!getCapability().equals(ctx.getCapability())) { - return false; - } - - return true; - } else { - return false; - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerUpdateType.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerUpdateType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerUpdateType.java new file mode 100644 index 0000000..978ea09 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerUpdateType.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Encodes the type of Container Update. + */ [email protected] [email protected] +public enum ContainerUpdateType { + + /** + * Resource increase. + */ + INCREASE_RESOURCE, + + /** + * Resource decrease. + */ + DECREASE_RESOURCE, + + /** + * Execution Type change. + */ + UPDATE_EXECUTION_TYPE +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerError.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerError.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerError.java new file mode 100644 index 0000000..7102f7b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerError.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.util.Records; + +/** + * {@code UpdateContainerError} is used by the Scheduler to notify the + * ApplicationMaster of an UpdateContainerRequest it cannot satisfy due to + * an error in the request. It includes the update request as well as + * a reason for why the request was not satisfiable. + */ [email protected] [email protected] +public abstract class UpdateContainerError { + + @InterfaceAudience.Public + @InterfaceStability.Unstable + public static UpdateContainerError newInstance(String reason, + UpdateContainerRequest updateContainerRequest) { + UpdateContainerError error = Records.newRecord(UpdateContainerError.class); + error.setReason(reason); + error.setUpdateContainerRequest(updateContainerRequest); + return error; + } + + /** + * Get reason why the update request was not satisfiable. + * @return Reason + */ + @InterfaceAudience.Public + @InterfaceStability.Unstable + public abstract String getReason(); + + /** + * Set reason why the update request was not satisfiable. + * @param reason Reason + */ + @InterfaceAudience.Public + @InterfaceStability.Unstable + public abstract void setReason(String reason); + + /** + * Get the {@code UpdateContainerRequest} that was not satisfiable. + * @return UpdateContainerRequest + */ + @InterfaceAudience.Public + @InterfaceStability.Unstable + public abstract UpdateContainerRequest getUpdateContainerRequest(); + + /** + * Set the {@code UpdateContainerRequest} that was not satisfiable. + * @param updateContainerRequest Update Container Request + */ + @InterfaceAudience.Public + @InterfaceStability.Unstable + public abstract void setUpdateContainerRequest( + UpdateContainerRequest updateContainerRequest); + + @Override + public int hashCode() { + final int prime = 2153; + int result = 2459; + String reason = getReason(); + UpdateContainerRequest updateReq = getUpdateContainerRequest(); + result = prime * result + ((reason == null) ? 0 : reason.hashCode()); + result = prime * result + ((updateReq == null) ? 0 : updateReq.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + UpdateContainerError other = (UpdateContainerError) obj; + String reason = getReason(); + if (reason == null) { + if (other.getReason() != null) { + return false; + } + } else if (!reason.equals(other.getReason())) { + return false; + } + UpdateContainerRequest req = getUpdateContainerRequest(); + if (req == null) { + if (other.getUpdateContainerRequest() != null) { + return false; + } + } else if (!req.equals(other.getUpdateContainerRequest())) { + return false; + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerRequest.java new file mode 100644 index 0000000..b459769 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerRequest.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.util.Records; + +/** + * {@code UpdateContainerRequest} represents the request made by an + * application to the {@code ResourceManager} to update an attribute of a + * {@code Container} such as its Resource allocation or (@code ExecutionType} + * <p> + * It includes: + * <ul> + * <li>version for the container.</li> + * <li>{@link ContainerId} for the container.</li> + * <li> + * {@link Resource} capability of the container after the update request + * is completed. + * </li> + * </ul> + * + * @see ApplicationMasterProtocol#allocate(org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest) + */ [email protected] [email protected] +public abstract class UpdateContainerRequest { + + @InterfaceAudience.Public + @InterfaceStability.Unstable + public static UpdateContainerRequest newInstance(int version, + ContainerId containerId, ContainerUpdateType updateType, + Resource targetCapability) { + UpdateContainerRequest request = + Records.newRecord(UpdateContainerRequest.class); + request.setContainerVersion(version); + request.setContainerId(containerId); + request.setContainerUpdateType(updateType); + request.setCapability(targetCapability); + return request; + } + + /** + * Get the <code>ContainerId</code> of the container. + * @return <code>ContainerId</code> of the container + */ + @InterfaceAudience.Public + @InterfaceStability.Unstable + public abstract int getContainerVersion(); + + /** + * Set the current version of the container. + * @param containerVersion of the container + */ + @InterfaceAudience.Public + @InterfaceStability.Unstable + public abstract void setContainerVersion(int containerVersion); + + /** + * Get the <code>ContainerUpdateType</code> of the container. + * @return <code>ContainerUpdateType</code> of the container. + */ + @InterfaceAudience.Public + @InterfaceStability.Unstable + public abstract ContainerUpdateType getContainerUpdateType(); + + /** + * Set the <code>ContainerUpdateType</code> of the container. + * @param updateType of the Container + */ + @InterfaceAudience.Public + @InterfaceStability.Unstable + public abstract void setContainerUpdateType(ContainerUpdateType updateType); + + /** + * Get the <code>ContainerId</code> of the container. + * @return <code>ContainerId</code> of the container + */ + @InterfaceAudience.Public + @InterfaceStability.Unstable + public abstract ContainerId getContainerId(); + + /** + * Set the <code>ContainerId</code> of the container. + * @param containerId <code>ContainerId</code> of the container + */ + @InterfaceAudience.Public + @InterfaceStability.Unstable + public abstract void setContainerId(ContainerId containerId); + + /** + * Get the <code>Resource</code> capability of the container. + * @return <code>Resource</code> capability of the container + */ + @InterfaceAudience.Public + @InterfaceStability.Unstable + public abstract Resource getCapability(); + + /** + * Set the <code>Resource</code> capability of the container. + * @param capability <code>Resource</code> capability of the container + */ + @InterfaceAudience.Public + @InterfaceStability.Unstable + public abstract void setCapability(Resource capability); + + @Override + public int hashCode() { + final int prime = 2153; + int result = 2459; + ContainerId cId = getContainerId(); + Resource capability = getCapability(); + result = + prime * result + ((capability == null) ? 0 : capability.hashCode()); + result = prime * result + ((cId == null) ? 0 : cId.hashCode()); + result = prime * result + getContainerVersion(); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + UpdateContainerRequest other = (UpdateContainerRequest) obj; + Resource capability = getCapability(); + if (capability == null) { + if (other.getCapability() != null) { + return false; + } + } else if (!capability.equals(other.getCapability())) { + return false; + } + ContainerId cId = getContainerId(); + if (cId == null) { + if (other.getContainerId() != null) { + return false; + } + } else if (!cId.equals(other.getContainerId())) { + return false; + } + if (getContainerVersion() != other.getContainerVersion()) { + return false; + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdatedContainer.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdatedContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdatedContainer.java new file mode 100644 index 0000000..68f6ca1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdatedContainer.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.util.Records; + +/** + * An object that encapsulates an updated container and the + * type of Update. + */ [email protected] [email protected] +public abstract class UpdatedContainer { + + /** + * Static Factory method. + * + * @param updateType ContainerUpdateType + * @param container Container + * @return UpdatedContainer + */ + @InterfaceAudience.Public + @InterfaceStability.Unstable + public static UpdatedContainer newInstance(ContainerUpdateType updateType, + Container container) { + UpdatedContainer updatedContainer = + Records.newRecord(UpdatedContainer.class); + updatedContainer.setUpdateType(updateType); + updatedContainer.setContainer(container); + return updatedContainer; + } + + /** + * Get the <code>ContainerUpdateType</code>. + * @return ContainerUpdateType + */ + public abstract ContainerUpdateType getUpdateType(); + + /** + * Set the <code>ContainerUpdateType</code>. + * @param updateType ContainerUpdateType + */ + public abstract void setUpdateType(ContainerUpdateType updateType); + + /** + * Get the <code>Container</code>. + * @return Container + */ + public abstract Container getContainer(); + + /** + * Set the <code>Container</code>. + * @param container Container + */ + public abstract void setContainer(Container container); + + @Override + public int hashCode() { + final int prime = 2153; + int result = 2459; + ContainerUpdateType updateType = getUpdateType(); + Container container = getContainer(); + result = prime * result + ((updateType == null) ? 0 : + updateType.hashCode()); + result = prime * result + ((container == null) ? 0 : container.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + UpdatedContainer other = (UpdatedContainer) obj; + ContainerUpdateType updateType = getUpdateType(); + if (updateType == null) { + if (other.getUpdateType() != null) { + return false; + } + } else if (updateType != other.getUpdateType()) { + return false; + } + Container container = getContainer(); + if (container == null) { + if (other.getContainer() != null) { + return false; + } + } else if (!container.equals(other.getContainer())) { + return false; + } + return true; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 00bf991..b303102 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -91,6 +91,7 @@ message ContainerProto { optional ResourceProto resource = 4; optional PriorityProto priority = 5; optional hadoop.common.TokenProto container_token = 6; + optional int32 version = 9 [default = 0]; } message ContainerReportProto { @@ -520,10 +521,6 @@ enum ContainerExitStatusProto { DISKS_FAILED = -101; } -message ContainerResourceChangeRequestProto { - optional ContainerIdProto container_id = 1; - optional ResourceProto capability = 2; -} //////////////////////////////////////////////////////////////////////// ////// From common////////////////////////////////////////////////////// http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index 4f27708..de05e1b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -60,14 +60,30 @@ message FinishApplicationMasterResponseProto { optional bool isUnregistered = 1 [default = false]; } +enum ContainerUpdateTypeProto { + INCREASE_RESOURCE = 0; + DECREASE_RESOURCE = 1; +} + +message UpdateContainerRequestProto { + required int32 container_version = 1; + required ContainerIdProto container_id = 2; + required ContainerUpdateTypeProto update_type = 3; + optional ResourceProto capability = 4; +} + +message UpdateContainerErrorProto { + optional string reason = 1; + optional UpdateContainerRequestProto update_request = 2; +} + message AllocateRequestProto { repeated ResourceRequestProto ask = 1; repeated ContainerIdProto release = 2; optional ResourceBlacklistRequestProto blacklist_request = 3; optional int32 response_id = 4; optional float progress = 5; - repeated ContainerResourceChangeRequestProto increase_request = 6; - repeated ContainerResourceChangeRequestProto decrease_request = 7; + repeated UpdateContainerRequestProto update_requests = 6; } message NMTokenProto { @@ -75,6 +91,11 @@ message NMTokenProto { optional hadoop.common.TokenProto token = 2; } +message UpdatedContainerProto { + required ContainerUpdateTypeProto update_type = 1; + required ContainerProto container = 2; +} + message AllocateResponseProto { optional AMCommandProto a_m_command = 1; optional int32 response_id = 2; @@ -85,10 +106,10 @@ message AllocateResponseProto { optional int32 num_cluster_nodes = 7; optional PreemptionMessageProto preempt = 8; repeated NMTokenProto nm_tokens = 9; - repeated ContainerProto increased_containers = 10; - repeated ContainerProto decreased_containers = 11; + repeated UpdatedContainerProto updated_containers = 10; optional hadoop.common.TokenProto am_rm_token = 12; optional PriorityProto application_priority = 13; + repeated UpdateContainerErrorProto update_errors = 15; } enum SchedulerResourceTypes { http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 09262bc..d717431 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -87,6 +87,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.URL; +import org.apache.hadoop.yarn.api.records.UpdatedContainer; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId; import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; @@ -842,7 +843,8 @@ public class ApplicationMaster { } @Override - public void onContainersResourceChanged(List<Container> containers) {} + public void onContainersUpdated( + List<UpdatedContainer> containers) {} @Override public void onShutdownRequest() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java index 3c8f923..a2eea52 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java @@ -29,6 +29,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; @@ -39,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.UpdatedContainer; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl; @@ -61,7 +63,7 @@ import com.google.common.annotations.VisibleForTesting; * [run tasks on the containers] * } * - * public void onContainersResourceChanged(List<Container> containers) { + * public void onContainersUpdated(List<Container> containers) { * [determine if resource allocation of containers have been increased in * the ResourceManager, and if so, inform the NodeManagers to increase the * resource monitor/enforcement on the containers] @@ -374,8 +376,9 @@ extends AbstractService { * Called when the ResourceManager responds to a heartbeat with containers * whose resource allocation has been changed. */ - public abstract void onContainersResourceChanged( - List<Container> containers); + @Public + @Unstable + public abstract void onContainersUpdated(List<UpdatedContainer> containers); /** * Called when the ResourceManager wants the ApplicationMaster to shutdown http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java index 286ca28..ae0ab9d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java @@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.UpdatedContainer; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; @@ -338,12 +339,11 @@ extends AMRMClientAsync<T> { if (handler instanceof AMRMClientAsync.AbstractCallbackHandler) { // RM side of the implementation guarantees that there are // no duplications between increased and decreased containers - List<Container> changed = new ArrayList<>(); - changed.addAll(response.getIncreasedContainers()); - changed.addAll(response.getDecreasedContainers()); + List<UpdatedContainer> changed = new ArrayList<>(); + changed.addAll(response.getUpdatedContainers()); if (!changed.isEmpty()) { ((AMRMClientAsync.AbstractCallbackHandler) handler) - .onContainersResourceChanged(changed); + .onContainersUpdated(changed); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java index 4366c25..b10f8de 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java @@ -52,8 +52,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterReque import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ContainerUpdateType; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.Priority; @@ -61,6 +61,8 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; +import org.apache.hadoop.yarn.api.records.UpdatedContainer; import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; @@ -257,33 +259,10 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> { new HashMap<>(); try { synchronized (this) { - askList = new ArrayList<ResourceRequest>(ask.size()); - for(ResourceRequest r : ask) { - // create a copy of ResourceRequest as we might change it while the - // RPC layer is using it to send info across - askList.add(ResourceRequest.newInstance(r.getPriority(), - r.getResourceName(), r.getCapability(), r.getNumContainers(), - r.getRelaxLocality(), r.getNodeLabelExpression())); - } - List<ContainerResourceChangeRequest> increaseList = new ArrayList<>(); - List<ContainerResourceChangeRequest> decreaseList = new ArrayList<>(); + askList = cloneAsks(); // Save the current change for recovery oldChange.putAll(change); - for (Map.Entry<ContainerId, SimpleEntry<Container, Resource>> entry : - change.entrySet()) { - Container container = entry.getValue().getKey(); - Resource original = container.getResource(); - Resource target = entry.getValue().getValue(); - if (Resources.fitsIn(target, original)) { - // This is a decrease request - decreaseList.add(ContainerResourceChangeRequest.newInstance( - container.getId(), target)); - } else { - // This is an increase request - increaseList.add(ContainerResourceChangeRequest.newInstance( - container.getId(), target)); - } - } + List<UpdateContainerRequest> updateList = createUpdateList(); releaseList = new ArrayList<ContainerId>(release); // optimistically clear this collection assuming no RPC failure ask.clear(); @@ -299,8 +278,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> { allocateRequest = AllocateRequest.newInstance(lastResponseId, progressIndicator, - askList, releaseList, blacklistRequest, - increaseList, decreaseList); + askList, releaseList, blacklistRequest, updateList); // clear blacklistAdditions and blacklistRemovals before // unsynchronized part blacklistAdditions.clear(); @@ -350,9 +328,8 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> { if (!pendingChange.isEmpty()) { List<ContainerStatus> completed = allocateResponse.getCompletedContainersStatuses(); - List<Container> changed = new ArrayList<>(); - changed.addAll(allocateResponse.getIncreasedContainers()); - changed.addAll(allocateResponse.getDecreasedContainers()); + List<UpdatedContainer> changed = new ArrayList<>(); + changed.addAll(allocateResponse.getUpdatedContainers()); // remove all pending change requests that belong to the completed // containers for (ContainerStatus status : completed) { @@ -409,6 +386,38 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> { return allocateResponse; } + private List<UpdateContainerRequest> createUpdateList() { + List<UpdateContainerRequest> updateList = new ArrayList<>(); + for (Map.Entry<ContainerId, SimpleEntry<Container, Resource>> entry : + change.entrySet()) { + Resource targetCapability = entry.getValue().getValue(); + Resource currCapability = entry.getValue().getKey().getResource(); + int version = entry.getValue().getKey().getVersion(); + ContainerUpdateType updateType = + ContainerUpdateType.INCREASE_RESOURCE; + if (Resources.fitsIn(targetCapability, currCapability)) { + updateType = ContainerUpdateType.DECREASE_RESOURCE; + } + updateList.add( + UpdateContainerRequest.newInstance(version, entry.getKey(), + updateType, targetCapability)); + } + return updateList; + } + + private List<ResourceRequest> cloneAsks() { + List<ResourceRequest> askList = new ArrayList<ResourceRequest>(ask.size()); + for(ResourceRequest r : ask) { + // create a copy of ResourceRequest as we might change it while the + // RPC layer is using it to send info across + ResourceRequest rr = ResourceRequest.newInstance(r.getPriority(), + r.getResourceName(), r.getCapability(), r.getNumContainers(), + r.getRelaxLocality(), r.getNodeLabelExpression()); + askList.add(rr); + } + return askList; + } + protected void removePendingReleaseRequests( List<ContainerStatus> completedContainersStatuses) { for (ContainerStatus containerStatus : completedContainersStatuses) { @@ -417,16 +426,16 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> { } protected void removePendingChangeRequests( - List<Container> changedContainers) { - for (Container changedContainer : changedContainers) { - ContainerId containerId = changedContainer.getId(); + List<UpdatedContainer> changedContainers) { + for (UpdatedContainer changedContainer : changedContainers) { + ContainerId containerId = changedContainer.getContainer().getId(); if (pendingChange.get(containerId) == null) { continue; } if (LOG.isDebugEnabled()) { LOG.debug("RM has confirmed changed resource allocation for " + "container " + containerId + ". Current resource allocation:" - + changedContainer.getResource() + + changedContainer.getContainer().getResource() + ". Remove pending change request:" + pendingChange.get(containerId).getValue()); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java index c7b3a94..dac82e4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java @@ -45,9 +45,11 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ContainerUpdateType; import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.UpdatedContainer; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; @@ -89,20 +91,21 @@ public class TestAMRMClientAsync { TestCallbackHandler callbackHandler = new TestCallbackHandler(); final AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class); final AtomicInteger secondHeartbeatSync = new AtomicInteger(0); - when(client.allocate(anyFloat())).thenReturn(response1).thenAnswer(new Answer<AllocateResponse>() { - @Override - public AllocateResponse answer(InvocationOnMock invocation) - throws Throwable { - secondHeartbeatSync.incrementAndGet(); - while (heartbeatBlock.get()) { - synchronized (heartbeatBlock) { - heartbeatBlock.wait(); + when(client.allocate(anyFloat())).thenReturn(response1).thenAnswer( + new Answer<AllocateResponse>() { + @Override + public AllocateResponse answer(InvocationOnMock invocation) + throws Throwable { + secondHeartbeatSync.incrementAndGet(); + while (heartbeatBlock.get()) { + synchronized (heartbeatBlock) { + heartbeatBlock.wait(); + } + } + secondHeartbeatSync.incrementAndGet(); + return response2; } - } - secondHeartbeatSync.incrementAndGet(); - return response2; - } - }).thenReturn(response3).thenReturn(emptyResponse); + }).thenReturn(response3).thenReturn(emptyResponse); when(client.registerApplicationMaster(anyString(), anyInt(), anyString())) .thenReturn(null); when(client.getAvailableResources()).thenAnswer(new Answer<Resource>() { @@ -410,10 +413,21 @@ public class TestAMRMClientAsync { List<ContainerStatus> completed, List<Container> allocated, List<Container> increased, List<Container> decreased, List<NMToken> nmTokens) { + List<UpdatedContainer> updatedContainers = new ArrayList<>(); + for (Container c : increased) { + updatedContainers.add( + UpdatedContainer.newInstance( + ContainerUpdateType.INCREASE_RESOURCE, c)); + } + for (Container c : decreased) { + updatedContainers.add( + UpdatedContainer.newInstance( + ContainerUpdateType.DECREASE_RESOURCE, c)); + } AllocateResponse response = AllocateResponse.newInstance(0, completed, allocated, new ArrayList<NodeReport>(), null, null, 1, null, nmTokens, - increased, decreased); + updatedContainers); return response; } @@ -429,7 +443,7 @@ public class TestAMRMClientAsync { extends AMRMClientAsync.AbstractCallbackHandler { private volatile List<ContainerStatus> completedContainers; private volatile List<Container> allocatedContainers; - private final List<Container> changedContainers = new ArrayList<>(); + private final List<UpdatedContainer> changedContainers = new ArrayList<>(); Exception savedException = null; volatile boolean reboot = false; Object notifier = new Object(); @@ -448,8 +462,8 @@ public class TestAMRMClientAsync { return ret; } - public List<Container> takeChangedContainers() { - List<Container> ret = null; + public List<UpdatedContainer> takeChangedContainers() { + List<UpdatedContainer> ret = null; synchronized (changedContainers) { if (!changedContainers.isEmpty()) { ret = new ArrayList<>(changedContainers); @@ -488,8 +502,8 @@ public class TestAMRMClientAsync { } @Override - public void onContainersResourceChanged( - List<Container> changed) { + public void onContainersUpdated( + List<UpdatedContainer> changed) { synchronized (changedContainers) { changedContainers.clear(); changedContainers.addAll(changed); @@ -564,7 +578,8 @@ public class TestAMRMClientAsync { public void onContainersAllocated(List<Container> containers) {} @Override - public void onContainersResourceChanged(List<Container> containers) {} + public void onContainersUpdated( + List<UpdatedContainer> containers) {} @Override public void onShutdownRequest() {} http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java index 75b49d0..79d1c67 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java @@ -70,6 +70,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.api.records.UpdatedContainer; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.client.api.AMRMClient; @@ -881,22 +882,16 @@ public class TestAMRMClient { AllocateResponse allocResponse = amClient.allocate(0.1f); Assert.assertEquals(0, amClientImpl.change.size()); // we should get decrease confirmation right away - List<Container> decreasedContainers = - allocResponse.getDecreasedContainers(); - List<Container> increasedContainers = - allocResponse.getIncreasedContainers(); - Assert.assertEquals(1, decreasedContainers.size()); - Assert.assertEquals(0, increasedContainers.size()); + List<UpdatedContainer> updatedContainers = + allocResponse.getUpdatedContainers(); + Assert.assertEquals(1, updatedContainers.size()); // we should get increase allocation after the next NM's heartbeat to RM sleep(150); // get allocations allocResponse = amClient.allocate(0.1f); - decreasedContainers = - allocResponse.getDecreasedContainers(); - increasedContainers = - allocResponse.getIncreasedContainers(); - Assert.assertEquals(1, increasedContainers.size()); - Assert.assertEquals(0, decreasedContainers.size()); + updatedContainers = + allocResponse.getUpdatedContainers(); + Assert.assertEquals(1, updatedContainers.size()); } private void testAllocation(final AMRMClientImpl<ContainerRequest> amClient) http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java index 0460f1e..6552ad0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java @@ -38,12 +38,12 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -249,7 +249,7 @@ public class TestAMRMClientOnRMRestart { // new NM to represent NM re-register nm1 = new MockNM("h1:1234", 10240, rm2.getResourceTrackerService()); NMContainerStatus containerReport = - NMContainerStatus.newInstance(containerId, ContainerState.RUNNING, + NMContainerStatus.newInstance(containerId, 0, ContainerState.RUNNING, Resource.newInstance(1024, 1), "recover container", 0, Priority.newInstance(0), 0); nm1.registerNode(Collections.singletonList(containerReport), @@ -386,7 +386,7 @@ public class TestAMRMClientOnRMRestart { ContainerId containerId = ContainerId.newContainerId(appAttemptId, 1); NMContainerStatus containerReport = - NMContainerStatus.newInstance(containerId, ContainerState.RUNNING, + NMContainerStatus.newInstance(containerId, 0, ContainerState.RUNNING, Resource.newInstance(1024, 1), "recover container", 0, Priority.newInstance(0), 0); nm1.registerNode(Arrays.asList(containerReport), null); @@ -549,8 +549,8 @@ public class TestAMRMClientOnRMRestart { List<ResourceRequest> lastAsk = null; List<ContainerId> lastRelease = null; - List<ContainerResourceChangeRequest> lastIncrease = null; - List<ContainerResourceChangeRequest> lastDecrease = null; + List<UpdateContainerRequest> lastIncrease = null; + List<UpdateContainerRequest> lastDecrease = null; List<String> lastBlacklistAdditions; List<String> lastBlacklistRemovals; @@ -561,8 +561,8 @@ public class TestAMRMClientOnRMRestart { ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask, List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals, - List<ContainerResourceChangeRequest> increaseRequests, - List<ContainerResourceChangeRequest> decreaseRequests) { + List<UpdateContainerRequest> increaseRequests, + List<UpdateContainerRequest> decreaseRequests) { List<ResourceRequest> askCopy = new ArrayList<ResourceRequest>(); for (ResourceRequest req : ask) { ResourceRequest reqCopy = http://git-wip-us.apache.org/repos/asf/hadoop/blob/979b29a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java index d6db32c..0f0f571 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java @@ -27,17 +27,17 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; -import org.apache.hadoop.yarn.api.records.impl.pb.ContainerResourceChangeRequestPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ResourceBlacklistRequestPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ResourceRequestPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.UpdateContainerRequestPBImpl; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; -import org.apache.hadoop.yarn.proto.YarnProtos.ContainerResourceChangeRequestProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceBlacklistRequestProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateContainerRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateRequestProtoOrBuilder; @@ -52,8 +52,7 @@ public class AllocateRequestPBImpl extends AllocateRequest { private List<ResourceRequest> ask = null; private List<ContainerId> release = null; - private List<ContainerResourceChangeRequest> increaseRequests = null; - private List<ContainerResourceChangeRequest> decreaseRequests = null; + private List<UpdateContainerRequest> updateRequests = null; private ResourceBlacklistRequest blacklistRequest = null; public AllocateRequestPBImpl() { @@ -99,11 +98,8 @@ public class AllocateRequestPBImpl extends AllocateRequest { if (this.release != null) { addReleasesToProto(); } - if (this.increaseRequests != null) { - addIncreaseRequestsToProto(); - } - if (this.decreaseRequests != null) { - addDecreaseRequestsToProto(); + if (this.updateRequests != null) { + addUpdateRequestsToProto(); } if (this.blacklistRequest != null) { builder.setBlacklistRequest(convertToProtoFormat(this.blacklistRequest)); @@ -166,37 +162,19 @@ public class AllocateRequestPBImpl extends AllocateRequest { } @Override - public List<ContainerResourceChangeRequest> getIncreaseRequests() { - initIncreaseRequests(); - return this.increaseRequests; + public List<UpdateContainerRequest> getUpdateRequests() { + initUpdateRequests(); + return this.updateRequests; } @Override - public void setIncreaseRequests( - List<ContainerResourceChangeRequest> increaseRequests) { - if (increaseRequests == null) { + public void setUpdateRequests(List<UpdateContainerRequest> updateRequests) { + if (updateRequests == null) { return; } - initIncreaseRequests(); - this.increaseRequests.clear(); - this.increaseRequests.addAll(increaseRequests); - } - - @Override - public List<ContainerResourceChangeRequest> getDecreaseRequests() { - initDecreaseRequests(); - return this.decreaseRequests; - } - - @Override - public void setDecreaseRequests( - List<ContainerResourceChangeRequest> decreaseRequests) { - if (decreaseRequests == null) { - return; - } - initDecreaseRequests(); - this.decreaseRequests.clear(); - this.decreaseRequests.addAll(decreaseRequests); + initUpdateRequests(); + this.updateRequests.clear(); + this.updateRequests.addAll(updateRequests); } @Override @@ -239,7 +217,8 @@ public class AllocateRequestPBImpl extends AllocateRequest { builder.clearAsk(); if (ask == null) return; - Iterable<ResourceRequestProto> iterable = new Iterable<ResourceRequestProto>() { + Iterable<ResourceRequestProto> iterable = + new Iterable<ResourceRequestProto>() { @Override public Iterator<ResourceRequestProto> iterator() { return new Iterator<ResourceRequestProto>() { @@ -268,84 +247,34 @@ public class AllocateRequestPBImpl extends AllocateRequest { builder.addAllAsk(iterable); } - private void initIncreaseRequests() { - if (this.increaseRequests != null) { + private void initUpdateRequests() { + if (this.updateRequests != null) { return; } AllocateRequestProtoOrBuilder p = viaProto ? proto : builder; - List<ContainerResourceChangeRequestProto> list = - p.getIncreaseRequestList(); - this.increaseRequests = new ArrayList<ContainerResourceChangeRequest>(); + List<UpdateContainerRequestProto> list = + p.getUpdateRequestsList(); + this.updateRequests = new ArrayList<>(); - for (ContainerResourceChangeRequestProto c : list) { - this.increaseRequests.add(convertFromProtoFormat(c)); - } - } - - private void initDecreaseRequests() { - if (this.decreaseRequests != null) { - return; - } - AllocateRequestProtoOrBuilder p = viaProto ? proto : builder; - List<ContainerResourceChangeRequestProto> list = - p.getDecreaseRequestList(); - this.decreaseRequests = new ArrayList<>(); - - for (ContainerResourceChangeRequestProto c : list) { - this.decreaseRequests.add(convertFromProtoFormat(c)); - } - } - - private void addIncreaseRequestsToProto() { - maybeInitBuilder(); - builder.clearIncreaseRequest(); - if (increaseRequests == null) { - return; + for (UpdateContainerRequestProto c : list) { + this.updateRequests.add(convertFromProtoFormat(c)); } - Iterable<ContainerResourceChangeRequestProto> iterable = - new Iterable<ContainerResourceChangeRequestProto>() { - @Override - public Iterator<ContainerResourceChangeRequestProto> iterator() { - return new Iterator<ContainerResourceChangeRequestProto>() { - - Iterator<ContainerResourceChangeRequest> iter = - increaseRequests.iterator(); - - @Override - public boolean hasNext() { - return iter.hasNext(); - } - - @Override - public ContainerResourceChangeRequestProto next() { - return convertToProtoFormat(iter.next()); - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - }; - - } - }; - builder.addAllIncreaseRequest(iterable); } - private void addDecreaseRequestsToProto() { + private void addUpdateRequestsToProto() { maybeInitBuilder(); - builder.clearDecreaseRequest(); - if (decreaseRequests == null) { + builder.clearUpdateRequests(); + if (updateRequests == null) { return; } - Iterable<ContainerResourceChangeRequestProto> iterable = - new Iterable<ContainerResourceChangeRequestProto>() { + Iterable<UpdateContainerRequestProto> iterable = + new Iterable<UpdateContainerRequestProto>() { @Override - public Iterator<ContainerResourceChangeRequestProto> iterator() { - return new Iterator<ContainerResourceChangeRequestProto>() { + public Iterator<UpdateContainerRequestProto> iterator() { + return new Iterator<UpdateContainerRequestProto>() { - Iterator<ContainerResourceChangeRequest> iter = - decreaseRequests.iterator(); + private Iterator<UpdateContainerRequest> iter = + updateRequests.iterator(); @Override public boolean hasNext() { @@ -353,7 +282,7 @@ public class AllocateRequestPBImpl extends AllocateRequest { } @Override - public ContainerResourceChangeRequestProto next() { + public UpdateContainerRequestProto next() { return convertToProtoFormat(iter.next()); } @@ -365,7 +294,7 @@ public class AllocateRequestPBImpl extends AllocateRequest { } }; - builder.addAllDecreaseRequest(iterable); + builder.addAllUpdateRequests(iterable); } @Override @@ -438,14 +367,14 @@ public class AllocateRequestPBImpl extends AllocateRequest { return ((ResourceRequestPBImpl)t).getProto(); } - private ContainerResourceChangeRequestPBImpl convertFromProtoFormat( - ContainerResourceChangeRequestProto p) { - return new ContainerResourceChangeRequestPBImpl(p); + private UpdateContainerRequestPBImpl convertFromProtoFormat( + UpdateContainerRequestProto p) { + return new UpdateContainerRequestPBImpl(p); } - private ContainerResourceChangeRequestProto convertToProtoFormat( - ContainerResourceChangeRequest t) { - return ((ContainerResourceChangeRequestPBImpl) t).getProto(); + private UpdateContainerRequestProto convertToProtoFormat( + UpdateContainerRequest t) { + return ((UpdateContainerRequestPBImpl) t).getProto(); } private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
