Author: cutting Date: Fri Feb 16 13:37:03 2007 New Revision: 508595 URL: http://svn.apache.org/viewvc?view=rev&rev=508595 Log: HADOOP-803. Reduce memory use by HDFS namenode, phase I. Contributed by Raghu.
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=508595&r1=508594&r2=508595 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Fri Feb 16 13:37:03 2007 @@ -56,6 +56,9 @@ 16. HADOOP-649. Fix so that jobs with no tasks are not lost. (Thomas Friol via cutting) +17. HADOOP-803. Reduce memory use by HDFS namenode, phase I. + (Raghu Angadi via cutting) + Branch 0.11 (unreleased) Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java?view=diff&rev=508595&r1=508594&r2=508595 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java Fri Feb 16 13:37:03 2007 @@ -122,17 +122,16 @@ ///////////////////////////////////// public int compareTo(Object o) { Block b = (Block) o; - if (getBlockId() < b.getBlockId()) { + if ( blkid < b.blkid ) { return -1; - } else if (getBlockId() == b.getBlockId()) { + } else if ( blkid == b.blkid ) { return 0; } else { return 1; } } public boolean equals(Object o) { - Block b = (Block) o; - return (this.compareTo(b) == 0); + return (this.compareTo(o) == 0); } public int hashCode() { Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java?view=diff&rev=508595&r1=508594&r2=508595 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java Fri Feb 16 13:37:03 2007 @@ -37,7 +37,7 @@ **************************************************/ public class DatanodeDescriptor extends DatanodeInfo { - private volatile Collection<Block> blocks = new TreeSet<Block>(); + private volatile SortedMap<Block, Block> blocks = new TreeMap<Block, Block>(); // isAlive == heartbeats.contains(this) // This is an optimization, because contains takes O(n) time on Arraylist protected boolean isAlive = false; @@ -118,17 +118,12 @@ /** */ - void updateBlocks(Block newBlocks[]) { - blocks.clear(); - for (int i = 0; i < newBlocks.length; i++) { - blocks.add(newBlocks[i]); - } - } - - /** - */ void addBlock(Block b) { - blocks.add(b); + blocks.put(b, b); + } + + void removeBlock(Block b) { + blocks.remove(b); } void resetBlocks() { @@ -152,13 +147,21 @@ } Block[] getBlocks() { - return (Block[]) blocks.toArray(new Block[blocks.size()]); + return (Block[]) blocks.keySet().toArray(new Block[blocks.size()]); } Iterator<Block> getBlockIterator() { - return blocks.iterator(); + return blocks.keySet().iterator(); } - + + Block getBlock(long blockId) { + return blocks.get( new Block(blockId, 0) ); + } + + Block getBlock(Block b) { + return blocks.get(b); + } + /* * Store block replication work. */ Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java?view=diff&rev=508595&r1=508594&r2=508595 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java Fri Feb 16 13:37:03 2007 @@ -49,7 +49,7 @@ class INode { private String name; private INode parent; - private TreeMap children = new TreeMap(); + private TreeMap<String, INode> children = null; private Block blocks[]; private short blockReplication; @@ -111,11 +111,19 @@ } /** - * Get children - * @return TreeMap of children + * Get children iterator + * @return Iterator of children */ - TreeMap getChildren() { - return this.children; + Iterator<INode> getChildIterator() { + return ( children != null ) ? children.values().iterator() : null; + // instead of null, we could return a static empty iterator. + } + + void addChild(String name, INode node) { + if ( children == null ) { + children = new TreeMap<String, INode>(); + } + children.put(name, node); } /** @@ -162,7 +170,7 @@ } INode getChild( String name) { - return (INode) children.get( name ); + return (children == null) ? null : children.get( name ); } /** @@ -197,7 +205,7 @@ return null; } // insert into the parent children list - parentNode.children.put(name, newNode); + parentNode.addChild(name, newNode); newNode.parent = parentNode; return newNode; } @@ -225,9 +233,9 @@ } } incrDeletedFileCount(); - for (Iterator it = children.values().iterator(); it.hasNext(); ) { - INode child = (INode) it.next(); - child.collectSubtreeBlocks(v); + for (Iterator<INode> it = getChildIterator(); it != null && + it.hasNext(); ) { + it.next().collectSubtreeBlocks(v); } } @@ -235,9 +243,9 @@ */ int numItemsInTree() { int total = 0; - for (Iterator it = children.values().iterator(); it.hasNext(); ) { - INode child = (INode) it.next(); - total += child.numItemsInTree(); + for (Iterator<INode> it = getChildIterator(); it != null && + it.hasNext(); ) { + total += it.next().numItemsInTree(); } return total + 1; } @@ -268,9 +276,9 @@ */ long computeContentsLength() { long total = computeFileLength(); - for (Iterator it = children.values().iterator(); it.hasNext(); ) { - INode child = (INode) it.next(); - total += child.computeContentsLength(); + for (Iterator<INode> it = getChildIterator(); it != null && + it.hasNext(); ) { + total += it.next().computeContentsLength(); } return total; } @@ -294,9 +302,9 @@ v.add(this); } - for (Iterator it = children.values().iterator(); it.hasNext(); ) { - INode child = (INode) it.next(); - v.add(child); + for (Iterator<INode> it = getChildIterator(); it != null && + it.hasNext(); ) { + v.add(it.next()); } } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java?view=diff&rev=508595&r1=508594&r2=508595 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java Fri Feb 16 13:37:03 2007 @@ -405,9 +405,9 @@ root.getBlocks()[i].write(out); } } - for(Iterator it = root.getChildren().values().iterator(); it.hasNext(); ) { - INode child = (INode) it.next(); - saveImage( fullName, child, out ); + for(Iterator<INode> it = root.getChildIterator(); it != null && + it.hasNext(); ) { + saveImage( fullName, it.next(), out ); } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?view=diff&rev=508595&r1=508594&r2=508595 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Fri Feb 16 13:37:03 2007 @@ -61,8 +61,8 @@ // to client-sent information. // Mapping: Block -> TreeSet<DatanodeDescriptor> // - Map<Block, SortedSet<DatanodeDescriptor>> blocksMap = - new HashMap<Block, SortedSet<DatanodeDescriptor>>(); + Map<Block, List<DatanodeDescriptor>> blocksMap = + new HashMap<Block, List<DatanodeDescriptor>>(); /** * Stores the datanode -> block map. @@ -182,6 +182,8 @@ private int maxReplicationStreams; // MIN_REPLICATION is how many copies we need in place or else we disallow the write private int minReplication; + // Default replication + private int defaultReplication; // heartbeatRecheckInterval is how often namenode checks for expired datanodes private long heartbeatRecheckInterval; // heartbeatExpireInterval is how long namenode waits for datanode to report @@ -211,6 +213,7 @@ int port, NameNode nn, Configuration conf) throws IOException { fsNamesystemObject = this; + this.defaultReplication = conf.getInt("dfs.replication", 3); this.maxReplication = conf.getInt("dfs.replication.max", 512); this.minReplication = conf.getInt("dfs.replication.min", 1); if( minReplication <= 0 ) @@ -524,7 +527,7 @@ DatanodeDescriptor machineSets[][] = new DatanodeDescriptor[blocks.length][]; for (int i = 0; i < blocks.length; i++) { - SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(blocks[i]); + Collection<DatanodeDescriptor> containingNodes = blocksMap.get(blocks[i]); if (containingNodes == null) { machineSets[i] = new DatanodeDescriptor[0]; } else { @@ -889,22 +892,16 @@ // // We have the pending blocks, but they won't have // length info in them (as they were allocated before - // data-write took place). So we need to add the correct - // length info to each - // - // REMIND - mjc - this is very inefficient! We should - // improve this! + // data-write took place). Find the block stored in + // node descriptor. // for (int i = 0; i < nrBlocks; i++) { Block b = pendingBlocks[i]; - SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(b); - DatanodeDescriptor node = containingNodes.first(); - for (Iterator<Block> it = node.getBlockIterator(); it.hasNext(); ) { - Block cur = it.next(); - if (b.getBlockId() == cur.getBlockId()) { - b.setNumBytes(cur.getNumBytes()); - break; - } + List<DatanodeDescriptor> containingNodes = blocksMap.get(b); + Block storedBlock = + containingNodes.get(0).getBlock(b); + if ( storedBlock != null ) { + pendingBlocks[i] = storedBlock; } } @@ -946,7 +943,7 @@ // the blocks. int numExpectedReplicas = pendingFile.getReplication(); for (int i = 0; i < nrBlocks; i++) { - SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(pendingBlocks[i]); + Collection<DatanodeDescriptor> containingNodes = blocksMap.get(pendingBlocks[i]); // filter out containingNodes that are marked for decommission. int numCurrentReplica = countContainingNodes(containingNodes); @@ -986,7 +983,7 @@ for (Iterator<Block> it = v.getBlocks().iterator(); it.hasNext(); ) { Block b = it.next(); - SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(b); + Collection<DatanodeDescriptor> containingNodes = blocksMap.get(b); if (containingNodes == null || containingNodes.size() < this.minReplication) { return false; } @@ -1077,7 +1074,7 @@ for (int i = 0; i < deletedBlocks.length; i++) { Block b = deletedBlocks[i]; - SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(b); + Collection<DatanodeDescriptor> containingNodes = blocksMap.get(b); if (containingNodes != null) { for (Iterator<DatanodeDescriptor> it = containingNodes.iterator(); it.hasNext(); ) { DatanodeDescriptor node = it.next(); @@ -1201,7 +1198,7 @@ } else { String hosts[][] = new String[(endBlock - startBlock) + 1][]; for (int i = startBlock; i <= endBlock; i++) { - SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(blocks[i]); + Collection<DatanodeDescriptor> containingNodes = blocksMap.get(blocks[i]); Collection<String> v = new ArrayList<String>(); if (containingNodes != null) { for (Iterator<DatanodeDescriptor> it =containingNodes.iterator(); it.hasNext();) { @@ -1924,12 +1921,16 @@ // between the old and new block report. // int newPos = 0; - boolean modified = false; Iterator<Block> iter = node.getBlockIterator(); Block oldblk = iter.hasNext() ? iter.next() : null; Block newblk = (newReport != null && newReport.length > 0) ? newReport[0] : null; + // common case is that most of the blocks from the datanode + // matches blocks in datanode descriptor. + Collection<Block> toRemove = new LinkedList<Block>(); + Collection<Block> toAdd = new LinkedList<Block>(); + while (oldblk != null || newblk != null) { int cmp = (oldblk == null) ? 1 : @@ -1943,25 +1944,27 @@ ? newReport[newPos] : null; } else if (cmp < 0) { // The old report has a block the new one does not - removeStoredBlock(oldblk, node); - modified = true; + toRemove.add(oldblk); oldblk = iter.hasNext() ? iter.next() : null; } else { // The new report has a block the old one does not - addStoredBlock(newblk, node); - modified = true; + toAdd.add(newblk); newPos++; newblk = (newPos < newReport.length) ? newReport[newPos] : null; } } - // - // Modify node so it has the new blockreport - // - if (modified) { - node.updateBlocks(newReport); + + for ( Iterator<Block> i = toRemove.iterator(); i.hasNext(); ) { + Block b = i.next(); + removeStoredBlock( b, node ); + node.removeBlock( b ); + } + for ( Iterator<Block> i = toAdd.iterator(); i.hasNext(); ) { + Block b = i.next(); + node.addBlock( addStoredBlock(b, node) ); } - + // // We've now completely updated the node's block report profile. // We now go through all its blocks and find which ones are invalid, @@ -1990,12 +1993,27 @@ /** * Modify (block-->datanode) map. Remove block from set of * needed replications if this takes care of the problem. + * @return the block that is stored in blockMap. */ - synchronized void addStoredBlock(Block block, DatanodeDescriptor node) { - SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(block); + synchronized Block addStoredBlock(Block block, DatanodeDescriptor node) { + List<DatanodeDescriptor> containingNodes = blocksMap.get(block); if (containingNodes == null) { - containingNodes = new TreeSet<DatanodeDescriptor>(); + //Create an arraylist with the current replication factor + FSDirectory.INode inode = dir.getFileByBlock(block); + int replication = (inode != null) ? + inode.getReplication() : defaultReplication; + containingNodes = new ArrayList<DatanodeDescriptor>(replication); blocksMap.put(block, containingNodes); + } else { + Block storedBlock = + containingNodes.get(0).getBlock(block); + // update stored block's length. + if ( storedBlock != null ) { + if ( block.getNumBytes() > 0 ) { + storedBlock.setNumBytes( block.getNumBytes() ); + } + block = storedBlock; + } } int curReplicaDelta = 0; if (! containingNodes.contains(node)) { @@ -2018,7 +2036,7 @@ FSDirectory.INode fileINode = dir.getFileByBlock(block); if( fileINode == null ) // block does not belong to any file - return; + return block; // filter out containingNodes that are marked for decommission. int numCurrentReplica = countContainingNodes(containingNodes); @@ -2036,6 +2054,7 @@ pendingReplications.remove(block); } proccessOverReplicatedBlock( block, fileReplication ); + return block; } /** @@ -2044,7 +2063,7 @@ * mark them in the excessReplicateMap. */ private void proccessOverReplicatedBlock( Block block, short replication ) { - SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(block); + Collection<DatanodeDescriptor> containingNodes = blocksMap.get(block); if( containingNodes == null ) return; Collection<DatanodeDescriptor> nonExcess = new ArrayList<DatanodeDescriptor>(); @@ -2124,7 +2143,7 @@ synchronized void removeStoredBlock(Block block, DatanodeDescriptor node) { NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: " +block.getBlockName() + " from "+node.getName() ); - SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(block); + Collection<DatanodeDescriptor> containingNodes = blocksMap.get(block); if (containingNodes == null || ! containingNodes.contains(node)) { NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: " +block.getBlockName()+" has already been removed from node "+node ); @@ -2182,14 +2201,9 @@ NameNode.stateChangeLog.debug("BLOCK* NameSystem.blockReceived: " +block.getBlockName()+" is received from " + nodeID.getName() ); // - // Modify the blocks->datanode map + // Modify the blocks->datanode map and node's map. // - addStoredBlock(block, node); - - // - // Supplement node's blockreport - // - node.addBlock(block); + node.addBlock( addStoredBlock(block, node) ); } /**