Author: cutting Date: Thu Jan 25 12:58:54 2007 New Revision: 499967 URL: http://svn.apache.org/viewvc?view=rev&rev=499967 Log: HADOOP-659. In HDFS, prioritize replication of blocks based on their current replication level. Contributed by Hairong.
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=499967&r1=499966&r2=499967 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Thu Jan 25 12:58:54 2007 @@ -102,6 +102,11 @@ 13. HADOOP-879. Fix InputFormatBase to handle output generated by MapFileOutputFormat. (cutting) +14. HADOOP-659. In HDFS, prioritize replication of blocks based on + current replication level. Blocks which are severely + under-replicated should be further replicated before blocks which + are less under-replicated. (Hairong Kuang via cutting) + Release 0.10.0 - 2007-01-05 Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?view=diff&rev=499967&r1=499966&r2=499967 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Thu Jan 25 12:58:54 2007 @@ -152,7 +152,7 @@ // We also store pending replication-orders. // Set of: Block // - private Collection<Block> neededReplications = new TreeSet<Block>(); + private UnderReplicationBlocks neededReplications = new UnderReplicationBlocks(); private Collection<Block> pendingReplications = new TreeSet<Block>(); // @@ -277,7 +277,189 @@ } } } + + /* get replication factor of a block */ + private int getReplication( Block block ) { + FSDirectory.INode fileINode = dir.getFileByBlock(block); + if( fileINode == null ) { // block does not belong to any file + return 0; + } else { + return fileINode.getReplication(); + } + } + /* Class for keeping track of under replication blocks + * Blocks have replication priority, with priority 0 indicating the highest + * Blocks have only one replicas has the highest + */ + private class UnderReplicationBlocks { + private static final int LEVEL = 3; + TreeSet<Block>[] priorityQueues = new TreeSet[LEVEL]; + + /* constructor */ + UnderReplicationBlocks() { + for(int i=0; i<LEVEL; i++) { + priorityQueues[i] = new TreeSet<Block>(); + } + } + + /* Return the total number of under replication blocks */ + synchronized int size() { + int size = 0; + for( int i=0; i<LEVEL; i++ ) { + size += priorityQueues[i].size(); + } + return size; + } + + /* Return the priority of a block + * @param block a under replication block + * @param curReplicas current number of replicas of the block + * @param expectedReplicas expected number of replicas of the block + */ + private int getPriority(Block block, + int curReplicas, int expectedReplicas) { + if (curReplicas>=expectedReplicas) { + return LEVEL; // no need to replicate + } else if(curReplicas==1) { + return 0; // highest priority + } else if(curReplicas*3<expectedReplicas) { + return 1; + } else { + return 2; + } + } + + /* add a block to a under replication queue according to its priority + * @param block a under replication block + * @param curReplicas current number of replicas of the block + * @param expectedReplicas expected number of replicas of the block + */ + synchronized boolean add( + Block block, int curReplicas, int expectedReplicas) { + if(expectedReplicas <= curReplicas) { + return false; + } + int priLevel = getPriority(block, curReplicas, expectedReplicas); + if( priorityQueues[priLevel].add(block) ) { + NameNode.stateChangeLog.debug( + "BLOCK* NameSystem.UnderReplicationBlock.add:" + + block.getBlockName() + + " has only "+curReplicas + + " replicas and need " + expectedReplicas + + " replicas so is added to neededReplications" + + " at priority level " + priLevel ); + return true; + } + return false; + } + + /* add a block to a under replication queue */ + synchronized boolean add(Block block) { + int curReplicas = countContainingNodes(blocksMap.get(block)); + int expectedReplicas = getReplication(block); + return add(block, curReplicas, expectedReplicas); + } + + /* remove a block from a under replication queue */ + synchronized boolean remove(Block block, + int oldReplicas, int oldExpectedReplicas) { + if(oldExpectedReplicas <= oldReplicas) { + return false; + } + int priLevel = getPriority(block, oldReplicas, oldExpectedReplicas); + return remove(block, priLevel); + } + + /* remove a block from a under replication queue given a priority*/ + private boolean remove(Block block, int priLevel ) { + if( priorityQueues[priLevel].remove(block) ) { + NameNode.stateChangeLog.debug( + "BLOCK* NameSystem.UnderReplicationBlock.remove: " + + "Removing block " + block.getBlockName() + + " from priority queue "+ priLevel ); + return true; + } else { + for(int i=0; i<LEVEL; i++) { + if( i!=priLevel && priorityQueues[i].remove(block) ) { + NameNode.stateChangeLog.debug( + "BLOCK* NameSystem.UnderReplicationBlock.remove: " + + "Removing block " + block.getBlockName() + + " from priority queue "+ i ); + return true; + } + } + } + return false; + } + + /* remove a block from a under replication queue */ + synchronized boolean remove(Block block) { + int curReplicas = countContainingNodes(blocksMap.get(block)); + int expectedReplicas = getReplication(block); + return remove(block, curReplicas, expectedReplicas); + } + + /* update the priority level of a block */ + synchronized void update(Block block, + int curReplicasDelta, int expectedReplicasDelta) { + int curReplicas = countContainingNodes(blocksMap.get(block)); + int curExpectedReplicas = getReplication(block); + int oldReplicas = curReplicas-curReplicasDelta; + int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta; + int curPri = getPriority(block, curReplicas, curExpectedReplicas); + int oldPri = getPriority(block, oldReplicas, oldExpectedReplicas); + if( oldPri != LEVEL && oldPri != curPri ) { + remove(block, oldPri); + } + if( curPri != LEVEL && oldPri != curPri + && priorityQueues[curPri].add(block)) { + NameNode.stateChangeLog.debug( + "BLOCK* NameSystem.UnderReplicationBlock.update:" + + block.getBlockName() + + " has only "+curReplicas + + " replicas and need " + curExpectedReplicas + + " replicas so is added to neededReplications" + + " at priority level " + curPri ); + } + } + + /* return a iterator of all the under replication blocks */ + synchronized Iterator<Block> iterator() { + return new Iterator<Block>() { + int level; + Iterator<Block>[] iterator = new Iterator[LEVEL]; + + { + level=0; + for(int i=0; i<LEVEL; i++) { + iterator[i] = priorityQueues[i].iterator(); + } + } + + private void update() { + while( level< LEVEL-1 && !iterator[level].hasNext() ) { + level++; + } + } + + public Block next() { + update(); + return iterator[level].next(); + } + + public boolean hasNext() { + update(); + return iterator[level].hasNext(); + } + + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + } + ///////////////////////////////////////////////////////// // // These methods are called by HadoopFS clients @@ -347,20 +529,18 @@ if( oldRepl == replication ) // the same replication return true; - synchronized( neededReplications ) { - if( oldRepl < replication ) { - // old replication < the new one; need to replicate - LOG.info("Increasing replication for file " + src - + ". New replication is " + replication ); - for( int idx = 0; idx < fileBlocks.length; idx++ ) - neededReplications.add( fileBlocks[idx] ); - } else { - // old replication > the new one; need to remove copies - LOG.info("Reducing replication for file " + src - + ". New replication is " + replication ); - for( int idx = 0; idx < fileBlocks.length; idx++ ) - proccessOverReplicatedBlock( fileBlocks[idx], replication ); - } + // update needReplication priority queues + LOG.info("Increasing replication for file " + src + + ". New replication is " + replication ); + for( int idx = 0; idx < fileBlocks.length; idx++ ) + neededReplications.update( fileBlocks[idx], 0, replication-oldRepl ); + + if( oldRepl > replication ) { + // old replication > the new one; need to remove copies + LOG.info("Reducing replication for file " + src + + ". New replication is " + replication ); + for( int idx = 0; idx < fileBlocks.length; idx++ ) + proccessOverReplicatedBlock( fileBlocks[idx], replication ); } return true; } @@ -715,19 +895,15 @@ // Now that the file is real, we need to be sure to replicate // the blocks. + int numExpectedReplicas = pendingFile.getReplication(); for (int i = 0; i < nrBlocks; i++) { SortedSet<DatanodeDescriptor> containingNodes = blocksMap.get(pendingBlocks[i]); // filter out containingNodes that are marked for decommission. int numCurrentReplica = countContainingNodes(containingNodes); - if (numCurrentReplica < pendingFile.getReplication()) { - NameNode.stateChangeLog.debug( - "DIR* NameSystem.completeFile:" - + pendingBlocks[i].getBlockName()+" has only "+containingNodes.size() - +" replicas so is added to neededReplications"); - synchronized (neededReplications) { - neededReplications.add(pendingBlocks[i]); - } + if (numCurrentReplica < numExpectedReplicas) { + neededReplications.add( + pendingBlocks[i], numCurrentReplica, numExpectedReplicas); } } return COMPLETE_SUCCESS; @@ -1608,8 +1784,10 @@ containingNodes = new TreeSet<DatanodeDescriptor>(); blocksMap.put(block, containingNodes); } + int curReplicaDelta = 0; if (! containingNodes.contains(node)) { containingNodes.add(node); + curReplicaDelta = 1; // // Hairong: I would prefer to set the level of next logrecord // to be debug. @@ -1625,34 +1803,24 @@ + block.getBlockName() + " on " + node.getName()); } - synchronized (neededReplications) { - FSDirectory.INode fileINode = dir.getFileByBlock(block); - if( fileINode == null ) // block does not belong to any file - return; - - // filter out containingNodes that are marked for decommission. - int numCurrentReplica = countContainingNodes(containingNodes); - - // check whether safe replication is reached for the block - // only if it is a part of a files - incrementSafeBlockCount( numCurrentReplica ); - short fileReplication = fileINode.getReplication(); - if (numCurrentReplica >= fileReplication ) { - neededReplications.remove(block); - pendingReplications.remove(block); - NameNode.stateChangeLog.trace("BLOCK* NameSystem.addStoredBlock: " - +block.getBlockName()+" has "+ numCurrentReplica - +" replicas so is removed from neededReplications and pendingReplications" ); - - } else {// numCurrentReplica < fileReplication - neededReplications.add(block); - NameNode.stateChangeLog.debug("BLOCK* NameSystem.addStoredBlock: " - +block.getBlockName()+" has only "+ numCurrentReplica - +" replicas so is added to neededReplications" ); - } - - proccessOverReplicatedBlock( block, fileReplication ); - } + FSDirectory.INode fileINode = dir.getFileByBlock(block); + if( fileINode == null ) // block does not belong to any file + return; + + // filter out containingNodes that are marked for decommission. + int numCurrentReplica = countContainingNodes(containingNodes); + + // check whether safe replication is reached for the block + // only if it is a part of a files + incrementSafeBlockCount( numCurrentReplica ); + + // handle underReplication/overReplication + short fileReplication = fileINode.getReplication(); + neededReplications.update(block, curReplicaDelta, 0); + if (numCurrentReplica >= fileReplication ) { + pendingReplications.remove(block); + } + proccessOverReplicatedBlock( block, fileReplication ); } /** @@ -1748,8 +1916,12 @@ return; } containingNodes.remove(node); - decrementSafeBlockCount( containingNodes.size() ); - if( containingNodes.size() == 0 ) + + // filter out containingNodes that are marked for decommission. + int numCurrentReplica = countContainingNodes(containingNodes); + + decrementSafeBlockCount( numCurrentReplica ); + if( containingNodes.isEmpty() ) blocksMap.remove(block); // // It's possible that the block was removed because of a datanode @@ -1758,13 +1930,8 @@ // be-replicated list. // FSDirectory.INode fileINode = dir.getFileByBlock(block); - if( fileINode != null && (countContainingNodes(containingNodes) < fileINode.getReplication())) { - synchronized (neededReplications) { - neededReplications.add(block); - } - NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: " - +block.getBlockName()+" has only "+containingNodes.size() - +" replicas so is added to neededReplications" ); + if( fileINode != null ) { + neededReplications.update(block, -1, 0); } // @@ -1896,9 +2063,7 @@ // replicated. Block decommissionBlocks[] = node.getBlocks(); for (int j = 0; j < decommissionBlocks.length; j++) { - synchronized (neededReplications) { - neededReplications.add(decommissionBlocks[j]); - } + neededReplications.update(decommissionBlocks[j], -1, 0); } } break; @@ -2107,15 +2272,13 @@ * reside on the specified node. Otherwise returns false. */ private boolean isReplicationInProgress(DatanodeDescriptor srcNode) { - synchronized (neededReplications) { for (Iterator<Block> it = neededReplications.iterator(); it.hasNext();){ - Block block = it.next(); - Collection<DatanodeDescriptor> containingNodes = blocksMap.get(block); - if (containingNodes.contains(srcNode)) { - return true; - } + Block block = it.next(); + Collection<DatanodeDescriptor> containingNodes = blocksMap.get(block); + if (containingNodes.contains(srcNode)) { + return true; + } } - } return false; } @@ -2237,9 +2400,10 @@ DatanodeDescriptor targets[] = (DatanodeDescriptor[]) replicateTargetSets.get(i); int numCurrentReplica = numCurrentReplicas.get(i).intValue(); - if (numCurrentReplica + targets.length >= - dir.getFileByBlock( block).getReplication() ) { - neededReplications.remove(block); + int numExpectedReplica = dir.getFileByBlock( block).getReplication(); + neededReplications.update( + block, numCurrentReplica, numExpectedReplica); + if (numCurrentReplica + targets.length >= numExpectedReplica) { pendingReplications.add(block); NameNode.stateChangeLog.debug( "BLOCK* NameSystem.pendingTransfer: "