Author: cutting Date: Tue Jan 16 12:11:42 2007 New Revision: 496845 URL: http://svn.apache.org/viewvc?view=rev&rev=496845 Log: HADOOP-803. Reduce memory footprint of HDFS namenode by replacing the TreeSet of block locations with an ArrayList. Contributed by Raghu.
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=496845&r1=496844&r2=496845 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Tue Jan 16 12:11:42 2007 @@ -17,6 +17,10 @@ 4. HADOOP-757. Fix "Bad File Descriptor" exception in HDFS client when an output file is closed twice. (Raghu Angadi via cutting) + 5. HADOOP-803. Reduce memory footprint of HDFS namenode by replacing + the TreeSet of block locations with an ArrayList. + (Raghu Angadi via cutting) + Release 0.10.1 - 2007-01-10 Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java?view=diff&rev=496845&r1=496844&r2=496845 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java Tue Jan 16 12:11:42 2007 @@ -121,18 +121,11 @@ // Comparable ///////////////////////////////////// public int compareTo(Object o) { - Block b = (Block) o; - if (getBlockId() < b.getBlockId()) { - return -1; - } else if (getBlockId() == b.getBlockId()) { - return 0; - } else { - return 1; - } + long diff = getBlockId() - ((Block)o).getBlockId(); + return ( diff < 0 ) ? -1 : ( ( diff > 0 ) ? 1 : 0 ); } public boolean equals(Object o) { - Block b = (Block) o; - return (this.compareTo(b) == 0); + return (this.compareTo(o) == 0); } public int hashCode() { Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java?view=diff&rev=496845&r1=496844&r2=496845 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java Tue Jan 16 12:11:42 2007 @@ -34,7 +34,7 @@ **************************************************/ class DatanodeDescriptor extends DatanodeInfo { - private volatile Collection<Block> blocks = new TreeSet<Block>(); + private volatile SortedMap<Block, Block> blocks = new TreeMap<Block, Block>(); // isAlive == heartbeats.contains(this) // This is an optimization, because contains takes O(n) time on Arraylist protected boolean isAlive = false; @@ -60,17 +60,12 @@ /** */ - void updateBlocks(Block newBlocks[]) { - blocks.clear(); - for (int i = 0; i < newBlocks.length; i++) { - blocks.add(newBlocks[i]); - } - } - - /** - */ void addBlock(Block b) { - blocks.add(b); + blocks.put(b, b); + } + + void removeBlock(Block b) { + blocks.remove(b); } void resetBlocks() { @@ -94,10 +89,14 @@ } Block[] getBlocks() { - return (Block[]) blocks.toArray(new Block[blocks.size()]); + return blocks.keySet().toArray(new Block[blocks.size()]); } Iterator<Block> getBlockIterator() { - return blocks.iterator(); + return blocks.keySet().iterator(); + } + + Block getBlock(long blockId) { + return blocks.get( new Block(blockId, 0) ); } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?view=diff&rev=496845&r1=496844&r2=496845 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Tue Jan 16 12:11:42 2007 @@ -59,8 +59,8 @@ // to client-sent information. // Mapping: Block -> TreeSet<DatanodeDescriptor> // - Map<Block, SortedSet<DatanodeDescriptor>> blocksMap = - new HashMap<Block, SortedSet<DatanodeDescriptor>>(); + Map<Block, List<DatanodeDescriptor>> blocksMap = + new HashMap<Block, List<DatanodeDescriptor>>(); /** * Stores the datanode -> block map. @@ -179,6 +179,8 @@ private int maxReplicationStreams; // MIN_REPLICATION is how many copies we need in place or else we disallow the write private int minReplication; + // Default replication + private int defaultReplication; // heartbeatRecheckInterval is how often namenode checks for expired datanodes private long heartbeatRecheckInterval; // heartbeatExpireInterval is how long namenode waits for datanode to report @@ -199,6 +201,7 @@ int port, NameNode nn, Configuration conf) throws IOException { fsNamesystemObject = this; + this.defaultReplication = conf.getInt("dfs.replication", 3); this.maxReplication = conf.getInt("dfs.replication.max", 512); this.minReplication = conf.getInt("dfs.replication.min", 1); if( minReplication <= 0 ) @@ -299,7 +302,7 @@ DatanodeDescriptor machineSets[][] = new DatanodeDescriptor[blocks.length][]; for (int i = 0; i < blocks.length; i++) { - SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(blocks[i]); + List<DatanodeDescriptor> containingNodes = blocksMap.get(blocks[i]); if (containingNodes == null) { machineSets[i] = new DatanodeDescriptor[0]; } else { @@ -660,22 +663,16 @@ // // We have the pending blocks, but they won't have // length info in them (as they were allocated before - // data-write took place). So we need to add the correct - // length info to each - // - // REMIND - mjc - this is very inefficient! We should - // improve this! + // data-write took place). Find the block stored in + // node descriptor. // for (int i = 0; i < nrBlocks; i++) { Block b = pendingBlocks[i]; - SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(b); - DatanodeDescriptor node = containingNodes.first(); - for (Iterator<Block> it = node.getBlockIterator(); it.hasNext(); ) { - Block cur = it.next(); - if (b.getBlockId() == cur.getBlockId()) { - b.setNumBytes(cur.getNumBytes()); - break; - } + List<DatanodeDescriptor> containingNodes = blocksMap.get(b); + Block storedBlock = + containingNodes.get(0).getBlock(b.getBlockId()); + if ( storedBlock != null ) { + pendingBlocks[i] = storedBlock; } } @@ -716,7 +713,7 @@ // Now that the file is real, we need to be sure to replicate // the blocks. for (int i = 0; i < nrBlocks; i++) { - SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(pendingBlocks[i]); + List<DatanodeDescriptor> containingNodes = blocksMap.get(pendingBlocks[i]); // filter out containingNodes that are marked for decommission. int numCurrentReplica = countContainingNodes(containingNodes); @@ -761,7 +758,7 @@ for (Iterator<Block> it = v.getBlocks().iterator(); it.hasNext(); ) { Block b = it.next(); - SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(b); + List<DatanodeDescriptor> containingNodes = blocksMap.get(b); if (containingNodes == null || containingNodes.size() < this.minReplication) { return false; } @@ -806,7 +803,7 @@ for (int i = 0; i < deletedBlocks.length; i++) { Block b = deletedBlocks[i]; - SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(b); + List<DatanodeDescriptor> containingNodes = blocksMap.get(b); if (containingNodes != null) { for (Iterator<DatanodeDescriptor> it = containingNodes.iterator(); it.hasNext(); ) { DatanodeDescriptor node = it.next(); @@ -935,7 +932,7 @@ } else { String hosts[][] = new String[(endBlock - startBlock) + 1][]; for (int i = startBlock; i <= endBlock; i++) { - SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(blocks[i]); + List<DatanodeDescriptor> containingNodes = blocksMap.get(blocks[i]); Collection<String> v = new ArrayList<String>(); if (containingNodes != null) { for (Iterator<DatanodeDescriptor> it =containingNodes.iterator(); it.hasNext();) { @@ -1494,12 +1491,16 @@ // between the old and new block report. // int newPos = 0; - boolean modified = false; Iterator<Block> iter = node.getBlockIterator(); Block oldblk = iter.hasNext() ? iter.next() : null; Block newblk = (newReport != null && newReport.length > 0) ? newReport[0] : null; + // common case is that most of the blocks from the datanode + // matches blocks in datanode descriptor. + Collection<Block> toRemove = new LinkedList<Block>(); + Collection<Block> toAdd = new LinkedList<Block>(); + while (oldblk != null || newblk != null) { int cmp = (oldblk == null) ? 1 : @@ -1513,25 +1514,25 @@ ? newReport[newPos] : null; } else if (cmp < 0) { // The old report has a block the new one does not + toRemove.add(oldblk); removeStoredBlock(oldblk, node); - modified = true; oldblk = iter.hasNext() ? iter.next() : null; } else { // The new report has a block the old one does not - addStoredBlock(newblk, node); - modified = true; + toAdd.add(addStoredBlock(newblk, node)); newPos++; newblk = (newPos < newReport.length) ? newReport[newPos] : null; } } - // - // Modify node so it has the new blockreport - // - if (modified) { - node.updateBlocks(newReport); + + for ( Iterator<Block> i = toRemove.iterator(); i.hasNext(); ) { + node.removeBlock( i.next() ); } - + for ( Iterator<Block> i = toAdd.iterator(); i.hasNext(); ) { + node.addBlock( i.next() ); + } + // // We've now completely updated the node's block report profile. // We now go through all its blocks and find which ones are invalid, @@ -1560,12 +1561,25 @@ /** * Modify (block-->datanode) map. Remove block from set of * needed replications if this takes care of the problem. + * @return the block that is stored in blockMap. */ - synchronized void addStoredBlock(Block block, DatanodeDescriptor node) { - SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(block); + synchronized Block addStoredBlock(Block block, DatanodeDescriptor node) { + List<DatanodeDescriptor> containingNodes = blocksMap.get(block); if (containingNodes == null) { - containingNodes = new TreeSet<DatanodeDescriptor>(); + //Create an arraylist with the current replication factor + FSDirectory.INode inode = dir.getFileByBlock(block); + int replication = (inode != null) ? + inode.getReplication() : defaultReplication; + containingNodes = new ArrayList<DatanodeDescriptor>(replication); blocksMap.put(block, containingNodes); + } else { + Block storedBlock = + containingNodes.get(0).getBlock(block.getBlockId()); + // update stored block's length. + if ( block.getNumBytes() > 0 ) { + storedBlock.setNumBytes( block.getNumBytes() ); + } + block = storedBlock; } if (! containingNodes.contains(node)) { containingNodes.add(node); @@ -1587,7 +1601,7 @@ synchronized (neededReplications) { FSDirectory.INode fileINode = dir.getFileByBlock(block); if( fileINode == null ) // block does not belong to any file - return; + return block; // filter out containingNodes that are marked for decommission. int numCurrentReplica = countContainingNodes(containingNodes); @@ -1612,6 +1626,7 @@ proccessOverReplicatedBlock( block, fileReplication ); } + return block; } /** @@ -1620,7 +1635,7 @@ * mark them in the excessReplicateMap. */ private void proccessOverReplicatedBlock( Block block, short replication ) { - SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(block); + List<DatanodeDescriptor> containingNodes = blocksMap.get(block); if( containingNodes == null ) return; Collection<DatanodeDescriptor> nonExcess = new ArrayList<DatanodeDescriptor>(); @@ -1700,7 +1715,7 @@ synchronized void removeStoredBlock(Block block, DatanodeDescriptor node) { NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: " +block.getBlockName() + " from "+node.getName() ); - SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(block); + List<DatanodeDescriptor> containingNodes = blocksMap.get(block); if (containingNodes == null || ! containingNodes.contains(node)) { NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: " +block.getBlockName()+" has already been removed from node "+node ); @@ -1759,14 +1774,9 @@ NameNode.stateChangeLog.debug("BLOCK* NameSystem.blockReceived: " +block.getBlockName()+" is received from " + nodeID.getName() ); // - // Modify the blocks->datanode map + // Modify the blocks->datanode map and node's map. // - addStoredBlock(block, node); - - // - // Supplement node's blockreport - // - node.addBlock(block); + node.addBlock( addStoredBlock(block, node) ); } /**