Author: cutting Date: Mon May 7 12:35:01 2007 New Revision: 535962 URL: http://svn.apache.org/viewvc?view=rev&rev=535962 Log: HADOOP-1184. Fix HDFS decommissioning to complete when the only copy of a block is on a decomissioned node.
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UnderReplicatedBlocks.java lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=535962&r1=535961&r2=535962 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Mon May 7 12:35:01 2007 @@ -329,6 +329,9 @@ that can fail before a job is aborted. The default is zero. (Arun C Murthy via cutting) +98. HADOOP-1184. Fix HDFS decomissioning to complete when the only + copy of a block is on a decommissioned node. (Dhruba Borthakur via cutting) + Release 0.12.3 - 2007-04-06 Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?view=diff&rev=535962&r1=535961&r2=535962 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Mon May 7 12:35:01 2007 @@ -186,6 +186,8 @@ private long heartbeatExpireInterval; //replicationRecheckInterval is how often namenode checks for new replication work private long replicationRecheckInterval; + //decommissionRecheckInterval is how often namenode checks if a node has finished decommission + private long decommissionRecheckInterval; static int replIndex = 0; // last datanode used for replication work static int REPL_WORK_PER_ITERATION = 32; // max percent datanodes per iteration @@ -240,6 +242,9 @@ this.heartbeatExpireInterval = 2 * heartbeatRecheckInterval + 10 * heartbeatInterval; this.replicationRecheckInterval = 3 * 1000; // 3 second + this.decommissionRecheckInterval = conf.getInt( + "dfs.namenode.decommission.interval", + 5 * 60 * 1000); this.localMachine = hostname; this.port = port; @@ -407,10 +412,13 @@ /* updates a block in under replication queue */ synchronized void updateNeededReplications(Block block, int curReplicasDelta, int expectedReplicasDelta) { - int curReplicas = countContainingNodes( block ); + NumberReplicas repl = countNodes(block); int curExpectedReplicas = getReplication(block); - neededReplications.update(block, curReplicas, curExpectedReplicas, - curReplicasDelta, expectedReplicasDelta); + neededReplications.update(block, + repl.liveReplicas(), + repl.decommissionedReplicas(), + curExpectedReplicas, + curReplicasDelta, expectedReplicasDelta); } ///////////////////////////////////////////////////////// @@ -863,10 +871,12 @@ int numExpectedReplicas = pendingFile.getReplication(); for (int i = 0; i < nrBlocks; i++) { // filter out containingNodes that are marked for decommission. - int numCurrentReplica = countContainingNodes(pendingBlocks[i]); - if (numCurrentReplica < numExpectedReplicas) { - neededReplications.add( - pendingBlocks[i], numCurrentReplica, numExpectedReplicas); + NumberReplicas number = countNodes(pendingBlocks[i]); + if (number.liveReplicas() < numExpectedReplicas) { + neededReplications.add(pendingBlocks[i], + number.liveReplicas(), + number.decommissionedReplicas, + numExpectedReplicas); } } return COMPLETE_SUCCESS; @@ -976,9 +986,8 @@ // Check how many copies we have of the block. If we have at least one // copy on a live node, then we can delete it. - int count = countContainingNodes(blk); - if ((count > 1) || ((count == 1) && (dn.isDecommissionInProgress() || - dn.isDecommissioned()))) { + int count = countNodes(blk).liveReplicas(); + if (count > 1) { addToInvalidates(blk, dn); removeStoredBlock(blk, getDatanode(dn)); NameNode.stateChangeLog.info("BLOCK* NameSystem.invalidateBlocks: " @@ -1737,14 +1746,7 @@ break; } foundwork++; - } else { - // - // See if the decommissioned node has finished moving all - // its datablocks to another replica. This is a loose - // heuristic to determine when a decommission is really over. - // - checkDecommissionState(node); - } + } } } @@ -1757,8 +1759,10 @@ if (timedOutItems != null) { synchronized (this) { for (int i = 0; i < timedOutItems.length; i++) { + NumberReplicas num = countNodes(timedOutItems[i]); neededReplications.add(timedOutItems[i], - countContainingNodes(timedOutItems[i]), + num.liveReplicas(), + num.decommissionedReplicas(), getReplication(timedOutItems[i])); } } @@ -2076,7 +2080,8 @@ return block; // filter out containingNodes that are marked for decommission. - int numCurrentReplica = countContainingNodes(block) + NumberReplicas num = countNodes(block); + int numCurrentReplica = num.liveReplicas() + pendingReplications.getNumReplicas(block); // check whether safe replication is reached for the block @@ -2086,7 +2091,8 @@ // handle underReplication/overReplication short fileReplication = fileINode.getReplication(); if (numCurrentReplica >= fileReplication) { - neededReplications.remove(block, numCurrentReplica, fileReplication); + neededReplications.remove(block, numCurrentReplica, + num.decommissionedReplicas, fileReplication); } else { updateNeededReplications(block, curReplicaDelta, 0); } @@ -2342,46 +2348,6 @@ node.stopDecommission(); } - /** - * Return true if all specified nodes are decommissioned. - * Otherwise return false. - */ - public synchronized boolean checkDecommissioned (String[] nodes) - throws IOException { - String badnodes = ""; - boolean isError = false; - - synchronized (datanodeMap) { - for (int i = 0; i < nodes.length; i++) { - boolean found = false; - for (Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator(); - it.hasNext();) { - DatanodeDescriptor node = it.next(); - - // - // If this is a node that we are interested in, check its admin state. - // - if (node.getName().equals(nodes[i]) || - node.getHost().equals(nodes[i])) { - found = true; - boolean isDecommissioned = checkDecommissionStateInternal(node); - if (!isDecommissioned) { - return false; - } - } - } - if (!found) { - badnodes += nodes[i] + " "; - isError = true; - } - } - } - if (isError) { - throw new IOException("Nodes " + badnodes + " not found"); - } - return true; - } - /** */ public DatanodeInfo getDataNodeInfo(String name) { @@ -2472,38 +2438,74 @@ return sendBlock.toArray(new Block[sendBlock.size()]); } + + /** + * A immutable object that stores the number of live replicas and + * the number of decommissined Replicas. + */ + static class NumberReplicas { + private int liveReplicas; + private int decommissionedReplicas; + + NumberReplicas(int live, int decommissioned) { + liveReplicas = live; + decommissionedReplicas = decommissioned; + } + + int liveReplicas() { + return liveReplicas; + } + int decommissionedReplicas() { + return decommissionedReplicas; + } + } + /* - * Counts the number of nodes in the given list. Skips over nodes - * that are marked for decommission. + * Counts the number of nodes in the given list into active and + * decommissioned counters. */ - private int countContainingNodes(Iterator<DatanodeDescriptor> nodeIter) { + private NumberReplicas countNodes(Iterator<DatanodeDescriptor> nodeIter) { int count = 0; - while (nodeIter.hasNext()) { + int live = 0; + while ( nodeIter.hasNext() ) { DatanodeDescriptor node = nodeIter.next(); - if (!node.isDecommissionInProgress() && !node.isDecommissioned()) { + if (node.isDecommissionInProgress() || node.isDecommissioned()) { count++; } + else { + live++; + } } - return count; + return new NumberReplicas(live, count); } - - /** wrapper for countContainingNodes(Iterator). */ - private int countContainingNodes(Block b) { - return countContainingNodes(blocksMap.nodeIterator(b)); + + /** return the number of nodes that are live and decommissioned. */ + private NumberReplicas countNodes(Block b) { + return countNodes(blocksMap.nodeIterator(b)); } - /** Reeturns a newly allocated list exluding the decommisioned nodes. */ - ArrayList<DatanodeDescriptor> containingNodeList(Block b) { - ArrayList<DatanodeDescriptor> nonCommissionedNodeList = + /** Returns a newly allocated list of all nodes. Returns a count of + * live and decommissioned nodes. */ + ArrayList<DatanodeDescriptor> containingNodeList(Block b, NumberReplicas[] numReplicas) { + ArrayList<DatanodeDescriptor> nodeList = new ArrayList<DatanodeDescriptor>(); + int count = 0; + int live = 0; for(Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(b); it.hasNext();) { DatanodeDescriptor node = it.next(); if (!node.isDecommissionInProgress() && !node.isDecommissioned()) { - nonCommissionedNodeList.add(node); + live++; } + else { + count++; + } + nodeList.add(node); + } + if (numReplicas != null) { + numReplicas[0] = new NumberReplicas(live, count); } - return nonCommissionedNodeList; + return nodeList; } /* * Return true if there are any blocks on this node that have not @@ -2511,15 +2513,34 @@ */ private boolean isReplicationInProgress(DatanodeDescriptor srcNode) { Block decommissionBlocks[] = srcNode.getBlocks(); + boolean status = false; for (int i = 0; i < decommissionBlocks.length; i++) { Block block = decommissionBlocks[i]; FSDirectory.INode fileINode = blocksMap.getINode(block); - if (fileINode != null && - fileINode.getReplication() > countContainingNodes(block)) { - return true; + + if (fileINode != null) { + NumberReplicas num = countNodes(block); + int curReplicas = num.liveReplicas(); + int curExpectedReplicas = getReplication(block); + if (curExpectedReplicas > curReplicas) { + status = true; + if (!neededReplications.contains(block) && + pendingReplications.getNumReplicas(block) == 0) { + // + // These blocks have been reported from the datanode + // after the startDecommission method has been executed. These + // blocks were in flight when the decommission was started. + // + neededReplications.update(block, + curReplicas, + num.decommissionedReplicas(), + curExpectedReplicas, + -1, 0); + } + } } } - return false; + return status; } /** @@ -2528,7 +2549,7 @@ */ private boolean checkDecommissionStateInternal(DatanodeDescriptor node) { // - // Check to see if there are all blocks in this decommisioned + // Check to see if all blocks in this decommisioned // node has reached their target replication factor. // if (node.isDecommissionInProgress()) { @@ -2544,18 +2565,6 @@ } /** - * Change, if appropriate, the admin state of a datanode to - * decommission completed. - */ - public synchronized void checkDecommissionState(DatanodeID nodeReg) { - DatanodeDescriptor node = datanodeMap.get(nodeReg.getStorageID()); - if (node == null) { - return; - } - checkDecommissionStateInternal(node); - } - - /** * Return with a list of Block/DataNodeInfo sets, indicating * where various Blocks should be copied, ASAP. * @@ -2582,9 +2591,10 @@ // replicate them. // List<Block> replicateBlocks = new ArrayList<Block>(); - List<Integer> numCurrentReplicas = new ArrayList<Integer>(); + List<NumberReplicas> numCurrentReplicas = new ArrayList<NumberReplicas>(); List<DatanodeDescriptor[]> replicateTargetSets; replicateTargetSets = new ArrayList<DatanodeDescriptor[]>(); + NumberReplicas[] allReplicas = new NumberReplicas[1]; for (Iterator<Block> it = neededReplications.iterator(); it.hasNext();) { if (needed <= 0) { break; @@ -2596,7 +2606,7 @@ it.remove(); } else { List<DatanodeDescriptor> containingNodes = - containingNodeList(block); + containingNodeList(block, allReplicas); Collection<Block> excessBlocks = excessReplicateMap.get( srcNode.getStorageID()); @@ -2604,8 +2614,10 @@ // not be scheduled for removal on that node if (containingNodes.contains(srcNode) && (excessBlocks == null || !excessBlocks.contains(block))) { - int numCurrentReplica = containingNodes.size() + + int numCurrentReplica = allReplicas[0].liveReplicas() + pendingReplications.getNumReplicas(block); + NumberReplicas repl = new NumberReplicas(numCurrentReplica, + allReplicas[0].decommissionedReplicas()); if (numCurrentReplica >= fileINode.getReplication()) { it.remove(); } else { @@ -2617,7 +2629,7 @@ if (targets.length > 0) { // Build items to return replicateBlocks.add(block); - numCurrentReplicas.add(new Integer(numCurrentReplica)); + numCurrentReplicas.add(repl); replicateTargetSets.add(targets); needed -= targets.length; } @@ -2638,11 +2650,14 @@ Block block = it.next(); DatanodeDescriptor targets[] = (DatanodeDescriptor[]) replicateTargetSets.get(i); - int numCurrentReplica = numCurrentReplicas.get(i).intValue(); + int numCurrentReplica = numCurrentReplicas.get(i).liveReplicas(); int numExpectedReplica = blocksMap.getINode(block).getReplication(); if (numCurrentReplica + targets.length >= numExpectedReplica) { neededReplications.remove( - block, numCurrentReplica, numExpectedReplica); + block, + numCurrentReplica, + numCurrentReplicas.get(i).decommissionedReplicas(), + numExpectedReplica); pendingReplications.add(block, targets.length); NameNode.stateChangeLog.debug( "BLOCK* NameSystem.pendingTransfer: " @@ -2797,7 +2812,7 @@ FSNamesystem.LOG.info(StringUtils.stringifyException(e)); } try { - Thread.sleep(1000 * 60 * 5); + Thread.sleep(decommissionRecheckInterval); } catch (InterruptedException ie) { } } @@ -3165,7 +3180,7 @@ void decrementSafeBlockCount(Block b) { if (safeMode == null) // mostly true return; - safeMode.decrementSafeBlockCount((short)countContainingNodes(b)); + safeMode.decrementSafeBlockCount((short)countNodes(b).liveReplicas()); } /** Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UnderReplicatedBlocks.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UnderReplicatedBlocks.java?view=diff&rev=535962&r1=535961&r2=535962 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UnderReplicatedBlocks.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UnderReplicatedBlocks.java Mon May 7 12:35:01 2007 @@ -57,8 +57,17 @@ * @param expectedReplicas expected number of replicas of the block */ private int getPriority(Block block, - int curReplicas, int expectedReplicas) { - if (curReplicas<=0 || curReplicas>=expectedReplicas) { + int curReplicas, + int decommissionedReplicas, + int expectedReplicas) { + if (curReplicas<0 || curReplicas>=expectedReplicas) { + return LEVEL; // no need to replicate + } else if(curReplicas==0) { + // If there are zero non-decommissioned replica but there are + // some decommissioned replicas, then assign them highest priority + if (decommissionedReplicas > 0) { + return 0; + } return LEVEL; // no need to replicate } else if(curReplicas==1) { return 0; // highest priority @@ -75,12 +84,16 @@ * @param expectedReplicas expected number of replicas of the block */ synchronized boolean add( - Block block, int curReplicas, int expectedReplicas) { - if(curReplicas<=0 || expectedReplicas <= curReplicas) { + Block block, + int curReplicas, + int decomissionedReplicas, + int expectedReplicas) { + if(curReplicas<0 || expectedReplicas <= curReplicas) { return false; } - int priLevel = getPriority(block, curReplicas, expectedReplicas); - if(priorityQueues.get(priLevel).add(block)) { + int priLevel = getPriority(block, curReplicas, decomissionedReplicas, + expectedReplicas); + if(priLevel != LEVEL && priorityQueues.get(priLevel).add(block)) { NameNode.stateChangeLog.debug( "BLOCK* NameSystem.UnderReplicationBlock.add:" + block.getBlockName() @@ -95,8 +108,12 @@ /* remove a block from a under replication queue */ synchronized boolean remove(Block block, - int oldReplicas, int oldExpectedReplicas) { - int priLevel = getPriority(block, oldReplicas, oldExpectedReplicas); + int oldReplicas, + int decommissionedReplicas, + int oldExpectedReplicas) { + int priLevel = getPriority(block, oldReplicas, + decommissionedReplicas, + oldExpectedReplicas); return remove(block, priLevel); } @@ -124,12 +141,14 @@ } /* update the priority level of a block */ - synchronized void update(Block block, int curReplicas, int curExpectedReplicas, + synchronized void update(Block block, int curReplicas, + int decommissionedReplicas, + int curExpectedReplicas, int curReplicasDelta, int expectedReplicasDelta) { int oldReplicas = curReplicas-curReplicasDelta; int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta; - int curPri = getPriority(block, curReplicas, curExpectedReplicas); - int oldPri = getPriority(block, oldReplicas, oldExpectedReplicas); + int curPri = getPriority(block, curReplicas, decommissionedReplicas, curExpectedReplicas); + int oldPri = getPriority(block, oldReplicas, decommissionedReplicas, oldExpectedReplicas); NameNode.stateChangeLog.debug("UnderReplicationBlocks.update " + block + " curReplicas " + curReplicas + Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java?view=diff&rev=535962&r1=535961&r2=535962 ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java (original) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java Mon May 7 12:35:01 2007 @@ -115,6 +115,7 @@ } conf.setInt("dfs.replication", Math.min(3, numDataNodes)); conf.setInt("dfs.safemode.extension", 0); + conf.setInt("dfs.namenode.decommission.interval", 3 * 1000); // 3 second // Format and clean out DataNode directories if (format) { Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java?view=diff&rev=535962&r1=535961&r2=535962 ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java (original) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java Mon May 7 12:35:01 2007 @@ -36,15 +36,14 @@ static final long seed = 0xDEADBEEFL; static final int blockSize = 8192; static final int fileSize = 16384; - static final int numIterations = 2; - static final int numDatanodes = numIterations + 3; + static final int numDatanodes = 6; Random myrand = new Random(); Path hostsFile; Path excludeFile; - ArrayList<String> decommissionedNodes = new ArrayList<String>(numIterations); + ArrayList<String> decommissionedNodes = new ArrayList<String>(numDatanodes); private enum NodeState {NORMAL, DECOMMISSION_INPROGRESS, DECOMMISSIONED; } @@ -91,6 +90,19 @@ } } + private void printFileLocations(FileSystem fileSys, Path name) + throws IOException { + String[][] locations = fileSys.getFileCacheHints(name, 0, fileSize); + for (int idx = 0; idx < locations.length; idx++) { + String[] loc = locations[idx]; + System.out.print("Block[" + idx + "] : "); + for (int j = 0; j < loc.length; j++) { + System.out.print(loc[j] + " "); + } + System.out.println(""); + } + } + /** * For blocks that reside on the nodes that are down, verify that their * replication factor is 1 more than the specified one. @@ -223,7 +235,7 @@ boolean done = checkNodeState(filesys, node, state); while (!done) { System.out.println("Waiting for node " + node + - " to change state..."); + " to change state to " + state); try { Thread.sleep(1000); } catch (InterruptedException e) { @@ -260,24 +272,21 @@ DistributedFileSystem dfs = (DistributedFileSystem) fileSys; try { - for (int iteration = 0; iteration < numIterations; iteration++) { + for (int iteration = 0; iteration < numDatanodes - 1; iteration++) { + int replicas = numDatanodes - iteration - 1; // // Decommission one node. Verify that node is decommissioned. - // Verify that replication factor of file has increased from 3 - // to 4. This means one replica is on decommissioned node. // - Path file1 = new Path("smallblocktest.dat"); - writeFile(fileSys, file1, 3); - checkFile(fileSys, file1, 3); - - String downnode = decommissionNode(client, fileSys, localFileSys); - waitNodeState(fileSys, downnode, NodeState.DECOMMISSION_INPROGRESS); - commissionNode(fileSys, localFileSys, downnode); - waitNodeState(fileSys, downnode, NodeState.NORMAL); - downnode = decommissionNode(client, fileSys, localFileSys); + Path file1 = new Path("decommission.dat"); + writeFile(fileSys, file1, replicas); + System.out.println("Created file decommission.dat with " + + replicas + " replicas."); + checkFile(fileSys, file1, replicas); + printFileLocations(fileSys, file1); + String downnode = decommissionNode(client, fileSys, localFileSys); decommissionedNodes.add(downnode); waitNodeState(fileSys, downnode, NodeState.DECOMMISSIONED); - checkFile(fileSys, file1, 3, downnode); + checkFile(fileSys, file1, replicas, downnode); cleanupFile(fileSys, file1); cleanupFile(localFileSys, dir); }