YARN-5788. Apps not activiated and AM limit resource in UI and REST not updated after -replaceLabelsOnNode (Bibin A Chundatt via Varun Saxena)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7d2d8d25 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7d2d8d25 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7d2d8d25 Branch: refs/heads/HDFS-10285 Commit: 7d2d8d25ba0cb10a3c6192d4123f27ede5ef2ba6 Parents: 310aa46 Author: Varun Saxena <[email protected]> Authored: Tue Nov 1 15:32:04 2016 +0530 Committer: Varun Saxena <[email protected]> Committed: Tue Nov 1 15:32:04 2016 +0530 ---------------------------------------------------------------------- .../scheduler/capacity/CapacityScheduler.java | 105 ++++++++++--------- .../TestCapacitySchedulerNodeLabelUpdate.java | 74 ++++++++++++- 2 files changed, 128 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/7d2d8d25/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index cfdcb10..d759d47 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -1126,57 +1126,52 @@ public class CapacityScheduler extends writeLock.unlock(); } } - + /** * Process node labels update on a node. */ private void updateLabelsOnNode(NodeId nodeId, Set<String> newLabels) { - try { - writeLock.lock(); - FiCaSchedulerNode node = nodeTracker.getNode(nodeId); - if (null == node) { - return; - } - - // Get new partition, we have only one partition per node - String newPartition; - if (newLabels.isEmpty()) { - newPartition = RMNodeLabelsManager.NO_LABEL; - } else{ - newPartition = newLabels.iterator().next(); - } + FiCaSchedulerNode node = nodeTracker.getNode(nodeId); + if (null == node) { + return; + } - // old partition as well - String oldPartition = node.getPartition(); + // Get new partition, we have only one partition per node + String newPartition; + if (newLabels.isEmpty()) { + newPartition = RMNodeLabelsManager.NO_LABEL; + } else{ + newPartition = newLabels.iterator().next(); + } - // Update resources of these containers - for (RMContainer rmContainer : node.getCopiedListOfRunningContainers()) { - FiCaSchedulerApp application = getApplicationAttempt( - rmContainer.getApplicationAttemptId()); - if (null != application) { - application.nodePartitionUpdated(rmContainer, oldPartition, - newPartition); - } else{ - LOG.warn("There's something wrong, some RMContainers running on" - + " a node, but we cannot find SchedulerApplicationAttempt " - + "for it. Node=" + node.getNodeID() + " applicationAttemptId=" - + rmContainer.getApplicationAttemptId()); - continue; - } - } + // old partition as well + String oldPartition = node.getPartition(); - // Unreserve container on this node - RMContainer reservedContainer = node.getReservedContainer(); - if (null != reservedContainer) { - killReservedContainer(reservedContainer); + // Update resources of these containers + for (RMContainer rmContainer : node.getCopiedListOfRunningContainers()) { + FiCaSchedulerApp application = getApplicationAttempt( + rmContainer.getApplicationAttemptId()); + if (null != application) { + application.nodePartitionUpdated(rmContainer, oldPartition, + newPartition); + } else{ + LOG.warn("There's something wrong, some RMContainers running on" + + " a node, but we cannot find SchedulerApplicationAttempt " + + "for it. Node=" + node.getNodeID() + " applicationAttemptId=" + + rmContainer.getApplicationAttemptId()); + continue; } + } - // Update node labels after we've done this - node.updateLabels(newLabels); - } finally { - writeLock.unlock(); + // Unreserve container on this node + RMContainer reservedContainer = node.getReservedContainer(); + if (null != reservedContainer) { + killReservedContainer(reservedContainer); } + + // Update node labels after we've done this + node.updateLabels(newLabels); } private void updateSchedulerHealth(long now, FiCaSchedulerNode node, @@ -1371,13 +1366,8 @@ public class CapacityScheduler extends { NodeLabelsUpdateSchedulerEvent labelUpdateEvent = (NodeLabelsUpdateSchedulerEvent) event; - - for (Entry<NodeId, Set<String>> entry : labelUpdateEvent - .getUpdatedNodeToLabels().entrySet()) { - NodeId id = entry.getKey(); - Set<String> labels = entry.getValue(); - updateLabelsOnNode(id, labels); - } + + updateNodeLabelsAndQueueResource(labelUpdateEvent); } break; case NODE_UPDATE: @@ -1482,6 +1472,27 @@ public class CapacityScheduler extends } } + /** + * Process node labels update. + */ + private void updateNodeLabelsAndQueueResource( + NodeLabelsUpdateSchedulerEvent labelUpdateEvent) { + try { + writeLock.lock(); + for (Entry<NodeId, Set<String>> entry : labelUpdateEvent + .getUpdatedNodeToLabels().entrySet()) { + NodeId id = entry.getKey(); + Set<String> labels = entry.getValue(); + updateLabelsOnNode(id, labels); + } + Resource clusterResource = getClusterResource(); + root.updateClusterResource(clusterResource, + new ResourceLimits(clusterResource)); + } finally { + writeLock.unlock(); + } + } + private void addNode(RMNode nodeManager) { try { writeLock.lock(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/7d2d8d25/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java index 0ae77f2..439e9df 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java @@ -413,7 +413,7 @@ public class TestCapacitySchedulerNodeLabelUpdate { rm.close(); } - @Test(timeout = 3000000) + @Test(timeout = 300000) public void testMoveApplicationWithLabel() throws Exception { // set node -> label mgr.addToCluserNodeLabelsWithDefaultExclusivity( @@ -589,7 +589,49 @@ public class TestCapacitySchedulerNodeLabelUpdate { rm.close(); } - @Test (timeout = 60000) + @Test + public void testAMResourceLimitNodeUpdatePartition() throws Exception { + conf.setInt("yarn.scheduler.minimum-allocation-mb", 64); + // inject node label manager + MockRM rm = new MockRM(getConfigurationWithQueueLabels(conf)) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + rm.getRMContext().setNodeLabelManager(mgr); + rm.start(); + rm.registerNode("h1:1234", 6400); + mgr.addToCluserNodeLabelsWithDefaultExclusivity( + ImmutableSet.of("x", "y", "z")); + + // .1 percentage of 6400 will be for am + checkAMResourceLimit(rm, "a", 640, ""); + checkAMResourceLimit(rm, "a", 0, "x"); + checkAMResourceLimit(rm, "a", 0, "y"); + checkAMResourceLimit(rm, "a", 0, "z"); + + mgr.replaceLabelsOnNode( + ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"))); + rm.drainEvents(); + + checkAMResourceLimit(rm, "a", 640, "x"); + checkAMResourceLimit(rm, "a", 0, "y"); + checkAMResourceLimit(rm, "a", 0, "z"); + checkAMResourceLimit(rm, "a", 0, ""); + + // Switch + mgr.replaceLabelsOnNode( + ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("y"))); + rm.drainEvents(); + + checkAMResourceLimit(rm, "a", 0, "x"); + checkAMResourceLimit(rm, "a", 640, "y"); + checkAMResourceLimit(rm, "a", 0, "z"); + checkAMResourceLimit(rm, "a", 0, ""); + } + + @Test(timeout = 60000) public void testAMResourceUsageWhenNodeUpdatesPartition() throws Exception { // set node -> label @@ -638,8 +680,8 @@ public class TestCapacitySchedulerNodeLabelUpdate { FiCaSchedulerApp app = cs.getApplicationAttempt(am1.getApplicationAttemptId()); // change h1's label to z - cs.handle(new NodeLabelsUpdateSchedulerEvent(ImmutableMap.of(nm1.getNodeId(), - toSet("z")))); + cs.handle(new NodeLabelsUpdateSchedulerEvent( + ImmutableMap.of(nm1.getNodeId(), toSet("z")))); // Now the resources also should change from x to z. Verify AM and normal // used resource are successfully changed. @@ -677,4 +719,28 @@ public class TestCapacitySchedulerNodeLabelUpdate { rm.close(); } + + private void checkAMResourceLimit(MockRM rm, String queuename, int memory, + String label) throws InterruptedException { + Assert.assertEquals(memory, + waitForResourceUpdate(rm, queuename, memory, label, 3000L)); + } + + private long waitForResourceUpdate(MockRM rm, String queuename, long memory, + String label, long timeout) throws InterruptedException { + long start = System.currentTimeMillis(); + long memorySize = 0; + while (System.currentTimeMillis() - start < timeout) { + CapacityScheduler scheduler = + (CapacityScheduler) rm.getResourceScheduler(); + CSQueue queue = scheduler.getQueue(queuename); + memorySize = + queue.getQueueResourceUsage().getAMLimit(label).getMemorySize(); + if (memory == memorySize) { + return memorySize; + } + Thread.sleep(100); + } + return memorySize; + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
