YARN-1651. CapacityScheduler side changes to support container resize. Contributed by Wangda Tan
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/733b0f68 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/733b0f68 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/733b0f68 Branch: refs/heads/YARN-1197 Commit: 733b0f68061558dc32eddb1f112447f5f4be02d0 Parents: b7c4cd5 Author: Jian He <[email protected]> Authored: Tue Sep 15 10:21:39 2015 +0800 Committer: Wangda Tan <[email protected]> Committed: Wed Sep 16 10:55:49 2015 -0700 ---------------------------------------------------------------------- .../v2/app/rm/TestRMContainerAllocator.java | 11 +- .../hadoop/yarn/sls/nodemanager/NodeInfo.java | 14 + .../yarn/sls/scheduler/RMNodeWrapper.java | 13 + .../sls/scheduler/ResourceSchedulerWrapper.java | 21 +- .../sls/scheduler/SLSCapacityScheduler.java | 19 +- hadoop-yarn-project/CHANGES.txt | 3 + .../api/impl/TestAMRMClientOnRMRestart.java | 8 +- .../resource/DefaultResourceCalculator.java | 5 + .../resource/DominantResourceCalculator.java | 6 + .../yarn/util/resource/ResourceCalculator.java | 5 + .../hadoop/yarn/util/resource/Resources.java | 5 + .../util/resource/TestResourceCalculator.java | 30 +- .../protocolrecords/NodeHeartbeatResponse.java | 5 +- .../impl/pb/NodeHeartbeatResponsePBImpl.java | 5 +- .../ApplicationMasterService.java | 22 +- .../server/resourcemanager/RMAuditLogger.java | 2 + .../server/resourcemanager/RMServerUtils.java | 164 ++++ .../resourcemanager/ResourceTrackerService.java | 7 +- .../rmapp/attempt/RMAppAttemptImpl.java | 4 +- .../rmcontainer/RMContainer.java | 4 + .../RMContainerChangeResourceEvent.java | 44 + .../rmcontainer/RMContainerEventType.java | 13 +- .../rmcontainer/RMContainerImpl.java | 121 ++- .../RMContainerUpdatesAcquiredEvent.java | 35 + .../server/resourcemanager/rmnode/RMNode.java | 9 + .../rmnode/RMNodeDecreaseContainerEvent.java | 39 + .../resourcemanager/rmnode/RMNodeEventType.java | 1 + .../resourcemanager/rmnode/RMNodeImpl.java | 93 ++ .../rmnode/RMNodeStatusEvent.java | 32 +- .../scheduler/AbstractYarnScheduler.java | 150 ++- .../resourcemanager/scheduler/Allocation.java | 22 +- .../scheduler/AppSchedulingInfo.java | 249 ++++- .../resourcemanager/scheduler/QueueMetrics.java | 16 +- .../scheduler/SchedContainerChangeRequest.java | 118 +++ .../scheduler/SchedulerApplication.java | 2 +- .../scheduler/SchedulerApplicationAttempt.java | 253 +++-- .../scheduler/SchedulerNode.java | 31 + .../scheduler/SchedulerUtils.java | 11 +- .../scheduler/YarnScheduler.java | 14 +- .../scheduler/capacity/AbstractCSQueue.java | 23 +- .../scheduler/capacity/CSAssignment.java | 9 + .../scheduler/capacity/CSQueue.java | 16 + .../scheduler/capacity/CapacityScheduler.java | 83 +- .../scheduler/capacity/LeafQueue.java | 127 ++- .../scheduler/capacity/ParentQueue.java | 115 ++- .../allocator/AbstractContainerAllocator.java | 131 +++ .../capacity/allocator/ContainerAllocator.java | 149 +-- .../allocator/IncreaseContainerAllocator.java | 365 +++++++ .../allocator/RegularContainerAllocator.java | 30 +- .../scheduler/common/fica/FiCaSchedulerApp.java | 68 +- .../scheduler/fair/FairScheduler.java | 35 +- .../scheduler/fifo/FifoScheduler.java | 25 +- .../server/resourcemanager/Application.java | 2 +- .../yarn/server/resourcemanager/MockAM.java | 9 + .../yarn/server/resourcemanager/MockNodes.java | 13 + .../yarn/server/resourcemanager/MockRM.java | 13 + .../TestApplicationMasterService.java | 144 ++- .../applicationsmanager/TestAMRestart.java | 15 +- .../TestRMAppLogAggregationStatus.java | 10 +- .../attempt/TestRMAppAttemptTransitions.java | 32 +- .../rmcontainer/TestRMContainerImpl.java | 117 ++- .../capacity/TestCapacityScheduler.java | 128 ++- .../scheduler/capacity/TestChildQueueOrder.java | 4 +- .../capacity/TestContainerAllocation.java | 50 +- .../capacity/TestContainerResizing.java | 963 +++++++++++++++++++ .../scheduler/capacity/TestLeafQueue.java | 4 +- .../scheduler/capacity/TestParentQueue.java | 4 +- .../scheduler/capacity/TestReservations.java | 9 +- .../scheduler/fair/FairSchedulerTestBase.java | 6 +- .../fair/TestContinuousScheduling.java | 2 +- .../scheduler/fair/TestFairScheduler.java | 30 +- .../scheduler/fifo/TestFifoScheduler.java | 28 +- 72 files changed, 3856 insertions(+), 509 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java index e148c32..2bb7e27 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java @@ -98,6 +98,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NMToken; @@ -1575,8 +1576,10 @@ public class TestRMContainerAllocator { @Override public synchronized Allocation allocate( ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask, - List<ContainerId> release, - List<String> blacklistAdditions, List<String> blacklistRemovals) { + List<ContainerId> release, List<String> blacklistAdditions, + List<String> blacklistRemovals, + List<ContainerResourceChangeRequest> increaseRequests, + List<ContainerResourceChangeRequest> decreaseRequests) { List<ResourceRequest> askCopy = new ArrayList<ResourceRequest>(); for (ResourceRequest req : ask) { ResourceRequest reqCopy = ResourceRequest.newInstance(req @@ -1590,8 +1593,8 @@ public class TestRMContainerAllocator { lastBlacklistAdditions = blacklistAdditions; lastBlacklistRemovals = blacklistRemovals; return super.allocate( - applicationAttemptId, askCopy, release, - blacklistAdditions, blacklistRemovals); + applicationAttemptId, askCopy, release, blacklistAdditions, + blacklistRemovals, increaseRequests, decreaseRequests); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java index 2d2c3e0..dae2ce7 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java @@ -26,6 +26,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.net.Node; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; @@ -174,6 +175,19 @@ public class NodeInfo { public Set<String> getNodeLabels() { return RMNodeLabelsManager.EMPTY_STRING_SET; } + + @Override + public void updateNodeHeartbeatResponseForContainersDecreasing( + NodeHeartbeatResponse response) { + // TODO Auto-generated method stub + + } + + @Override + public List<Container> pullNewlyIncreasedContainers() { + // TODO Auto-generated method stub + return null; + } } public static RMNode newNodeInfo(String rackName, String hostName, http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java index ecc4734..8c65ccc 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java @@ -22,6 +22,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.net.Node; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; @@ -163,4 +164,16 @@ public class RMNodeWrapper implements RMNode { public Set<String> getNodeLabels() { return RMNodeLabelsManager.EMPTY_STRING_SET; } + + @Override + public void updateNodeHeartbeatResponseForContainersDecreasing( + NodeHeartbeatResponse response) { + // TODO Auto-generated method stub + } + + @Override + public List<Container> pullNewlyIncreasedContainers() { + // TODO Auto-generated method stub + return null; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java index 14e2645..310b3b5 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java @@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; @@ -72,6 +73,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; @@ -202,15 +204,16 @@ final public class ResourceSchedulerWrapper @Override public Allocation allocate(ApplicationAttemptId attemptId, - List<ResourceRequest> resourceRequests, - List<ContainerId> containerIds, - List<String> strings, List<String> strings2) { + List<ResourceRequest> resourceRequests, List<ContainerId> containerIds, + List<String> strings, List<String> strings2, + List<ContainerResourceChangeRequest> increaseRequests, + List<ContainerResourceChangeRequest> decreaseRequests) { if (metricsON) { final Timer.Context context = schedulerAllocateTimer.time(); Allocation allocation = null; try { allocation = scheduler.allocate(attemptId, resourceRequests, - containerIds, strings, strings2); + containerIds, strings, strings2, null, null); return allocation; } finally { context.stop(); @@ -224,7 +227,7 @@ final public class ResourceSchedulerWrapper } } else { return scheduler.allocate(attemptId, - resourceRequests, containerIds, strings, strings2); + resourceRequests, containerIds, strings, strings2, null, null); } } @@ -959,4 +962,12 @@ final public class ResourceSchedulerWrapper return Priority.newInstance(0); } + @Override + protected void decreaseContainer( + SchedContainerChangeRequest decreaseRequest, + SchedulerApplicationAttempt attempt) { + // TODO Auto-generated method stub + + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java index a4416db..3626027 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java @@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -176,15 +177,17 @@ public class SLSCapacityScheduler extends CapacityScheduler implements @Override public Allocation allocate(ApplicationAttemptId attemptId, - List<ResourceRequest> resourceRequests, - List<ContainerId> containerIds, - List<String> strings, List<String> strings2) { + List<ResourceRequest> resourceRequests, List<ContainerId> containerIds, + List<String> strings, List<String> strings2, + List<ContainerResourceChangeRequest> increaseRequests, + List<ContainerResourceChangeRequest> decreaseRequests) { if (metricsON) { final Timer.Context context = schedulerAllocateTimer.time(); Allocation allocation = null; try { - allocation = super.allocate(attemptId, resourceRequests, - containerIds, strings, strings2); + allocation = super + .allocate(attemptId, resourceRequests, containerIds, strings, + strings2, increaseRequests, decreaseRequests); return allocation; } finally { context.stop(); @@ -197,8 +200,8 @@ public class SLSCapacityScheduler extends CapacityScheduler implements } } } else { - return super.allocate(attemptId, - resourceRequests, containerIds, strings, strings2); + return super.allocate(attemptId, resourceRequests, containerIds, strings, + strings2, increaseRequests, decreaseRequests); } } @@ -426,7 +429,7 @@ public class SLSCapacityScheduler extends CapacityScheduler implements if (pool != null) pool.shutdown(); } - @SuppressWarnings("unchecked") + @SuppressWarnings({ "unchecked", "rawtypes" }) private void initMetrics() throws Exception { metrics = new MetricRegistry(); // configuration http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index d7ff457..5c0f849 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -214,6 +214,9 @@ Release 2.8.0 - UNRELEASED YARN-3868. Recovery support for container resizing. (Meng Ding via jianhe) + YARN-1651. CapacityScheduler side changes to support container resize. + (Wangda Tan via jianhe) + IMPROVEMENTS YARN-644. Basic null check is not performed on passed in arguments before http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java index 108ad37..2394747 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.Priority; @@ -525,7 +526,9 @@ public class TestAMRMClientOnRMRestart { public synchronized Allocation allocate( ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask, List<ContainerId> release, List<String> blacklistAdditions, - List<String> blacklistRemovals) { + List<String> blacklistRemovals, + List<ContainerResourceChangeRequest> increaseRequests, + List<ContainerResourceChangeRequest> decreaseRequests) { List<ResourceRequest> askCopy = new ArrayList<ResourceRequest>(); for (ResourceRequest req : ask) { ResourceRequest reqCopy = @@ -539,7 +542,8 @@ public class TestAMRMClientOnRMRestart { lastBlacklistAdditions = blacklistAdditions; lastBlacklistRemovals = blacklistRemovals; return super.allocate(applicationAttemptId, askCopy, release, - blacklistAdditions, blacklistRemovals); + blacklistAdditions, blacklistRemovals, increaseRequests, + decreaseRequests); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java index c2fc1f0..2fdf214 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java @@ -110,4 +110,9 @@ public class DefaultResourceCalculator extends ResourceCalculator { ); } + @Override + public boolean fitsIn(Resource cluster, + Resource smaller, Resource bigger) { + return smaller.getMemory() <= bigger.getMemory(); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java index 2ee95ce..b5c9967 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java @@ -209,4 +209,10 @@ public class DominantResourceCalculator extends ResourceCalculator { ); } + @Override + public boolean fitsIn(Resource cluster, + Resource smaller, Resource bigger) { + return smaller.getMemory() <= bigger.getMemory() + && smaller.getVirtualCores() <= bigger.getVirtualCores(); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java index 442196c..3a31225 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java @@ -171,4 +171,9 @@ public abstract class ResourceCalculator { */ public abstract Resource divideAndCeil(Resource numerator, int denominator); + /** + * Check if a smaller resource can be contained by bigger resource. + */ + public abstract boolean fitsIn(Resource cluster, + Resource smaller, Resource bigger); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java index 503d456..b05d021 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java @@ -267,6 +267,11 @@ public class Resources { return smaller.getMemory() <= bigger.getMemory() && smaller.getVirtualCores() <= bigger.getVirtualCores(); } + + public static boolean fitsIn(ResourceCalculator rc, Resource cluster, + Resource smaller, Resource bigger) { + return rc.fitsIn(cluster, smaller, bigger); + } public static Resource componentwiseMin(Resource lhs, Resource rhs) { return createResource(Math.min(lhs.getMemory(), rhs.getMemory()), http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResourceCalculator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResourceCalculator.java index 6a0b62e..0654891 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResourceCalculator.java @@ -41,6 +41,35 @@ public class TestResourceCalculator { public TestResourceCalculator(ResourceCalculator rs) { this.resourceCalculator = rs; } + + @Test(timeout = 10000) + public void testFitsIn() { + Resource cluster = Resource.newInstance(1024, 1); + + if (resourceCalculator instanceof DefaultResourceCalculator) { + Assert.assertTrue(resourceCalculator.fitsIn(cluster, + Resource.newInstance(1, 2), Resource.newInstance(2, 1))); + Assert.assertTrue(resourceCalculator.fitsIn(cluster, + Resource.newInstance(1, 2), Resource.newInstance(2, 2))); + Assert.assertTrue(resourceCalculator.fitsIn(cluster, + Resource.newInstance(1, 2), Resource.newInstance(1, 2))); + Assert.assertTrue(resourceCalculator.fitsIn(cluster, + Resource.newInstance(1, 2), Resource.newInstance(1, 1))); + Assert.assertFalse(resourceCalculator.fitsIn(cluster, + Resource.newInstance(2, 1), Resource.newInstance(1, 2))); + } else if (resourceCalculator instanceof DominantResourceCalculator) { + Assert.assertFalse(resourceCalculator.fitsIn(cluster, + Resource.newInstance(1, 2), Resource.newInstance(2, 1))); + Assert.assertTrue(resourceCalculator.fitsIn(cluster, + Resource.newInstance(1, 2), Resource.newInstance(2, 2))); + Assert.assertTrue(resourceCalculator.fitsIn(cluster, + Resource.newInstance(1, 2), Resource.newInstance(1, 2))); + Assert.assertFalse(resourceCalculator.fitsIn(cluster, + Resource.newInstance(1, 2), Resource.newInstance(1, 1))); + Assert.assertFalse(resourceCalculator.fitsIn(cluster, + Resource.newInstance(2, 1), Resource.newInstance(1, 2))); + } + } @Test(timeout = 10000) public void testResourceCalculatorCompareMethod() { @@ -92,7 +121,6 @@ public class TestResourceCalculator { } - private void assertResourcesOperations(Resource clusterResource, Resource lhs, Resource rhs, boolean lessThan, boolean lessThanOrEqual, boolean greaterThan, boolean greaterThanOrEqual, Resource max, http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java index 38fbc82..c0ccf57 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java @@ -19,12 +19,13 @@ package org.apache.hadoop.yarn.server.api.protocolrecords; import java.nio.ByteBuffer; +import java.util.Collection; import java.util.List; import java.util.Map; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; @@ -73,5 +74,5 @@ public interface NodeHeartbeatResponse { void setAreNodeLabelsAcceptedByRM(boolean areNodeLabelsAcceptedByRM); List<Container> getContainersToDecrease(); - void addAllContainersToDecrease(List<Container> containersToDecrease); + void addAllContainersToDecrease(Collection<Container> containersToDecrease); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java index 12c5230..dc65141 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java @@ -20,14 +20,15 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; @@ -437,7 +438,7 @@ public class NodeHeartbeatResponsePBImpl extends @Override public void addAllContainersToDecrease( - final List<Container> containersToDecrease) { + final Collection<Container> containersToDecrease) { if (containersToDecrease == null) { return; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 14142de..87c7bfa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -451,11 +451,13 @@ public class ApplicationMasterService extends AbstractService implements req.setNodeLabelExpression(asc.getNodeLabelExpression()); } } + + Resource maximumCapacity = rScheduler.getMaximumResourceCapability(); // sanity check try { RMServerUtils.normalizeAndValidateRequests(ask, - rScheduler.getMaximumResourceCapability(), app.getQueue(), + maximumCapacity, app.getQueue(), rScheduler, rmContext); } catch (InvalidResourceRequestException e) { LOG.warn("Invalid resource ask by application " + appAttemptId, e); @@ -469,6 +471,15 @@ public class ApplicationMasterService extends AbstractService implements throw e; } + try { + RMServerUtils.increaseDecreaseRequestSanityCheck(rmContext, + request.getIncreaseRequests(), request.getDecreaseRequests(), + maximumCapacity); + } catch (InvalidResourceRequestException e) { + LOG.warn(e); + throw e; + } + // In the case of work-preserving AM restart, it's possible for the // AM to release containers from the earlier attempt. if (!app.getApplicationSubmissionContext() @@ -493,8 +504,9 @@ public class ApplicationMasterService extends AbstractService implements allocation = EMPTY_ALLOCATION; } else { allocation = - this.rScheduler.allocate(appAttemptId, ask, release, - blacklistAdditions, blacklistRemovals); + this.rScheduler.allocate(appAttemptId, ask, release, + blacklistAdditions, blacklistRemovals, + request.getIncreaseRequests(), request.getDecreaseRequests()); } if (!blacklistAdditions.isEmpty() || !blacklistRemovals.isEmpty()) { @@ -540,6 +552,10 @@ public class ApplicationMasterService extends AbstractService implements .pullJustFinishedContainers()); allocateResponse.setResponseId(lastResponse.getResponseId() + 1); allocateResponse.setAvailableResources(allocation.getResourceLimit()); + + // Handling increased/decreased containers + allocateResponse.setIncreasedContainers(allocation.getIncreasedContainers()); + allocateResponse.setDecreasedContainers(allocation.getDecreasedContainers()); allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java index f049d97..cd9a61d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java @@ -56,6 +56,8 @@ public class RMAuditLogger { public static final String RELEASE_CONTAINER = "AM Released Container"; public static final String UPDATE_APP_PRIORITY = "Update Application Priority Request"; + public static final String CHANGE_CONTAINER_RESOURCE = + "AM Changed Container Resource"; // Some commonly used descriptions public static final String UNAUTHORIZED_USER = "Unauthorized user"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java index 4d2e41c..cc30593 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java @@ -22,8 +22,10 @@ import java.io.IOException; import java.util.ArrayList; import java.util.EnumSet; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.hadoop.conf.Configuration; @@ -34,6 +36,7 @@ import org.apache.hadoop.security.authorize.ProxyUsers; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.Resource; @@ -49,10 +52,14 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.security.YarnAuthorizationProvider; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; /** @@ -107,6 +114,89 @@ public class RMServerUtils { queueName, scheduler, rmContext, queueInfo); } } + + /** + * Normalize container increase/decrease request, it will normalize and update + * ContainerResourceChangeRequest.targetResource + * + * <pre> + * - Throw exception when any other error happens + * </pre> + */ + public static void checkAndNormalizeContainerChangeRequest( + RMContext rmContext, ContainerResourceChangeRequest request, + boolean increase) throws InvalidResourceRequestException { + ContainerId containerId = request.getContainerId(); + ResourceScheduler scheduler = rmContext.getScheduler(); + RMContainer rmContainer = scheduler.getRMContainer(containerId); + ResourceCalculator rc = scheduler.getResourceCalculator(); + + if (null == rmContainer) { + String msg = + "Failed to get rmContainer for " + + (increase ? "increase" : "decrease") + + " request, with container-id=" + containerId; + throw new InvalidResourceRequestException(msg); + } + + if (rmContainer.getState() != RMContainerState.RUNNING) { + String msg = + "rmContainer's state is not RUNNING, for " + + (increase ? "increase" : "decrease") + + " request, with container-id=" + containerId; + throw new InvalidResourceRequestException(msg); + } + + Resource targetResource = Resources.normalize(rc, request.getCapability(), + scheduler.getMinimumResourceCapability(), + scheduler.getMaximumResourceCapability(), + scheduler.getMinimumResourceCapability()); + + // Compare targetResource and original resource + Resource originalResource = rmContainer.getAllocatedResource(); + + // Resource comparasion should be >= (or <=) for all resource vectors, for + // example, you cannot request target resource of a <10G, 10> container to + // <20G, 8> + if (increase) { + if (originalResource.getMemory() > targetResource.getMemory() + || originalResource.getVirtualCores() > targetResource + .getVirtualCores()) { + String msg = + "Trying to increase a container, but target resource has some" + + " resource < original resource, target=" + targetResource + + " original=" + originalResource + " containerId=" + + containerId; + throw new InvalidResourceRequestException(msg); + } + } else { + if (originalResource.getMemory() < targetResource.getMemory() + || originalResource.getVirtualCores() < targetResource + .getVirtualCores()) { + String msg = + "Trying to decrease a container, but target resource has " + + "some resource > original resource, target=" + targetResource + + " original=" + originalResource + " containerId=" + + containerId; + throw new InvalidResourceRequestException(msg); + } + } + + RMNode rmNode = rmContext.getRMNodes().get(rmContainer.getAllocatedNode()); + + // Target resource of the increase request is more than NM can offer + if (!Resources.fitsIn(scheduler.getResourceCalculator(), + scheduler.getClusterResource(), targetResource, + rmNode.getTotalCapability())) { + String msg = "Target resource=" + targetResource + " of containerId=" + + containerId + " is more than node's total resource=" + + rmNode.getTotalCapability(); + throw new InvalidResourceRequestException(msg); + } + + // Update normalized target resource + request.setCapability(targetResource); + } /* * @throw <code>InvalidResourceBlacklistRequestException </code> if the @@ -123,6 +213,80 @@ public class RMServerUtils { } } } + + /** + * Check if we have: + * - Request for same containerId and different target resource + * - If targetResources violates maximum/minimumAllocation + */ + public static void increaseDecreaseRequestSanityCheck(RMContext rmContext, + List<ContainerResourceChangeRequest> incRequests, + List<ContainerResourceChangeRequest> decRequests, + Resource maximumAllocation) throws InvalidResourceRequestException { + checkDuplicatedIncreaseDecreaseRequest(incRequests, decRequests); + validateIncreaseDecreaseRequest(rmContext, incRequests, maximumAllocation, + true); + validateIncreaseDecreaseRequest(rmContext, decRequests, maximumAllocation, + false); + } + + private static void checkDuplicatedIncreaseDecreaseRequest( + List<ContainerResourceChangeRequest> incRequests, + List<ContainerResourceChangeRequest> decRequests) + throws InvalidResourceRequestException { + String msg = "There're multiple increase or decrease container requests " + + "for same containerId="; + Set<ContainerId> existedContainerIds = new HashSet<ContainerId>(); + if (incRequests != null) { + for (ContainerResourceChangeRequest r : incRequests) { + if (!existedContainerIds.add(r.getContainerId())) { + throw new InvalidResourceRequestException(msg + r.getContainerId()); + } + } + } + + if (decRequests != null) { + for (ContainerResourceChangeRequest r : decRequests) { + if (!existedContainerIds.add(r.getContainerId())) { + throw new InvalidResourceRequestException(msg + r.getContainerId()); + } + } + } + } + + private static void validateIncreaseDecreaseRequest(RMContext rmContext, + List<ContainerResourceChangeRequest> requests, Resource maximumAllocation, + boolean increase) + throws InvalidResourceRequestException { + if (requests == null) { + return; + } + for (ContainerResourceChangeRequest request : requests) { + if (request.getCapability().getMemory() < 0 + || request.getCapability().getMemory() > maximumAllocation + .getMemory()) { + throw new InvalidResourceRequestException("Invalid " + + (increase ? "increase" : "decrease") + " request" + + ", requested memory < 0" + + ", or requested memory > max configured" + ", requestedMemory=" + + request.getCapability().getMemory() + ", maxMemory=" + + maximumAllocation.getMemory()); + } + if (request.getCapability().getVirtualCores() < 0 + || request.getCapability().getVirtualCores() > maximumAllocation + .getVirtualCores()) { + throw new InvalidResourceRequestException("Invalid " + + (increase ? "increase" : "decrease") + " request" + + ", requested virtual cores < 0" + + ", or requested virtual cores > max configured" + + ", requestedVirtualCores=" + + request.getCapability().getVirtualCores() + ", maxVirtualCores=" + + maximumAllocation.getVirtualCores()); + } + + checkAndNormalizeContainerChangeRequest(rmContext, request, increase); + } + } /** * It will validate to make sure all the containers belong to correct http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 100e991..557f6d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -449,6 +449,8 @@ public class ResourceTrackerService extends AbstractService implements getResponseId() + 1, NodeAction.NORMAL, null, null, null, null, nextHeartBeatInterval); rmNode.updateNodeHeartbeatResponseForCleanup(nodeHeartBeatResponse); + rmNode.updateNodeHeartbeatResponseForContainersDecreasing( + nodeHeartBeatResponse); populateKeys(request, nodeHeartBeatResponse); @@ -461,8 +463,9 @@ public class ResourceTrackerService extends AbstractService implements // 4. Send status to RMNode, saving the latest response. RMNodeStatusEvent nodeStatusEvent = new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(), - remoteNodeStatus.getContainersStatuses(), - remoteNodeStatus.getKeepAliveApplications(), nodeHeartBeatResponse); + remoteNodeStatus.getContainersStatuses(), + remoteNodeStatus.getKeepAliveApplications(), nodeHeartBeatResponse, + remoteNodeStatus.getIncreasedContainers()); if (request.getLogAggregationReportsForApps() != null && !request.getLogAggregationReportsForApps().isEmpty()) { nodeStatusEvent.setLogAggregationReportsForApps(request http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 629b2a3..43de3ac 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -971,7 +971,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { Collections.singletonList(appAttempt.amReq), EMPTY_CONTAINER_RELEASE_LIST, amBlacklist.getAdditions(), - amBlacklist.getRemovals()); + amBlacklist.getRemovals(), null, null); if (amContainerAllocation != null && amContainerAllocation.getContainers() != null) { assert (amContainerAllocation.getContainers().size() == 0); @@ -995,7 +995,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { Allocation amContainerAllocation = appAttempt.scheduler.allocate(appAttempt.applicationAttemptId, EMPTY_CONTAINER_REQUEST_LIST, EMPTY_CONTAINER_RELEASE_LIST, null, - null); + null, null, null); // There must be at least one container allocated, because a // CONTAINER_ALLOCATED is emitted after an RMContainer is constructed, // and is put in SchedulerApplication#newlyAllocatedContainers. http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java index 21d79ee..dc0d9ba 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java @@ -82,4 +82,8 @@ public interface RMContainer extends EventHandler<RMContainerEvent> { String getNodeHttpAddress(); String getNodeLabelExpression(); + + boolean hasIncreaseReservation(); + + void cancelIncreaseReservation(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerChangeResourceEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerChangeResourceEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerChangeResourceEvent.java new file mode 100644 index 0000000..920cfdb --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerChangeResourceEvent.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer; + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; + +public class RMContainerChangeResourceEvent extends RMContainerEvent { + + final Resource targetResource; + final boolean increase; + + public RMContainerChangeResourceEvent(ContainerId containerId, + Resource targetResource, boolean increase) { + super(containerId, RMContainerEventType.CHANGE_RESOURCE); + + this.targetResource = targetResource; + this.increase = increase; + } + + public Resource getTargetResource() { + return targetResource; + } + + public boolean isIncrease() { + return increase; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java index 259d68b3..a3b4b76 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java @@ -25,6 +25,10 @@ public enum RMContainerEventType { ACQUIRED, KILL, // Also from Node on NodeRemoval RESERVED, + + // when a container acquired by AM after + // it increased/decreased + ACQUIRE_UPDATED_CONTAINER, LAUNCHED, FINISHED, @@ -35,5 +39,12 @@ public enum RMContainerEventType { // Source: ContainerAllocationExpirer EXPIRE, - RECOVER + RECOVER, + + // Source: Scheduler + // Resource change approved by scheduler + CHANGE_RESOURCE, + + // NM reported resource change is done + NM_DONE_CHANGE_RESOURCE } http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index a3d8bee..8133657 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -118,7 +118,18 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> { .addTransition(RMContainerState.RUNNING, RMContainerState.RELEASED, RMContainerEventType.RELEASED, new KillTransition()) .addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING, - RMContainerEventType.EXPIRE) + RMContainerEventType.RESERVED, new ContainerReservedTransition()) + .addTransition(RMContainerState.RUNNING, RMContainerState.EXPIRED, + RMContainerEventType.EXPIRE, + new ContainerExpiredWhileRunningTransition()) + .addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING, + RMContainerEventType.CHANGE_RESOURCE, new ChangeResourceTransition()) + .addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING, + RMContainerEventType.ACQUIRE_UPDATED_CONTAINER, + new ContainerAcquiredWhileRunningTransition()) + .addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING, + RMContainerEventType.NM_DONE_CHANGE_RESOURCE, + new NMReportedContainerChangeIsDoneTransition()) // Transitions from COMPLETED state .addTransition(RMContainerState.COMPLETED, RMContainerState.COMPLETED, @@ -140,9 +151,7 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> { RMContainerEventType.KILL, RMContainerEventType.FINISHED)) // create the topology tables - .installTopology(); - - + .installTopology(); private final StateMachine<RMContainerState, RMContainerEventType, RMContainerEvent> stateMachine; @@ -166,6 +175,8 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> { private ContainerStatus finishedStatus; private boolean isAMContainer; private List<ResourceRequest> resourceRequests; + + private volatile boolean hasIncreaseReservation = false; public RMContainerImpl(Container container, ApplicationAttemptId appAttemptId, NodeId nodeId, String user, @@ -264,7 +275,12 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> { @Override public Resource getAllocatedResource() { - return container.getResource(); + try { + readLock.lock(); + return container.getResource(); + } finally { + readLock.unlock(); + } } @Override @@ -471,8 +487,8 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> { } } - private static final class ContainerReservedTransition extends - BaseTransition { + private static final class ContainerReservedTransition + extends BaseTransition { @Override public void transition(RMContainerImpl container, RMContainerEvent event) { @@ -480,6 +496,12 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> { container.reservedResource = e.getReservedResource(); container.reservedNode = e.getReservedNode(); container.reservedPriority = e.getReservedPriority(); + + if (!EnumSet.of(RMContainerState.NEW, RMContainerState.RESERVED) + .contains(container.getState())) { + // When container's state != NEW/RESERVED, it is an increase reservation + container.hasIncreaseReservation = true; + } } } @@ -509,6 +531,70 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> { .getApplicationAttemptId().getApplicationId(), container.nodeId)); } } + + private static final class ContainerAcquiredWhileRunningTransition extends + BaseTransition { + + @Override + public void transition(RMContainerImpl container, RMContainerEvent event) { + RMContainerUpdatesAcquiredEvent acquiredEvent = + (RMContainerUpdatesAcquiredEvent) event; + if (acquiredEvent.isIncreasedContainer()) { + // If container is increased but not acquired by AM, we will start + // containerAllocationExpirer for this container in this transition. + container.containerAllocationExpirer.register(event.getContainerId()); + } + } + } + + private static final class NMReportedContainerChangeIsDoneTransition + extends BaseTransition { + + @Override + public void transition(RMContainerImpl container, RMContainerEvent event) { + // Unregister the allocation expirer, it is already increased.. + container.containerAllocationExpirer.unregister(event.getContainerId()); + } + } + + private static final class ContainerExpiredWhileRunningTransition extends + BaseTransition { + + @Override + public void transition(RMContainerImpl container, RMContainerEvent event) { + // When the container expired, and it has a pending increased request, we + // will kill the container. + // TODO, we can do better for this: roll back container resource to the + // resource before increase, and notify scheduler about this decrease as + // well. Will do that in a separated JIRA. + new KillTransition().transition(container, event); + } + } + + private static final class ChangeResourceTransition extends BaseTransition { + + @Override + public void transition(RMContainerImpl container, RMContainerEvent event) { + RMContainerChangeResourceEvent changeEvent = (RMContainerChangeResourceEvent)event; + + // Register with containerAllocationExpirer. + // For now, we assume timeout for increase is as same as container + // allocation. + if (!changeEvent.isIncrease()) { + // if this is a decrease request, if container was increased but not + // told to NM, we can consider previous increase is cancelled, + // unregister from the containerAllocationExpirer + container.containerAllocationExpirer.unregister(container + .getContainerId()); + } + + container.container.setResource(changeEvent.getTargetResource()); + + // We reach here means we either allocated increase reservation OR + // decreased container, reservation will be cancelled anyway. + container.hasIncreaseReservation = false; + } + } private static final class ContainerRescheduledTransition extends FinishedTransition { @@ -561,13 +647,14 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> { RMAppAttempt rmAttempt = container.rmContext.getRMApps() .get(container.getApplicationAttemptId().getApplicationId()) .getCurrentAppAttempt(); - if (ContainerExitStatus.PREEMPTED == container.finishedStatus - .getExitStatus()) { - rmAttempt.getRMAppAttemptMetrics().updatePreemptionInfo(resource, - container); - } if (rmAttempt != null) { + if (ContainerExitStatus.PREEMPTED == container.finishedStatus + .getExitStatus()) { + rmAttempt.getRMAppAttemptMetrics().updatePreemptionInfo(resource, + container); + } + long usedMillis = container.finishTime - container.creationTime; long memorySeconds = resource.getMemory() * usedMillis / DateUtils.MILLIS_PER_SECOND; @@ -665,4 +752,14 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> { } return -1; } + + @Override + public boolean hasIncreaseReservation() { + return hasIncreaseReservation; + } + + @Override + public void cancelIncreaseReservation() { + hasIncreaseReservation = false; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerUpdatesAcquiredEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerUpdatesAcquiredEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerUpdatesAcquiredEvent.java new file mode 100644 index 0000000..0dccc5f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerUpdatesAcquiredEvent.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer; + +import org.apache.hadoop.yarn.api.records.ContainerId; + +public class RMContainerUpdatesAcquiredEvent extends RMContainerEvent { + private final boolean increasedContainer; + + public RMContainerUpdatesAcquiredEvent(ContainerId containerId, + boolean increasedContainer) { + super(containerId, RMContainerEventType.ACQUIRE_UPDATED_CONTAINER); + this.increasedContainer = increasedContainer; + } + + public boolean isIncreasedContainer() { + return increasedContainer; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java index 6bb0971..f28422a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java @@ -24,6 +24,7 @@ import java.util.Set; import org.apache.hadoop.net.Node; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; @@ -146,4 +147,12 @@ public interface RMNode { * @return labels in this node */ public Set<String> getNodeLabels(); + + /** + * Update containers to be decreased + */ + public void updateNodeHeartbeatResponseForContainersDecreasing( + NodeHeartbeatResponse response); + + public List<Container> pullNewlyIncreasedContainers(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeDecreaseContainerEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeDecreaseContainerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeDecreaseContainerEvent.java new file mode 100644 index 0000000..62925ad --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeDecreaseContainerEvent.java @@ -0,0 +1,39 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.resourcemanager.rmnode; + +import java.util.List; + +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.NodeId; + +public class RMNodeDecreaseContainerEvent extends RMNodeEvent { + final List<Container> toBeDecreasedContainers; + + public RMNodeDecreaseContainerEvent(NodeId nodeId, + List<Container> toBeDecreasedContainers) { + super(nodeId, RMNodeEventType.DECREASE_CONTAINER); + + this.toBeDecreasedContainers = toBeDecreasedContainers; + } + + public List<Container> getToBeDecreasedContainers() { + return toBeDecreasedContainers; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java index 27ba1c0..a68c894 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java @@ -42,6 +42,7 @@ public enum RMNodeEventType { // Source: Container CONTAINER_ALLOCATED, CLEANUP_CONTAINER, + DECREASE_CONTAINER, // Source: RMAppAttempt FINISHED_CONTAINERS_PULLED_BY_AM, http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 7a1ba74..7a43598 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -19,9 +19,13 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmnode; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.EnumSet; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ConcurrentLinkedQueue; @@ -36,6 +40,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.net.Node; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -131,6 +136,12 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { /* the list of applications that are running on this node */ private final List<ApplicationId> runningApplications = new ArrayList<ApplicationId>(); + + private final Map<ContainerId, Container> toBeDecreasedContainers = + new HashMap<>(); + + private final Map<ContainerId, Container> nmReportedIncreasedContainers = + new HashMap<>(); private NodeHeartbeatResponse latestNodeHeartBeatResponse = recordFactory .newRecordInstance(NodeHeartbeatResponse.class); @@ -178,6 +189,9 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { .addTransition(NodeState.RUNNING, NodeState.SHUTDOWN, RMNodeEventType.SHUTDOWN, new DeactivateNodeTransition(NodeState.SHUTDOWN)) + .addTransition(NodeState.RUNNING, NodeState.RUNNING, + RMNodeEventType.DECREASE_CONTAINER, + new DecreaseContainersTransition()) //Transitions from REBOOTED state .addTransition(NodeState.REBOOTED, NodeState.REBOOTED, @@ -430,6 +444,24 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { this.writeLock.unlock(); } }; + + @VisibleForTesting + public Collection<Container> getToBeDecreasedContainers() { + return toBeDecreasedContainers.values(); + } + + @Override + public void updateNodeHeartbeatResponseForContainersDecreasing( + NodeHeartbeatResponse response) { + this.writeLock.lock(); + + try { + response.addAllContainersToDecrease(toBeDecreasedContainers.values()); + toBeDecreasedContainers.clear(); + } finally { + this.writeLock.unlock(); + } + } @Override public NodeHeartbeatResponse getLastNodeHeartBeatResponse() { @@ -759,6 +791,19 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { RMNodeFinishedContainersPulledByAMEvent) event).getContainers()); } } + + public static class DecreaseContainersTransition + implements SingleArcTransition<RMNodeImpl, RMNodeEvent> { + + @Override + public void transition(RMNodeImpl rmNode, RMNodeEvent event) { + RMNodeDecreaseContainerEvent de = (RMNodeDecreaseContainerEvent) event; + + for (Container c : de.getToBeDecreasedContainers()) { + rmNode.toBeDecreasedContainers.put(c.getId(), c); + } + } + } public static class DeactivateNodeTransition implements SingleArcTransition<RMNodeImpl, RMNodeEvent> { @@ -827,6 +872,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { } rmNode.handleContainerStatus(statusEvent.getContainers()); + rmNode.handleReportedIncreasedContainers( + statusEvent.getNMReportedIncreasedContainers()); List<LogAggregationReport> logAggregationReportsForApps = statusEvent.getLogAggregationReportsForApps(); @@ -919,6 +966,34 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { } return nlm.getLabelsOnNode(nodeId); } + + private void handleReportedIncreasedContainers( + List<Container> reportedIncreasedContainers) { + for (Container container : reportedIncreasedContainers) { + ContainerId containerId = container.getId(); + + // Don't bother with containers already scheduled for cleanup, or for + // applications already killed. The scheduler doens't need to know any + // more about this container + if (containersToClean.contains(containerId)) { + LOG.info("Container " + containerId + " already scheduled for " + + "cleanup, no further processing"); + continue; + } + + ApplicationId containerAppId = + containerId.getApplicationAttemptId().getApplicationId(); + + if (finishedApplications.contains(containerAppId)) { + LOG.info("Container " + containerId + + " belongs to an application that is already killed," + + " no further processing"); + continue; + } + + this.nmReportedIncreasedContainers.put(containerId, container); + } + } private void handleContainerStatus(List<ContainerStatus> containerStatuses) { // Filter the map to only obtain just launched containers and finished @@ -989,4 +1064,22 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { } } + @Override + public List<Container> pullNewlyIncreasedContainers() { + try { + writeLock.lock(); + + if (nmReportedIncreasedContainers.isEmpty()) { + return Collections.EMPTY_LIST; + } else { + List<Container> container = + new ArrayList<Container>(nmReportedIncreasedContainers.values()); + nmReportedIncreasedContainers.clear(); + return container; + } + + } finally { + writeLock.unlock(); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java index b95d7d3..8323f3c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java @@ -18,8 +18,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmnode; +import java.util.Collections; import java.util.List; + import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; @@ -33,28 +36,36 @@ public class RMNodeStatusEvent extends RMNodeEvent { private final NodeHeartbeatResponse latestResponse; private final List<ApplicationId> keepAliveAppIds; private List<LogAggregationReport> logAggregationReportsForApps; - + private final List<Container> nmReportedIncreasedContainers; + + // Used by tests public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus, List<ContainerStatus> collection, List<ApplicationId> keepAliveAppIds, NodeHeartbeatResponse latestResponse) { - super(nodeId, RMNodeEventType.STATUS_UPDATE); - this.nodeHealthStatus = nodeHealthStatus; - this.containersCollection = collection; - this.keepAliveAppIds = keepAliveAppIds; - this.latestResponse = latestResponse; - this.logAggregationReportsForApps = null; + this(nodeId, nodeHealthStatus, collection, keepAliveAppIds, + latestResponse, null); } public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus, List<ContainerStatus> collection, List<ApplicationId> keepAliveAppIds, NodeHeartbeatResponse latestResponse, - List<LogAggregationReport> logAggregationReportsForApps) { + List<Container> nmReportedIncreasedContainers) { + this(nodeId, nodeHealthStatus, collection, keepAliveAppIds, latestResponse, + null, nmReportedIncreasedContainers); + } + + public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus, + List<ContainerStatus> collection, List<ApplicationId> keepAliveAppIds, + NodeHeartbeatResponse latestResponse, + List<LogAggregationReport> logAggregationReportsForApps, + List<Container> nmReportedIncreasedContainers) { super(nodeId, RMNodeEventType.STATUS_UPDATE); this.nodeHealthStatus = nodeHealthStatus; this.containersCollection = collection; this.keepAliveAppIds = keepAliveAppIds; this.latestResponse = latestResponse; this.logAggregationReportsForApps = logAggregationReportsForApps; + this.nmReportedIncreasedContainers = nmReportedIncreasedContainers; } public NodeHealthStatus getNodeHealthStatus() { @@ -81,4 +92,9 @@ public class RMNodeStatusEvent extends RMNodeEvent { List<LogAggregationReport> logAggregationReportsForApps) { this.logAggregationReportsForApps = logAggregationReportsForApps; } + + public List<Container> getNMReportedIncreasedContainers() { + return nmReportedIncreasedContainers == null ? Collections.EMPTY_LIST + : nmReportedIncreasedContainers; + } } \ No newline at end of file
