Author: jing9 Date: Sat Jan 18 00:48:08 2014 New Revision: 1559298 URL: http://svn.apache.org/r1559298 Log: HDFS-5783. Compute the digest before loading FSImage. Contributed by Haohui Mai.
Modified: hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-5698.txt hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java Modified: hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-5698.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-5698.txt?rev=1559298&r1=1559297&r2=1559298&view=diff ============================================================================== --- hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-5698.txt (original) +++ hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-5698.txt Sat Jan 18 00:48:08 2014 @@ -5,3 +5,5 @@ HDFS-5698 subtasks HDFS-5738. Serialize INode information in protobuf. (Haohui Mai via jing9) HDFS-5772. Serialize under-construction file information in FSImage. (jing9) + + HDFS-5783. Compute the digest before loading FSImage. (Haohui Mai via jing9) Modified: hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java?rev=1559298&r1=1559297&r2=1559298&view=diff ============================================================================== --- hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java (original) +++ hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java Sat Jan 18 00:48:08 2014 @@ -209,19 +209,16 @@ final class FSImageFormatPBINode { final static class Saver { private final FSNamesystem fsn; - private final FileSummary.Builder headers; - private final OutputStream out; + private final FileSummary.Builder summary; private final FSImageFormatProtobuf.Saver parent; - Saver(FSImageFormatProtobuf.Saver parent, OutputStream out, - FileSummary.Builder headers) { + Saver(FSImageFormatProtobuf.Saver parent, FileSummary.Builder summary) { this.parent = parent; - this.out = out; - this.headers = headers; + this.summary = summary; this.fsn = parent.context.getSourceNamesystem(); } - void serializeINodeDirectorySection() throws IOException { + void serializeINodeDirectorySection(OutputStream out) throws IOException { for (INodeWithAdditionalFields n : fsn.dir.getINodeMap().getMap()) { if (!n.isDirectory()) continue; @@ -238,24 +235,25 @@ final class FSImageFormatPBINode { e.writeDelimitedTo(out); } } - parent.commitSection(headers, + parent.commitSection(summary, FSImageFormatProtobuf.SectionName.INODE_DIR); } - void serializeINodeSection() throws IOException { + void serializeINodeSection(OutputStream out) throws IOException { INodeMap inodesMap = fsn.dir.getINodeMap(); + INodeSection.Builder b = INodeSection.newBuilder() .setLastInodeId(fsn.getLastInodeId()).setNumInodes(inodesMap.size()); INodeSection s = b.build(); s.writeDelimitedTo(out); for (INodeWithAdditionalFields n : inodesMap.getMap()) { - save(n); + save(out, n); } - parent.commitSection(headers, FSImageFormatProtobuf.SectionName.INODE); + parent.commitSection(summary, FSImageFormatProtobuf.SectionName.INODE); } - void serializeFilesUCSection() throws IOException { + void serializeFilesUCSection(OutputStream out) throws IOException { Map<String, INodeFile> ucMap = fsn.getFilesUnderConstruction(); for (Map.Entry<String, INodeFile> entry : ucMap.entrySet()) { String path = entry.getKey(); @@ -265,7 +263,7 @@ final class FSImageFormatPBINode { FileUnderConstructionEntry e = b.build(); e.writeDelimitedTo(out); } - parent.commitSection(headers, + parent.commitSection(summary, FSImageFormatProtobuf.SectionName.FILES_UNDERCONSTRUCTION); } @@ -274,15 +272,15 @@ final class FSImageFormatPBINode { .setGroup(n.getGroupName()).setPermission(n.getFsPermissionShort()); } - private void save(INode n) throws IOException { + private void save(OutputStream out, INode n) throws IOException { if (n.isDirectory()) { - save(n.asDirectory()); + save(out, n.asDirectory()); } else if (n.isFile()) { - save(n.asFile()); + save(out, n.asFile()); } } - private void save(INodeDirectory n) throws IOException { + private void save(OutputStream out, INodeDirectory n) throws IOException { Quota.Counts quota = n.getQuotaCounts(); INodeSection.INodeDirectory.Builder b = INodeSection.INodeDirectory .newBuilder().setModificationTime(n.getModificationTime()) @@ -296,7 +294,7 @@ final class FSImageFormatPBINode { r.writeDelimitedTo(out); } - private void save(INodeFile n) throws IOException { + private void save(OutputStream out, INodeFile n) throws IOException { INodeSection.INodeFile.Builder b = INodeSection.INodeFile.newBuilder() .setAccessTime(n.getAccessTime()) .setModificationTime(n.getModificationTime()) Modified: hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java?rev=1559298&r1=1559297&r2=1559298&view=diff ============================================================================== --- hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java (original) +++ hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java Sat Jan 18 00:48:08 2014 @@ -21,7 +21,6 @@ package org.apache.hadoop.hdfs.server.na import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; @@ -31,7 +30,6 @@ import java.io.OutputStream; import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; -import java.security.DigestInputStream; import java.security.DigestOutputStream; import java.security.MessageDigest; import java.util.Arrays; @@ -43,8 +41,10 @@ import org.apache.hadoop.hdfs.protocol.L import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature; import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FileSummary; import org.apache.hadoop.hdfs.server.namenode.FsImageProto.NameSystemSection; +import org.apache.hadoop.hdfs.util.MD5FileUtils; import org.apache.hadoop.io.MD5Hash; import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressorStream; import com.google.common.io.LimitInputStream; import com.google.protobuf.CodedOutputStream; @@ -84,6 +84,7 @@ final class FSImageFormatProtobuf { } void load(File file) throws IOException { + imgDigest = MD5FileUtils.computeMd5ForFile(file); RandomAccessFile raFile = new RandomAccessFile(file, "r"); FileInputStream fin = new FileInputStream(file); try { @@ -139,7 +140,6 @@ final class FSImageFormatProtobuf { } FileSummary summary = loadSummary(raFile); - MessageDigest digester = MD5Hash.getDigester(); FileChannel channel = fin.getChannel(); FSImageFormatPBINode.Loader inodeLoader = new FSImageFormatPBINode.Loader( @@ -147,8 +147,8 @@ final class FSImageFormatProtobuf { for (FileSummary.Section s : summary.getSectionsList()) { channel.position(s.getOffset()); - InputStream in = new DigestInputStream(new BufferedInputStream( - new LimitInputStream(fin, s.getLength())), digester); + InputStream in = new BufferedInputStream(new LimitInputStream(fin, + s.getLength())); if (summary.hasCodec()) { // read compression related info @@ -179,10 +179,6 @@ final class FSImageFormatProtobuf { break; } } - - updateDigestForFileSummary(summary, digester); - - imgDigest = new MD5Hash(digester.digest()); } private void loadNameSystemSection(InputStream in, FileSummary.Section sections) @@ -204,6 +200,8 @@ final class FSImageFormatProtobuf { private FileChannel fileChannel; // OutputStream for the section data private OutputStream sectionOutputStream; + private CompressionCodec codec; + private OutputStream underlyingOutputStream; Saver(SaveNamespaceContext context) { this.context = context; @@ -216,15 +214,29 @@ final class FSImageFormatProtobuf { void commitSection(FileSummary.Builder summary, SectionName name) throws IOException { long oldOffset = currentOffset; - sectionOutputStream.flush(); + flushSectionOutputStream(); + + if (codec != null) { + sectionOutputStream = codec.createOutputStream(underlyingOutputStream); + } else { + sectionOutputStream = underlyingOutputStream; + } long length = fileChannel.position() - oldOffset; summary.addSections(FileSummary.Section.newBuilder().setName(name.name) .setLength(length).setOffset(currentOffset)); currentOffset += length; } + private void flushSectionOutputStream() throws IOException { + if (codec != null) { + ((CompressorStream) sectionOutputStream).finish(); + } + sectionOutputStream.flush(); + } + void save(File file, FSImageCompression compression) throws IOException { FileOutputStream fout = new FileOutputStream(file); + fileChannel = fout.getChannel(); try { saveInternal(fout, compression); } finally { @@ -232,60 +244,60 @@ final class FSImageFormatProtobuf { } } - private void saveFileSummary(FileOutputStream fout, FileSummary summary) + private static void saveFileSummary(OutputStream out, FileSummary summary) throws IOException { - summary.writeDelimitedTo(fout); + summary.writeDelimitedTo(out); int length = getOndiskTrunkSize(summary); byte[] lengthBytes = new byte[4]; ByteBuffer.wrap(lengthBytes).asIntBuffer().put(length); - fout.write(lengthBytes); + out.write(lengthBytes); } - private void saveInodes(OutputStream out, FileSummary.Builder summary) - throws IOException { + private void saveInodes(FileSummary.Builder summary) throws IOException { FSImageFormatPBINode.Saver saver = new FSImageFormatPBINode.Saver(this, - out, summary); - saver.serializeINodeSection(); - saver.serializeINodeDirectorySection(); - saver.serializeFilesUCSection(); + summary); + saver.serializeINodeSection(sectionOutputStream); + saver.serializeINodeDirectorySection(sectionOutputStream); + saver.serializeFilesUCSection(sectionOutputStream); } private void saveInternal(FileOutputStream fout, FSImageCompression compression) throws IOException { - fout.write(MAGIC_HEADER); - fileChannel = fout.getChannel(); - MessageDigest digester = MD5Hash.getDigester(); - OutputStream out = new DigestOutputStream(new BufferedOutputStream(fout), - digester); + underlyingOutputStream = new DigestOutputStream(new BufferedOutputStream( + fout), digester); + underlyingOutputStream.write(MAGIC_HEADER); + + fileChannel = fout.getChannel(); FileSummary.Builder b = FileSummary.newBuilder() .setOndiskVersion(FILE_VERSION) .setLayoutVersion(LayoutVersion.getCurrentLayoutVersion()); - CompressionCodec codec = compression.getImageCodec(); + codec = compression.getImageCodec(); if (codec != null) { b.setCodec(codec.getClass().getCanonicalName()); - sectionOutputStream = codec.createOutputStream(out); + sectionOutputStream = codec.createOutputStream(underlyingOutputStream); } else { - sectionOutputStream = out; + sectionOutputStream = underlyingOutputStream; } - saveNameSystemSection(sectionOutputStream, b); - saveInodes(sectionOutputStream, b); + saveNameSystemSection(b); + saveInodes(b); // Flush the buffered data into the file before appending the header - out.flush(); + flushSectionOutputStream(); FileSummary summary = b.build(); - saveFileSummary(fout, summary); - updateDigestForFileSummary(summary, digester); + saveFileSummary(underlyingOutputStream, summary); + underlyingOutputStream.close(); savedDigest = new MD5Hash(digester.digest()); } - private void saveNameSystemSection(OutputStream out, + private void saveNameSystemSection( FileSummary.Builder summary) throws IOException { final FSNamesystem fsn = context.getSourceNamesystem(); + OutputStream out = sectionOutputStream; NameSystemSection.Builder b = NameSystemSection.newBuilder() .setGenstampV1(fsn.getGenerationStampV1()) .setGenstampV1Limit(fsn.getGenerationStampV1Limit()) @@ -335,19 +347,6 @@ final class FSImageFormatProtobuf { + s.getSerializedSize(); } - /** - * Include the FileSummary when calculating the digest. This is required as the - * code does not access the FSImage strictly in sequential order. - */ - private static void updateDigestForFileSummary(FileSummary summary, - MessageDigest digester) throws IOException { - ByteArrayOutputStream o = new ByteArrayOutputStream(); - o.write(MAGIC_HEADER); - summary.writeDelimitedTo(o); - digester.update(o.toByteArray()); - } - private FSImageFormatProtobuf() { } - } Modified: hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java?rev=1559298&r1=1559297&r2=1559298&view=diff ============================================================================== --- hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java (original) +++ hadoop/common/branches/HDFS-5698/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java Sat Jan 18 00:48:08 2014 @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.na import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import java.io.File; import java.io.IOException; import java.util.EnumSet; @@ -28,6 +29,7 @@ import junit.framework.Assert; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSOutputStream; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; @@ -36,70 +38,101 @@ import org.apache.hadoop.hdfs.protocol.H import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease; -import org.junit.After; -import org.junit.Before; +import org.apache.hadoop.hdfs.util.MD5FileUtils; import org.junit.Test; public class TestFSImage { - private MiniDFSCluster cluster; - private Configuration conf; - private DistributedFileSystem fs; - private FSNamesystem fsn; - - @Before - public void setUp() throws IOException { - conf = new Configuration(); - cluster = new MiniDFSCluster.Builder(conf).build(); - cluster.waitActive(); - fsn = cluster.getNamesystem(); - fs = cluster.getFileSystem(); + @Test + public void testPersist() throws IOException { + Configuration conf = new Configuration(); + testPersistHelper(conf); } - @After - public void tearDown() throws IOException { - if (cluster != null) { - cluster.shutdown(); + @Test + public void testCompression() throws IOException { + Configuration conf = new Configuration(); + conf.setBoolean(DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY, true); + conf.set(DFSConfigKeys.DFS_IMAGE_COMPRESSION_CODEC_KEY, + "org.apache.hadoop.io.compress.GzipCodec"); + testPersistHelper(conf); + } + + private void testPersistHelper(Configuration conf) throws IOException { + MiniDFSCluster cluster = null; + try { + cluster = new MiniDFSCluster.Builder(conf).build(); + cluster.waitActive(); + FSNamesystem fsn = cluster.getNamesystem(); + DistributedFileSystem fs = cluster.getFileSystem(); + + final Path dir = new Path("/abc/def"); + final Path file1 = new Path(dir, "f1"); + final Path file2 = new Path(dir, "f2"); + + // create an empty file f1 + fs.create(file1).close(); + + // create an under-construction file f2 + FSDataOutputStream out = fs.create(file2); + out.writeBytes("hello"); + ((DFSOutputStream) out.getWrappedStream()).hsync(EnumSet + .of(SyncFlag.UPDATE_LENGTH)); + + // checkpoint + fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER); + fs.saveNamespace(); + fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE); + + cluster.restartNameNode(); + cluster.waitActive(); + fs = cluster.getFileSystem(); + + assertTrue(fs.isDirectory(dir)); + assertTrue(fs.exists(file1)); + assertTrue(fs.exists(file2)); + + // check internals of file2 + INodeFile file2Node = fsn.dir.getINode4Write(file2.toString()).asFile(); + assertEquals("hello".length(), file2Node.computeFileSize()); + assertTrue(file2Node.isUnderConstruction()); + BlockInfo[] blks = file2Node.getBlocks(); + assertEquals(1, blks.length); + assertEquals(BlockUCState.UNDER_CONSTRUCTION, blks[0].getBlockUCState()); + // check lease manager + Lease lease = fsn.leaseManager.getLeaseByPath(file2.toString()); + Assert.assertNotNull(lease); + } finally { + if (cluster != null) { + cluster.shutdown(); + } } } + /** + * Ensure that the digest written by the saver equals to the digest of the + * file. + */ @Test - public void testINode() throws IOException { - final Path dir = new Path("/abc/def"); - final Path file1 = new Path(dir, "f1"); - final Path file2 = new Path(dir, "f2"); - - // create an empty file f1 - fs.create(file1).close(); - - // create an under-construction file f2 - FSDataOutputStream out = fs.create(file2); - out.writeBytes("hello"); - ((DFSOutputStream) out.getWrappedStream()).hsync(EnumSet - .of(SyncFlag.UPDATE_LENGTH)); - - // checkpoint - fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER); - fs.saveNamespace(); - fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE); - - cluster.restartNameNode(); - cluster.waitActive(); - fs = cluster.getFileSystem(); - - assertTrue(fs.isDirectory(dir)); - assertTrue(fs.exists(file1)); - assertTrue(fs.exists(file2)); - - // check internals of file2 - INodeFile file2Node = fsn.dir.getINode4Write(file2.toString()).asFile(); - assertEquals("hello".length(), file2Node.computeFileSize()); - assertTrue(file2Node.isUnderConstruction()); - BlockInfo[] blks = file2Node.getBlocks(); - assertEquals(1, blks.length); - assertEquals(BlockUCState.UNDER_CONSTRUCTION, blks[0].getBlockUCState()); - // check lease manager - Lease lease = fsn.leaseManager.getLeaseByPath(file2.toString()); - Assert.assertNotNull(lease); + public void testDigest() throws IOException { + Configuration conf = new Configuration(); + MiniDFSCluster cluster = null; + try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build(); + DistributedFileSystem fs = cluster.getFileSystem(); + fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER); + fs.saveNamespace(); + fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE); + File currentDir = FSImageTestUtil.getNameNodeCurrentDirs(cluster, 0).get( + 0); + File fsimage = FSImageTestUtil.findNewestImageFile(currentDir + .getAbsolutePath()); + assertEquals(MD5FileUtils.readStoredMd5ForFile(fsimage), + MD5FileUtils.computeMd5ForFile(fsimage)); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } } }