Repository: hadoop Updated Branches: refs/heads/branch-2.6 16d714433 -> bb6c79f76
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb6c79f7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java new file mode 100644 index 0000000..1d7f6f1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java @@ -0,0 +1,447 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.nodelabels; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authorize.AccessControlList; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; +import org.apache.hadoop.yarn.util.resource.Resources; + +import com.google.common.collect.ImmutableSet; + +public class RMNodeLabelsManager extends CommonNodeLabelsManager { + + protected static class Queue { + protected Set<String> acccessibleNodeLabels; + protected Resource resource; + + protected Queue() { + acccessibleNodeLabels = + Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>()); + resource = Resource.newInstance(0, 0); + } + } + + ConcurrentMap<String, Queue> queueCollections = + new ConcurrentHashMap<String, Queue>(); + protected AccessControlList adminAcl; + + @Override + protected void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + adminAcl = + new AccessControlList(conf.get(YarnConfiguration.YARN_ADMIN_ACL, + YarnConfiguration.DEFAULT_YARN_ADMIN_ACL)); + } + + @Override + public void addLabelsToNode(Map<NodeId, Set<String>> addedLabelsToNode) + throws IOException { + try { + writeLock.lock(); + + // get nodesCollection before edition + Map<String, Host> before = cloneNodeMap(addedLabelsToNode.keySet()); + + super.addLabelsToNode(addedLabelsToNode); + + // get nodesCollection after edition + Map<String, Host> after = cloneNodeMap(addedLabelsToNode.keySet()); + + // update running nodes resources + updateResourceMappings(before, after); + } finally { + writeLock.unlock(); + } + } + + protected void checkRemoveFromClusterNodeLabelsOfQueue( + Collection<String> labelsToRemove) throws IOException { + // Check if label to remove doesn't existed or null/empty, will throw + // exception if any of labels to remove doesn't meet requirement + for (String label : labelsToRemove) { + label = normalizeLabel(label); + + // check if any queue contains this label + for (Entry<String, Queue> entry : queueCollections.entrySet()) { + String queueName = entry.getKey(); + Set<String> queueLabels = entry.getValue().acccessibleNodeLabels; + if (queueLabels.contains(label)) { + throw new IOException("Cannot remove label=" + label + + ", because queue=" + queueName + " is using this label. " + + "Please remove label on queue before remove the label"); + } + } + } + } + + @Override + public void removeFromClusterNodeLabels(Collection<String> labelsToRemove) + throws IOException { + try { + writeLock.lock(); + + checkRemoveFromClusterNodeLabelsOfQueue(labelsToRemove); + + // copy before NMs + Map<String, Host> before = cloneNodeMap(); + + super.removeFromClusterNodeLabels(labelsToRemove); + + updateResourceMappings(before, nodeCollections); + } finally { + writeLock.unlock(); + } + } + + @Override + public void + removeLabelsFromNode(Map<NodeId, Set<String>> removeLabelsFromNode) + throws IOException { + try { + writeLock.lock(); + + // get nodesCollection before edition + Map<String, Host> before = + cloneNodeMap(removeLabelsFromNode.keySet()); + + super.removeLabelsFromNode(removeLabelsFromNode); + + // get nodesCollection before edition + Map<String, Host> after = cloneNodeMap(removeLabelsFromNode.keySet()); + + // update running nodes resources + updateResourceMappings(before, after); + } finally { + writeLock.unlock(); + } + } + + @Override + public void replaceLabelsOnNode(Map<NodeId, Set<String>> replaceLabelsToNode) + throws IOException { + try { + writeLock.lock(); + + // get nodesCollection before edition + Map<String, Host> before = cloneNodeMap(replaceLabelsToNode.keySet()); + + super.replaceLabelsOnNode(replaceLabelsToNode); + + // get nodesCollection after edition + Map<String, Host> after = cloneNodeMap(replaceLabelsToNode.keySet()); + + // update running nodes resources + updateResourceMappings(before, after); + } finally { + writeLock.unlock(); + } + } + + + /* + * Following methods are used for setting if a node is up and running, and it + * will update running nodes resource + */ + public void activateNode(NodeId nodeId, Resource resource) { + try { + writeLock.lock(); + + // save if we have a node before + Map<String, Host> before = cloneNodeMap(ImmutableSet.of(nodeId)); + + createNodeIfNonExisted(nodeId); + Node nm = getNMInNodeSet(nodeId); + nm.resource = resource; + nm.running = true; + + // get the node after edition + Map<String, Host> after = cloneNodeMap(ImmutableSet.of(nodeId)); + + updateResourceMappings(before, after); + } finally { + writeLock.unlock(); + } + } + + /* + * Following methods are used for setting if a node unregistered to RM + */ + public void deactivateNode(NodeId nodeId) { + try { + writeLock.lock(); + + // save if we have a node before + Map<String, Host> before = cloneNodeMap(ImmutableSet.of(nodeId)); + Node nm = getNMInNodeSet(nodeId); + if (null != nm) { + // set nm is not running, and its resource = 0 + nm.running = false; + nm.resource = Resource.newInstance(0, 0); + } + + // get the node after edition + Map<String, Host> after = cloneNodeMap(ImmutableSet.of(nodeId)); + + updateResourceMappings(before, after); + } finally { + writeLock.unlock(); + } + } + + public void updateNodeResource(NodeId node, Resource newResource) { + deactivateNode(node); + activateNode(node, newResource); + } + + public void reinitializeQueueLabels(Map<String, Set<String>> queueToLabels) { + try { + writeLock.lock(); + // clear before set + this.queueCollections.clear(); + + for (Entry<String, Set<String>> entry : queueToLabels.entrySet()) { + String queue = entry.getKey(); + Queue q = new Queue(); + this.queueCollections.put(queue, q); + + Set<String> labels = entry.getValue(); + if (labels.contains(ANY)) { + continue; + } + + q.acccessibleNodeLabels.addAll(labels); + for (Host host : nodeCollections.values()) { + for (Entry<NodeId, Node> nentry : host.nms.entrySet()) { + NodeId nodeId = nentry.getKey(); + Node nm = nentry.getValue(); + if (nm.running && isNodeUsableByQueue(getLabelsByNode(nodeId), q)) { + Resources.addTo(q.resource, nm.resource); + } + } + } + } + } finally { + writeLock.unlock(); + } + } + + public Resource getQueueResource(String queueName, Set<String> queueLabels, + Resource clusterResource) { + try { + readLock.lock(); + if (queueLabels.contains(ANY)) { + return clusterResource; + } + Queue q = queueCollections.get(queueName); + if (null == q) { + return Resources.none(); + } + return q.resource; + } finally { + readLock.unlock(); + } + } + + public Set<String> getLabelsOnNode(NodeId nodeId) { + try { + readLock.lock(); + Set<String> nodeLabels = getLabelsByNode(nodeId); + return Collections.unmodifiableSet(nodeLabels); + } finally { + readLock.unlock(); + } + } + + public boolean containsNodeLabel(String label) { + try { + readLock.lock(); + return label != null + && (label.isEmpty() || labelCollections.containsKey(label)); + } finally { + readLock.unlock(); + } + } + + private Map<String, Host> cloneNodeMap(Set<NodeId> nodesToCopy) { + Map<String, Host> map = new HashMap<String, Host>(); + for (NodeId nodeId : nodesToCopy) { + if (!map.containsKey(nodeId.getHost())) { + Host originalN = nodeCollections.get(nodeId.getHost()); + if (null == originalN) { + continue; + } + Host n = originalN.copy(); + n.nms.clear(); + map.put(nodeId.getHost(), n); + } + + Host n = map.get(nodeId.getHost()); + if (WILDCARD_PORT == nodeId.getPort()) { + for (Entry<NodeId, Node> entry : nodeCollections + .get(nodeId.getHost()).nms.entrySet()) { + n.nms.put(entry.getKey(), entry.getValue().copy()); + } + } else { + Node nm = getNMInNodeSet(nodeId); + if (null != nm) { + n.nms.put(nodeId, nm.copy()); + } + } + } + return map; + } + + private void updateResourceMappings(Map<String, Host> before, + Map<String, Host> after) { + // Get NMs in before only + Set<NodeId> allNMs = new HashSet<NodeId>(); + for (Entry<String, Host> entry : before.entrySet()) { + allNMs.addAll(entry.getValue().nms.keySet()); + } + for (Entry<String, Host> entry : after.entrySet()) { + allNMs.addAll(entry.getValue().nms.keySet()); + } + + // traverse all nms + for (NodeId nodeId : allNMs) { + Node oldNM; + if ((oldNM = getNMInNodeSet(nodeId, before, true)) != null) { + Set<String> oldLabels = getLabelsByNode(nodeId, before); + // no label in the past + if (oldLabels.isEmpty()) { + // update labels + Label label = labelCollections.get(NO_LABEL); + Resources.subtractFrom(label.resource, oldNM.resource); + + // update queues, all queue can access this node + for (Queue q : queueCollections.values()) { + Resources.subtractFrom(q.resource, oldNM.resource); + } + } else { + // update labels + for (String labelName : oldLabels) { + Label label = labelCollections.get(labelName); + if (null == label) { + continue; + } + Resources.subtractFrom(label.resource, oldNM.resource); + } + + // update queues, only queue can access this node will be subtract + for (Queue q : queueCollections.values()) { + if (isNodeUsableByQueue(oldLabels, q)) { + Resources.subtractFrom(q.resource, oldNM.resource); + } + } + } + } + + Node newNM; + if ((newNM = getNMInNodeSet(nodeId, after, true)) != null) { + Set<String> newLabels = getLabelsByNode(nodeId, after); + // no label in the past + if (newLabels.isEmpty()) { + // update labels + Label label = labelCollections.get(NO_LABEL); + Resources.addTo(label.resource, newNM.resource); + + // update queues, all queue can access this node + for (Queue q : queueCollections.values()) { + Resources.addTo(q.resource, newNM.resource); + } + } else { + // update labels + for (String labelName : newLabels) { + Label label = labelCollections.get(labelName); + Resources.addTo(label.resource, newNM.resource); + } + + // update queues, only queue can access this node will be subtract + for (Queue q : queueCollections.values()) { + if (isNodeUsableByQueue(newLabels, q)) { + Resources.addTo(q.resource, newNM.resource); + } + } + } + } + } + } + + public Resource getResourceByLabel(String label, Resource clusterResource) { + label = normalizeLabel(label); + try { + readLock.lock(); + if (null == labelCollections.get(label)) { + return Resources.none(); + } + return labelCollections.get(label).resource; + } finally { + readLock.unlock(); + } + } + + private boolean isNodeUsableByQueue(Set<String> nodeLabels, Queue q) { + // node without any labels can be accessed by any queue + if (nodeLabels == null || nodeLabels.isEmpty() + || (nodeLabels.size() == 1 && nodeLabels.contains(NO_LABEL))) { + return true; + } + + for (String label : nodeLabels) { + if (q.acccessibleNodeLabels.contains(label)) { + return true; + } + } + + return false; + } + + private Map<String, Host> cloneNodeMap() { + Set<NodeId> nodesToCopy = new HashSet<NodeId>(); + for (String nodeName : nodeCollections.keySet()) { + nodesToCopy.add(NodeId.newInstance(nodeName, WILDCARD_PORT)); + } + return cloneNodeMap(nodesToCopy); + } + + public boolean checkAccess(UserGroupInformation user) { + // make sure only admin can invoke + // this method + if (adminAcl.isUserAllowed(user)) { + return true; + } + return false; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb6c79f7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/DummyRMNodeLabelsManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/DummyRMNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/DummyRMNodeLabelsManager.java new file mode 100644 index 0000000..14bd999 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/DummyRMNodeLabelsManager.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.nodelabels; + +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.event.InlineDispatcher; +import org.apache.hadoop.yarn.nodelabels.NodeLabelsStore; + +public class DummyRMNodeLabelsManager extends RMNodeLabelsManager { + Map<NodeId, Set<String>> lastNodeToLabels = null; + Collection<String> lastAddedlabels = null; + Collection<String> lastRemovedlabels = null; + + @Override + public void initNodeLabelStore(Configuration conf) { + this.store = new NodeLabelsStore(this) { + + @Override + public void recover() throws IOException { + // do nothing + } + + @Override + public void removeClusterNodeLabels(Collection<String> labels) + throws IOException { + // do nothing + } + + @Override + public void updateNodeToLabelsMappings( + Map<NodeId, Set<String>> nodeToLabels) throws IOException { + // do nothing + } + + @Override + public void storeNewClusterNodeLabels(Set<String> label) throws IOException { + // do nothing + } + + @Override + public void close() throws IOException { + // do nothing + } + }; + } + + @Override + protected void initDispatcher(Configuration conf) { + super.dispatcher = new InlineDispatcher(); + } + + @Override + protected void startDispatcher() { + // do nothing + } + + @Override + protected void stopDispatcher() { + // do nothing + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb6c79f7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestRMNodeLabelsManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestRMNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestRMNodeLabelsManager.java new file mode 100644 index 0000000..26284e2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestRMNodeLabelsManager.java @@ -0,0 +1,367 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.nodelabels; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; + +public class TestRMNodeLabelsManager extends NodeLabelTestBase { + private final Resource EMPTY_RESOURCE = Resource.newInstance(0, 0); + private final Resource SMALL_RESOURCE = Resource.newInstance(100, 0); + private final Resource LARGE_NODE = Resource.newInstance(1000, 0); + + DummyRMNodeLabelsManager mgr = null; + + @Before + public void before() { + mgr = new DummyRMNodeLabelsManager(); + mgr.init(new Configuration()); + mgr.start(); + } + + @After + public void after() { + mgr.stop(); + } + + @Test(timeout = 5000) + public void testNodeActiveDeactiveUpdate() throws Exception { + mgr.addToCluserNodeLabels(toSet("p1", "p2", "p3")); + mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1"), toSet("p1"), + toNodeId("n2"), toSet("p2"), toNodeId("n3"), toSet("p3"))); + + Assert.assertEquals(mgr.getResourceByLabel("p1", null), EMPTY_RESOURCE); + Assert.assertEquals(mgr.getResourceByLabel("p2", null), EMPTY_RESOURCE); + Assert.assertEquals(mgr.getResourceByLabel("p3", null), EMPTY_RESOURCE); + Assert.assertEquals(mgr.getResourceByLabel(RMNodeLabelsManager.NO_LABEL, null), + EMPTY_RESOURCE); + + // active two NM to n1, one large and one small + mgr.activateNode(NodeId.newInstance("n1", 1), SMALL_RESOURCE); + mgr.activateNode(NodeId.newInstance("n1", 2), LARGE_NODE); + Assert.assertEquals(mgr.getResourceByLabel("p1", null), + Resources.add(SMALL_RESOURCE, LARGE_NODE)); + + // change the large NM to small, check if resource updated + mgr.updateNodeResource(NodeId.newInstance("n1", 2), SMALL_RESOURCE); + Assert.assertEquals(mgr.getResourceByLabel("p1", null), + Resources.multiply(SMALL_RESOURCE, 2)); + + // deactive one NM, and check if resource updated + mgr.deactivateNode(NodeId.newInstance("n1", 1)); + Assert.assertEquals(mgr.getResourceByLabel("p1", null), SMALL_RESOURCE); + + // continus deactive, check if resource updated + mgr.deactivateNode(NodeId.newInstance("n1", 2)); + Assert.assertEquals(mgr.getResourceByLabel("p1", null), EMPTY_RESOURCE); + + // Add two NM to n1 back + mgr.activateNode(NodeId.newInstance("n1", 1), SMALL_RESOURCE); + mgr.activateNode(NodeId.newInstance("n1", 2), LARGE_NODE); + + // And remove p1, now the two NM should come to default label, + mgr.removeFromClusterNodeLabels(ImmutableSet.of("p1")); + Assert.assertEquals(mgr.getResourceByLabel(RMNodeLabelsManager.NO_LABEL, null), + Resources.add(SMALL_RESOURCE, LARGE_NODE)); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test(timeout = 5000) + public void testUpdateNodeLabelWithActiveNode() throws Exception { + mgr.addToCluserNodeLabels(toSet("p1", "p2", "p3")); + mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1"), toSet("p1"), + toNodeId("n2"), toSet("p2"), toNodeId("n3"), toSet("p3"))); + + // active two NM to n1, one large and one small + mgr.activateNode(NodeId.newInstance("n1", 1), SMALL_RESOURCE); + mgr.activateNode(NodeId.newInstance("n2", 1), SMALL_RESOURCE); + mgr.activateNode(NodeId.newInstance("n3", 1), SMALL_RESOURCE); + + // change label of n1 to p2 + mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1"), toSet("p2"))); + Assert.assertEquals(mgr.getResourceByLabel("p1", null), EMPTY_RESOURCE); + Assert.assertEquals(mgr.getResourceByLabel("p2", null), + Resources.multiply(SMALL_RESOURCE, 2)); + Assert.assertEquals(mgr.getResourceByLabel("p3", null), SMALL_RESOURCE); + + // add more labels + mgr.addToCluserNodeLabels(toSet("p4", "p5", "p6")); + mgr.replaceLabelsOnNode((Map) ImmutableMap.of(toNodeId("n4"), toSet("p1"), + toNodeId("n5"), toSet("p2"), toNodeId("n6"), toSet("p3"), + toNodeId("n7"), toSet("p4"), toNodeId("n8"), toSet("p5"))); + + // now node -> label is, + // p1 : n4 + // p2 : n1, n2, n5 + // p3 : n3, n6 + // p4 : n7 + // p5 : n8 + // no-label : n9 + + // active these nodes + mgr.activateNode(NodeId.newInstance("n4", 1), SMALL_RESOURCE); + mgr.activateNode(NodeId.newInstance("n5", 1), SMALL_RESOURCE); + mgr.activateNode(NodeId.newInstance("n6", 1), SMALL_RESOURCE); + mgr.activateNode(NodeId.newInstance("n7", 1), SMALL_RESOURCE); + mgr.activateNode(NodeId.newInstance("n8", 1), SMALL_RESOURCE); + mgr.activateNode(NodeId.newInstance("n9", 1), SMALL_RESOURCE); + + // check varibles + Assert.assertEquals(mgr.getResourceByLabel("p1", null), SMALL_RESOURCE); + Assert.assertEquals(mgr.getResourceByLabel("p2", null), + Resources.multiply(SMALL_RESOURCE, 3)); + Assert.assertEquals(mgr.getResourceByLabel("p3", null), + Resources.multiply(SMALL_RESOURCE, 2)); + Assert.assertEquals(mgr.getResourceByLabel("p4", null), + Resources.multiply(SMALL_RESOURCE, 1)); + Assert.assertEquals(mgr.getResourceByLabel("p5", null), + Resources.multiply(SMALL_RESOURCE, 1)); + Assert.assertEquals(mgr.getResourceByLabel(RMNodeLabelsManager.NO_LABEL, null), + Resources.multiply(SMALL_RESOURCE, 1)); + + // change a bunch of nodes -> labels + // n4 -> p2 + // n7 -> empty + // n5 -> p1 + // n8 -> empty + // n9 -> p1 + // + // now become: + // p1 : n5, n9 + // p2 : n1, n2, n4 + // p3 : n3, n6 + // p4 : [ ] + // p5 : [ ] + // no label: n8, n7 + mgr.replaceLabelsOnNode((Map) ImmutableMap.of(toNodeId("n4"), toSet("p2"), + toNodeId("n7"), RMNodeLabelsManager.EMPTY_STRING_SET, toNodeId("n5"), + toSet("p1"), toNodeId("n8"), RMNodeLabelsManager.EMPTY_STRING_SET, + toNodeId("n9"), toSet("p1"))); + + // check varibles + Assert.assertEquals(mgr.getResourceByLabel("p1", null), + Resources.multiply(SMALL_RESOURCE, 2)); + Assert.assertEquals(mgr.getResourceByLabel("p2", null), + Resources.multiply(SMALL_RESOURCE, 3)); + Assert.assertEquals(mgr.getResourceByLabel("p3", null), + Resources.multiply(SMALL_RESOURCE, 2)); + Assert.assertEquals(mgr.getResourceByLabel("p4", null), + Resources.multiply(SMALL_RESOURCE, 0)); + Assert.assertEquals(mgr.getResourceByLabel("p5", null), + Resources.multiply(SMALL_RESOURCE, 0)); + Assert.assertEquals(mgr.getResourceByLabel("", null), + Resources.multiply(SMALL_RESOURCE, 2)); + } + + @Test(timeout=5000) + public void testGetQueueResource() throws Exception { + Resource clusterResource = Resource.newInstance(9999, 1); + + /* + * Node->Labels: + * host1 : red, blue + * host2 : blue, yellow + * host3 : yellow + * host4 : + */ + mgr.addToCluserNodeLabels(toSet("red", "blue", "yellow")); + mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("host1"), + toSet("red", "blue"))); + mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("host2"), + toSet("blue", "yellow"))); + mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("host3"), toSet("yellow"))); + + // active two NM to n1, one large and one small + mgr.activateNode(NodeId.newInstance("host1", 1), SMALL_RESOURCE); + mgr.activateNode(NodeId.newInstance("host2", 1), SMALL_RESOURCE); + mgr.activateNode(NodeId.newInstance("host3", 1), SMALL_RESOURCE); + mgr.activateNode(NodeId.newInstance("host4", 1), SMALL_RESOURCE); + + // reinitialize queue + Set<String> q1Label = toSet("red", "blue"); + Set<String> q2Label = toSet("blue", "yellow"); + Set<String> q3Label = toSet("yellow"); + Set<String> q4Label = RMNodeLabelsManager.EMPTY_STRING_SET; + Set<String> q5Label = toSet(RMNodeLabelsManager.ANY); + + Map<String, Set<String>> queueToLabels = new HashMap<String, Set<String>>(); + queueToLabels.put("Q1", q1Label); + queueToLabels.put("Q2", q2Label); + queueToLabels.put("Q3", q3Label); + queueToLabels.put("Q4", q4Label); + queueToLabels.put("Q5", q5Label); + + mgr.reinitializeQueueLabels(queueToLabels); + + // check resource + Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3), + mgr.getQueueResource("Q1", q1Label, clusterResource)); + Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 4), + mgr.getQueueResource("Q2", q2Label, clusterResource)); + Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3), + mgr.getQueueResource("Q3", q3Label, clusterResource)); + Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 1), + mgr.getQueueResource("Q4", q4Label, clusterResource)); + Assert.assertEquals(clusterResource, + mgr.getQueueResource("Q5", q5Label, clusterResource)); + + mgr.removeLabelsFromNode(ImmutableMap.of(toNodeId("host1"), toSet("red"), + toNodeId("host2"), toSet("blue", "yellow"))); + mgr.addLabelsToNode(ImmutableMap.of(toNodeId("host3"), toSet("red"))); + /* + * Check resource after changes some labels + * Node->Labels: + * host1 : blue (was: red, blue) + * host2 : (was: blue, yellow) + * host3 : red, yellow (was: yellow) + * host4 : + */ + + // check resource + Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 4), + mgr.getQueueResource("Q1", q1Label, clusterResource)); + Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 4), + mgr.getQueueResource("Q2", q2Label, clusterResource)); + Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3), + mgr.getQueueResource("Q3", q3Label, clusterResource)); + Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 2), + mgr.getQueueResource("Q4", q4Label, clusterResource)); + Assert.assertEquals(clusterResource, + mgr.getQueueResource("Q5", q5Label, clusterResource)); + + /* + * Check resource after deactive/active some nodes + * Node->Labels: + * (deactived) host1 : blue + * host2 : + * (deactived and then actived) host3 : red, yellow + * host4 : + */ + mgr.deactivateNode(NodeId.newInstance("host1", 1)); + mgr.deactivateNode(NodeId.newInstance("host3", 1)); + mgr.activateNode(NodeId.newInstance("host3", 1), SMALL_RESOURCE); + + // check resource + Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3), + mgr.getQueueResource("Q1", q1Label, clusterResource)); + Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3), + mgr.getQueueResource("Q2", q2Label, clusterResource)); + Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3), + mgr.getQueueResource("Q3", q3Label, clusterResource)); + Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 2), + mgr.getQueueResource("Q4", q4Label, clusterResource)); + Assert.assertEquals(clusterResource, + mgr.getQueueResource("Q5", q5Label, clusterResource)); + + /* + * Check resource after refresh queue: + * Q1: blue + * Q2: red, blue + * Q3: red + * Q4: + * Q5: ANY + */ + q1Label = toSet("blue"); + q2Label = toSet("blue", "red"); + q3Label = toSet("red"); + q4Label = RMNodeLabelsManager.EMPTY_STRING_SET; + q5Label = toSet(RMNodeLabelsManager.ANY); + + queueToLabels.clear(); + queueToLabels.put("Q1", q1Label); + queueToLabels.put("Q2", q2Label); + queueToLabels.put("Q3", q3Label); + queueToLabels.put("Q4", q4Label); + queueToLabels.put("Q5", q5Label); + + mgr.reinitializeQueueLabels(queueToLabels); + + // check resource + Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 2), + mgr.getQueueResource("Q1", q1Label, clusterResource)); + Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3), + mgr.getQueueResource("Q2", q2Label, clusterResource)); + Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3), + mgr.getQueueResource("Q3", q3Label, clusterResource)); + Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 2), + mgr.getQueueResource("Q4", q4Label, clusterResource)); + Assert.assertEquals(clusterResource, + mgr.getQueueResource("Q5", q5Label, clusterResource)); + + /* + * Active NMs in nodes already have NM + * Node->Labels: + * host2 : + * host3 : red, yellow (3 NMs) + * host4 : (2 NMs) + */ + mgr.activateNode(NodeId.newInstance("host3", 2), SMALL_RESOURCE); + mgr.activateNode(NodeId.newInstance("host3", 3), SMALL_RESOURCE); + mgr.activateNode(NodeId.newInstance("host4", 2), SMALL_RESOURCE); + + // check resource + Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3), + mgr.getQueueResource("Q1", q1Label, clusterResource)); + Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 6), + mgr.getQueueResource("Q2", q2Label, clusterResource)); + Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 6), + mgr.getQueueResource("Q3", q3Label, clusterResource)); + Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3), + mgr.getQueueResource("Q4", q4Label, clusterResource)); + Assert.assertEquals(clusterResource, + mgr.getQueueResource("Q5", q5Label, clusterResource)); + + /* + * Deactive NMs in nodes already have NMs + * Node->Labels: + * host2 : + * host3 : red, yellow (2 NMs) + * host4 : (0 NMs) + */ + mgr.deactivateNode(NodeId.newInstance("host3", 3)); + mgr.deactivateNode(NodeId.newInstance("host4", 2)); + mgr.deactivateNode(NodeId.newInstance("host4", 1)); + + // check resource + Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 1), + mgr.getQueueResource("Q1", q1Label, clusterResource)); + Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3), + mgr.getQueueResource("Q2", q2Label, clusterResource)); + Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3), + mgr.getQueueResource("Q3", q3Label, clusterResource)); + Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 1), + mgr.getQueueResource("Q4", q4Label, clusterResource)); + Assert.assertEquals(clusterResource, + mgr.getQueueResource("Q5", q5Label, clusterResource)); + } +}