Author: shv
Date: Mon Jun 30 19:22:23 2008
New Revision: 672976
URL: http://svn.apache.org/viewvc?rev=672976&view=rev
Log:
HADOOP-3649. Fix bug in removing blocks from the corrupted block map.
Contributed by Lohit Vijayarenu.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/hdfs/org/apache/hadoop/dfs/CorruptReplicasMap.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/dfs/FSNamesystem.java
hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDatanodeBlockScanner.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=672976&r1=672975&r2=672976&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Jun 30 19:22:23 2008
@@ -46,7 +46,7 @@
HADOOP-3556. Removed lock contention in MD5Hash by changing the
singleton MessageDigester by an instance per Thread using
- ThreadLocal. (Ivn de Prado via omalley)
+ ThreadLocal. (Iv?n de Prado via omalley)
BUG FIXES
@@ -720,6 +720,9 @@
HADOOP-3572. SetQuotas usage interface has some minor bugs. (hairong)
+ HADOOP-3649. Fix bug in removing blocks from the corrupted block map.
+ (Lohit Vijayarenu via shv)
+
Release 0.17.1 - Unreleased
INCOMPATIBLE CHANGES
Modified:
hadoop/core/trunk/src/hdfs/org/apache/hadoop/dfs/CorruptReplicasMap.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/dfs/CorruptReplicasMap.java?rev=672976&r1=672975&r2=672976&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/dfs/CorruptReplicasMap.java
(original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/dfs/CorruptReplicasMap.java
Mon Jun 30 19:22:23 2008
@@ -20,7 +20,6 @@
import org.apache.hadoop.ipc.Server;
import java.util.*;
-import java.io.IOException;
/**
* Stores information about all corrupt blocks in the File System.
@@ -71,9 +70,6 @@
* @param blk Block to be removed
*/
void removeFromCorruptReplicasMap(Block blk) {
- FSNamesystem fsNamesystem = FSNamesystem.getFSNamesystem();
- if (fsNamesystem.blocksMap.contains(blk))
- return;
if (corruptReplicasMap != null) {
corruptReplicasMap.remove(blk);
NameNode.getNameNodeMetrics().numBlocksCorrupted.set(
@@ -103,29 +99,8 @@
return ((nodes != null) && (nodes.contains(node)));
}
- /**
- * Invalidate corrupt replicas
- *
- * @param blk Block whose corrupt replicas need to be invalidated
- */
- void invalidateCorruptReplicas(Block blk) {
- FSNamesystem fsNamesystem = FSNamesystem.getFSNamesystem();
+ int numCorruptReplicas(Block blk) {
Collection<DatanodeDescriptor> nodes = getNodes(blk);
- boolean gotException = false;
- if (nodes == null)
- return;
- for (Iterator<DatanodeDescriptor> it = nodes.iterator(); it.hasNext(); ) {
- DatanodeDescriptor node = it.next();
- try {
- fsNamesystem.invalidateBlock(blk, node);
- } catch (IOException e) {
- NameNode.stateChangeLog.info("NameNode.invalidateCorruptReplicas " +
- "error in deleting bad block " + blk +
- " on " + node + e);
- }
- }
- // Remove the block from corruptReplicasMap if empty
- if (!gotException)
- removeFromCorruptReplicasMap(blk);
+ return (nodes == null) ? 0 : nodes.size();
}
}
Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/dfs/FSNamesystem.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/dfs/FSNamesystem.java?rev=672976&r1=672975&r2=672976&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/dfs/FSNamesystem.java
(original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/dfs/FSNamesystem.java Mon Jun
30 19:22:23 2008
@@ -1345,8 +1345,12 @@
"block " + blk + " could not be marked " +
"as corrupt as it does not exists in " +
"blocksMap");
- else
+ else {
+ // Add this replica to corruptReplicas Map and
+ // add the block to neededReplication
corruptReplicas.addToCorruptReplicasMap(blk, node);
+ updateNeededReplications(blk, 0, 1);
+ }
}
/**
@@ -2803,7 +2807,8 @@
// filter out containingNodes that are marked for decommission.
NumberReplicas num = countNodes(storedBlock);
- int numCurrentReplica = num.liveReplicas()
+ int numLiveReplicas = num.liveReplicas();
+ int numCurrentReplica = numLiveReplicas
+ pendingReplications.getNumReplicas(block);
// check whether safe replication is reached for the block
@@ -2835,12 +2840,46 @@
// If the file replication has reached desired value
// we can remove any corrupt replicas the block may have
int corruptReplicasCount = num.corruptReplicas();
- if ((corruptReplicasCount > 0) && (numCurrentReplica == fileReplication))
- corruptReplicas.invalidateCorruptReplicas(block);
+ if ((corruptReplicasCount > 0) && (numLiveReplicas == fileReplication))
+ invalidateCorruptReplicas(block);
return block;
}
/**
+ * Invalidate corrupt replicas.
+ * <p>
+ * This will remove the replicas from the block's location list,
+ * add them to [EMAIL PROTECTED] #recentInvalidateSets} so that they could
be further
+ * deleted from the respective data-nodes,
+ * and remove the block from corruptReplicasMap.
+ * <p>
+ * This method should be called when the block has sufficient
+ * number of live replicas.
+ *
+ * @param blk Block whose corrupt replicas need to be invalidated
+ */
+ void invalidateCorruptReplicas(Block blk) {
+ Collection<DatanodeDescriptor> nodes = corruptReplicas.getNodes(blk);
+ boolean gotException = false;
+ if (nodes == null)
+ return;
+ for (Iterator<DatanodeDescriptor> it = nodes.iterator(); it.hasNext(); ) {
+ DatanodeDescriptor node = it.next();
+ try {
+ invalidateBlock(blk, node);
+ } catch (IOException e) {
+ NameNode.stateChangeLog.info("NameNode.invalidateCorruptReplicas " +
+ "error in deleting bad block " + blk +
+ " on " + node + e);
+ gotException = true;
+ }
+ }
+ // Remove the block from corruptReplicasMap
+ if (!gotException)
+ corruptReplicas.removeFromCorruptReplicasMap(blk);
+ }
+
+ /**
* For each block in the name-node verify whether it belongs to any file,
* over or under replicated. Place it into the respective queue.
*/
@@ -3067,7 +3106,8 @@
}
}
// If block is removed from blocksMap, remove it from corruptReplicas
- corruptReplicas.removeFromCorruptReplicasMap(block);
+ if (fileINode == null)
+ corruptReplicas.removeFromCorruptReplicasMap(block);
}
/**
Modified:
hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDatanodeBlockScanner.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDatanodeBlockScanner.java?rev=672976&r1=672975&r2=672976&view=diff
==============================================================================
---
hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDatanodeBlockScanner.java
(original)
+++
hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDatanodeBlockScanner.java
Mon Jun 30 19:22:23 2008
@@ -27,7 +27,6 @@
import java.util.regex.Pattern;
import java.io.*;
import java.nio.channels.FileChannel;
-import java.nio.ByteBuffer;
import java.util.Random;
import org.apache.commons.logging.Log;
@@ -142,9 +141,10 @@
cluster.shutdown();
}
- void corruptReplica(String blockName, int replica) throws IOException {
+ boolean corruptReplica(String blockName, int replica) throws IOException {
Random random = new Random();
File baseDir = new File(System.getProperty("test.build.data"), "dfs/data");
+ boolean corrupted = false;
for (int i=replica*2; i<replica*2+2; i++) {
File blockFile = new File(baseDir, "data" + (i+1)+ "/current/" +
blockName);
@@ -157,8 +157,10 @@
raFile.seek(rand);
raFile.write(badString.getBytes());
raFile.close();
+ corrupted = true;
}
}
+ return corrupted;
}
public void testBlockCorruptionPolicy() throws IOException {
@@ -241,4 +243,143 @@
cluster.shutdown();
}
+
+ /**
+ * testBlockCorruptionRecoveryPolicy.
+ * This tests recovery of corrupt replicas, first for one corrupt replica
+ * then for two. The test invokes blockCorruptionRecoveryPolicy which
+ * 1. Creates a block with desired number of replicas
+ * 2. Corrupts the desired number of replicas and restarts the datanodes
+ * containing the corrupt replica. Additionaly we also read the block
+ * in case restarting does not report corrupt replicas.
+ * Restarting or reading from the datanode would trigger reportBadBlocks
+ * to namenode.
+ * NameNode adds it to corruptReplicasMap and neededReplication
+ * 3. Test waits until all corrupt replicas are reported, meanwhile
+ * Re-replciation brings the block back to healthy state
+ * 4. Test again waits until the block is reported with expected number
+ * of good replicas.
+ */
+ public void testBlockCorruptionRecoveryPolicy() throws IOException {
+ // Test recovery of 1 corrupt replica
+ LOG.info("Testing corrupt replica recovery for one corrupt replica");
+ blockCorruptionRecoveryPolicy(4, (short)3, 1);
+
+ // Test recovery of 2 corrupt replicas
+ LOG.info("Testing corrupt replica recovery for two corrupt replicas");
+ blockCorruptionRecoveryPolicy(5, (short)3, 2);
+ }
+
+ private void blockCorruptionRecoveryPolicy(int numDataNodes,
+ short numReplicas,
+ int numCorruptReplicas)
+ throws IOException {
+ Configuration conf = new Configuration();
+ conf.setLong("dfs.blockreport.intervalMsec", 30L);
+ conf.setLong("dfs.replication.interval", 30);
+ conf.setLong("dfs.heartbeat.interval", 30L);
+ conf.setBoolean("dfs.replication.considerLoad", false);
+ Random random = new Random();
+ FileSystem fs = null;
+ DFSClient dfsClient = null;
+ LocatedBlocks blocks = null;
+ int replicaCount = 0;
+ int rand = random.nextInt(numDataNodes);
+
+ MiniDFSCluster cluster = new MiniDFSCluster(conf, numDataNodes, true,
null);
+ cluster.waitActive();
+ fs = cluster.getFileSystem();
+ Path file1 = new Path("/tmp/testBlockCorruptRecovery/file");
+ DFSTestUtil.createFile(fs, file1, 1024, numReplicas, 0);
+ Block blk = DFSTestUtil.getFirstBlock(fs, file1);
+ String block = blk.getBlockName();
+
+ dfsClient = new DFSClient(new InetSocketAddress("localhost",
+ cluster.getNameNodePort()), conf);
+ blocks = dfsClient.namenode.
+ getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
+ replicaCount = blocks.get(0).getLocations().length;
+
+ // Wait until block is replicated to numReplicas
+ while (replicaCount != numReplicas) {
+ try {
+ LOG.info("Looping until expected replicaCount of " + numReplicas +
+ "is reached");
+ Thread.sleep(1000);
+ } catch (InterruptedException ignore) {
+ }
+ blocks = dfsClient.namenode.
+ getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
+ replicaCount = blocks.get(0).getLocations().length;
+ }
+ assertTrue(blocks.get(0).isCorrupt() == false);
+
+ // Corrupt numCorruptReplicas replicas of block
+ int[] corruptReplicasDNIDs = new int[numCorruptReplicas];
+ for (int i=0, j=0; (j != numCorruptReplicas) && (i < numDataNodes); i++) {
+ if (corruptReplica(block, i))
+ corruptReplicasDNIDs[j++] = i;
+ }
+
+ // Restart the datanodes containing corrupt replicas
+ // so they would be reported to namenode and re-replicated
+ for (int i =0; i < numCorruptReplicas; i++)
+ cluster.restartDataNode(corruptReplicasDNIDs[i]);
+
+ // Loop until all corrupt replicas are reported
+ int corruptReplicaSize = cluster.getNameNode().namesystem.
+ corruptReplicas.numCorruptReplicas(blk);
+ while (corruptReplicaSize != numCorruptReplicas) {
+ try {
+ IOUtils.copyBytes(fs.open(file1), new IOUtils.NullOutputStream(),
+ conf, true);
+ } catch (IOException e) {
+ }
+ try {
+ LOG.info("Looping until expected " + numCorruptReplicas + " are " +
+ "reported. Current reported " + corruptReplicaSize);
+ Thread.sleep(1000);
+ } catch (InterruptedException ignore) {
+ }
+ corruptReplicaSize = cluster.getNameNode().namesystem.
+ corruptReplicas.numCorruptReplicas(blk);
+ }
+
+ // Loop until the block recovers after replication
+ blocks = dfsClient.namenode.
+ getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
+ replicaCount = blocks.get(0).getLocations().length;
+ while (replicaCount != numReplicas) {
+ try {
+ LOG.info("Looping until block gets rereplicated to " + numReplicas);
+ Thread.sleep(1000);
+ } catch (InterruptedException ignore) {
+ }
+ blocks = dfsClient.namenode.
+ getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
+ replicaCount = blocks.get(0).getLocations().length;
+ }
+
+ // Make sure the corrupt replica is invalidated and removed from
+ // corruptReplicasMap
+ corruptReplicaSize = cluster.getNameNode().namesystem.
+ corruptReplicas.numCorruptReplicas(blk);
+ while (corruptReplicaSize != 0) {
+ try {
+ LOG.info("Looping until corrupt replica is invalidated");
+ Thread.sleep(1000);
+ } catch (InterruptedException ignore) {
+ }
+ corruptReplicaSize = cluster.getNameNode().namesystem.
+ corruptReplicas.numCorruptReplicas(blk);
+ blocks = dfsClient.namenode.
+ getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
+ replicaCount = blocks.get(0).getLocations().length;
+ }
+ // Make sure block is healthy
+ assertTrue(corruptReplicaSize == 0);
+ assertTrue(replicaCount == numReplicas);
+ assertTrue(blocks.get(0).isCorrupt() == false);
+ cluster.shutdown();
+ }
}