Author: todd
Date: Sat Jan 28 00:39:19 2012
New Revision: 1236944
URL: http://svn.apache.org/viewvc?rev=1236944&view=rev
Log:
HDFS-2791. If block report races with closing of file, replica is incorrectly
marked corrupt. Contributed by Todd Lipcon.
Modified:
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AppendTestUtil.java
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeAdapter.java
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java
Modified:
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1236944&r1=1236943&r2=1236944&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
(original)
+++
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
Sat Jan 28 00:39:19 2012
@@ -190,6 +190,9 @@ Release 0.23.1 - UNRELEASED
HDFS-2840. TestHostnameFilter should work with localhost or
localhost.localdomain (tucu)
+ HDFS-2791. If block report races with closing of file, replica is
+ incorrectly marked corrupt. (todd)
+
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES
Modified:
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1236944&r1=1236943&r2=1236944&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
(original)
+++
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
Sat Jan 28 00:39:19 2012
@@ -1569,7 +1569,24 @@ public class BlockManager {
}
case RBW:
case RWR:
- return storedBlock.isComplete();
+ if (!storedBlock.isComplete()) {
+ return false;
+ } else if (storedBlock.getGenerationStamp() !=
iblk.getGenerationStamp()) {
+ return true;
+ } else { // COMPLETE block, same genstamp
+ if (reportedState == ReplicaState.RBW) {
+ // If it's a RBW report for a COMPLETE block, it may just be that
+ // the block report got a little bit delayed after the pipeline
+ // closed. So, ignore this report, assuming we will get a
+ // FINALIZED replica later. See HDFS-2791
+ LOG.info("Received an RBW replica for block " + storedBlock +
+ " on " + dn.getName() + ": ignoring it, since the block is " +
+ "complete with the same generation stamp.");
+ return false;
+ } else {
+ return true;
+ }
+ }
case RUR: // should not be reported
case TEMPORARY: // should not be reported
default:
Modified:
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java?rev=1236944&r1=1236943&r2=1236944&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
(original)
+++
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
Sat Jan 28 00:39:19 2012
@@ -763,4 +763,13 @@ class BPOfferService implements Runnable
return;
}
+ @VisibleForTesting
+ DatanodeProtocol getBpNamenode() {
+ return bpNamenode;
+ }
+
+ @VisibleForTesting
+ void setBpNamenode(DatanodeProtocol bpNamenode) {
+ this.bpNamenode = bpNamenode;
+ }
}
Modified:
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AppendTestUtil.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AppendTestUtil.java?rev=1236944&r1=1236943&r2=1236944&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AppendTestUtil.java
(original)
+++
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AppendTestUtil.java
Sat Jan 28 00:39:19 2012
@@ -101,7 +101,7 @@ public class AppendTestUtil {
return DFSTestUtil.getFileSystemAs(ugi, conf);
}
- static void write(OutputStream out, int offset, int length) throws
IOException {
+ public static void write(OutputStream out, int offset, int length) throws
IOException {
final byte[] bytes = new byte[length];
for(int i = 0; i < length; i++) {
bytes[i] = (byte)(offset + i);
Modified:
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeAdapter.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeAdapter.java?rev=1236944&r1=1236943&r2=1236944&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeAdapter.java
(original)
+++
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeAdapter.java
Sat Jan 28 00:39:19 2012
@@ -17,6 +17,15 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.mockito.Mockito;
+
+import com.google.common.base.Preconditions;
+
/**
* WARNING!! This is TEST ONLY class: it never has to be used
* for ANY development purposes.
@@ -42,4 +51,34 @@ public class DataNodeAdapter {
boolean heartbeatsDisabledForTests) {
dn.setHeartbeatsDisabledForTests(heartbeatsDisabledForTests);
}
+
+ /**
+ * Insert a Mockito spy object between the given DataNode and
+ * the given NameNode. This can be used to delay or wait for
+ * RPC calls on the datanode->NN path.
+ */
+ public static DatanodeProtocol spyOnBposToNN(
+ DataNode dn, NameNode nn) {
+ String bpid = nn.getNamesystem().getBlockPoolId();
+
+ BPOfferService bpos = null;
+ for (BPOfferService thisBpos : dn.getAllBpOs()) {
+ if (thisBpos.getBlockPoolId().equals(bpid)) {
+ bpos = thisBpos;
+ break;
+ }
+ }
+ Preconditions.checkArgument(bpos != null,
+ "No such bpid: %s", bpid);
+
+ // When protobufs are merged, the following can be converted
+ // to a simple spy. Because you can't spy on proxy objects,
+ // we have to use the DelegateAnswer trick.
+ DatanodeProtocol origNN = bpos.getBpNamenode();
+ DatanodeProtocol spy = Mockito.mock(DatanodeProtocol.class,
+ new GenericTestUtils.DelegateAnswer(origNN));
+
+ bpos.setBpNamenode(spy);
+ return spy;
+ }
}
Modified:
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java?rev=1236944&r1=1236943&r2=1236944&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java
(original)
+++
hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java
Sat Jan 28 00:39:19 2012
@@ -21,7 +21,9 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.AppendTestUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -35,14 +37,19 @@ import org.apache.hadoop.hdfs.server.com
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.GenericTestUtils.DelayAnswer;
import org.apache.log4j.Level;
import org.junit.After;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
import java.io.File;
import java.io.FilenameFilter;
@@ -50,6 +57,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
+import java.util.concurrent.CountDownLatch;
/**
* This test simulates a variety of situations when blocks are being
@@ -491,6 +499,84 @@ public class TestBlockReport {
resetConfiguration(); // return the initial state of the configuration
}
}
+
+ /**
+ * Test for the case where one of the DNs in the pipeline is in the
+ * process of doing a block report exactly when the block is closed.
+ * In this case, the block report becomes delayed until after the
+ * block is marked completed on the NN, and hence it reports an RBW
+ * replica for a COMPLETE block. Such a report should not be marked
+ * corrupt.
+ * This is a regression test for HDFS-2791.
+ */
+ @Test
+ public void testOneReplicaRbwReportArrivesAfterBlockCompleted() throws
Exception {
+ final CountDownLatch brFinished = new CountDownLatch(1);
+ DelayAnswer delayer = new GenericTestUtils.DelayAnswer(LOG) {
+ @Override
+ protected Object passThrough(InvocationOnMock invocation)
+ throws Throwable {
+ try {
+ return super.passThrough(invocation);
+ } finally {
+ // inform the test that our block report went through.
+ brFinished.countDown();
+ }
+ }
+ };
+
+ final String METHOD_NAME = GenericTestUtils.getMethodName();
+ Path filePath = new Path("/" + METHOD_NAME + ".dat");
+
+ // Start a second DN for this test -- we're checking
+ // what happens when one of the DNs is slowed for some reason.
+ REPL_FACTOR = 2;
+ startDNandWait(null, false);
+
+ NameNode nn = cluster.getNameNode();
+
+ FSDataOutputStream out = fs.create(filePath, REPL_FACTOR);
+ try {
+ AppendTestUtil.write(out, 0, 10);
+ out.hflush();
+
+ // Set up a spy so that we can delay the block report coming
+ // from this node.
+ DataNode dn = cluster.getDataNodes().get(0);
+ DatanodeProtocol spy =
+ DataNodeAdapter.spyOnBposToNN(dn, nn);
+
+ Mockito.doAnswer(delayer)
+ .when(spy).blockReport(
+ Mockito.<DatanodeRegistration>anyObject(),
+ Mockito.anyString(),
+ Mockito.<long[]>anyObject());
+
+ // Force a block report to be generated. The block report will have
+ // an RBW replica in it. Wait for the RPC to be sent, but block
+ // it before it gets to the NN.
+ dn.scheduleAllBlockReport(0);
+ delayer.waitForCall();
+
+ } finally {
+ IOUtils.closeStream(out);
+ }
+
+ // Now that the stream is closed, the NN will have the block in COMPLETE
+ // state.
+ delayer.proceed();
+ brFinished.await();
+
+ // Verify that no replicas are marked corrupt, and that the
+ // file is still readable.
+ BlockManagerTestUtil.updateState(nn.getNamesystem().getBlockManager());
+ assertEquals(0, nn.getNamesystem().getCorruptReplicaBlocks());
+ DFSTestUtil.readFile(fs, filePath);
+
+ // Ensure that the file is readable even from the DN that we futzed with.
+ cluster.stopDataNode(1);
+ DFSTestUtil.readFile(fs, filePath);
+ }
private void waitForTempReplica(Block bl, int DN_N1) throws IOException {
final boolean tooLongWait = false;