Author: rangadi
Date: Thu May 22 13:12:30 2008
New Revision: 659235
URL: http://svn.apache.org/viewvc?rev=659235&view=rev
Log:
HADOOP-3035. During block transfers between datanodes, the receiving
datanode, now can report corrupt replicas received from src node to
the namenode. (Lohit Vijayarenu via rangadi)
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
hadoop/core/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java
hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java
hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestReplication.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=659235&r1=659234&r2=659235&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu May 22 13:12:30 2008
@@ -166,6 +166,10 @@
that it does not block user. DataNode misses heartbeats in large
nodes otherwise. (Johan Oskarsson via rangadi)
+ HADOOP-3035. During block transfers between datanodes, the receiving
+ datanode, now can report corrupt replicas received from src node to
+ the namenode. (Lohit Vijayarenu via rangadi)
+
OPTIMIZATIONS
HADOOP-3274. The default constructor of BytesWritable creates empty
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?rev=659235&r1=659234&r2=659235&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Thu May 22
13:12:30 2008
@@ -2299,6 +2299,7 @@
out.writeInt( nodes.length );
out.writeBoolean( recoveryFlag ); // recovery flag
Text.writeString( out, client );
+ out.writeBoolean(false); // Not sending src node information
out.writeInt( nodes.length - 1 );
for (int i = 1; i < nodes.length; i++) {
nodes[i].write(out);
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?rev=659235&r1=659234&r2=659235&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Thu May 22
13:12:30 2008
@@ -1126,6 +1126,7 @@
*/
private void writeBlock(DataInputStream in) throws IOException {
xceiverCount.incr();
+ DatanodeInfo srcDataNode = null;
LOG.debug("writeBlock receive buf size " + s.getReceiveBufferSize() +
" tcp no delay " + s.getTcpNoDelay());
//
@@ -1138,6 +1139,11 @@
int pipelineSize = in.readInt(); // num of datanodes in entire pipeline
boolean isRecovery = in.readBoolean(); // is this part of recovery?
String client = Text.readString(in); // working on behalf of this client
+ boolean hasSrcDataNode = in.readBoolean(); // is src node info present
+ if (hasSrcDataNode) {
+ srcDataNode = new DatanodeInfo();
+ srcDataNode.readFields(in);
+ }
int numTargets = in.readInt();
if (numTargets < 0) {
throw new IOException("Mislabelled incoming datastream.");
@@ -1159,7 +1165,7 @@
try {
// open a block receiver and check if the block does not exist
blockReceiver = new BlockReceiver(block, in,
- s.getInetAddress().toString(), isRecovery, client);
+ s.getInetAddress().toString(), isRecovery, client, srcDataNode);
// get a connection back to the previous target
replyOut = new DataOutputStream(
@@ -1196,6 +1202,10 @@
mirrorOut.writeInt( pipelineSize );
mirrorOut.writeBoolean( isRecovery );
Text.writeString( mirrorOut, client );
+ mirrorOut.writeBoolean(hasSrcDataNode);
+ if (hasSrcDataNode) { // pass src node information
+ srcDataNode.write(mirrorOut);
+ }
mirrorOut.writeInt( targets.length - 1 );
for ( int i = 1; i < targets.length; i++ ) {
targets[i].write( mirrorOut );
@@ -1419,7 +1429,7 @@
try {
// open a block receiver and check if the block does not exist
blockReceiver = new BlockReceiver(
- block, in, s.getRemoteSocketAddress().toString(), false, "");
+ block, in, s.getRemoteSocketAddress().toString(), false, "", null);
// receive a block
blockReceiver.receiveBlock(null, null, null, null, balancingThrottler,
-1);
@@ -2250,10 +2260,11 @@
private FSDataset.BlockWriteStreams streams;
private boolean isRecovery = false;
private String clientName;
+ DatanodeInfo srcDataNode = null;
BlockReceiver(Block block, DataInputStream in, String inAddr,
- boolean isRecovery, String clientName)
- throws IOException {
+ boolean isRecovery, String clientName,
+ DatanodeInfo srcDataNode) throws IOException {
try{
this.block = block;
this.in = in;
@@ -2264,6 +2275,7 @@
this.checksum = DataChecksum.newDataChecksum(in);
this.bytesPerChecksum = checksum.getBytesPerChecksum();
this.checksumSize = checksum.getChecksumSize();
+ this.srcDataNode = srcDataNode;
//
// Open local disk out
//
@@ -2352,6 +2364,18 @@
checksum.update(dataBuf, dataOff, chunkLen);
if (!checksum.compare(checksumBuf, checksumOff)) {
+ if (srcDataNode != null) {
+ try {
+ LOG.info("report corrupt block " + block + " from datanode " +
+ srcDataNode + " to namenode");
+ LocatedBlock lb = new LocatedBlock(block,
+ new DatanodeInfo[]
{srcDataNode});
+ namenode.reportBadBlocks(new LocatedBlock[] {lb});
+ } catch (IOException e) {
+ LOG.warn("Failed to report bad block " + block +
+ " from datanode " + srcDataNode + " to namenode");
+ }
+ }
throw new IOException("Unexpected checksum mismatch " +
"while writing " + block + " from " + inAddr);
}
@@ -2770,6 +2794,7 @@
SMALL_BUFFER_SIZE));
blockSender = new BlockSender(b, 0, -1, false, false, false);
+ DatanodeInfo srcNode = new DatanodeInfo(dnRegistration);
//
// Header info
@@ -2781,6 +2806,8 @@
out.writeInt(0); // no pipelining
out.writeBoolean(false); // not part of recovery
Text.writeString(out, ""); // client
+ out.writeBoolean(true); // sending src node information
+ srcNode.write(out); // Write src node DatanodeInfo
// write targets
out.writeInt(targets.length - 1);
for (int i = 1; i < targets.length; i++) {
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java?rev=659235&r1=659234&r2=659235&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java Thu May
22 13:12:30 2008
@@ -101,11 +101,11 @@
* when protocol changes. It is not very obvious.
*/
/*
- * Version 10:
- * DFSClient also sends non-interleaved checksum and data while writing
- * to DFS.
+ * Version 11:
+ * OP_WRITE_BLOCK sends a boolean. If its value is true, an additonal
+ * DatanodeInfo of client requesting transfer is also sent.
*/
- public static final int DATA_TRANSFER_VERSION = 10;
+ public static final int DATA_TRANSFER_VERSION = 11;
// Return codes for file create
public static final int OPERATION_FAILED = 0;
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java?rev=659235&r1=659234&r2=659235&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java
(original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java Thu
May 22 13:12:30 2008
@@ -22,6 +22,10 @@
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
+import java.nio.channels.FileChannel;
+import java.nio.ByteBuffer;
+import java.util.Random;
+import java.io.RandomAccessFile;
import javax.security.auth.login.LoginException;
@@ -547,6 +551,41 @@
}
/*
+ * Corrupt a block on all datanode
+ */
+ void corruptBlockOnDataNodes(String blockName) throws Exception{
+ for (int i=0; i < dataNodes.size(); i++)
+ corruptBlockOnDataNode(i,blockName);
+ }
+
+ /*
+ * Corrupt a block on a particular datanode
+ */
+ boolean corruptBlockOnDataNode(int i, String blockName) throws Exception {
+ Random random = new Random();
+ boolean corrupted = false;
+ File baseDir = new File(System.getProperty("test.build.data"), "dfs/data");
+ if (i < 0 || i >= dataNodes.size())
+ return false;
+ for (int dn = i*2; dn < i*2+2; dn++) {
+ File blockFile = new File(baseDir, "data" + (dn+1) + "/current/" +
+ blockName);
+ if (blockFile.exists()) {
+ // Corrupt replica by writing random bytes into replica
+ RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
+ FileChannel channel = raFile.getChannel();
+ String badString = "BADBAD";
+ int rand = random.nextInt((int)channel.size()/2);
+ raFile.seek(rand);
+ raFile.write(badString.getBytes());
+ raFile.close();
+ }
+ corrupted = true;
+ }
+ return corrupted;
+ }
+
+ /*
* Shutdown a particular datanode
*/
boolean stopDataNode(int i) {
Modified:
hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java?rev=659235&r1=659234&r2=659235&view=diff
==============================================================================
---
hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java
(original)
+++
hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java
Thu May 22 13:12:30 2008
@@ -171,6 +171,7 @@
sendOut.writeInt(0); // targets in pipeline
sendOut.writeBoolean(false); // recoveryFlag
Text.writeString(sendOut, "cl");// clientID
+ sendOut.writeBoolean(false); // no src node info
sendOut.writeInt(0); // number of downstream targets
sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
@@ -189,6 +190,7 @@
sendOut.writeInt(0); // targets in pipeline
sendOut.writeBoolean(false); // recoveryFlag
Text.writeString(sendOut, "cl");// clientID
+ sendOut.writeBoolean(false); // no src node info
// bad number of targets
sendOut.writeInt(-1-random.nextInt(oneMil));
@@ -204,6 +206,7 @@
sendOut.writeInt(0); // targets in pipeline
sendOut.writeBoolean(false); // recoveryFlag
Text.writeString(sendOut, "cl");// clientID
+ sendOut.writeBoolean(false); // no src node info
sendOut.writeInt(0);
sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
sendOut.writeInt((int)512);
@@ -230,6 +233,7 @@
sendOut.writeInt(0); // targets in pipeline
sendOut.writeBoolean(false); // recoveryFlag
Text.writeString(sendOut, "cl");// clientID
+ sendOut.writeBoolean(false); // no src node info
sendOut.writeInt(0);
sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
sendOut.writeInt((int)512); // checksum size
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestReplication.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestReplication.java?rev=659235&r1=659234&r2=659235&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestReplication.java
(original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestReplication.java Thu
May 22 13:12:30 2008
@@ -126,6 +126,51 @@
fileSys.delete(name, true);
assertTrue(!fileSys.exists(name));
}
+
+ /*
+ * Test if Datanode reports bad blocks during replication request
+ */
+ public void testBadBlockReportOnTransfer() throws Exception {
+ Configuration conf = new Configuration();
+ FileSystem fs = null;
+ DFSClient dfsClient = null;
+ LocatedBlocks blocks = null;
+ int replicaCount = 0;
+ MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);
+ cluster.waitActive();
+ fs = cluster.getFileSystem();
+ dfsClient = new DFSClient(new InetSocketAddress("localhost",
+ cluster.getNameNodePort()), conf);
+
+ // Create file with replication factor of 1
+ Path file1 = new Path("/tmp/testBadBlockReportOnTransfer/file1");
+ DFSTestUtil.createFile(fs, file1, 1024, (short)1, 0);
+ DFSTestUtil.waitReplication(fs, file1, (short)1);
+
+ // Corrupt the block belonging to the created file
+ String block = DFSTestUtil.getFirstBlock(fs, file1).getBlockName();
+ cluster.corruptBlockOnDataNodes(block);
+
+ // Increase replication factor, this should invoke transfer request
+ // Receiving datanode fails on checksum and reports it to namenode
+ fs.setReplication(file1, (short)2);
+
+ // Now get block details and check if the block is corrupt
+ blocks = dfsClient.namenode.
+ getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
+ while (blocks.get(0).isCorrupt() != true) {
+ try {
+ LOG.info("Waiting until block is marked as corrupt...");
+ Thread.sleep(1000);
+ } catch (InterruptedException ie) {
+ }
+ blocks = dfsClient.namenode.
+ getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
+ }
+ replicaCount = blocks.get(0).getLocations().length;
+ assertTrue(replicaCount == 1);
+ cluster.shutdown();
+ }
/**
* Tests replication in DFS.
@@ -330,4 +375,5 @@
}
}
}
+
}