Repository: hadoop Updated Branches: refs/heads/branch-2.7 75a303b54 -> 829959a8f
HDFS-11187. Optimize disk access for last partial chunk checksum of Finalized replica. Contributed by Gabor Bota, Wei-Chiu Chuang. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/829959a8 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/829959a8 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/829959a8 Branch: refs/heads/branch-2.7 Commit: 829959a8ff1c034dd3116328f9a4a4d2f1d92f00 Parents: 75a303b Author: Xiao Chen <x...@apache.org> Authored: Tue Feb 20 23:26:21 2018 -0800 Committer: Xiao Chen <x...@apache.org> Committed: Tue Feb 20 23:27:20 2018 -0800 ---------------------------------------------------------------------- .../hdfs/server/datanode/BlockSender.java | 56 +++++++++++---- .../hdfs/server/datanode/FinalizedReplica.java | 71 ++++++++++++-------- .../datanode/fsdataset/impl/FsDatasetImpl.java | 18 ++++- .../org/apache/hadoop/hdfs/MiniDFSCluster.java | 23 +++++++ .../namenode/TestListCorruptFileBlocks.java | 4 +- 5 files changed, 127 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/829959a8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java index c042190..71e4804 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java @@ -170,8 +170,13 @@ class BlockSender implements java.io.Closeable { * See {{@link BlockSender#isLongRead()} */ private static final long LONG_READ_THRESHOLD_BYTES = 256 * 1024; - + // The number of bytes per checksum here determines the alignment + // of reads: we always start reading at a checksum chunk boundary, + // even if the checksum type is NULL. So, choosing too big of a value + // would risk sending too much unnecessary data. 512 (1 disk sector) + // is likely to result in minimal extra IO. + private static final long CHUNK_SIZE = 512; /** * Constructor * @@ -241,12 +246,6 @@ class BlockSender implements java.io.Closeable { synchronized(datanode.data) { replica = getReplica(block, datanode); replicaVisibleLength = replica.getVisibleLength(); - if (replica instanceof FinalizedReplica) { - // Load last checksum in case the replica is being written - // concurrently - final FinalizedReplica frep = (FinalizedReplica) replica; - chunkChecksum = frep.getLastChecksumAndDataLen(); - } } // if there is a write in progress if (replica instanceof ReplicaBeingWritten) { @@ -254,6 +253,10 @@ class BlockSender implements java.io.Closeable { waitForMinLength(rbw, startOffset + length); chunkChecksum = rbw.getLastChecksumAndDataLen(); } + if (replica instanceof FinalizedReplica) { + chunkChecksum = getPartialChunkChecksumForFinalized( + (FinalizedReplica)replica); + } if (replica.getGenerationStamp() < block.getGenerationStamp()) { throw new IOException("Replica gen stamp < block genstamp, block=" @@ -329,12 +332,8 @@ class BlockSender implements java.io.Closeable { } } if (csum == null) { - // The number of bytes per checksum here determines the alignment - // of reads: we always start reading at a checksum chunk boundary, - // even if the checksum type is NULL. So, choosing too big of a value - // would risk sending too much unnecessary data. 512 (1 disk sector) - // is likely to result in minimal extra IO. - csum = DataChecksum.newDataChecksum(DataChecksum.Type.NULL, 512); + csum = DataChecksum.newDataChecksum(DataChecksum.Type.NULL, + (int)CHUNK_SIZE); } /* @@ -410,6 +409,37 @@ class BlockSender implements java.io.Closeable { } } + private ChunkChecksum getPartialChunkChecksumForFinalized( + FinalizedReplica finalized) throws IOException { + // There are a number of places in the code base where a finalized replica + // object is created. If last partial checksum is loaded whenever a + // finalized replica is created, it would increase latency in DataNode + // initialization. Therefore, the last partial chunk checksum is loaded + // lazily. + + // Load last checksum in case the replica is being written concurrently + final long replicaVisibleLength = replica.getVisibleLength(); + if (replicaVisibleLength % CHUNK_SIZE != 0 && + finalized.getLastPartialChunkChecksum() == null) { + // the finalized replica does not have precomputed last partial + // chunk checksum. Recompute now. + try { + finalized.loadLastPartialChunkChecksum(); + return new ChunkChecksum(finalized.getVisibleLength(), + finalized.getLastPartialChunkChecksum()); + } catch (FileNotFoundException e) { + // meta file is lost. Continue anyway to preserve existing behavior. + DataNode.LOG.warn( + "meta file " + finalized.getMetaFile() + " is missing!"); + return null; + } + } else { + // If the checksum is null, BlockSender will use on-disk checksum. + return new ChunkChecksum(finalized.getVisibleLength(), + finalized.getLastPartialChunkChecksum()); + } + } + /** * close opened files. */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/829959a8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java index d8fa60a..cf4f9b9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hdfs.server.datanode; import java.io.File; -import java.io.FileNotFoundException; import java.io.IOException; import org.apache.hadoop.hdfs.protocol.Block; @@ -30,6 +29,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; */ public class FinalizedReplica extends ReplicaInfo { private boolean unlinked; // copy-on-write done for block + private byte[] lastPartialChunkChecksum; /** * Constructor @@ -41,9 +41,24 @@ public class FinalizedReplica extends ReplicaInfo { */ public FinalizedReplica(long blockId, long len, long genStamp, FsVolumeSpi vol, File dir) { + this(blockId, len, genStamp, vol, dir, null); + } + + /** + * Constructor. + * @param blockId block id + * @param len replica length + * @param genStamp replica generation stamp + * @param vol volume where replica is located + * @param dir directory path where block and meta files are located + * @param checksum the last partial chunk checksum + */ + public FinalizedReplica(long blockId, long len, long genStamp, + FsVolumeSpi vol, File dir, byte[] checksum) { super(blockId, len, genStamp, vol, dir); + this.setLastPartialChunkChecksum(checksum); } - + /** * Constructor * @param block a block @@ -51,7 +66,20 @@ public class FinalizedReplica extends ReplicaInfo { * @param dir directory path where block and meta files are located */ public FinalizedReplica(Block block, FsVolumeSpi vol, File dir) { + this(block, vol, dir, null); + } + + /** + * Constructor. + * @param block a block + * @param vol volume where replica is located + * @param dir directory path where block and meta files are located + * @param checksum the last partial chunk checksum + */ + public FinalizedReplica(Block block, FsVolumeSpi vol, File dir, + byte[] checksum) { super(block, vol, dir); + this.setLastPartialChunkChecksum(checksum); } /** @@ -61,6 +89,7 @@ public class FinalizedReplica extends ReplicaInfo { public FinalizedReplica(FinalizedReplica from) { super(from); this.unlinked = from.isUnlinked(); + this.setLastPartialChunkChecksum(from.getLastPartialChunkChecksum()); } @Override // ReplicaInfo @@ -104,30 +133,18 @@ public class FinalizedReplica extends ReplicaInfo { + "\n unlinked =" + unlinked; } - /** - * gets the last chunk checksum and the length of the block corresponding - * to that checksum. - * Note, need to be called with the FsDataset lock acquired. May improve to - * lock only the FsVolume in the future. - * @throws IOException - */ - public ChunkChecksum getLastChecksumAndDataLen() throws IOException { - ChunkChecksum chunkChecksum = null; - try { - byte[] lastChecksum = getVolume().loadLastPartialChunkChecksum( - getBlockFile(), getMetaFile()); - if (lastChecksum != null) { - chunkChecksum = - new ChunkChecksum(getVisibleLength(), lastChecksum); - } - } catch (FileNotFoundException e) { - // meta file is lost. Try to continue anyway. - DataNode.LOG.warn("meta file " + getMetaFile() + - " is missing!"); - } catch (IOException ioe) { - DataNode.LOG.warn("Unable to read checksum from meta file " + - getMetaFile(), ioe); - } - return chunkChecksum; + public byte[] getLastPartialChunkChecksum() { + return lastPartialChunkChecksum; + } + + public void setLastPartialChunkChecksum(byte[] checksum) { + lastPartialChunkChecksum = checksum; + } + + public void loadLastPartialChunkChecksum() + throws IOException { + byte[] lastChecksum = getVolume().loadLastPartialChunkChecksum( + getBlockFile(), getMetaFile()); + setLastPartialChunkChecksum(lastChecksum); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/829959a8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 757dc64..7211a4d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -1127,10 +1127,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { v, newBlkFile.getParentFile(), Thread.currentThread(), bytesReserved); // load last checksum and datalen - byte[] lastChunkChecksum = v.loadLastPartialChunkChecksum( - replicaInfo.getBlockFile(), replicaInfo.getMetaFile()); newReplicaInfo.setLastChecksumAndDataLen( - replicaInfo.getNumBytes(), lastChunkChecksum); + replicaInfo.getNumBytes(), replicaInfo.getLastPartialChunkChecksum()); File newmeta = newReplicaInfo.getMetaFile(); @@ -1621,6 +1619,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { ReplicaState.FINALIZED) { newReplicaInfo = (FinalizedReplica) ((ReplicaUnderRecovery)replicaInfo).getOriginalReplica(); + newReplicaInfo.loadLastPartialChunkChecksum(); } else { FsVolumeImpl v = (FsVolumeImpl)replicaInfo.getVolume(); File f = replicaInfo.getBlockFile(); @@ -1632,6 +1631,19 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { File dest = v.addFinalizedBlock( bpid, replicaInfo, f, replicaInfo.getBytesReserved()); newReplicaInfo = new FinalizedReplica(replicaInfo, v, dest.getParentFile()); + + byte[] checksum = null; + // copy the last partial checksum if the replica is originally + // in finalized or rbw state. + if (replicaInfo.getState() == ReplicaState.FINALIZED) { + FinalizedReplica finalized = (FinalizedReplica)replicaInfo; + checksum = finalized.getLastPartialChunkChecksum(); + } else if (replicaInfo.getState() == ReplicaState.RBW) { + ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaInfo; + checksum = rbw.getLastChecksumAndDataLen().getChecksum(); + } + newReplicaInfo.setLastPartialChunkChecksum(checksum); + if (v.isTransientStorage()) { ramDiskReplicaTracker.addReplica(bpid, replicaInfo.getBlockId(), v); datanode.getMetrics().addRamDiskBytesWrite(replicaInfo.getNumBytes()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/829959a8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index 15ecf01..44f7a0d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -2638,6 +2638,29 @@ public class MiniDFSCluster { } /** + * Return all block files in given directory (recursive search). + */ + public static List<File> getAllBlockFiles(File storageDir) { + List<File> results = new ArrayList<File>(); + File[] files = storageDir.listFiles(); + if (files == null) { + return null; + } + for (File f : files) { + if (f.getName().startsWith(Block.BLOCK_FILE_PREFIX) && + !f.getName().endsWith(Block.METADATA_EXTENSION)) { + results.add(f); + } else if (f.isDirectory()) { + List<File> subdirResults = getAllBlockFiles(f); + if (subdirResults != null) { + results.addAll(subdirResults); + } + } + } + return results; + } + + /** * Get the latest metadata file correpsonding to a block * @param storageDir storage directory * @param blk the block http://git-wip-us.apache.org/repos/asf/hadoop/blob/829959a8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java index 92ea111..6a8d39f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java @@ -89,7 +89,7 @@ public class TestListCorruptFileBlocks { File storageDir = cluster.getInstanceStorageDir(0, 1); File data_dir = MiniDFSCluster.getFinalizedDir(storageDir, bpid); assertTrue("data directory does not exist", data_dir.exists()); - List<File> metaFiles = MiniDFSCluster.getAllBlockMetadataFiles(data_dir); + List<File> metaFiles = MiniDFSCluster.getAllBlockFiles(data_dir); assertTrue("Data directory does not contain any blocks or there was an " + "IO error", metaFiles != null && !metaFiles.isEmpty()); File metaFile = metaFiles.get(0); @@ -169,7 +169,7 @@ public class TestListCorruptFileBlocks { File data_dir = MiniDFSCluster.getFinalizedDir(storageDir, cluster.getNamesystem().getBlockPoolId()); assertTrue("data directory does not exist", data_dir.exists()); - List<File> metaFiles = MiniDFSCluster.getAllBlockMetadataFiles(data_dir); + List<File> metaFiles = MiniDFSCluster.getAllBlockFiles(data_dir); assertTrue("Data directory does not contain any blocks or there was an " + "IO error", metaFiles != null && !metaFiles.isEmpty()); File metaFile = metaFiles.get(0); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org