Repository: hadoop
Updated Branches:
  refs/heads/branch-2.6 16d714433 -> bb6c79f76


http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb6c79f7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java
new file mode 100644
index 0000000..1d7f6f1
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java
@@ -0,0 +1,447 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.nodelabels;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import com.google.common.collect.ImmutableSet;
+
+public class RMNodeLabelsManager extends CommonNodeLabelsManager {
+  
+  protected static class Queue {
+    protected Set<String> acccessibleNodeLabels;
+    protected Resource resource;
+
+    protected Queue() {
+      acccessibleNodeLabels =
+          Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
+      resource = Resource.newInstance(0, 0);
+    }
+  }
+
+  ConcurrentMap<String, Queue> queueCollections =
+      new ConcurrentHashMap<String, Queue>();
+  protected AccessControlList adminAcl;
+  
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    super.serviceInit(conf);
+    adminAcl =
+        new AccessControlList(conf.get(YarnConfiguration.YARN_ADMIN_ACL,
+            YarnConfiguration.DEFAULT_YARN_ADMIN_ACL));
+  }
+
+  @Override
+  public void addLabelsToNode(Map<NodeId, Set<String>> addedLabelsToNode)
+      throws IOException {    
+    try {
+      writeLock.lock();
+
+      // get nodesCollection before edition
+      Map<String, Host> before = cloneNodeMap(addedLabelsToNode.keySet());
+
+      super.addLabelsToNode(addedLabelsToNode);
+
+      // get nodesCollection after edition
+      Map<String, Host> after = cloneNodeMap(addedLabelsToNode.keySet());
+
+      // update running nodes resources
+      updateResourceMappings(before, after);
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  protected void checkRemoveFromClusterNodeLabelsOfQueue(
+      Collection<String> labelsToRemove) throws IOException {
+    // Check if label to remove doesn't existed or null/empty, will throw
+    // exception if any of labels to remove doesn't meet requirement
+    for (String label : labelsToRemove) {
+      label = normalizeLabel(label);
+
+      // check if any queue contains this label
+      for (Entry<String, Queue> entry : queueCollections.entrySet()) {
+        String queueName = entry.getKey();
+        Set<String> queueLabels = entry.getValue().acccessibleNodeLabels;
+        if (queueLabels.contains(label)) {
+          throw new IOException("Cannot remove label=" + label
+              + ", because queue=" + queueName + " is using this label. "
+              + "Please remove label on queue before remove the label");
+        }
+      }
+    }
+  }
+
+  @Override
+  public void removeFromClusterNodeLabels(Collection<String> labelsToRemove)
+      throws IOException {
+    try {
+      writeLock.lock();
+
+      checkRemoveFromClusterNodeLabelsOfQueue(labelsToRemove);
+
+      // copy before NMs
+      Map<String, Host> before = cloneNodeMap();
+
+      super.removeFromClusterNodeLabels(labelsToRemove);
+
+      updateResourceMappings(before, nodeCollections);
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  @Override
+  public void
+      removeLabelsFromNode(Map<NodeId, Set<String>> removeLabelsFromNode)
+          throws IOException {
+    try {
+      writeLock.lock();
+
+      // get nodesCollection before edition
+      Map<String, Host> before =
+          cloneNodeMap(removeLabelsFromNode.keySet());
+
+      super.removeLabelsFromNode(removeLabelsFromNode);
+
+      // get nodesCollection before edition
+      Map<String, Host> after = cloneNodeMap(removeLabelsFromNode.keySet());
+
+      // update running nodes resources
+      updateResourceMappings(before, after);
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  @Override
+  public void replaceLabelsOnNode(Map<NodeId, Set<String>> replaceLabelsToNode)
+      throws IOException {
+    try {
+      writeLock.lock();
+      
+      // get nodesCollection before edition
+      Map<String, Host> before = cloneNodeMap(replaceLabelsToNode.keySet());
+
+      super.replaceLabelsOnNode(replaceLabelsToNode);
+
+      // get nodesCollection after edition
+      Map<String, Host> after = cloneNodeMap(replaceLabelsToNode.keySet());
+
+      // update running nodes resources
+      updateResourceMappings(before, after);
+    } finally {
+      writeLock.unlock();
+    }
+  }
+  
+
+  /*
+   * Following methods are used for setting if a node is up and running, and it
+   * will update running nodes resource
+   */
+  public void activateNode(NodeId nodeId, Resource resource) {
+    try {
+      writeLock.lock();
+      
+      // save if we have a node before
+      Map<String, Host> before = cloneNodeMap(ImmutableSet.of(nodeId));
+      
+      createNodeIfNonExisted(nodeId);
+      Node nm = getNMInNodeSet(nodeId);
+      nm.resource = resource;
+      nm.running = true;
+      
+      // get the node after edition
+      Map<String, Host> after = cloneNodeMap(ImmutableSet.of(nodeId));
+      
+      updateResourceMappings(before, after);
+    } finally {
+      writeLock.unlock();
+    }
+  }
+  
+  /*
+   * Following methods are used for setting if a node unregistered to RM
+   */
+  public void deactivateNode(NodeId nodeId) {
+    try {
+      writeLock.lock();
+      
+      // save if we have a node before
+      Map<String, Host> before = cloneNodeMap(ImmutableSet.of(nodeId));
+      Node nm = getNMInNodeSet(nodeId);
+      if (null != nm) {
+        // set nm is not running, and its resource = 0
+        nm.running = false;
+        nm.resource = Resource.newInstance(0, 0);
+      }
+      
+      // get the node after edition
+      Map<String, Host> after = cloneNodeMap(ImmutableSet.of(nodeId));
+      
+      updateResourceMappings(before, after);
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  public void updateNodeResource(NodeId node, Resource newResource) {
+    deactivateNode(node);
+    activateNode(node, newResource);
+  }
+
+  public void reinitializeQueueLabels(Map<String, Set<String>> queueToLabels) {
+    try {
+      writeLock.lock();
+      // clear before set
+      this.queueCollections.clear();
+
+      for (Entry<String, Set<String>> entry : queueToLabels.entrySet()) {
+        String queue = entry.getKey();
+        Queue q = new Queue();
+        this.queueCollections.put(queue, q);
+
+        Set<String> labels = entry.getValue();
+        if (labels.contains(ANY)) {
+          continue;
+        }
+
+        q.acccessibleNodeLabels.addAll(labels);
+        for (Host host : nodeCollections.values()) {
+          for (Entry<NodeId, Node> nentry : host.nms.entrySet()) {
+            NodeId nodeId = nentry.getKey();
+            Node nm = nentry.getValue();
+            if (nm.running && isNodeUsableByQueue(getLabelsByNode(nodeId), q)) 
{
+              Resources.addTo(q.resource, nm.resource);
+            }
+          }
+        }
+      }
+    } finally {
+      writeLock.unlock();
+    }
+  }
+  
+  public Resource getQueueResource(String queueName, Set<String> queueLabels,
+      Resource clusterResource) {
+    try {
+      readLock.lock();
+      if (queueLabels.contains(ANY)) {
+        return clusterResource;
+      }
+      Queue q = queueCollections.get(queueName);
+      if (null == q) {
+        return Resources.none();
+      }
+      return q.resource;
+    } finally {
+      readLock.unlock();
+    }
+  }
+  
+  public Set<String> getLabelsOnNode(NodeId nodeId) {
+    try {
+      readLock.lock();
+      Set<String> nodeLabels = getLabelsByNode(nodeId);
+      return Collections.unmodifiableSet(nodeLabels);
+    } finally {
+      readLock.unlock();
+    }
+  }
+  
+  public boolean containsNodeLabel(String label) {
+    try {
+      readLock.lock();
+      return label != null
+          && (label.isEmpty() || labelCollections.containsKey(label));
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  private Map<String, Host> cloneNodeMap(Set<NodeId> nodesToCopy) {
+    Map<String, Host> map = new HashMap<String, Host>();
+    for (NodeId nodeId : nodesToCopy) {
+      if (!map.containsKey(nodeId.getHost())) {
+        Host originalN = nodeCollections.get(nodeId.getHost());
+        if (null == originalN) {
+          continue;
+        }
+        Host n = originalN.copy();
+        n.nms.clear();
+        map.put(nodeId.getHost(), n);
+      }
+
+      Host n = map.get(nodeId.getHost());
+      if (WILDCARD_PORT == nodeId.getPort()) {
+        for (Entry<NodeId, Node> entry : nodeCollections
+            .get(nodeId.getHost()).nms.entrySet()) {
+          n.nms.put(entry.getKey(), entry.getValue().copy());
+        }
+      } else {
+        Node nm = getNMInNodeSet(nodeId);
+        if (null != nm) {
+          n.nms.put(nodeId, nm.copy());
+        }
+      }
+    }
+    return map;
+  }
+
+  private void updateResourceMappings(Map<String, Host> before,
+      Map<String, Host> after) {
+    // Get NMs in before only
+    Set<NodeId> allNMs = new HashSet<NodeId>();
+    for (Entry<String, Host> entry : before.entrySet()) {
+      allNMs.addAll(entry.getValue().nms.keySet());
+    }
+    for (Entry<String, Host> entry : after.entrySet()) {
+      allNMs.addAll(entry.getValue().nms.keySet());
+    }
+
+    // traverse all nms
+    for (NodeId nodeId : allNMs) {
+      Node oldNM;
+      if ((oldNM = getNMInNodeSet(nodeId, before, true)) != null) {
+        Set<String> oldLabels = getLabelsByNode(nodeId, before);
+        // no label in the past
+        if (oldLabels.isEmpty()) {
+          // update labels
+          Label label = labelCollections.get(NO_LABEL);
+          Resources.subtractFrom(label.resource, oldNM.resource);
+
+          // update queues, all queue can access this node
+          for (Queue q : queueCollections.values()) {
+            Resources.subtractFrom(q.resource, oldNM.resource);
+          }
+        } else {
+          // update labels
+          for (String labelName : oldLabels) {
+            Label label = labelCollections.get(labelName);
+            if (null == label) {
+              continue;
+            }
+            Resources.subtractFrom(label.resource, oldNM.resource);
+          }
+
+          // update queues, only queue can access this node will be subtract
+          for (Queue q : queueCollections.values()) {
+            if (isNodeUsableByQueue(oldLabels, q)) {
+              Resources.subtractFrom(q.resource, oldNM.resource);
+            }
+          }
+        }
+      }
+
+      Node newNM;
+      if ((newNM = getNMInNodeSet(nodeId, after, true)) != null) {
+        Set<String> newLabels = getLabelsByNode(nodeId, after);
+        // no label in the past
+        if (newLabels.isEmpty()) {
+          // update labels
+          Label label = labelCollections.get(NO_LABEL);
+          Resources.addTo(label.resource, newNM.resource);
+
+          // update queues, all queue can access this node
+          for (Queue q : queueCollections.values()) {
+            Resources.addTo(q.resource, newNM.resource);
+          }
+        } else {
+          // update labels
+          for (String labelName : newLabels) {
+            Label label = labelCollections.get(labelName);
+            Resources.addTo(label.resource, newNM.resource);
+          }
+
+          // update queues, only queue can access this node will be subtract
+          for (Queue q : queueCollections.values()) {
+            if (isNodeUsableByQueue(newLabels, q)) {
+              Resources.addTo(q.resource, newNM.resource);
+            }
+          }
+        }
+      }
+    }
+  }
+  
+  public Resource getResourceByLabel(String label, Resource clusterResource) {
+    label = normalizeLabel(label);
+    try {
+      readLock.lock();
+      if (null == labelCollections.get(label)) {
+        return Resources.none();
+      }
+      return labelCollections.get(label).resource;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  private boolean isNodeUsableByQueue(Set<String> nodeLabels, Queue q) {
+    // node without any labels can be accessed by any queue
+    if (nodeLabels == null || nodeLabels.isEmpty()
+        || (nodeLabels.size() == 1 && nodeLabels.contains(NO_LABEL))) {
+      return true;
+    }
+
+    for (String label : nodeLabels) {
+      if (q.acccessibleNodeLabels.contains(label)) {
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+  private Map<String, Host> cloneNodeMap() {
+    Set<NodeId> nodesToCopy = new HashSet<NodeId>();
+    for (String nodeName : nodeCollections.keySet()) {
+      nodesToCopy.add(NodeId.newInstance(nodeName, WILDCARD_PORT));
+    }
+    return cloneNodeMap(nodesToCopy);
+  }
+
+  public boolean checkAccess(UserGroupInformation user) {
+    // make sure only admin can invoke
+    // this method
+    if (adminAcl.isUserAllowed(user)) {
+      return true;
+    }
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb6c79f7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/DummyRMNodeLabelsManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/DummyRMNodeLabelsManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/DummyRMNodeLabelsManager.java
new file mode 100644
index 0000000..14bd999
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/DummyRMNodeLabelsManager.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.nodelabels;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.event.InlineDispatcher;
+import org.apache.hadoop.yarn.nodelabels.NodeLabelsStore;
+
+public class DummyRMNodeLabelsManager extends RMNodeLabelsManager {
+  Map<NodeId, Set<String>> lastNodeToLabels = null;
+  Collection<String> lastAddedlabels = null;
+  Collection<String> lastRemovedlabels = null;
+
+  @Override
+  public void initNodeLabelStore(Configuration conf) {
+    this.store = new NodeLabelsStore(this) {
+
+      @Override
+      public void recover() throws IOException {
+        // do nothing
+      }
+
+      @Override
+      public void removeClusterNodeLabels(Collection<String> labels)
+          throws IOException {
+        // do nothing
+      }
+
+      @Override
+      public void updateNodeToLabelsMappings(
+          Map<NodeId, Set<String>> nodeToLabels) throws IOException {
+        // do nothing
+      }
+
+      @Override
+      public void storeNewClusterNodeLabels(Set<String> label) throws 
IOException {
+        // do nothing
+      }
+
+      @Override
+      public void close() throws IOException {
+        // do nothing
+      }
+    };
+  }
+
+  @Override
+  protected void initDispatcher(Configuration conf) {
+    super.dispatcher = new InlineDispatcher();
+  }
+
+  @Override
+  protected void startDispatcher() {
+    // do nothing
+  }
+  
+  @Override
+  protected void stopDispatcher() {
+    // do nothing
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb6c79f7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestRMNodeLabelsManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestRMNodeLabelsManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestRMNodeLabelsManager.java
new file mode 100644
index 0000000..26284e2
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestRMNodeLabelsManager.java
@@ -0,0 +1,367 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.nodelabels;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+public class TestRMNodeLabelsManager extends NodeLabelTestBase {
+  private final Resource EMPTY_RESOURCE = Resource.newInstance(0, 0);
+  private final Resource SMALL_RESOURCE = Resource.newInstance(100, 0);
+  private final Resource LARGE_NODE = Resource.newInstance(1000, 0);
+  
+  DummyRMNodeLabelsManager mgr = null;
+
+  @Before
+  public void before() {
+    mgr = new DummyRMNodeLabelsManager();
+    mgr.init(new Configuration());
+    mgr.start();
+  }
+
+  @After
+  public void after() {
+    mgr.stop();
+  }
+  
+  @Test(timeout = 5000)
+  public void testNodeActiveDeactiveUpdate() throws Exception {
+    mgr.addToCluserNodeLabels(toSet("p1", "p2", "p3"));
+    mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1"), toSet("p1"),
+        toNodeId("n2"), toSet("p2"), toNodeId("n3"), toSet("p3")));
+
+    Assert.assertEquals(mgr.getResourceByLabel("p1", null), EMPTY_RESOURCE);
+    Assert.assertEquals(mgr.getResourceByLabel("p2", null), EMPTY_RESOURCE);
+    Assert.assertEquals(mgr.getResourceByLabel("p3", null), EMPTY_RESOURCE);
+    Assert.assertEquals(mgr.getResourceByLabel(RMNodeLabelsManager.NO_LABEL, 
null),
+        EMPTY_RESOURCE);
+
+    // active two NM to n1, one large and one small
+    mgr.activateNode(NodeId.newInstance("n1", 1), SMALL_RESOURCE);
+    mgr.activateNode(NodeId.newInstance("n1", 2), LARGE_NODE);
+    Assert.assertEquals(mgr.getResourceByLabel("p1", null),
+        Resources.add(SMALL_RESOURCE, LARGE_NODE));
+
+    // change the large NM to small, check if resource updated
+    mgr.updateNodeResource(NodeId.newInstance("n1", 2), SMALL_RESOURCE);
+    Assert.assertEquals(mgr.getResourceByLabel("p1", null),
+        Resources.multiply(SMALL_RESOURCE, 2));
+
+    // deactive one NM, and check if resource updated
+    mgr.deactivateNode(NodeId.newInstance("n1", 1));
+    Assert.assertEquals(mgr.getResourceByLabel("p1", null), SMALL_RESOURCE);
+
+    // continus deactive, check if resource updated
+    mgr.deactivateNode(NodeId.newInstance("n1", 2));
+    Assert.assertEquals(mgr.getResourceByLabel("p1", null), EMPTY_RESOURCE);
+
+    // Add two NM to n1 back
+    mgr.activateNode(NodeId.newInstance("n1", 1), SMALL_RESOURCE);
+    mgr.activateNode(NodeId.newInstance("n1", 2), LARGE_NODE);
+
+    // And remove p1, now the two NM should come to default label,
+    mgr.removeFromClusterNodeLabels(ImmutableSet.of("p1"));
+    Assert.assertEquals(mgr.getResourceByLabel(RMNodeLabelsManager.NO_LABEL, 
null),
+        Resources.add(SMALL_RESOURCE, LARGE_NODE));
+  }
+
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  @Test(timeout = 5000)
+  public void testUpdateNodeLabelWithActiveNode() throws Exception {
+    mgr.addToCluserNodeLabels(toSet("p1", "p2", "p3"));
+    mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1"), toSet("p1"),
+        toNodeId("n2"), toSet("p2"), toNodeId("n3"), toSet("p3")));
+
+    // active two NM to n1, one large and one small
+    mgr.activateNode(NodeId.newInstance("n1", 1), SMALL_RESOURCE);
+    mgr.activateNode(NodeId.newInstance("n2", 1), SMALL_RESOURCE);
+    mgr.activateNode(NodeId.newInstance("n3", 1), SMALL_RESOURCE);
+
+    // change label of n1 to p2
+    mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1"), toSet("p2")));
+    Assert.assertEquals(mgr.getResourceByLabel("p1", null), EMPTY_RESOURCE);
+    Assert.assertEquals(mgr.getResourceByLabel("p2", null),
+        Resources.multiply(SMALL_RESOURCE, 2));
+    Assert.assertEquals(mgr.getResourceByLabel("p3", null), SMALL_RESOURCE);
+
+    // add more labels
+    mgr.addToCluserNodeLabels(toSet("p4", "p5", "p6"));
+    mgr.replaceLabelsOnNode((Map) ImmutableMap.of(toNodeId("n4"), toSet("p1"),
+        toNodeId("n5"), toSet("p2"), toNodeId("n6"), toSet("p3"),
+        toNodeId("n7"), toSet("p4"), toNodeId("n8"), toSet("p5")));
+
+    // now node -> label is,
+    // p1 : n4
+    // p2 : n1, n2, n5
+    // p3 : n3, n6
+    // p4 : n7
+    // p5 : n8
+    // no-label : n9
+
+    // active these nodes
+    mgr.activateNode(NodeId.newInstance("n4", 1), SMALL_RESOURCE);
+    mgr.activateNode(NodeId.newInstance("n5", 1), SMALL_RESOURCE);
+    mgr.activateNode(NodeId.newInstance("n6", 1), SMALL_RESOURCE);
+    mgr.activateNode(NodeId.newInstance("n7", 1), SMALL_RESOURCE);
+    mgr.activateNode(NodeId.newInstance("n8", 1), SMALL_RESOURCE);
+    mgr.activateNode(NodeId.newInstance("n9", 1), SMALL_RESOURCE);
+
+    // check varibles
+    Assert.assertEquals(mgr.getResourceByLabel("p1", null), SMALL_RESOURCE);
+    Assert.assertEquals(mgr.getResourceByLabel("p2", null),
+        Resources.multiply(SMALL_RESOURCE, 3));
+    Assert.assertEquals(mgr.getResourceByLabel("p3", null),
+        Resources.multiply(SMALL_RESOURCE, 2));
+    Assert.assertEquals(mgr.getResourceByLabel("p4", null),
+        Resources.multiply(SMALL_RESOURCE, 1));
+    Assert.assertEquals(mgr.getResourceByLabel("p5", null),
+        Resources.multiply(SMALL_RESOURCE, 1));
+    Assert.assertEquals(mgr.getResourceByLabel(RMNodeLabelsManager.NO_LABEL, 
null),
+        Resources.multiply(SMALL_RESOURCE, 1));
+
+    // change a bunch of nodes -> labels
+    // n4 -> p2
+    // n7 -> empty
+    // n5 -> p1
+    // n8 -> empty
+    // n9 -> p1
+    //
+    // now become:
+    // p1 : n5, n9
+    // p2 : n1, n2, n4
+    // p3 : n3, n6
+    // p4 : [ ]
+    // p5 : [ ]
+    // no label: n8, n7
+    mgr.replaceLabelsOnNode((Map) ImmutableMap.of(toNodeId("n4"), toSet("p2"),
+        toNodeId("n7"), RMNodeLabelsManager.EMPTY_STRING_SET, toNodeId("n5"),
+        toSet("p1"), toNodeId("n8"), RMNodeLabelsManager.EMPTY_STRING_SET,
+        toNodeId("n9"), toSet("p1")));
+
+    // check varibles
+    Assert.assertEquals(mgr.getResourceByLabel("p1", null),
+        Resources.multiply(SMALL_RESOURCE, 2));
+    Assert.assertEquals(mgr.getResourceByLabel("p2", null),
+        Resources.multiply(SMALL_RESOURCE, 3));
+    Assert.assertEquals(mgr.getResourceByLabel("p3", null),
+        Resources.multiply(SMALL_RESOURCE, 2));
+    Assert.assertEquals(mgr.getResourceByLabel("p4", null),
+        Resources.multiply(SMALL_RESOURCE, 0));
+    Assert.assertEquals(mgr.getResourceByLabel("p5", null),
+        Resources.multiply(SMALL_RESOURCE, 0));
+    Assert.assertEquals(mgr.getResourceByLabel("", null),
+        Resources.multiply(SMALL_RESOURCE, 2));
+  }
+  
+  @Test(timeout=5000)
+  public void testGetQueueResource() throws Exception {
+    Resource clusterResource = Resource.newInstance(9999, 1);
+    
+    /*
+     * Node->Labels:
+     *   host1 : red, blue
+     *   host2 : blue, yellow
+     *   host3 : yellow
+     *   host4 :
+     */
+    mgr.addToCluserNodeLabels(toSet("red", "blue", "yellow"));
+    mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("host1"),
+        toSet("red", "blue")));
+    mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("host2"),
+        toSet("blue", "yellow")));
+    mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("host3"), 
toSet("yellow")));
+    
+    // active two NM to n1, one large and one small
+    mgr.activateNode(NodeId.newInstance("host1", 1), SMALL_RESOURCE);
+    mgr.activateNode(NodeId.newInstance("host2", 1), SMALL_RESOURCE);
+    mgr.activateNode(NodeId.newInstance("host3", 1), SMALL_RESOURCE);
+    mgr.activateNode(NodeId.newInstance("host4", 1), SMALL_RESOURCE);
+    
+    // reinitialize queue
+    Set<String> q1Label = toSet("red", "blue");
+    Set<String> q2Label = toSet("blue", "yellow");
+    Set<String> q3Label = toSet("yellow");
+    Set<String> q4Label = RMNodeLabelsManager.EMPTY_STRING_SET;
+    Set<String> q5Label = toSet(RMNodeLabelsManager.ANY);
+    
+    Map<String, Set<String>> queueToLabels = new HashMap<String, 
Set<String>>();
+    queueToLabels.put("Q1", q1Label);
+    queueToLabels.put("Q2", q2Label);
+    queueToLabels.put("Q3", q3Label);
+    queueToLabels.put("Q4", q4Label);
+    queueToLabels.put("Q5", q5Label);
+
+    mgr.reinitializeQueueLabels(queueToLabels);
+    
+    // check resource
+    Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3),
+        mgr.getQueueResource("Q1", q1Label, clusterResource));
+    Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 4),
+        mgr.getQueueResource("Q2", q2Label, clusterResource));
+    Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3),
+        mgr.getQueueResource("Q3", q3Label, clusterResource));
+    Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 1),
+        mgr.getQueueResource("Q4", q4Label, clusterResource));
+    Assert.assertEquals(clusterResource,
+        mgr.getQueueResource("Q5", q5Label, clusterResource));
+    
+    mgr.removeLabelsFromNode(ImmutableMap.of(toNodeId("host1"), toSet("red"),
+        toNodeId("host2"), toSet("blue", "yellow")));
+    mgr.addLabelsToNode(ImmutableMap.of(toNodeId("host3"), toSet("red")));
+    /*
+     * Check resource after changes some labels
+     * Node->Labels:
+     *   host1 : blue (was: red, blue)
+     *   host2 : (was: blue, yellow)
+     *   host3 : red, yellow (was: yellow)
+     *   host4 :
+     */
+    
+    // check resource
+    Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 4),
+        mgr.getQueueResource("Q1", q1Label, clusterResource));
+    Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 4),
+        mgr.getQueueResource("Q2", q2Label, clusterResource));
+    Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3),
+        mgr.getQueueResource("Q3", q3Label, clusterResource));
+    Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 2),
+        mgr.getQueueResource("Q4", q4Label, clusterResource));
+    Assert.assertEquals(clusterResource,
+        mgr.getQueueResource("Q5", q5Label, clusterResource));
+    
+    /*
+     * Check resource after deactive/active some nodes 
+     * Node->Labels:
+     *   (deactived) host1 : blue
+     *   host2 :
+     *   (deactived and then actived) host3 : red, yellow
+     *   host4 :
+     */
+    mgr.deactivateNode(NodeId.newInstance("host1", 1));
+    mgr.deactivateNode(NodeId.newInstance("host3", 1));
+    mgr.activateNode(NodeId.newInstance("host3", 1), SMALL_RESOURCE);
+    
+    // check resource
+    Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3),
+        mgr.getQueueResource("Q1", q1Label, clusterResource));
+    Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3),
+        mgr.getQueueResource("Q2", q2Label, clusterResource));
+    Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3),
+        mgr.getQueueResource("Q3", q3Label, clusterResource));
+    Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 2),
+        mgr.getQueueResource("Q4", q4Label, clusterResource));
+    Assert.assertEquals(clusterResource,
+        mgr.getQueueResource("Q5", q5Label, clusterResource));
+    
+    /*
+     * Check resource after refresh queue:
+     *    Q1: blue
+     *    Q2: red, blue
+     *    Q3: red
+     *    Q4:
+     *    Q5: ANY
+     */
+    q1Label = toSet("blue");
+    q2Label = toSet("blue", "red");
+    q3Label = toSet("red");
+    q4Label = RMNodeLabelsManager.EMPTY_STRING_SET;
+    q5Label = toSet(RMNodeLabelsManager.ANY);
+    
+    queueToLabels.clear();
+    queueToLabels.put("Q1", q1Label);
+    queueToLabels.put("Q2", q2Label);
+    queueToLabels.put("Q3", q3Label);
+    queueToLabels.put("Q4", q4Label);
+    queueToLabels.put("Q5", q5Label);
+
+    mgr.reinitializeQueueLabels(queueToLabels);
+    
+    // check resource
+    Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 2),
+        mgr.getQueueResource("Q1", q1Label, clusterResource));
+    Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3),
+        mgr.getQueueResource("Q2", q2Label, clusterResource));
+    Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3),
+        mgr.getQueueResource("Q3", q3Label, clusterResource));
+    Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 2),
+        mgr.getQueueResource("Q4", q4Label, clusterResource));
+    Assert.assertEquals(clusterResource,
+        mgr.getQueueResource("Q5", q5Label, clusterResource));
+    
+    /*
+     * Active NMs in nodes already have NM
+     * Node->Labels:
+     *   host2 :
+     *   host3 : red, yellow (3 NMs)
+     *   host4 : (2 NMs)
+     */
+    mgr.activateNode(NodeId.newInstance("host3", 2), SMALL_RESOURCE);
+    mgr.activateNode(NodeId.newInstance("host3", 3), SMALL_RESOURCE);
+    mgr.activateNode(NodeId.newInstance("host4", 2), SMALL_RESOURCE);
+    
+    // check resource
+    Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3),
+        mgr.getQueueResource("Q1", q1Label, clusterResource));
+    Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 6),
+        mgr.getQueueResource("Q2", q2Label, clusterResource));
+    Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 6),
+        mgr.getQueueResource("Q3", q3Label, clusterResource));
+    Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3),
+        mgr.getQueueResource("Q4", q4Label, clusterResource));
+    Assert.assertEquals(clusterResource,
+        mgr.getQueueResource("Q5", q5Label, clusterResource));
+    
+    /*
+     * Deactive NMs in nodes already have NMs
+     * Node->Labels:
+     *   host2 :
+     *   host3 : red, yellow (2 NMs)
+     *   host4 : (0 NMs)
+     */
+    mgr.deactivateNode(NodeId.newInstance("host3", 3));
+    mgr.deactivateNode(NodeId.newInstance("host4", 2));
+    mgr.deactivateNode(NodeId.newInstance("host4", 1));
+    
+    // check resource
+    Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 1),
+        mgr.getQueueResource("Q1", q1Label, clusterResource));
+    Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3),
+        mgr.getQueueResource("Q2", q2Label, clusterResource));
+    Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3),
+        mgr.getQueueResource("Q3", q3Label, clusterResource));
+    Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 1),
+        mgr.getQueueResource("Q4", q4Label, clusterResource));
+    Assert.assertEquals(clusterResource,
+        mgr.getQueueResource("Q5", q5Label, clusterResource));
+  }
+}

Reply via email to