HDFS-10652. Add a unit test for HDFS-4660. Contributed by Vinayakumar B., Wei-Chiu Chuang, Yongjun Zhang.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c2581715 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c2581715 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c2581715 Branch: refs/heads/HDFS-7240 Commit: c25817159af17753b398956cfe6ff14984801b01 Parents: 19c743c Author: Yongjun Zhang <[email protected]> Authored: Sat Aug 27 22:46:53 2016 -0700 Committer: Yongjun Zhang <[email protected]> Committed: Sat Aug 27 22:51:31 2016 -0700 ---------------------------------------------------------------------- .../hdfs/server/datanode/BlockReceiver.java | 1 + .../hadoop/hdfs/server/datanode/DataNode.java | 3 +- .../server/datanode/DataNodeFaultInjector.java | 3 + .../datanode/fsdataset/impl/FsDatasetImpl.java | 2 +- .../TestClientProtocolForPipelineRecovery.java | 138 ++++++++++++++++++- 5 files changed, 144 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2581715/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index b6f0b01..522d577 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -1306,6 +1306,7 @@ class BlockReceiver implements Closeable { long ackRecvNanoTime = 0; try { if (type != PacketResponderType.LAST_IN_PIPELINE && !mirrorError) { + DataNodeFaultInjector.get().failPipeline(replicaInfo, mirrorAddr); // read an ack from downstream datanode ack.readFields(downstreamIn); ackRecvNanoTime = System.nanoTime(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2581715/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index ad3c172..08a1fc0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -2403,7 +2403,8 @@ public class DataNode extends ReconfigurableBase blockSender.sendBlock(out, unbufOut, null); // no response necessary - LOG.info(getClass().getSimpleName() + ": Transmitted " + b + LOG.info(getClass().getSimpleName() + ", at " + + DataNode.this.getDisplayName() + ": Transmitted " + b + " (numBytes=" + b.getNumBytes() + ") to " + curTarget); // read ack http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2581715/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java index 4ecbdc0..931c124 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java @@ -55,4 +55,7 @@ public class DataNodeFaultInjector { public void noRegistration() throws IOException { } public void failMirrorConnection() throws IOException { } + + public void failPipeline(ReplicaInPipelineInterface replicaInfo, + String mirrorAddr) throws IOException { } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2581715/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index c4d924e..129024b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -1505,7 +1505,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { if (!rbw.attemptToSetWriter(null, Thread.currentThread())) { throw new MustStopExistingWriter(rbw); } - LOG.info("Recovering " + rbw); + LOG.info("At " + datanode.getDisplayName() + ", Recovering " + rbw); return recoverRbwImpl(rbw, b, newGS, minBytesRcvd, maxBytesRcvd); } } catch (MustStopExistingWriter e) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2581715/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java index 5e320fa..99b617e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java @@ -17,9 +17,16 @@ */ package org.apache.hadoop.hdfs; +import static org.junit.Assert.assertTrue; + import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Random; +import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.base.Supplier; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -31,6 +38,8 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector; +import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; +import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface; import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.tools.DFSAdmin; @@ -39,12 +48,15 @@ import org.apache.hadoop.test.GenericTestUtils; import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This tests pipeline recovery related client protocol works correct or not. */ public class TestClientProtocolForPipelineRecovery { - + private static final Logger LOG = + LoggerFactory.getLogger(TestClientProtocolForPipelineRecovery.class); @Test public void testGetNewStamp() throws IOException { int numDataNodes = 1; Configuration conf = new HdfsConfiguration(); @@ -477,4 +489,128 @@ public class TestClientProtocolForPipelineRecovery { DataNodeFaultInjector.set(oldDnInjector); } } + + // Test to verify that blocks are no longer corrupted after HDFS-4660. + // Revert HDFS-4660 and the other related ones (HDFS-9220, HDFS-8722), this + // test would fail. + // Scenario: Prior to the fix, block get corrupted when the transferBlock + // happens during pipeline recovery with extra bytes to make up the end of + // chunk. + // For verification, Need to fail the pipeline for last datanode when the + // second datanode have more bytes on disk than already acked bytes. + // This will enable to transfer extra bytes to the newNode to makeup + // end-of-chunk during pipeline recovery. This is achieved by the customized + // DataNodeFaultInjector class in this test. + // For detailed info, please refer to HDFS-4660 and HDFS-10587. HDFS-9220 + // fixes an issue in HDFS-4660 patch, and HDFS-8722 is an optimization. + @Test + public void testPipelineRecoveryWithTransferBlock() throws Exception { + final int chunkSize = 512; + final int oneWriteSize = 5000; + final int totalSize = 1024 * 1024; + final int errorInjectionPos = 512; + Configuration conf = new HdfsConfiguration(); + // Need 4 datanodes to verify the replaceDatanode during pipeline recovery + final MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(4).build(); + DataNodeFaultInjector old = DataNodeFaultInjector.get(); + + try { + DistributedFileSystem fs = cluster.getFileSystem(); + Path fileName = new Path("/f"); + FSDataOutputStream o = fs.create(fileName); + int count = 0; + // Flush to get the pipeline created. + o.writeBytes("hello"); + o.hflush(); + DFSOutputStream dfsO = (DFSOutputStream) o.getWrappedStream(); + final DatanodeInfo[] pipeline = dfsO.getStreamer().getNodes(); + final String lastDn = pipeline[2].getXferAddr(false); + final AtomicBoolean failed = new AtomicBoolean(false); + + DataNodeFaultInjector.set(new DataNodeFaultInjector() { + @Override + public void failPipeline(ReplicaInPipelineInterface replicaInfo, + String mirror) throws IOException { + if (!lastDn.equals(mirror)) { + // Only fail for second DN + return; + } + if (!failed.get() && + (replicaInfo.getBytesAcked() > errorInjectionPos) && + (replicaInfo.getBytesAcked() % chunkSize != 0)) { + int count = 0; + while (count < 10) { + // Fail the pipeline (Throw exception) when: + // 1. bytsAcked is not at chunk boundary (checked in the if + // statement above) + // 2. bytesOnDisk is bigger than bytesAcked and at least + // reaches (or go beyond) the end of the chunk that + // bytesAcked is in (checked in the if statement below). + // At this condition, transferBlock that happens during + // pipeline recovery would transfer extra bytes to make up to the + // end of the chunk. And this is when the block corruption + // described in HDFS-4660 would occur. + if ((replicaInfo.getBytesOnDisk() / chunkSize) - + (replicaInfo.getBytesAcked() / chunkSize) >= 1) { + failed.set(true); + throw new IOException( + "Failing Pipeline " + replicaInfo.getBytesAcked() + " : " + + replicaInfo.getBytesOnDisk()); + } + try { + Thread.sleep(200); + } catch (InterruptedException e) { + } + count++; + } + } + } + }); + + Random r = new Random(); + byte[] b = new byte[oneWriteSize]; + while (count < totalSize) { + r.nextBytes(b); + o.write(b); + count += oneWriteSize; + o.hflush(); + } + + assertTrue("Expected a failure in the pipeline", failed.get()); + DatanodeInfo[] newNodes = dfsO.getStreamer().getNodes(); + o.close(); + // Trigger block report to NN + for (DataNode d: cluster.getDataNodes()) { + DataNodeTestUtils.triggerBlockReport(d); + } + // Read from the replaced datanode to verify the corruption. So shutdown + // all other nodes in the pipeline. + List<DatanodeInfo> pipelineList = Arrays.asList(pipeline); + DatanodeInfo newNode = null; + for (DatanodeInfo node : newNodes) { + if (!pipelineList.contains(node)) { + newNode = node; + break; + } + } + LOG.info("Number of nodes in pipeline: {} newNode {}", + newNodes.length, newNode.getName()); + // shutdown old 2 nodes + for (int i = 0; i < newNodes.length; i++) { + if (newNodes[i].getName().equals(newNode.getName())) { + continue; + } + LOG.info("shutdown {}", newNodes[i].getName()); + cluster.stopDataNode(newNodes[i].getName()); + } + + // Read should be successfull from only the newNode. There should not be + // any corruption reported. + DFSTestUtil.readFile(fs, fileName); + } finally { + DataNodeFaultInjector.set(old); + cluster.shutdown(); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
