Y-5181. v1
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d781c25f Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d781c25f Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d781c25f Branch: refs/heads/fs-preemption Commit: d781c25f46a217d945177f98a0efed22d6513bc7 Parents: ec5b5ec Author: Karthik Kambatla <ka...@apache.org> Authored: Mon May 30 23:24:37 2016 -0700 Committer: Karthik Kambatla <ka...@apache.org> Committed: Mon May 30 23:29:00 2016 -0700 ---------------------------------------------------------------------- .../scheduler/ClusterNodeTracker.java | 55 ++++++++++++---- .../scheduler/TestClusterNodeTracker.java | 68 ++++++++++++++++++++ 2 files changed, 111 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/d781c25f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java index feb071f..9ff83fa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java @@ -18,11 +18,14 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; +import com.google.common.base.Preconditions; +import org.apache.commons.collections.map.HashedMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.util.resource.Resources; @@ -50,7 +53,8 @@ public class ClusterNodeTracker<N extends SchedulerNode> { private Lock writeLock = readWriteLock.writeLock(); private HashMap<NodeId, N> nodes = new HashMap<>(); - private Map<String, Integer> nodesPerRack = new HashMap<>(); + private Map<String, N> nodeNameToNodeMap = new HashMap<>(); + private Map<String, List<N>> nodesPerRack = new HashMap<>(); private Resource clusterCapacity = Resources.clone(Resources.none()); private Resource staleClusterCapacity = null; @@ -66,14 +70,16 @@ public class ClusterNodeTracker<N extends SchedulerNode> { writeLock.lock(); try { nodes.put(node.getNodeID(), node); + nodeNameToNodeMap.put(node.getNodeName(), node); // Update nodes per rack as well String rackName = node.getRackName(); - Integer numNodes = nodesPerRack.get(rackName); - if (numNodes == null) { - numNodes = 0; + List<N> nodesList = nodesPerRack.get(rackName); + if (nodesList == null) { + nodesList = new ArrayList<>(); + nodesPerRack.put(rackName, nodesList); } - nodesPerRack.put(rackName, ++numNodes); + nodesList.add(node); // Update cluster capacity Resources.addTo(clusterCapacity, node.getTotalResource()); @@ -126,8 +132,8 @@ public class ClusterNodeTracker<N extends SchedulerNode> { readLock.lock(); String rName = rackName == null ? "NULL" : rackName; try { - Integer nodeCount = nodesPerRack.get(rName); - return nodeCount == null ? 0 : nodeCount; + List<N> nodesList = nodesPerRack.get(rName); + return nodesList == null ? 0 : nodesList.size(); } finally { readLock.unlock(); } @@ -154,14 +160,18 @@ public class ClusterNodeTracker<N extends SchedulerNode> { LOG.warn("Attempting to remove a non-existent node " + nodeId); return null; } + nodeNameToNodeMap.remove(node.getNodeName()); // Update nodes per rack as well String rackName = node.getRackName(); - Integer numNodes = nodesPerRack.get(rackName); - if (numNodes > 0) { - nodesPerRack.put(rackName, --numNodes); - } else { + List<N> nodesList = nodesPerRack.get(rackName); + if (nodesList == null) { LOG.error("Attempting to remove node from an empty rack " + rackName); + } else { + nodesList.remove(node); + if (nodesList.isEmpty()) { + nodesPerRack.remove(rackName); + } } // Update cluster capacity @@ -254,7 +264,7 @@ public class ClusterNodeTracker<N extends SchedulerNode> { } public List<N> getAllNodes() { - return getNodes(null); + return getNodes((NodeFilter)null); } /** @@ -297,4 +307,25 @@ public class ClusterNodeTracker<N extends SchedulerNode> { Collections.sort(sortedList, comparator); return sortedList; } + + /** + * Convenience method to return list of nodes corresponding to resourceName + * passed in the {@link ResourceRequest}. + */ + public List<N> getNodes(final String resourceName) { + Preconditions.checkArgument( + resourceName != null && !resourceName.isEmpty()); + List<N> nodes = new ArrayList<>(); + if (ResourceRequest.ANY.equals(resourceName)) { + return getAllNodes(); + } else if (nodeNameToNodeMap.containsKey(resourceName)) { + nodes.add(nodeNameToNodeMap.get(resourceName)); + } else if (nodesPerRack.containsKey(resourceName)) { + return nodesPerRack.get(resourceName); + } else { + LOG.info( + "Could not find a node matching given resourceName " + resourceName); + } + return nodes; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/d781c25f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestClusterNodeTracker.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestClusterNodeTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestClusterNodeTracker.java new file mode 100644 index 0000000..06e7dc8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestClusterNodeTracker.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSSchedulerNode; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * Test class to verify ClusterNodeTracker. Using FSSchedulerNode without + * loss of generality. + */ +public class TestClusterNodeTracker { + ClusterNodeTracker<FSSchedulerNode> nodeTracker = new ClusterNodeTracker(); + + @Before + public void setup() { + List<RMNode> rmNodes = + MockNodes.newNodes(2, 4, Resource.newInstance(4096, 4)); + for (RMNode rmNode : rmNodes) { + nodeTracker.addNode(new FSSchedulerNode(rmNode, false)); + } + } + + @Test + public void testGetNodeCount() { + assertEquals("Incorrect number of nodes in the cluster", + 8, nodeTracker.nodeCount()); + + assertEquals("Incorrect number of nodes in each rack", + 4, nodeTracker.nodeCount("rack0")); + } + + @Test + public void testGetNodesForResourceName() throws Exception { + assertEquals("Incorrect number of nodes matching ANY", + 8, nodeTracker.getNodes(ResourceRequest.ANY).size()); + + assertEquals("Incorrect number of nodes matching rack", + 4, nodeTracker.getNodes("rack0").size()); + + assertEquals("Incorrect number of nodes matching node", + 1, nodeTracker.getNodes("host0").size()); + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org