Repository: hadoop Updated Branches: refs/heads/trunk 16b348246 -> a3839a9fb
YARN-2744. Fixed CapacityScheduler to validate node-labels correctly against queues. Contributed by Wangda Tan. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a3839a9f Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a3839a9f Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a3839a9f Branch: refs/heads/trunk Commit: a3839a9fbfb8eec396b9bf85472d25e0ffc3aab2 Parents: 16b3482 Author: Vinod Kumar Vavilapalli <vino...@apache.org> Authored: Thu Nov 6 17:28:12 2014 -0800 Committer: Vinod Kumar Vavilapalli <vino...@apache.org> Committed: Thu Nov 6 17:28:12 2014 -0800 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../scheduler/capacity/AbstractCSQueue.java | 4 +- .../scheduler/capacity/TestQueueParsing.java | 144 ++++++++++++++++++- 3 files changed, 144 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3839a9f/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 0c5fc4c..f6b39e3 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -882,6 +882,9 @@ Release 2.6.0 - UNRELEASED YARN-2812. TestApplicationHistoryServer is likely to fail on less powerful machine. (Zhijie Shen via xgong) + YARN-2744. Fixed CapacityScheduler to validate node-labels correctly against + queues. (Wangda Tan via vinodkv) + Release 2.5.2 - UNRELEASED INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3839a9f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 7159e4d..c612846 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -105,9 +105,9 @@ public abstract class AbstractCSQueue implements CSQueue { // inherit from parent if labels not set if (this.accessibleLabels == null && parent != null) { this.accessibleLabels = parent.getAccessibleNodeLabels(); - SchedulerUtils.checkIfLabelInClusterNodeLabels(labelManager, - this.accessibleLabels); } + SchedulerUtils.checkIfLabelInClusterNodeLabels(labelManager, + this.accessibleLabels); // inherit from parent if labels not set if (this.defaultLabelExpression == null && parent != null http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3839a9f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java index 20a7e53..42db030 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java @@ -26,9 +26,11 @@ import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.service.ServiceOperations; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.MemoryRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; @@ -80,7 +82,7 @@ public class TestQueueParsing { Assert.assertEquals(0.7 * 0.5 * 0.45, c12.getAbsoluteCapacity(), DELTA); Assert.assertEquals(0.7 * 0.55 * 0.7, c12.getAbsoluteMaximumCapacity(), DELTA); - capacityScheduler.stop(); + ServiceOperations.stopQuietly(capacityScheduler); } private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) { @@ -164,7 +166,7 @@ public class TestQueueParsing { capacityScheduler.init(conf); capacityScheduler.start(); capacityScheduler.reinitialize(conf, null); - capacityScheduler.stop(); + ServiceOperations.stopQuietly(capacityScheduler); } public void testMaxCapacity() throws Exception { @@ -339,6 +341,27 @@ public class TestQueueParsing { conf.setCapacityByLabel(A2, "red", 50); } + private void setupQueueConfigurationWithSingleLevel( + CapacitySchedulerConfiguration conf) { + // Define top-level queues + conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"}); + + // Set A configuration + final String A = CapacitySchedulerConfiguration.ROOT + ".a"; + conf.setCapacity(A, 10); + conf.setMaximumCapacity(A, 15); + conf.setAccessibleNodeLabels(A, ImmutableSet.of("red", "blue")); + conf.setCapacityByLabel(A, "red", 90); + conf.setCapacityByLabel(A, "blue", 90); + + // Set B configuraiton + final String B = CapacitySchedulerConfiguration.ROOT + ".b"; + conf.setCapacity(B, 90); + conf.setAccessibleNodeLabels(B, ImmutableSet.of("red", "blue")); + conf.setCapacityByLabel(B, "red", 10); + conf.setCapacityByLabel(B, "blue", 10); + } + @Test public void testQueueParsingReinitializeWithLabels() throws IOException { CapacitySchedulerConfiguration csConf = @@ -362,7 +385,7 @@ public class TestQueueParsing { conf = new YarnConfiguration(csConf); capacityScheduler.reinitialize(conf, rmContext); checkQueueLabels(capacityScheduler); - capacityScheduler.stop(); + ServiceOperations.stopQuietly(capacityScheduler); } private void checkQueueLabels(CapacityScheduler capacityScheduler) { @@ -429,7 +452,7 @@ public class TestQueueParsing { capacityScheduler.init(csConf); capacityScheduler.start(); checkQueueLabels(capacityScheduler); - capacityScheduler.stop(); + ServiceOperations.stopQuietly(capacityScheduler); } @Test @@ -451,6 +474,117 @@ public class TestQueueParsing { capacityScheduler.init(csConf); capacityScheduler.start(); checkQueueLabelsInheritConfig(capacityScheduler); - capacityScheduler.stop(); + ServiceOperations.stopQuietly(capacityScheduler); + } + + @Test(expected = Exception.class) + public void testQueueParsingWhenLabelsNotExistedInNodeLabelManager() + throws IOException { + YarnConfiguration conf = new YarnConfiguration(); + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(conf); + setupQueueConfigurationWithLabels(csConf); + + CapacityScheduler capacityScheduler = new CapacityScheduler(); + RMContextImpl rmContext = + new RMContextImpl(null, null, null, null, null, null, + new RMContainerTokenSecretManager(csConf), + new NMTokenSecretManagerInRM(csConf), + new ClientToAMTokenSecretManagerInRM(), null); + + RMNodeLabelsManager nodeLabelsManager = new MemoryRMNodeLabelsManager(); + nodeLabelsManager.init(conf); + nodeLabelsManager.start(); + + rmContext.setNodeLabelManager(nodeLabelsManager); + capacityScheduler.setConf(csConf); + capacityScheduler.setRMContext(rmContext); + capacityScheduler.init(csConf); + capacityScheduler.start(); + ServiceOperations.stopQuietly(capacityScheduler); + ServiceOperations.stopQuietly(nodeLabelsManager); + } + + @Test(expected = Exception.class) + public void testQueueParsingWhenLabelsInheritedNotExistedInNodeLabelManager() + throws IOException { + YarnConfiguration conf = new YarnConfiguration(); + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(conf); + setupQueueConfigurationWithLabelsInherit(csConf); + + CapacityScheduler capacityScheduler = new CapacityScheduler(); + RMContextImpl rmContext = + new RMContextImpl(null, null, null, null, null, null, + new RMContainerTokenSecretManager(csConf), + new NMTokenSecretManagerInRM(csConf), + new ClientToAMTokenSecretManagerInRM(), null); + + RMNodeLabelsManager nodeLabelsManager = new MemoryRMNodeLabelsManager(); + nodeLabelsManager.init(conf); + nodeLabelsManager.start(); + + rmContext.setNodeLabelManager(nodeLabelsManager); + capacityScheduler.setConf(csConf); + capacityScheduler.setRMContext(rmContext); + capacityScheduler.init(csConf); + capacityScheduler.start(); + ServiceOperations.stopQuietly(capacityScheduler); + ServiceOperations.stopQuietly(nodeLabelsManager); + } + + @Test(expected = Exception.class) + public void testSingleLevelQueueParsingWhenLabelsNotExistedInNodeLabelManager() + throws IOException { + YarnConfiguration conf = new YarnConfiguration(); + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(conf); + setupQueueConfigurationWithSingleLevel(csConf); + + CapacityScheduler capacityScheduler = new CapacityScheduler(); + RMContextImpl rmContext = + new RMContextImpl(null, null, null, null, null, null, + new RMContainerTokenSecretManager(csConf), + new NMTokenSecretManagerInRM(csConf), + new ClientToAMTokenSecretManagerInRM(), null); + + RMNodeLabelsManager nodeLabelsManager = new MemoryRMNodeLabelsManager(); + nodeLabelsManager.init(conf); + nodeLabelsManager.start(); + + rmContext.setNodeLabelManager(nodeLabelsManager); + capacityScheduler.setConf(csConf); + capacityScheduler.setRMContext(rmContext); + capacityScheduler.init(csConf); + capacityScheduler.start(); + ServiceOperations.stopQuietly(capacityScheduler); + ServiceOperations.stopQuietly(nodeLabelsManager); + } + + @Test(expected = Exception.class) + public void testQueueParsingWhenLabelsNotExist() throws IOException { + YarnConfiguration conf = new YarnConfiguration(); + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(conf); + setupQueueConfigurationWithLabels(csConf); + + CapacityScheduler capacityScheduler = new CapacityScheduler(); + RMContextImpl rmContext = + new RMContextImpl(null, null, null, null, null, null, + new RMContainerTokenSecretManager(csConf), + new NMTokenSecretManagerInRM(csConf), + new ClientToAMTokenSecretManagerInRM(), null); + + RMNodeLabelsManager nodeLabelsManager = new MemoryRMNodeLabelsManager(); + nodeLabelsManager.init(conf); + nodeLabelsManager.start(); + + rmContext.setNodeLabelManager(nodeLabelsManager); + capacityScheduler.setConf(csConf); + capacityScheduler.setRMContext(rmContext); + capacityScheduler.init(csConf); + capacityScheduler.start(); + ServiceOperations.stopQuietly(capacityScheduler); + ServiceOperations.stopQuietly(nodeLabelsManager); } }