Repository: hadoop
Updated Branches:
  refs/heads/branch-2.7 c997fc629 -> 81bf6f283


HDFS-11056. Concurrent append and read operations lead to checksum error. 
Contributed by Wei-Chiu Chuang.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/81bf6f28
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/81bf6f28
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/81bf6f28

Branch: refs/heads/branch-2.7
Commit: 81bf6f283449a53c64fb1fd64c85be32bc58607e
Parents: c997fc6
Author: Wei-Chiu Chuang <[email protected]>
Authored: Mon Nov 14 11:57:51 2016 -0800
Committer: Wei-Chiu Chuang <[email protected]>
Committed: Mon Nov 14 11:57:51 2016 -0800

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |  3 +
 .../datanode/fsdataset/impl/FsDatasetImpl.java  | 43 ++++++++++++-
 .../datanode/fsdataset/impl/FsDatasetUtil.java  | 13 ++++
 .../org/apache/hadoop/hdfs/TestFileAppend.java  | 68 ++++++++++++++++++++
 .../fsdataset/impl/TestWriteToReplica.java      | 17 ++++-
 5 files changed, 141 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/81bf6f28/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt 
b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 5363aa8..fa1e872 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -188,6 +188,9 @@ Release 2.7.4 - UNRELEASED
     HDFS-9500. Fix software version counts for DataNodes during rolling 
upgrade.
     (Erik Krogen via shv)
 
+    HDFS-11056 Concurrent append and read operations lead to checksum error.
+    (Wei-Chiu Chuang)
+
 Release 2.7.3 - 2016-08-25
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81bf6f28/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 400a778..4ad863e 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -942,7 +942,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    * @param blockFile block file for which the checksum will be computed
    * @throws IOException
    */
-  private static void computeChecksum(File srcMeta, File dstMeta, File 
blockFile)
+  static void computeChecksum(File srcMeta, File dstMeta, File blockFile)
       throws IOException {
     final DataChecksum checksum = 
BlockMetadataHeader.readDataChecksum(srcMeta);
     final byte[] data = new byte[1 << 16];
@@ -1079,7 +1079,30 @@ class FsDatasetImpl implements 
FsDatasetSpi<FsVolumeImpl> {
     }
     return new ReplicaHandler(replica, ref);
   }
-  
+
+
+  private byte[] loadLastPartialChunkChecksum(
+      File blockFile, File metaFile) throws IOException {
+    DataChecksum dcs = BlockMetadataHeader.readHeader(metaFile).getChecksum();
+    final int checksumSize = dcs.getChecksumSize();
+    final long onDiskLen = blockFile.length();
+    final int bytesPerChecksum = dcs.getBytesPerChecksum();
+
+    if (onDiskLen % bytesPerChecksum == 0) {
+      // the last chunk is a complete one. No need to preserve its checksum
+      // because it will not be modified.
+      return null;
+    }
+
+    int offsetInChecksum = BlockMetadataHeader.getHeaderSize() +
+        (int)(onDiskLen / bytesPerChecksum * checksumSize);
+    byte[] lastChecksum = new byte[checksumSize];
+    RandomAccessFile raf = new RandomAccessFile(metaFile, "r");
+    raf.seek(offsetInChecksum);
+    raf.read(lastChecksum, 0, checksumSize);
+    return lastChecksum;
+  }
+
   /** Append to a finalized replica
    * Change a finalized replica to be a RBW replica and 
    * bump its generation stamp to be the newGS
@@ -1113,6 +1136,13 @@ class FsDatasetImpl implements 
FsDatasetSpi<FsVolumeImpl> {
     ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(
         replicaInfo.getBlockId(), replicaInfo.getNumBytes(), newGS,
         v, newBlkFile.getParentFile(), Thread.currentThread(), bytesReserved);
+
+    // load last checksum and datalen
+    byte[] lastChunkChecksum = loadLastPartialChunkChecksum(
+        replicaInfo.getBlockFile(), replicaInfo.getMetaFile());
+    newReplicaInfo.setLastChecksumAndDataLen(
+        replicaInfo.getNumBytes(), lastChunkChecksum);
+
     File newmeta = newReplicaInfo.getMetaFile();
 
     // rename meta file to rbw directory
@@ -1435,6 +1465,12 @@ class FsDatasetImpl implements 
FsDatasetSpi<FsVolumeImpl> {
         blockId, numBytes, expectedGs,
         v, dest.getParentFile(), Thread.currentThread(), 0);
     rbw.setBytesAcked(visible);
+
+    // load last checksum and datalen
+    final File destMeta = FsDatasetUtil.getMetaFile(dest,
+        b.getGenerationStamp());
+    byte[] lastChunkChecksum = loadLastPartialChunkChecksum(dest, destMeta);
+    rbw.setLastChecksumAndDataLen(numBytes, lastChunkChecksum);
     // overwrite the RBW in the volume map
     volumeMap.add(b.getBlockPoolId(), rbw);
     return rbw;
@@ -2466,6 +2502,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> 
{
             newBlockId, recoveryId, volume, blockFile.getParentFile(),
             newlength);
         newReplicaInfo.setNumBytes(newlength);
+        // In theory, this rbw replica needs to reload last chunk checksum,
+        // but it is immediately converted to finalized state within the same
+        // lock, so no need to update it.
         volumeMap.add(bpid, newReplicaInfo);
         finalizeReplica(bpid, newReplicaInfo);
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81bf6f28/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java
index adefbdb..6db034e 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java
@@ -22,6 +22,7 @@ import java.io.FilenameFilter;
 import java.io.IOException;
 import java.util.Arrays;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
@@ -104,4 +105,16 @@ public class FsDatasetUtil {
           + blockFile + ", metaFile=" + metaFile, nfe);
     }
   }
+
+  /**
+   * Compute the checksum for a block file that does not already have
+   * its checksum computed, and save it to dstMeta file.
+   */
+  public static void computeChecksum(File srcMeta, File dstMeta, File 
blockFile)
+      throws IOException {
+    Preconditions.checkNotNull(srcMeta);
+    Preconditions.checkNotNull(dstMeta);
+    Preconditions.checkNotNull(blockFile);
+    FsDatasetImpl.computeChecksum(srcMeta, dstMeta, blockFile);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81bf6f28/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
index ff0b9d7..c196aae 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
@@ -24,9 +24,11 @@ import static org.junit.Assert.fail;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.util.EnumSet;
 import java.util.List;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
@@ -42,9 +44,15 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaHandler;
 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetUtil;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.util.DataChecksum;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -57,6 +65,8 @@ public class TestFileAppend{
 
   private static byte[] fileContents = null;
 
+  static final DataChecksum DEFAULT_CHECKSUM =
+      DataChecksum.newDataChecksum(DataChecksum.Type.CRC32C, 512);
   //
   // writes to file but does not close it
   //
@@ -603,4 +613,62 @@ public class TestFileAppend{
       cluster.shutdown();
     }
   }
+
+  @Test(timeout = 10000)
+  public void testConcurrentAppendRead()
+      throws IOException, TimeoutException, InterruptedException {
+    // Create a finalized replica and append to it
+    // Read block data and checksum. Verify checksum.
+    Configuration conf = new HdfsConfiguration();
+    conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024);
+    conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
+    conf.setInt("dfs.min.replication", 1);
+
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
+    try {
+      cluster.waitActive();
+      DataNode dn = cluster.getDataNodes().get(0);
+      FsDatasetSpi<?> dataSet = DataNodeTestUtils.getFSDataset(dn);
+
+      // create a file with 1 byte of data.
+      long initialFileLength = 1;
+      DistributedFileSystem fs = cluster.getFileSystem();
+      Path fileName = new Path("/appendCorruptBlock");
+      DFSTestUtil.createFile(fs, fileName, initialFileLength, (short) 1, 0);
+      DFSTestUtil.waitReplication(fs, fileName, (short) 1);
+      Assert.assertTrue("File not created", fs.exists(fileName));
+
+      // Call FsDatasetImpl#append to append the block file,
+      // which converts it to a rbw replica.
+      ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, fileName);
+      long newGS = block.getGenerationStamp() + 1;
+      ReplicaHandler replicaHandler =
+          dataSet.append(block, newGS, initialFileLength);
+
+      // write data to block file
+      ReplicaBeingWritten rbw =
+          (ReplicaBeingWritten) replicaHandler.getReplica();
+      ReplicaOutputStreams outputStreams =
+          rbw.createStreams(false, DEFAULT_CHECKSUM);
+      OutputStream dataOutput = outputStreams.getDataOut();
+
+      byte[] appendBytes = new byte[1];
+      dataOutput.write(appendBytes, 0, 1);
+      dataOutput.flush();
+      dataOutput.close();
+
+      // update checksum file
+      FsDatasetUtil.computeChecksum(rbw.getMetaFile(), rbw.getMetaFile(),
+          rbw.getBlockFile());
+
+      // read the block
+      // the DataNode BlockSender should read from the rbw replica's in-memory
+      // checksum, rather than on-disk checksum. Otherwise it will see a
+      // checksum mismatch error.
+      final byte[] readBlock = DFSTestUtil.readFileBuffer(fs, fileName);
+      assertEquals("should have read only one byte!", 1, readBlock.length);
+    } finally {
+      cluster.shutdown();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81bf6f28/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
index 9325cdc..17558f1 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
@@ -17,12 +17,16 @@
  */
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
@@ -34,6 +38,7 @@ import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
+import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 import org.junit.Assert;
 import org.junit.Test;
@@ -47,7 +52,9 @@ public class TestWriteToReplica {
   final private static int RWR = 3;
   final private static int RUR = 4;
   final private static int NON_EXISTENT = 5;
-  
+
+  private static final DataChecksum DEFAULT_CHECKSUM =
+      DataChecksum.newDataChecksum(DataChecksum.Type.CRC32C, 512);
   // test close
   @Test
   public void testClose() throws Exception {
@@ -129,6 +136,13 @@ public class TestWriteToReplica {
       cluster.shutdown();
     }
   }
+
+  private void saveMetaFileHeader(File metaFile) throws IOException {
+    DataOutputStream metaOut = new DataOutputStream(
+        new FileOutputStream(metaFile));
+    BlockMetadataHeader.writeHeader(metaOut, DEFAULT_CHECKSUM);
+    metaOut.close();
+  }
   
   /**
    * Generate testing environment and return a collection of blocks
@@ -156,6 +170,7 @@ public class TestWriteToReplica {
     replicasMap.add(bpid, replicaInfo);
     replicaInfo.getBlockFile().createNewFile();
     replicaInfo.getMetaFile().createNewFile();
+    saveMetaFileHeader(replicaInfo.getMetaFile());
     
     replicasMap.add(bpid, new ReplicaInPipeline(
         blocks[TEMPORARY].getBlockId(),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to