Author: cutting Date: Thu Jan 18 11:19:19 2007 New Revision: 497541 URL: http://svn.apache.org/viewvc?view=rev&rev=497541 Log: HADOOP-898. Revert HADOOP-803, since it was causing problems (svn merge -r 496845:496844, plus one dependent change). Contributed by Nigel.
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=497541&r1=497540&r2=497541 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Thu Jan 18 11:19:19 2007 @@ -17,9 +17,7 @@ 4. HADOOP-757. Fix "Bad File Descriptor" exception in HDFS client when an output file is closed twice. (Raghu Angadi via cutting) - 5. HADOOP-803. Reduce memory footprint of HDFS namenode by replacing - the TreeSet of block locations with an ArrayList. - (Raghu Angadi via cutting) + 5. [ intentionally blank ] 6. HADOOP-890. Replace dashes in metric names with underscores, for better compatibility with some monitoring systems. Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java?view=diff&rev=497541&r1=497540&r2=497541 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java Thu Jan 18 11:19:19 2007 @@ -121,11 +121,18 @@ // Comparable ///////////////////////////////////// public int compareTo(Object o) { - long diff = getBlockId() - ((Block)o).getBlockId(); - return ( diff < 0 ) ? -1 : ( ( diff > 0 ) ? 1 : 0 ); + Block b = (Block) o; + if (getBlockId() < b.getBlockId()) { + return -1; + } else if (getBlockId() == b.getBlockId()) { + return 0; + } else { + return 1; + } } public boolean equals(Object o) { - return (this.compareTo(o) == 0); + Block b = (Block) o; + return (this.compareTo(b) == 0); } public int hashCode() { Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java?view=diff&rev=497541&r1=497540&r2=497541 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java Thu Jan 18 11:19:19 2007 @@ -34,7 +34,7 @@ **************************************************/ class DatanodeDescriptor extends DatanodeInfo { - private volatile SortedMap<Block, Block> blocks = new TreeMap<Block, Block>(); + private volatile Collection<Block> blocks = new TreeSet<Block>(); // isAlive == heartbeats.contains(this) // This is an optimization, because contains takes O(n) time on Arraylist protected boolean isAlive = false; @@ -60,12 +60,17 @@ /** */ - void addBlock(Block b) { - blocks.put(b, b); + void updateBlocks(Block newBlocks[]) { + blocks.clear(); + for (int i = 0; i < newBlocks.length; i++) { + blocks.add(newBlocks[i]); + } } - - void removeBlock(Block b) { - blocks.remove(b); + + /** + */ + void addBlock(Block b) { + blocks.add(b); } void resetBlocks() { @@ -89,14 +94,10 @@ } Block[] getBlocks() { - return blocks.keySet().toArray(new Block[blocks.size()]); + return (Block[]) blocks.toArray(new Block[blocks.size()]); } Iterator<Block> getBlockIterator() { - return blocks.keySet().iterator(); - } - - Block getBlock(long blockId) { - return blocks.get( new Block(blockId, 0) ); + return blocks.iterator(); } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?view=diff&rev=497541&r1=497540&r2=497541 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Thu Jan 18 11:19:19 2007 @@ -59,8 +59,8 @@ // to client-sent information. // Mapping: Block -> TreeSet<DatanodeDescriptor> // - Map<Block, List<DatanodeDescriptor>> blocksMap = - new HashMap<Block, List<DatanodeDescriptor>>(); + Map<Block, SortedSet<DatanodeDescriptor>> blocksMap = + new HashMap<Block, SortedSet<DatanodeDescriptor>>(); /** * Stores the datanode -> block map. @@ -179,8 +179,6 @@ private int maxReplicationStreams; // MIN_REPLICATION is how many copies we need in place or else we disallow the write private int minReplication; - // Default replication - private int defaultReplication; // heartbeatRecheckInterval is how often namenode checks for expired datanodes private long heartbeatRecheckInterval; // heartbeatExpireInterval is how long namenode waits for datanode to report @@ -201,7 +199,6 @@ int port, NameNode nn, Configuration conf) throws IOException { fsNamesystemObject = this; - this.defaultReplication = conf.getInt("dfs.replication", 3); this.maxReplication = conf.getInt("dfs.replication.max", 512); this.minReplication = conf.getInt("dfs.replication.min", 1); if( minReplication <= 0 ) @@ -302,7 +299,7 @@ DatanodeDescriptor machineSets[][] = new DatanodeDescriptor[blocks.length][]; for (int i = 0; i < blocks.length; i++) { - List<DatanodeDescriptor> containingNodes = blocksMap.get(blocks[i]); + SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(blocks[i]); if (containingNodes == null) { machineSets[i] = new DatanodeDescriptor[0]; } else { @@ -663,16 +660,22 @@ // // We have the pending blocks, but they won't have // length info in them (as they were allocated before - // data-write took place). Find the block stored in - // node descriptor. + // data-write took place). So we need to add the correct + // length info to each + // + // REMIND - mjc - this is very inefficient! We should + // improve this! // for (int i = 0; i < nrBlocks; i++) { Block b = pendingBlocks[i]; - List<DatanodeDescriptor> containingNodes = blocksMap.get(b); - Block storedBlock = - containingNodes.get(0).getBlock(b.getBlockId()); - if ( storedBlock != null ) { - pendingBlocks[i] = storedBlock; + SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(b); + DatanodeDescriptor node = containingNodes.first(); + for (Iterator<Block> it = node.getBlockIterator(); it.hasNext(); ) { + Block cur = it.next(); + if (b.getBlockId() == cur.getBlockId()) { + b.setNumBytes(cur.getNumBytes()); + break; + } } } @@ -713,7 +716,7 @@ // Now that the file is real, we need to be sure to replicate // the blocks. for (int i = 0; i < nrBlocks; i++) { - List<DatanodeDescriptor> containingNodes = blocksMap.get(pendingBlocks[i]); + SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(pendingBlocks[i]); // filter out containingNodes that are marked for decommission. int numCurrentReplica = countContainingNodes(containingNodes); @@ -758,7 +761,7 @@ for (Iterator<Block> it = v.getBlocks().iterator(); it.hasNext(); ) { Block b = it.next(); - List<DatanodeDescriptor> containingNodes = blocksMap.get(b); + SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(b); if (containingNodes == null || containingNodes.size() < this.minReplication) { return false; } @@ -791,7 +794,7 @@ throw new SafeModeException("Cannot invalidate block " + blk.getBlockName(), safeMode); } - List<DatanodeDescriptor> containingNodes = blocksMap.get(blk); + Collection<DatanodeDescriptor> containingNodes = blocksMap.get(blk); // Check how many copies we have of the block. If we have at least one // copy on a live node, then we can delete it. @@ -849,7 +852,7 @@ for (int i = 0; i < deletedBlocks.length; i++) { Block b = deletedBlocks[i]; - List<DatanodeDescriptor> containingNodes = blocksMap.get(b); + SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(b); if (containingNodes != null) { for (Iterator<DatanodeDescriptor> it = containingNodes.iterator(); it.hasNext(); ) { DatanodeDescriptor node = it.next(); @@ -973,7 +976,7 @@ } else { String hosts[][] = new String[(endBlock - startBlock) + 1][]; for (int i = startBlock; i <= endBlock; i++) { - List<DatanodeDescriptor> containingNodes = blocksMap.get(blocks[i]); + SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(blocks[i]); Collection<String> v = new ArrayList<String>(); if (containingNodes != null) { for (Iterator<DatanodeDescriptor> it =containingNodes.iterator(); it.hasNext();) { @@ -1532,16 +1535,12 @@ // between the old and new block report. // int newPos = 0; + boolean modified = false; Iterator<Block> iter = node.getBlockIterator(); Block oldblk = iter.hasNext() ? iter.next() : null; Block newblk = (newReport != null && newReport.length > 0) ? newReport[0] : null; - // common case is that most of the blocks from the datanode - // matches blocks in datanode descriptor. - Collection<Block> toRemove = new LinkedList<Block>(); - Collection<Block> toAdd = new LinkedList<Block>(); - while (oldblk != null || newblk != null) { int cmp = (oldblk == null) ? 1 : @@ -1555,25 +1554,25 @@ ? newReport[newPos] : null; } else if (cmp < 0) { // The old report has a block the new one does not - toRemove.add(oldblk); removeStoredBlock(oldblk, node); + modified = true; oldblk = iter.hasNext() ? iter.next() : null; } else { // The new report has a block the old one does not - toAdd.add(addStoredBlock(newblk, node)); + addStoredBlock(newblk, node); + modified = true; newPos++; newblk = (newPos < newReport.length) ? newReport[newPos] : null; } } - - for ( Iterator<Block> i = toRemove.iterator(); i.hasNext(); ) { - node.removeBlock( i.next() ); - } - for ( Iterator<Block> i = toAdd.iterator(); i.hasNext(); ) { - node.addBlock( i.next() ); + // + // Modify node so it has the new blockreport + // + if (modified) { + node.updateBlocks(newReport); } - + // // We've now completely updated the node's block report profile. // We now go through all its blocks and find which ones are invalid, @@ -1602,25 +1601,12 @@ /** * Modify (block-->datanode) map. Remove block from set of * needed replications if this takes care of the problem. - * @return the block that is stored in blockMap. */ - synchronized Block addStoredBlock(Block block, DatanodeDescriptor node) { - List<DatanodeDescriptor> containingNodes = blocksMap.get(block); + synchronized void addStoredBlock(Block block, DatanodeDescriptor node) { + SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(block); if (containingNodes == null) { - //Create an arraylist with the current replication factor - FSDirectory.INode inode = dir.getFileByBlock(block); - int replication = (inode != null) ? - inode.getReplication() : defaultReplication; - containingNodes = new ArrayList<DatanodeDescriptor>(replication); + containingNodes = new TreeSet<DatanodeDescriptor>(); blocksMap.put(block, containingNodes); - } else { - Block storedBlock = - containingNodes.get(0).getBlock(block.getBlockId()); - // update stored block's length. - if ( block.getNumBytes() > 0 ) { - storedBlock.setNumBytes( block.getNumBytes() ); - } - block = storedBlock; } if (! containingNodes.contains(node)) { containingNodes.add(node); @@ -1642,7 +1628,7 @@ synchronized (neededReplications) { FSDirectory.INode fileINode = dir.getFileByBlock(block); if( fileINode == null ) // block does not belong to any file - return block; + return; // filter out containingNodes that are marked for decommission. int numCurrentReplica = countContainingNodes(containingNodes); @@ -1667,7 +1653,6 @@ proccessOverReplicatedBlock( block, fileReplication ); } - return block; } /** @@ -1676,7 +1661,7 @@ * mark them in the excessReplicateMap. */ private void proccessOverReplicatedBlock( Block block, short replication ) { - List<DatanodeDescriptor> containingNodes = blocksMap.get(block); + SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(block); if( containingNodes == null ) return; Collection<DatanodeDescriptor> nonExcess = new ArrayList<DatanodeDescriptor>(); @@ -1756,7 +1741,7 @@ synchronized void removeStoredBlock(Block block, DatanodeDescriptor node) { NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: " +block.getBlockName() + " from "+node.getName() ); - List<DatanodeDescriptor> containingNodes = blocksMap.get(block); + SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(block); if (containingNodes == null || ! containingNodes.contains(node)) { NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: " +block.getBlockName()+" has already been removed from node "+node ); @@ -1815,9 +1800,14 @@ NameNode.stateChangeLog.debug("BLOCK* NameSystem.blockReceived: " +block.getBlockName()+" is received from " + nodeID.getName() ); // - // Modify the blocks->datanode map and node's map. + // Modify the blocks->datanode map // - node.addBlock( addStoredBlock(block, node) ); + addStoredBlock(block, node); + + // + // Supplement node's blockreport + // + node.addBlock(block); } /**