Repository: hadoop Updated Branches: refs/heads/trunk 10f0f7851 -> f308561f1
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocal.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocal.java new file mode 100644 index 0000000..bd0c7c3 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocal.java @@ -0,0 +1,786 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.client.impl; + +import static org.hamcrest.CoreMatchers.equalTo; + +import java.io.EOFException; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.util.UUID; +import java.util.concurrent.TimeoutException; + +import org.apache.hadoop.fs.ChecksumException; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.ClientContext; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSInputStream; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.ExtendedBlockId; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.client.HdfsDataInputStream; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; +import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache; +import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica; +import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm; +import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId; +import org.apache.hadoop.fs.FsTracer; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.net.unix.DomainSocket; +import org.apache.hadoop.net.unix.TemporarySocketDirectory; +import org.apache.hadoop.util.Time; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestBlockReaderLocal { + private static TemporarySocketDirectory sockDir; + + @BeforeClass + public static void init() { + sockDir = new TemporarySocketDirectory(); + DomainSocket.disableBindPathValidation(); + } + + @AfterClass + public static void shutdown() throws IOException { + sockDir.close(); + } + + public static void assertArrayRegionsEqual(byte []buf1, int off1, byte []buf2, + int off2, int len) { + for (int i = 0; i < len; i++) { + if (buf1[off1 + i] != buf2[off2 + i]) { + Assert.fail("arrays differ at byte " + i + ". " + + "The first array has " + (int)buf1[off1 + i] + + ", but the second array has " + (int)buf2[off2 + i]); + } + } + } + + /** + * Similar to IOUtils#readFully(). Reads bytes in a loop. + * + * @param reader The BlockReaderLocal to read bytes from + * @param buf The ByteBuffer to read into + * @param off The offset in the buffer to read into + * @param len The number of bytes to read. + * + * @throws IOException If it could not read the requested number of bytes + */ + private static void readFully(BlockReaderLocal reader, + ByteBuffer buf, int off, int len) throws IOException { + int amt = len; + while (amt > 0) { + buf.limit(off + len); + buf.position(off); + long ret = reader.read(buf); + if (ret < 0) { + throw new EOFException( "Premature EOF from BlockReaderLocal " + + "after reading " + (len - amt) + " byte(s)."); + } + amt -= ret; + off += ret; + } + } + + private static class BlockReaderLocalTest { + final static int TEST_LENGTH = 12345; + final static int BYTES_PER_CHECKSUM = 512; + + public void setConfiguration(HdfsConfiguration conf) { + // default: no-op + } + public void setup(File blockFile, boolean usingChecksums) + throws IOException { + // default: no-op + } + public void doTest(BlockReaderLocal reader, byte original[]) + throws IOException { + // default: no-op + } + } + + public void runBlockReaderLocalTest(BlockReaderLocalTest test, + boolean checksum, long readahead) throws IOException { + Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null)); + MiniDFSCluster cluster = null; + HdfsConfiguration conf = new HdfsConfiguration(); + conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY, + !checksum); + conf.setLong(HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, + BlockReaderLocalTest.BYTES_PER_CHECKSUM); + conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "CRC32C"); + conf.setLong(HdfsClientConfigKeys.DFS_CLIENT_CACHE_READAHEAD, readahead); + test.setConfiguration(conf); + FileInputStream dataIn = null, metaIn = null; + final Path TEST_PATH = new Path("/a"); + final long RANDOM_SEED = 4567L; + BlockReaderLocal blockReaderLocal = null; + FSDataInputStream fsIn = null; + byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH]; + + FileSystem fs = null; + ShortCircuitShm shm = null; + RandomAccessFile raf = null; + try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + cluster.waitActive(); + fs = cluster.getFileSystem(); + DFSTestUtil.createFile(fs, TEST_PATH, + BlockReaderLocalTest.TEST_LENGTH, (short)1, RANDOM_SEED); + try { + DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1); + } catch (InterruptedException e) { + Assert.fail("unexpected InterruptedException during " + + "waitReplication: " + e); + } catch (TimeoutException e) { + Assert.fail("unexpected TimeoutException during " + + "waitReplication: " + e); + } + fsIn = fs.open(TEST_PATH); + IOUtils.readFully(fsIn, original, 0, + BlockReaderLocalTest.TEST_LENGTH); + fsIn.close(); + fsIn = null; + ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, TEST_PATH); + File dataFile = cluster.getBlockFile(0, block); + File metaFile = cluster.getBlockMetadataFile(0, block); + + ShortCircuitCache shortCircuitCache = + ClientContext.getFromConf(conf).getShortCircuitCache(); + cluster.shutdown(); + cluster = null; + test.setup(dataFile, checksum); + FileInputStream streams[] = { + new FileInputStream(dataFile), + new FileInputStream(metaFile) + }; + dataIn = streams[0]; + metaIn = streams[1]; + ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(), + block.getBlockPoolId()); + raf = new RandomAccessFile( + new File(sockDir.getDir().getAbsolutePath(), + UUID.randomUUID().toString()), "rw"); + raf.setLength(8192); + FileInputStream shmStream = new FileInputStream(raf.getFD()); + shm = new ShortCircuitShm(ShmId.createRandom(), shmStream); + ShortCircuitReplica replica = + new ShortCircuitReplica(key, dataIn, metaIn, shortCircuitCache, + Time.now(), shm.allocAndRegisterSlot( + ExtendedBlockId.fromExtendedBlock(block))); + blockReaderLocal = new BlockReaderLocal.Builder( + new DfsClientConf.ShortCircuitConf(conf)). + setFilename(TEST_PATH.getName()). + setBlock(block). + setShortCircuitReplica(replica). + setCachingStrategy(new CachingStrategy(false, readahead)). + setVerifyChecksum(checksum). + setTracer(FsTracer.get(conf)). + build(); + dataIn = null; + metaIn = null; + test.doTest(blockReaderLocal, original); + // BlockReaderLocal should not alter the file position. + Assert.assertEquals(0, streams[0].getChannel().position()); + Assert.assertEquals(0, streams[1].getChannel().position()); + } finally { + if (fsIn != null) fsIn.close(); + if (fs != null) fs.close(); + if (cluster != null) cluster.shutdown(); + if (dataIn != null) dataIn.close(); + if (metaIn != null) metaIn.close(); + if (blockReaderLocal != null) blockReaderLocal.close(); + if (shm != null) shm.free(); + if (raf != null) raf.close(); + } + } + + private static class TestBlockReaderLocalImmediateClose + extends BlockReaderLocalTest { + } + + @Test + public void testBlockReaderLocalImmediateClose() throws IOException { + runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), true, 0); + runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), false, 0); + } + + private static class TestBlockReaderSimpleReads + extends BlockReaderLocalTest { + @Override + public void doTest(BlockReaderLocal reader, byte original[]) + throws IOException { + byte buf[] = new byte[TEST_LENGTH]; + reader.readFully(buf, 0, 512); + assertArrayRegionsEqual(original, 0, buf, 0, 512); + reader.readFully(buf, 512, 512); + assertArrayRegionsEqual(original, 512, buf, 512, 512); + reader.readFully(buf, 1024, 513); + assertArrayRegionsEqual(original, 1024, buf, 1024, 513); + reader.readFully(buf, 1537, 514); + assertArrayRegionsEqual(original, 1537, buf, 1537, 514); + // Readahead is always at least the size of one chunk in this test. + Assert.assertTrue(reader.getMaxReadaheadLength() >= + BlockReaderLocalTest.BYTES_PER_CHECKSUM); + } + } + + @Test + public void testBlockReaderSimpleReads() throws IOException { + runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), true, + HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); + } + + @Test + public void testBlockReaderSimpleReadsShortReadahead() throws IOException { + runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), true, + BlockReaderLocalTest.BYTES_PER_CHECKSUM - 1); + } + + @Test + public void testBlockReaderSimpleReadsNoChecksum() throws IOException { + runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), false, + HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); + } + + @Test + public void testBlockReaderSimpleReadsNoReadahead() throws IOException { + runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), true, 0); + } + + @Test + public void testBlockReaderSimpleReadsNoChecksumNoReadahead() + throws IOException { + runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), false, 0); + } + + private static class TestBlockReaderLocalArrayReads2 + extends BlockReaderLocalTest { + @Override + public void doTest(BlockReaderLocal reader, byte original[]) + throws IOException { + byte buf[] = new byte[TEST_LENGTH]; + reader.readFully(buf, 0, 10); + assertArrayRegionsEqual(original, 0, buf, 0, 10); + reader.readFully(buf, 10, 100); + assertArrayRegionsEqual(original, 10, buf, 10, 100); + reader.readFully(buf, 110, 700); + assertArrayRegionsEqual(original, 110, buf, 110, 700); + reader.readFully(buf, 810, 1); // from offset 810 to offset 811 + reader.readFully(buf, 811, 5); + assertArrayRegionsEqual(original, 811, buf, 811, 5); + reader.readFully(buf, 816, 900); // skip from offset 816 to offset 1716 + reader.readFully(buf, 1716, 5); + assertArrayRegionsEqual(original, 1716, buf, 1716, 5); + } + } + + @Test + public void testBlockReaderLocalArrayReads2() throws IOException { + runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(), + true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); + } + + @Test + public void testBlockReaderLocalArrayReads2NoChecksum() + throws IOException { + runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(), + false, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); + } + + @Test + public void testBlockReaderLocalArrayReads2NoReadahead() + throws IOException { + runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(), true, 0); + } + + @Test + public void testBlockReaderLocalArrayReads2NoChecksumNoReadahead() + throws IOException { + runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(), false, 0); + } + + private static class TestBlockReaderLocalByteBufferReads + extends BlockReaderLocalTest { + @Override + public void doTest(BlockReaderLocal reader, byte original[]) + throws IOException { + ByteBuffer buf = ByteBuffer.wrap(new byte[TEST_LENGTH]); + readFully(reader, buf, 0, 10); + assertArrayRegionsEqual(original, 0, buf.array(), 0, 10); + readFully(reader, buf, 10, 100); + assertArrayRegionsEqual(original, 10, buf.array(), 10, 100); + readFully(reader, buf, 110, 700); + assertArrayRegionsEqual(original, 110, buf.array(), 110, 700); + reader.skip(1); // skip from offset 810 to offset 811 + readFully(reader, buf, 811, 5); + assertArrayRegionsEqual(original, 811, buf.array(), 811, 5); + } + } + + @Test + public void testBlockReaderLocalByteBufferReads() + throws IOException { + runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferReads(), + true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); + } + + @Test + public void testBlockReaderLocalByteBufferReadsNoChecksum() + throws IOException { + runBlockReaderLocalTest( + new TestBlockReaderLocalByteBufferReads(), + false, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); + } + + @Test + public void testBlockReaderLocalByteBufferReadsNoReadahead() + throws IOException { + runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferReads(), + true, 0); + } + + @Test + public void testBlockReaderLocalByteBufferReadsNoChecksumNoReadahead() + throws IOException { + runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferReads(), + false, 0); + } + + /** + * Test reads that bypass the bounce buffer (because they are aligned + * and bigger than the readahead). + */ + private static class TestBlockReaderLocalByteBufferFastLaneReads + extends BlockReaderLocalTest { + @Override + public void doTest(BlockReaderLocal reader, byte original[]) + throws IOException { + ByteBuffer buf = ByteBuffer.allocateDirect(TEST_LENGTH); + readFully(reader, buf, 0, 5120); + buf.flip(); + assertArrayRegionsEqual(original, 0, + DFSTestUtil.asArray(buf), 0, + 5120); + reader.skip(1537); + readFully(reader, buf, 0, 1); + buf.flip(); + assertArrayRegionsEqual(original, 6657, + DFSTestUtil.asArray(buf), 0, + 1); + reader.forceAnchorable(); + readFully(reader, buf, 0, 5120); + buf.flip(); + assertArrayRegionsEqual(original, 6658, + DFSTestUtil.asArray(buf), 0, + 5120); + reader.forceUnanchorable(); + readFully(reader, buf, 0, 513); + buf.flip(); + assertArrayRegionsEqual(original, 11778, + DFSTestUtil.asArray(buf), 0, + 513); + reader.skip(3); + readFully(reader, buf, 0, 50); + buf.flip(); + assertArrayRegionsEqual(original, 12294, + DFSTestUtil.asArray(buf), 0, + 50); + } + } + + @Test + public void testBlockReaderLocalByteBufferFastLaneReads() + throws IOException { + runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferFastLaneReads(), + true, 2 * BlockReaderLocalTest.BYTES_PER_CHECKSUM); + } + + @Test + public void testBlockReaderLocalByteBufferFastLaneReadsNoChecksum() + throws IOException { + runBlockReaderLocalTest( + new TestBlockReaderLocalByteBufferFastLaneReads(), + false, 2 * BlockReaderLocalTest.BYTES_PER_CHECKSUM); + } + + @Test + public void testBlockReaderLocalByteBufferFastLaneReadsNoReadahead() + throws IOException { + runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferFastLaneReads(), + true, 0); + } + + @Test + public void testBlockReaderLocalByteBufferFastLaneReadsNoChecksumNoReadahead() + throws IOException { + runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferFastLaneReads(), + false, 0); + } + + private static class TestBlockReaderLocalReadCorruptStart + extends BlockReaderLocalTest { + boolean usingChecksums = false; + @Override + public void setup(File blockFile, boolean usingChecksums) + throws IOException { + RandomAccessFile bf = null; + this.usingChecksums = usingChecksums; + try { + bf = new RandomAccessFile(blockFile, "rw"); + bf.write(new byte[] {0,0,0,0,0,0,0,0,0,0,0,0,0,0}); + } finally { + if (bf != null) bf.close(); + } + } + + public void doTest(BlockReaderLocal reader, byte original[]) + throws IOException { + byte buf[] = new byte[TEST_LENGTH]; + if (usingChecksums) { + try { + reader.readFully(buf, 0, 10); + Assert.fail("did not detect corruption"); + } catch (IOException e) { + // expected + } + } else { + reader.readFully(buf, 0, 10); + } + } + } + + @Test + public void testBlockReaderLocalReadCorruptStart() + throws IOException { + runBlockReaderLocalTest(new TestBlockReaderLocalReadCorruptStart(), true, + HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); + } + + private static class TestBlockReaderLocalReadCorrupt + extends BlockReaderLocalTest { + boolean usingChecksums = false; + @Override + public void setup(File blockFile, boolean usingChecksums) + throws IOException { + RandomAccessFile bf = null; + this.usingChecksums = usingChecksums; + try { + bf = new RandomAccessFile(blockFile, "rw"); + bf.seek(1539); + bf.write(new byte[] {0,0,0,0,0,0,0,0,0,0,0,0,0,0}); + } finally { + if (bf != null) bf.close(); + } + } + + public void doTest(BlockReaderLocal reader, byte original[]) + throws IOException { + byte buf[] = new byte[TEST_LENGTH]; + try { + reader.readFully(buf, 0, 10); + assertArrayRegionsEqual(original, 0, buf, 0, 10); + reader.readFully(buf, 10, 100); + assertArrayRegionsEqual(original, 10, buf, 10, 100); + reader.readFully(buf, 110, 700); + assertArrayRegionsEqual(original, 110, buf, 110, 700); + reader.skip(1); // skip from offset 810 to offset 811 + reader.readFully(buf, 811, 5); + assertArrayRegionsEqual(original, 811, buf, 811, 5); + reader.readFully(buf, 816, 900); + if (usingChecksums) { + // We should detect the corruption when using a checksum file. + Assert.fail("did not detect corruption"); + } + } catch (ChecksumException e) { + if (!usingChecksums) { + Assert.fail("didn't expect to get ChecksumException: not " + + "using checksums."); + } + } + } + } + + @Test + public void testBlockReaderLocalReadCorrupt() + throws IOException { + runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), true, + HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); + } + + @Test + public void testBlockReaderLocalReadCorruptNoChecksum() + throws IOException { + runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), false, + HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); + } + + @Test + public void testBlockReaderLocalReadCorruptNoReadahead() + throws IOException { + runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), true, 0); + } + + @Test + public void testBlockReaderLocalReadCorruptNoChecksumNoReadahead() + throws IOException { + runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), false, 0); + } + + private static class TestBlockReaderLocalWithMlockChanges + extends BlockReaderLocalTest { + @Override + public void setup(File blockFile, boolean usingChecksums) + throws IOException { + } + + @Override + public void doTest(BlockReaderLocal reader, byte original[]) + throws IOException { + ByteBuffer buf = ByteBuffer.wrap(new byte[TEST_LENGTH]); + reader.skip(1); + readFully(reader, buf, 1, 9); + assertArrayRegionsEqual(original, 1, buf.array(), 1, 9); + readFully(reader, buf, 10, 100); + assertArrayRegionsEqual(original, 10, buf.array(), 10, 100); + reader.forceAnchorable(); + readFully(reader, buf, 110, 700); + assertArrayRegionsEqual(original, 110, buf.array(), 110, 700); + reader.forceUnanchorable(); + reader.skip(1); // skip from offset 810 to offset 811 + readFully(reader, buf, 811, 5); + assertArrayRegionsEqual(original, 811, buf.array(), 811, 5); + } + } + + @Test + public void testBlockReaderLocalWithMlockChanges() + throws IOException { + runBlockReaderLocalTest(new TestBlockReaderLocalWithMlockChanges(), + true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); + } + + @Test + public void testBlockReaderLocalWithMlockChangesNoChecksum() + throws IOException { + runBlockReaderLocalTest(new TestBlockReaderLocalWithMlockChanges(), + false, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); + } + + @Test + public void testBlockReaderLocalWithMlockChangesNoReadahead() + throws IOException { + runBlockReaderLocalTest(new TestBlockReaderLocalWithMlockChanges(), + true, 0); + } + + @Test + public void testBlockReaderLocalWithMlockChangesNoChecksumNoReadahead() + throws IOException { + runBlockReaderLocalTest(new TestBlockReaderLocalWithMlockChanges(), + false, 0); + } + + private static class TestBlockReaderLocalOnFileWithoutChecksum + extends BlockReaderLocalTest { + @Override + public void setConfiguration(HdfsConfiguration conf) { + conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "NULL"); + } + + @Override + public void doTest(BlockReaderLocal reader, byte original[]) + throws IOException { + Assert.assertTrue(!reader.getVerifyChecksum()); + ByteBuffer buf = ByteBuffer.wrap(new byte[TEST_LENGTH]); + reader.skip(1); + readFully(reader, buf, 1, 9); + assertArrayRegionsEqual(original, 1, buf.array(), 1, 9); + readFully(reader, buf, 10, 100); + assertArrayRegionsEqual(original, 10, buf.array(), 10, 100); + reader.forceAnchorable(); + readFully(reader, buf, 110, 700); + assertArrayRegionsEqual(original, 110, buf.array(), 110, 700); + reader.forceUnanchorable(); + reader.skip(1); // skip from offset 810 to offset 811 + readFully(reader, buf, 811, 5); + assertArrayRegionsEqual(original, 811, buf.array(), 811, 5); + } + } + + private static class TestBlockReaderLocalReadZeroBytes + extends BlockReaderLocalTest { + @Override + public void doTest(BlockReaderLocal reader, byte original[]) + throws IOException { + byte emptyArr[] = new byte[0]; + Assert.assertEquals(0, reader.read(emptyArr, 0, 0)); + ByteBuffer emptyBuf = ByteBuffer.wrap(emptyArr); + Assert.assertEquals(0, reader.read(emptyBuf)); + reader.skip(1); + Assert.assertEquals(0, reader.read(emptyArr, 0, 0)); + Assert.assertEquals(0, reader.read(emptyBuf)); + reader.skip(BlockReaderLocalTest.TEST_LENGTH - 1); + Assert.assertEquals(-1, reader.read(emptyArr, 0, 0)); + Assert.assertEquals(-1, reader.read(emptyBuf)); + } + } + + @Test + public void testBlockReaderLocalOnFileWithoutChecksum() + throws IOException { + runBlockReaderLocalTest(new TestBlockReaderLocalOnFileWithoutChecksum(), + true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); + } + + @Test + public void testBlockReaderLocalOnFileWithoutChecksumNoChecksum() + throws IOException { + runBlockReaderLocalTest(new TestBlockReaderLocalOnFileWithoutChecksum(), + false, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); + } + + @Test + public void testBlockReaderLocalOnFileWithoutChecksumNoReadahead() + throws IOException { + runBlockReaderLocalTest(new TestBlockReaderLocalOnFileWithoutChecksum(), + true, 0); + } + + @Test + public void testBlockReaderLocalOnFileWithoutChecksumNoChecksumNoReadahead() + throws IOException { + runBlockReaderLocalTest(new TestBlockReaderLocalOnFileWithoutChecksum(), + false, 0); + } + + @Test + public void testBlockReaderLocalReadZeroBytes() + throws IOException { + runBlockReaderLocalTest(new TestBlockReaderLocalReadZeroBytes(), + true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); + } + + @Test + public void testBlockReaderLocalReadZeroBytesNoChecksum() + throws IOException { + runBlockReaderLocalTest(new TestBlockReaderLocalReadZeroBytes(), + false, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); + } + + @Test + public void testBlockReaderLocalReadZeroBytesNoReadahead() + throws IOException { + runBlockReaderLocalTest(new TestBlockReaderLocalReadZeroBytes(), + true, 0); + } + + @Test + public void testBlockReaderLocalReadZeroBytesNoChecksumNoReadahead() + throws IOException { + runBlockReaderLocalTest(new TestBlockReaderLocalReadZeroBytes(), + false, 0); + } + + + @Test(timeout=60000) + public void TestStatisticsForShortCircuitLocalRead() throws Exception { + testStatistics(true); + } + + @Test(timeout=60000) + public void TestStatisticsForLocalRead() throws Exception { + testStatistics(false); + } + + private void testStatistics(boolean isShortCircuit) throws Exception { + Assume.assumeTrue(DomainSocket.getLoadingFailureReason() == null); + HdfsConfiguration conf = new HdfsConfiguration(); + TemporarySocketDirectory sockDir = null; + if (isShortCircuit) { + DFSInputStream.tcpReadsDisabledForTesting = true; + sockDir = new TemporarySocketDirectory(); + conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, + new File(sockDir.getDir(), "TestStatisticsForLocalRead.%d.sock"). + getAbsolutePath()); + conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true); + DomainSocket.disableBindPathValidation(); + } else { + conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, false); + } + MiniDFSCluster cluster = null; + final Path TEST_PATH = new Path("/a"); + final long RANDOM_SEED = 4567L; + FSDataInputStream fsIn = null; + byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH]; + FileSystem fs = null; + try { + cluster = new MiniDFSCluster.Builder(conf). + hosts(new String[] {NetUtils.getLocalHostname()}).build(); + cluster.waitActive(); + fs = cluster.getFileSystem(); + DFSTestUtil.createFile(fs, TEST_PATH, + BlockReaderLocalTest.TEST_LENGTH, (short)1, RANDOM_SEED); + try { + DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1); + } catch (InterruptedException e) { + Assert.fail("unexpected InterruptedException during " + + "waitReplication: " + e); + } catch (TimeoutException e) { + Assert.fail("unexpected TimeoutException during " + + "waitReplication: " + e); + } + fsIn = fs.open(TEST_PATH); + IOUtils.readFully(fsIn, original, 0, + BlockReaderLocalTest.TEST_LENGTH); + HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn; + Assert.assertEquals(BlockReaderLocalTest.TEST_LENGTH, + dfsIn.getReadStatistics().getTotalBytesRead()); + Assert.assertEquals(BlockReaderLocalTest.TEST_LENGTH, + dfsIn.getReadStatistics().getTotalLocalBytesRead()); + if (isShortCircuit) { + Assert.assertEquals(BlockReaderLocalTest.TEST_LENGTH, + dfsIn.getReadStatistics().getTotalShortCircuitBytesRead()); + } else { + Assert.assertEquals(0, + dfsIn.getReadStatistics().getTotalShortCircuitBytesRead()); + } + fsIn.close(); + fsIn = null; + } finally { + DFSInputStream.tcpReadsDisabledForTesting = false; + if (fsIn != null) fsIn.close(); + if (fs != null) fs.close(); + if (cluster != null) cluster.shutdown(); + if (sockDir != null) sockDir.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocalLegacy.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocalLegacy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocalLegacy.java new file mode 100644 index 0000000..273619c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocalLegacy.java @@ -0,0 +1,227 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.client.impl; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; + +import org.apache.hadoop.fs.ChecksumException; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSInputStream; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DFSUtilClient; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; +import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.net.unix.DomainSocket; +import org.apache.hadoop.net.unix.TemporarySocketDirectory; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestBlockReaderLocalLegacy { + @BeforeClass + public static void setupCluster() throws IOException { + DFSInputStream.tcpReadsDisabledForTesting = true; + DomainSocket.disableBindPathValidation(); + } + + private static HdfsConfiguration getConfiguration( + TemporarySocketDirectory socketDir) throws IOException { + HdfsConfiguration conf = new HdfsConfiguration(); + if (socketDir == null) { + conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, ""); + } else { + conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, + new File(socketDir.getDir(), "TestBlockReaderLocalLegacy.%d.sock"). + getAbsolutePath()); + } + conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true); + conf.setBoolean(HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, true); + conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY, + false); + conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY, + UserGroupInformation.getCurrentUser().getShortUserName()); + conf.setBoolean(HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, false); + // Set short retry timeouts so this test runs faster + conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10); + return conf; + } + + /** + * Test that, in the case of an error, the position and limit of a ByteBuffer + * are left unchanged. This is not mandated by ByteBufferReadable, but clients + * of this class might immediately issue a retry on failure, so it's polite. + */ + @Test + public void testStablePositionAfterCorruptRead() throws Exception { + final short REPL_FACTOR = 1; + final long FILE_LENGTH = 512L; + + HdfsConfiguration conf = getConfiguration(null); + MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + cluster.waitActive(); + FileSystem fs = cluster.getFileSystem(); + + Path path = new Path("/corrupted"); + + DFSTestUtil.createFile(fs, path, FILE_LENGTH, REPL_FACTOR, 12345L); + DFSTestUtil.waitReplication(fs, path, REPL_FACTOR); + + ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, path); + int blockFilesCorrupted = cluster.corruptBlockOnDataNodes(block); + assertEquals("All replicas not corrupted", REPL_FACTOR, blockFilesCorrupted); + + FSDataInputStream dis = cluster.getFileSystem().open(path); + ByteBuffer buf = ByteBuffer.allocateDirect((int)FILE_LENGTH); + boolean sawException = false; + try { + dis.read(buf); + } catch (ChecksumException ex) { + sawException = true; + } + + assertTrue(sawException); + assertEquals(0, buf.position()); + assertEquals(buf.capacity(), buf.limit()); + + dis = cluster.getFileSystem().open(path); + buf.position(3); + buf.limit(25); + sawException = false; + try { + dis.read(buf); + } catch (ChecksumException ex) { + sawException = true; + } + + assertTrue(sawException); + assertEquals(3, buf.position()); + assertEquals(25, buf.limit()); + cluster.shutdown(); + } + + @Test + public void testBothOldAndNewShortCircuitConfigured() throws Exception { + final short REPL_FACTOR = 1; + final int FILE_LENGTH = 512; + Assume.assumeTrue(null == DomainSocket.getLoadingFailureReason()); + TemporarySocketDirectory socketDir = new TemporarySocketDirectory(); + HdfsConfiguration conf = getConfiguration(socketDir); + MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + cluster.waitActive(); + socketDir.close(); + FileSystem fs = cluster.getFileSystem(); + + Path path = new Path("/foo"); + byte orig[] = new byte[FILE_LENGTH]; + for (int i = 0; i < orig.length; i++) { + orig[i] = (byte)(i%10); + } + FSDataOutputStream fos = fs.create(path, (short)1); + fos.write(orig); + fos.close(); + DFSTestUtil.waitReplication(fs, path, REPL_FACTOR); + FSDataInputStream fis = cluster.getFileSystem().open(path); + byte buf[] = new byte[FILE_LENGTH]; + IOUtils.readFully(fis, buf, 0, FILE_LENGTH); + fis.close(); + Assert.assertArrayEquals(orig, buf); + Arrays.equals(orig, buf); + cluster.shutdown(); + } + + @Test(timeout=20000) + public void testBlockReaderLocalLegacyWithAppend() throws Exception { + final short REPL_FACTOR = 1; + final HdfsConfiguration conf = getConfiguration(null); + conf.setBoolean(HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, true); + + final MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + cluster.waitActive(); + + final DistributedFileSystem dfs = cluster.getFileSystem(); + final Path path = new Path("/testBlockReaderLocalLegacy"); + DFSTestUtil.createFile(dfs, path, 10, REPL_FACTOR, 0); + DFSTestUtil.waitReplication(dfs, path, REPL_FACTOR); + + final ClientDatanodeProtocol proxy; + final Token<BlockTokenIdentifier> token; + final ExtendedBlock originalBlock; + final long originalGS; + { + final LocatedBlock lb = cluster.getNameNode().getRpcServer() + .getBlockLocations(path.toString(), 0, 1).get(0); + proxy = DFSUtilClient.createClientDatanodeProtocolProxy( + lb.getLocations()[0], conf, 60000, false); + token = lb.getBlockToken(); + + // get block and generation stamp + final ExtendedBlock blk = new ExtendedBlock(lb.getBlock()); + originalBlock = new ExtendedBlock(blk); + originalGS = originalBlock.getGenerationStamp(); + + // test getBlockLocalPathInfo + final BlockLocalPathInfo info = proxy.getBlockLocalPathInfo(blk, token); + Assert.assertEquals(originalGS, info.getBlock().getGenerationStamp()); + } + + { // append one byte + FSDataOutputStream out = dfs.append(path); + out.write(1); + out.close(); + } + + { + // get new generation stamp + final LocatedBlock lb = cluster.getNameNode().getRpcServer() + .getBlockLocations(path.toString(), 0, 1).get(0); + final long newGS = lb.getBlock().getGenerationStamp(); + Assert.assertTrue(newGS > originalGS); + + // getBlockLocalPathInfo using the original block. + Assert.assertEquals(originalGS, originalBlock.getGenerationStamp()); + final BlockLocalPathInfo info = proxy.getBlockLocalPathInfo( + originalBlock, token); + Assert.assertEquals(newGS, info.getBlock().getGenerationStamp()); + } + cluster.shutdown(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderRemote.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderRemote.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderRemote.java new file mode 100644 index 0000000..c30aac8 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderRemote.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.client.impl; + +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; + +public class TestBlockReaderRemote extends TestBlockReaderBase { + + HdfsConfiguration createConf() { + HdfsConfiguration conf = new HdfsConfiguration(); + conf.setBoolean(HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER, true); + return conf; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderRemote2.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderRemote2.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderRemote2.java new file mode 100644 index 0000000..34a1539 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderRemote2.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.client.impl; + +import org.apache.hadoop.hdfs.HdfsConfiguration; + +public class TestBlockReaderRemote2 extends TestBlockReaderBase { + HdfsConfiguration createConf() { + HdfsConfiguration conf = new HdfsConfiguration(); + return conf; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestClientBlockVerification.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestClientBlockVerification.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestClientBlockVerification.java new file mode 100644 index 0000000..b6ec1bf --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestClientBlockVerification.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.client.impl; + +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; + +import java.util.List; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.log4j.Level; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestClientBlockVerification { + + static BlockReaderTestUtil util = null; + static final Path TEST_FILE = new Path("/test.file"); + static final int FILE_SIZE_K = 256; + static LocatedBlock testBlock = null; + + static { + GenericTestUtils.setLogLevel(BlockReaderRemote2.LOG, Level.ALL); + } + @BeforeClass + public static void setupCluster() throws Exception { + final int REPLICATION_FACTOR = 1; + util = new BlockReaderTestUtil(REPLICATION_FACTOR); + util.writeFile(TEST_FILE, FILE_SIZE_K); + List<LocatedBlock> blkList = util.getFileBlocks(TEST_FILE, FILE_SIZE_K); + testBlock = blkList.get(0); // Use the first block to test + } + + /** + * Verify that if we read an entire block, we send CHECKSUM_OK + */ + @Test + public void testBlockVerification() throws Exception { + BlockReaderRemote2 reader = (BlockReaderRemote2)spy( + util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024)); + util.readAndCheckEOS(reader, FILE_SIZE_K * 1024, true); + verify(reader).sendReadResult(Status.CHECKSUM_OK); + reader.close(); + } + + /** + * Test that if we do an incomplete read, we don't call CHECKSUM_OK + */ + @Test + public void testIncompleteRead() throws Exception { + BlockReaderRemote2 reader = (BlockReaderRemote2)spy( + util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024)); + util.readAndCheckEOS(reader, FILE_SIZE_K / 2 * 1024, false); + + // We asked the blockreader for the whole file, and only read + // half of it, so no CHECKSUM_OK + verify(reader, never()).sendReadResult(Status.CHECKSUM_OK); + reader.close(); + } + + /** + * Test that if we ask for a half block, and read it all, we *do* + * send CHECKSUM_OK. The DN takes care of knowing whether it was + * the whole block or not. + */ + @Test + public void testCompletePartialRead() throws Exception { + // Ask for half the file + BlockReaderRemote2 reader = (BlockReaderRemote2)spy( + util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024 / 2)); + // And read half the file + util.readAndCheckEOS(reader, FILE_SIZE_K * 1024 / 2, true); + verify(reader).sendReadResult(Status.CHECKSUM_OK); + reader.close(); + } + + /** + * Test various unaligned reads to make sure that we properly + * account even when we don't start or end on a checksum boundary + */ + @Test + public void testUnalignedReads() throws Exception { + int startOffsets[] = new int[] { 0, 3, 129 }; + int lengths[] = new int[] { 30, 300, 512, 513, 1025 }; + for (int startOffset : startOffsets) { + for (int length : lengths) { + DFSClient.LOG.info("Testing startOffset = " + startOffset + " and " + + " len=" + length); + BlockReaderRemote2 reader = (BlockReaderRemote2)spy( + util.getBlockReader(testBlock, startOffset, length)); + util.readAndCheckEOS(reader, length, true); + verify(reader).sendReadResult(Status.CHECKSUM_OK); + reader.close(); + } + } + } + + + @AfterClass + public static void teardownCluster() throws Exception { + util.shutdown(); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java index aa46de2..e7e7739 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java @@ -35,7 +35,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FsTracer; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.BlockReader; -import org.apache.hadoop.hdfs.BlockReaderFactory; +import org.apache.hadoop.hdfs.client.impl.BlockReaderFactory; import org.apache.hadoop.hdfs.ClientContext; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSConfigKeys; http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java index 73f02bd..00c2f62 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java @@ -42,7 +42,7 @@ import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FsTracer; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.BlockReader; -import org.apache.hadoop.hdfs.BlockReaderFactory; +import org.apache.hadoop.hdfs.client.impl.BlockReaderFactory; import org.apache.hadoop.hdfs.ClientContext; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java index d82e383..10d974c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java @@ -43,7 +43,7 @@ import org.apache.hadoop.fs.HdfsBlockLocation; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; -import org.apache.hadoop.hdfs.BlockReaderTestUtil; +import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; @@ -59,7 +59,6 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache.PageRounder; -import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream; import org.apache.hadoop.hdfs.server.namenode.FSImage; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand; http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCacheRevocation.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCacheRevocation.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCacheRevocation.java index a6d972a..ce37abd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCacheRevocation.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCacheRevocation.java @@ -27,7 +27,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.ReadOption; -import org.apache.hadoop.hdfs.BlockReaderTestUtil; +import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java index 3793cae..efb5cf8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java @@ -55,7 +55,7 @@ import org.apache.hadoop.fs.InvalidRequestException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.hdfs.BlockReaderTestUtil; +import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java index e395fca..f788613 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java @@ -39,8 +39,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.BlockReaderFactory; -import org.apache.hadoop.hdfs.BlockReaderTestUtil; +import org.apache.hadoop.hdfs.client.impl.BlockReaderFactory; +import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil; import org.apache.hadoop.hdfs.DFSInputStream; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitLocalRead.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitLocalRead.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitLocalRead.java index 116dc88..299865b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitLocalRead.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitLocalRead.java @@ -45,7 +45,7 @@ import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hdfs.TestBlockReaderLocal; +import org.apache.hadoop.hdfs.client.impl.TestBlockReaderLocal; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsDataInputStream; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; @@ -54,7 +54,6 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; -import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.net.unix.TemporarySocketDirectory; @@ -593,7 +592,7 @@ public class TestShortCircuitLocalRead { /** * Test that file data can be read by reading the block - * through RemoteBlockReader + * through BlockReaderRemote * @throws IOException */ public void doTestShortCircuitReadWithRemoteBlockReader( @@ -623,9 +622,9 @@ public class TestShortCircuitLocalRead { try { checkFileContent(uri, file1, fileData, readOffset, shortCircuitUser, conf, shortCircuitFails); - //RemoteBlockReader have unsupported method read(ByteBuffer bf) + //BlockReaderRemote have unsupported method read(ByteBuffer bf) assertTrue( - "RemoteBlockReader unsupported method read(ByteBuffer bf) error", + "BlockReaderRemote unsupported method read(ByteBuffer bf) error", checkUnsupportedMethod(fs, file1, fileData, readOffset)); } catch(IOException e) { throw new IOException(
