http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java deleted file mode 100644 index 0048d2a..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java +++ /dev/null @@ -1,780 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs; - -import static org.hamcrest.CoreMatchers.equalTo; - -import java.io.EOFException; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.RandomAccessFile; -import java.nio.ByteBuffer; -import java.util.UUID; -import java.util.concurrent.TimeoutException; - -import org.apache.hadoop.fs.ChecksumException; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; -import org.apache.hadoop.hdfs.client.HdfsDataInputStream; -import org.apache.hadoop.hdfs.client.impl.DfsClientConf; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; -import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache; -import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica; -import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm; -import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId; -import org.apache.hadoop.fs.FsTracer; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.net.unix.DomainSocket; -import org.apache.hadoop.net.unix.TemporarySocketDirectory; -import org.apache.hadoop.util.Time; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.Assume; -import org.junit.BeforeClass; -import org.junit.Test; - -public class TestBlockReaderLocal { - private static TemporarySocketDirectory sockDir; - - @BeforeClass - public static void init() { - sockDir = new TemporarySocketDirectory(); - DomainSocket.disableBindPathValidation(); - } - - @AfterClass - public static void shutdown() throws IOException { - sockDir.close(); - } - - public static void assertArrayRegionsEqual(byte []buf1, int off1, byte []buf2, - int off2, int len) { - for (int i = 0; i < len; i++) { - if (buf1[off1 + i] != buf2[off2 + i]) { - Assert.fail("arrays differ at byte " + i + ". " + - "The first array has " + (int)buf1[off1 + i] + - ", but the second array has " + (int)buf2[off2 + i]); - } - } - } - - /** - * Similar to IOUtils#readFully(). Reads bytes in a loop. - * - * @param reader The BlockReaderLocal to read bytes from - * @param buf The ByteBuffer to read into - * @param off The offset in the buffer to read into - * @param len The number of bytes to read. - * - * @throws IOException If it could not read the requested number of bytes - */ - private static void readFully(BlockReaderLocal reader, - ByteBuffer buf, int off, int len) throws IOException { - int amt = len; - while (amt > 0) { - buf.limit(off + len); - buf.position(off); - long ret = reader.read(buf); - if (ret < 0) { - throw new EOFException( "Premature EOF from BlockReaderLocal " + - "after reading " + (len - amt) + " byte(s)."); - } - amt -= ret; - off += ret; - } - } - - private static class BlockReaderLocalTest { - final static int TEST_LENGTH = 12345; - final static int BYTES_PER_CHECKSUM = 512; - - public void setConfiguration(HdfsConfiguration conf) { - // default: no-op - } - public void setup(File blockFile, boolean usingChecksums) - throws IOException { - // default: no-op - } - public void doTest(BlockReaderLocal reader, byte original[]) - throws IOException { - // default: no-op - } - } - - public void runBlockReaderLocalTest(BlockReaderLocalTest test, - boolean checksum, long readahead) throws IOException { - Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null)); - MiniDFSCluster cluster = null; - HdfsConfiguration conf = new HdfsConfiguration(); - conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY, - !checksum); - conf.setLong(HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, - BlockReaderLocalTest.BYTES_PER_CHECKSUM); - conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "CRC32C"); - conf.setLong(HdfsClientConfigKeys.DFS_CLIENT_CACHE_READAHEAD, readahead); - test.setConfiguration(conf); - FileInputStream dataIn = null, metaIn = null; - final Path TEST_PATH = new Path("/a"); - final long RANDOM_SEED = 4567L; - BlockReaderLocal blockReaderLocal = null; - FSDataInputStream fsIn = null; - byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH]; - - FileSystem fs = null; - ShortCircuitShm shm = null; - RandomAccessFile raf = null; - try { - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); - cluster.waitActive(); - fs = cluster.getFileSystem(); - DFSTestUtil.createFile(fs, TEST_PATH, - BlockReaderLocalTest.TEST_LENGTH, (short)1, RANDOM_SEED); - try { - DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1); - } catch (InterruptedException e) { - Assert.fail("unexpected InterruptedException during " + - "waitReplication: " + e); - } catch (TimeoutException e) { - Assert.fail("unexpected TimeoutException during " + - "waitReplication: " + e); - } - fsIn = fs.open(TEST_PATH); - IOUtils.readFully(fsIn, original, 0, - BlockReaderLocalTest.TEST_LENGTH); - fsIn.close(); - fsIn = null; - ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, TEST_PATH); - File dataFile = cluster.getBlockFile(0, block); - File metaFile = cluster.getBlockMetadataFile(0, block); - - ShortCircuitCache shortCircuitCache = - ClientContext.getFromConf(conf).getShortCircuitCache(); - cluster.shutdown(); - cluster = null; - test.setup(dataFile, checksum); - FileInputStream streams[] = { - new FileInputStream(dataFile), - new FileInputStream(metaFile) - }; - dataIn = streams[0]; - metaIn = streams[1]; - ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(), - block.getBlockPoolId()); - raf = new RandomAccessFile( - new File(sockDir.getDir().getAbsolutePath(), - UUID.randomUUID().toString()), "rw"); - raf.setLength(8192); - FileInputStream shmStream = new FileInputStream(raf.getFD()); - shm = new ShortCircuitShm(ShmId.createRandom(), shmStream); - ShortCircuitReplica replica = - new ShortCircuitReplica(key, dataIn, metaIn, shortCircuitCache, - Time.now(), shm.allocAndRegisterSlot( - ExtendedBlockId.fromExtendedBlock(block))); - blockReaderLocal = new BlockReaderLocal.Builder( - new DfsClientConf.ShortCircuitConf(conf)). - setFilename(TEST_PATH.getName()). - setBlock(block). - setShortCircuitReplica(replica). - setCachingStrategy(new CachingStrategy(false, readahead)). - setVerifyChecksum(checksum). - setTracer(FsTracer.get(conf)). - build(); - dataIn = null; - metaIn = null; - test.doTest(blockReaderLocal, original); - // BlockReaderLocal should not alter the file position. - Assert.assertEquals(0, streams[0].getChannel().position()); - Assert.assertEquals(0, streams[1].getChannel().position()); - } finally { - if (fsIn != null) fsIn.close(); - if (fs != null) fs.close(); - if (cluster != null) cluster.shutdown(); - if (dataIn != null) dataIn.close(); - if (metaIn != null) metaIn.close(); - if (blockReaderLocal != null) blockReaderLocal.close(); - if (shm != null) shm.free(); - if (raf != null) raf.close(); - } - } - - private static class TestBlockReaderLocalImmediateClose - extends BlockReaderLocalTest { - } - - @Test - public void testBlockReaderLocalImmediateClose() throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), true, 0); - runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), false, 0); - } - - private static class TestBlockReaderSimpleReads - extends BlockReaderLocalTest { - @Override - public void doTest(BlockReaderLocal reader, byte original[]) - throws IOException { - byte buf[] = new byte[TEST_LENGTH]; - reader.readFully(buf, 0, 512); - assertArrayRegionsEqual(original, 0, buf, 0, 512); - reader.readFully(buf, 512, 512); - assertArrayRegionsEqual(original, 512, buf, 512, 512); - reader.readFully(buf, 1024, 513); - assertArrayRegionsEqual(original, 1024, buf, 1024, 513); - reader.readFully(buf, 1537, 514); - assertArrayRegionsEqual(original, 1537, buf, 1537, 514); - // Readahead is always at least the size of one chunk in this test. - Assert.assertTrue(reader.getMaxReadaheadLength() >= - BlockReaderLocalTest.BYTES_PER_CHECKSUM); - } - } - - @Test - public void testBlockReaderSimpleReads() throws IOException { - runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), true, - HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); - } - - @Test - public void testBlockReaderSimpleReadsShortReadahead() throws IOException { - runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), true, - BlockReaderLocalTest.BYTES_PER_CHECKSUM - 1); - } - - @Test - public void testBlockReaderSimpleReadsNoChecksum() throws IOException { - runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), false, - HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); - } - - @Test - public void testBlockReaderSimpleReadsNoReadahead() throws IOException { - runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), true, 0); - } - - @Test - public void testBlockReaderSimpleReadsNoChecksumNoReadahead() - throws IOException { - runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), false, 0); - } - - private static class TestBlockReaderLocalArrayReads2 - extends BlockReaderLocalTest { - @Override - public void doTest(BlockReaderLocal reader, byte original[]) - throws IOException { - byte buf[] = new byte[TEST_LENGTH]; - reader.readFully(buf, 0, 10); - assertArrayRegionsEqual(original, 0, buf, 0, 10); - reader.readFully(buf, 10, 100); - assertArrayRegionsEqual(original, 10, buf, 10, 100); - reader.readFully(buf, 110, 700); - assertArrayRegionsEqual(original, 110, buf, 110, 700); - reader.readFully(buf, 810, 1); // from offset 810 to offset 811 - reader.readFully(buf, 811, 5); - assertArrayRegionsEqual(original, 811, buf, 811, 5); - reader.readFully(buf, 816, 900); // skip from offset 816 to offset 1716 - reader.readFully(buf, 1716, 5); - assertArrayRegionsEqual(original, 1716, buf, 1716, 5); - } - } - - @Test - public void testBlockReaderLocalArrayReads2() throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(), - true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); - } - - @Test - public void testBlockReaderLocalArrayReads2NoChecksum() - throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(), - false, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); - } - - @Test - public void testBlockReaderLocalArrayReads2NoReadahead() - throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(), true, 0); - } - - @Test - public void testBlockReaderLocalArrayReads2NoChecksumNoReadahead() - throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(), false, 0); - } - - private static class TestBlockReaderLocalByteBufferReads - extends BlockReaderLocalTest { - @Override - public void doTest(BlockReaderLocal reader, byte original[]) - throws IOException { - ByteBuffer buf = ByteBuffer.wrap(new byte[TEST_LENGTH]); - readFully(reader, buf, 0, 10); - assertArrayRegionsEqual(original, 0, buf.array(), 0, 10); - readFully(reader, buf, 10, 100); - assertArrayRegionsEqual(original, 10, buf.array(), 10, 100); - readFully(reader, buf, 110, 700); - assertArrayRegionsEqual(original, 110, buf.array(), 110, 700); - reader.skip(1); // skip from offset 810 to offset 811 - readFully(reader, buf, 811, 5); - assertArrayRegionsEqual(original, 811, buf.array(), 811, 5); - } - } - - @Test - public void testBlockReaderLocalByteBufferReads() - throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferReads(), - true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); - } - - @Test - public void testBlockReaderLocalByteBufferReadsNoChecksum() - throws IOException { - runBlockReaderLocalTest( - new TestBlockReaderLocalByteBufferReads(), - false, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); - } - - @Test - public void testBlockReaderLocalByteBufferReadsNoReadahead() - throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferReads(), - true, 0); - } - - @Test - public void testBlockReaderLocalByteBufferReadsNoChecksumNoReadahead() - throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferReads(), - false, 0); - } - - /** - * Test reads that bypass the bounce buffer (because they are aligned - * and bigger than the readahead). - */ - private static class TestBlockReaderLocalByteBufferFastLaneReads - extends BlockReaderLocalTest { - @Override - public void doTest(BlockReaderLocal reader, byte original[]) - throws IOException { - ByteBuffer buf = ByteBuffer.allocateDirect(TEST_LENGTH); - readFully(reader, buf, 0, 5120); - buf.flip(); - assertArrayRegionsEqual(original, 0, - DFSTestUtil.asArray(buf), 0, - 5120); - reader.skip(1537); - readFully(reader, buf, 0, 1); - buf.flip(); - assertArrayRegionsEqual(original, 6657, - DFSTestUtil.asArray(buf), 0, - 1); - reader.forceAnchorable(); - readFully(reader, buf, 0, 5120); - buf.flip(); - assertArrayRegionsEqual(original, 6658, - DFSTestUtil.asArray(buf), 0, - 5120); - reader.forceUnanchorable(); - readFully(reader, buf, 0, 513); - buf.flip(); - assertArrayRegionsEqual(original, 11778, - DFSTestUtil.asArray(buf), 0, - 513); - reader.skip(3); - readFully(reader, buf, 0, 50); - buf.flip(); - assertArrayRegionsEqual(original, 12294, - DFSTestUtil.asArray(buf), 0, - 50); - } - } - - @Test - public void testBlockReaderLocalByteBufferFastLaneReads() - throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferFastLaneReads(), - true, 2 * BlockReaderLocalTest.BYTES_PER_CHECKSUM); - } - - @Test - public void testBlockReaderLocalByteBufferFastLaneReadsNoChecksum() - throws IOException { - runBlockReaderLocalTest( - new TestBlockReaderLocalByteBufferFastLaneReads(), - false, 2 * BlockReaderLocalTest.BYTES_PER_CHECKSUM); - } - - @Test - public void testBlockReaderLocalByteBufferFastLaneReadsNoReadahead() - throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferFastLaneReads(), - true, 0); - } - - @Test - public void testBlockReaderLocalByteBufferFastLaneReadsNoChecksumNoReadahead() - throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferFastLaneReads(), - false, 0); - } - - private static class TestBlockReaderLocalReadCorruptStart - extends BlockReaderLocalTest { - boolean usingChecksums = false; - @Override - public void setup(File blockFile, boolean usingChecksums) - throws IOException { - RandomAccessFile bf = null; - this.usingChecksums = usingChecksums; - try { - bf = new RandomAccessFile(blockFile, "rw"); - bf.write(new byte[] {0,0,0,0,0,0,0,0,0,0,0,0,0,0}); - } finally { - if (bf != null) bf.close(); - } - } - - public void doTest(BlockReaderLocal reader, byte original[]) - throws IOException { - byte buf[] = new byte[TEST_LENGTH]; - if (usingChecksums) { - try { - reader.readFully(buf, 0, 10); - Assert.fail("did not detect corruption"); - } catch (IOException e) { - // expected - } - } else { - reader.readFully(buf, 0, 10); - } - } - } - - @Test - public void testBlockReaderLocalReadCorruptStart() - throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalReadCorruptStart(), true, - HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); - } - - private static class TestBlockReaderLocalReadCorrupt - extends BlockReaderLocalTest { - boolean usingChecksums = false; - @Override - public void setup(File blockFile, boolean usingChecksums) - throws IOException { - RandomAccessFile bf = null; - this.usingChecksums = usingChecksums; - try { - bf = new RandomAccessFile(blockFile, "rw"); - bf.seek(1539); - bf.write(new byte[] {0,0,0,0,0,0,0,0,0,0,0,0,0,0}); - } finally { - if (bf != null) bf.close(); - } - } - - public void doTest(BlockReaderLocal reader, byte original[]) - throws IOException { - byte buf[] = new byte[TEST_LENGTH]; - try { - reader.readFully(buf, 0, 10); - assertArrayRegionsEqual(original, 0, buf, 0, 10); - reader.readFully(buf, 10, 100); - assertArrayRegionsEqual(original, 10, buf, 10, 100); - reader.readFully(buf, 110, 700); - assertArrayRegionsEqual(original, 110, buf, 110, 700); - reader.skip(1); // skip from offset 810 to offset 811 - reader.readFully(buf, 811, 5); - assertArrayRegionsEqual(original, 811, buf, 811, 5); - reader.readFully(buf, 816, 900); - if (usingChecksums) { - // We should detect the corruption when using a checksum file. - Assert.fail("did not detect corruption"); - } - } catch (ChecksumException e) { - if (!usingChecksums) { - Assert.fail("didn't expect to get ChecksumException: not " + - "using checksums."); - } - } - } - } - - @Test - public void testBlockReaderLocalReadCorrupt() - throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), true, - HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); - } - - @Test - public void testBlockReaderLocalReadCorruptNoChecksum() - throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), false, - HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); - } - - @Test - public void testBlockReaderLocalReadCorruptNoReadahead() - throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), true, 0); - } - - @Test - public void testBlockReaderLocalReadCorruptNoChecksumNoReadahead() - throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), false, 0); - } - - private static class TestBlockReaderLocalWithMlockChanges - extends BlockReaderLocalTest { - @Override - public void setup(File blockFile, boolean usingChecksums) - throws IOException { - } - - @Override - public void doTest(BlockReaderLocal reader, byte original[]) - throws IOException { - ByteBuffer buf = ByteBuffer.wrap(new byte[TEST_LENGTH]); - reader.skip(1); - readFully(reader, buf, 1, 9); - assertArrayRegionsEqual(original, 1, buf.array(), 1, 9); - readFully(reader, buf, 10, 100); - assertArrayRegionsEqual(original, 10, buf.array(), 10, 100); - reader.forceAnchorable(); - readFully(reader, buf, 110, 700); - assertArrayRegionsEqual(original, 110, buf.array(), 110, 700); - reader.forceUnanchorable(); - reader.skip(1); // skip from offset 810 to offset 811 - readFully(reader, buf, 811, 5); - assertArrayRegionsEqual(original, 811, buf.array(), 811, 5); - } - } - - @Test - public void testBlockReaderLocalWithMlockChanges() - throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalWithMlockChanges(), - true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); - } - - @Test - public void testBlockReaderLocalWithMlockChangesNoChecksum() - throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalWithMlockChanges(), - false, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); - } - - @Test - public void testBlockReaderLocalWithMlockChangesNoReadahead() - throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalWithMlockChanges(), - true, 0); - } - - @Test - public void testBlockReaderLocalWithMlockChangesNoChecksumNoReadahead() - throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalWithMlockChanges(), - false, 0); - } - - private static class TestBlockReaderLocalOnFileWithoutChecksum - extends BlockReaderLocalTest { - @Override - public void setConfiguration(HdfsConfiguration conf) { - conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "NULL"); - } - - @Override - public void doTest(BlockReaderLocal reader, byte original[]) - throws IOException { - Assert.assertTrue(!reader.getVerifyChecksum()); - ByteBuffer buf = ByteBuffer.wrap(new byte[TEST_LENGTH]); - reader.skip(1); - readFully(reader, buf, 1, 9); - assertArrayRegionsEqual(original, 1, buf.array(), 1, 9); - readFully(reader, buf, 10, 100); - assertArrayRegionsEqual(original, 10, buf.array(), 10, 100); - reader.forceAnchorable(); - readFully(reader, buf, 110, 700); - assertArrayRegionsEqual(original, 110, buf.array(), 110, 700); - reader.forceUnanchorable(); - reader.skip(1); // skip from offset 810 to offset 811 - readFully(reader, buf, 811, 5); - assertArrayRegionsEqual(original, 811, buf.array(), 811, 5); - } - } - - private static class TestBlockReaderLocalReadZeroBytes - extends BlockReaderLocalTest { - @Override - public void doTest(BlockReaderLocal reader, byte original[]) - throws IOException { - byte emptyArr[] = new byte[0]; - Assert.assertEquals(0, reader.read(emptyArr, 0, 0)); - ByteBuffer emptyBuf = ByteBuffer.wrap(emptyArr); - Assert.assertEquals(0, reader.read(emptyBuf)); - reader.skip(1); - Assert.assertEquals(0, reader.read(emptyArr, 0, 0)); - Assert.assertEquals(0, reader.read(emptyBuf)); - reader.skip(BlockReaderLocalTest.TEST_LENGTH - 1); - Assert.assertEquals(-1, reader.read(emptyArr, 0, 0)); - Assert.assertEquals(-1, reader.read(emptyBuf)); - } - } - - @Test - public void testBlockReaderLocalOnFileWithoutChecksum() - throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalOnFileWithoutChecksum(), - true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); - } - - @Test - public void testBlockReaderLocalOnFileWithoutChecksumNoChecksum() - throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalOnFileWithoutChecksum(), - false, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); - } - - @Test - public void testBlockReaderLocalOnFileWithoutChecksumNoReadahead() - throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalOnFileWithoutChecksum(), - true, 0); - } - - @Test - public void testBlockReaderLocalOnFileWithoutChecksumNoChecksumNoReadahead() - throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalOnFileWithoutChecksum(), - false, 0); - } - - @Test - public void testBlockReaderLocalReadZeroBytes() - throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalReadZeroBytes(), - true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); - } - - @Test - public void testBlockReaderLocalReadZeroBytesNoChecksum() - throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalReadZeroBytes(), - false, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); - } - - @Test - public void testBlockReaderLocalReadZeroBytesNoReadahead() - throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalReadZeroBytes(), - true, 0); - } - - @Test - public void testBlockReaderLocalReadZeroBytesNoChecksumNoReadahead() - throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalReadZeroBytes(), - false, 0); - } - - - @Test(timeout=60000) - public void TestStatisticsForShortCircuitLocalRead() throws Exception { - testStatistics(true); - } - - @Test(timeout=60000) - public void TestStatisticsForLocalRead() throws Exception { - testStatistics(false); - } - - private void testStatistics(boolean isShortCircuit) throws Exception { - Assume.assumeTrue(DomainSocket.getLoadingFailureReason() == null); - HdfsConfiguration conf = new HdfsConfiguration(); - TemporarySocketDirectory sockDir = null; - if (isShortCircuit) { - DFSInputStream.tcpReadsDisabledForTesting = true; - sockDir = new TemporarySocketDirectory(); - conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, - new File(sockDir.getDir(), "TestStatisticsForLocalRead.%d.sock"). - getAbsolutePath()); - conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true); - DomainSocket.disableBindPathValidation(); - } else { - conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, false); - } - MiniDFSCluster cluster = null; - final Path TEST_PATH = new Path("/a"); - final long RANDOM_SEED = 4567L; - FSDataInputStream fsIn = null; - byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH]; - FileSystem fs = null; - try { - cluster = new MiniDFSCluster.Builder(conf). - hosts(new String[] {NetUtils.getLocalHostname()}).build(); - cluster.waitActive(); - fs = cluster.getFileSystem(); - DFSTestUtil.createFile(fs, TEST_PATH, - BlockReaderLocalTest.TEST_LENGTH, (short)1, RANDOM_SEED); - try { - DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1); - } catch (InterruptedException e) { - Assert.fail("unexpected InterruptedException during " + - "waitReplication: " + e); - } catch (TimeoutException e) { - Assert.fail("unexpected TimeoutException during " + - "waitReplication: " + e); - } - fsIn = fs.open(TEST_PATH); - IOUtils.readFully(fsIn, original, 0, - BlockReaderLocalTest.TEST_LENGTH); - HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn; - Assert.assertEquals(BlockReaderLocalTest.TEST_LENGTH, - dfsIn.getReadStatistics().getTotalBytesRead()); - Assert.assertEquals(BlockReaderLocalTest.TEST_LENGTH, - dfsIn.getReadStatistics().getTotalLocalBytesRead()); - if (isShortCircuit) { - Assert.assertEquals(BlockReaderLocalTest.TEST_LENGTH, - dfsIn.getReadStatistics().getTotalShortCircuitBytesRead()); - } else { - Assert.assertEquals(0, - dfsIn.getReadStatistics().getTotalShortCircuitBytesRead()); - } - fsIn.close(); - fsIn = null; - } finally { - DFSInputStream.tcpReadsDisabledForTesting = false; - if (fsIn != null) fsIn.close(); - if (fs != null) fs.close(); - if (cluster != null) cluster.shutdown(); - if (sockDir != null) sockDir.close(); - } - } -}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocalLegacy.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocalLegacy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocalLegacy.java deleted file mode 100644 index af28bd3..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocalLegacy.java +++ /dev/null @@ -1,220 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.io.File; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Arrays; - -import org.apache.hadoop.fs.ChecksumException; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; -import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; -import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.LocatedBlock; -import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.net.unix.DomainSocket; -import org.apache.hadoop.net.unix.TemporarySocketDirectory; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; -import org.junit.Assert; -import org.junit.Assume; -import org.junit.BeforeClass; -import org.junit.Test; - -public class TestBlockReaderLocalLegacy { - @BeforeClass - public static void setupCluster() throws IOException { - DFSInputStream.tcpReadsDisabledForTesting = true; - DomainSocket.disableBindPathValidation(); - } - - private static HdfsConfiguration getConfiguration( - TemporarySocketDirectory socketDir) throws IOException { - HdfsConfiguration conf = new HdfsConfiguration(); - if (socketDir == null) { - conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, ""); - } else { - conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, - new File(socketDir.getDir(), "TestBlockReaderLocalLegacy.%d.sock"). - getAbsolutePath()); - } - conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true); - conf.setBoolean(HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, true); - conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY, - false); - conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY, - UserGroupInformation.getCurrentUser().getShortUserName()); - conf.setBoolean(HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, false); - // Set short retry timeouts so this test runs faster - conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10); - return conf; - } - - /** - * Test that, in the case of an error, the position and limit of a ByteBuffer - * are left unchanged. This is not mandated by ByteBufferReadable, but clients - * of this class might immediately issue a retry on failure, so it's polite. - */ - @Test - public void testStablePositionAfterCorruptRead() throws Exception { - final short REPL_FACTOR = 1; - final long FILE_LENGTH = 512L; - - HdfsConfiguration conf = getConfiguration(null); - MiniDFSCluster cluster = - new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); - cluster.waitActive(); - FileSystem fs = cluster.getFileSystem(); - - Path path = new Path("/corrupted"); - - DFSTestUtil.createFile(fs, path, FILE_LENGTH, REPL_FACTOR, 12345L); - DFSTestUtil.waitReplication(fs, path, REPL_FACTOR); - - ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, path); - int blockFilesCorrupted = cluster.corruptBlockOnDataNodes(block); - assertEquals("All replicas not corrupted", REPL_FACTOR, blockFilesCorrupted); - - FSDataInputStream dis = cluster.getFileSystem().open(path); - ByteBuffer buf = ByteBuffer.allocateDirect((int)FILE_LENGTH); - boolean sawException = false; - try { - dis.read(buf); - } catch (ChecksumException ex) { - sawException = true; - } - - assertTrue(sawException); - assertEquals(0, buf.position()); - assertEquals(buf.capacity(), buf.limit()); - - dis = cluster.getFileSystem().open(path); - buf.position(3); - buf.limit(25); - sawException = false; - try { - dis.read(buf); - } catch (ChecksumException ex) { - sawException = true; - } - - assertTrue(sawException); - assertEquals(3, buf.position()); - assertEquals(25, buf.limit()); - cluster.shutdown(); - } - - @Test - public void testBothOldAndNewShortCircuitConfigured() throws Exception { - final short REPL_FACTOR = 1; - final int FILE_LENGTH = 512; - Assume.assumeTrue(null == DomainSocket.getLoadingFailureReason()); - TemporarySocketDirectory socketDir = new TemporarySocketDirectory(); - HdfsConfiguration conf = getConfiguration(socketDir); - MiniDFSCluster cluster = - new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); - cluster.waitActive(); - socketDir.close(); - FileSystem fs = cluster.getFileSystem(); - - Path path = new Path("/foo"); - byte orig[] = new byte[FILE_LENGTH]; - for (int i = 0; i < orig.length; i++) { - orig[i] = (byte)(i%10); - } - FSDataOutputStream fos = fs.create(path, (short)1); - fos.write(orig); - fos.close(); - DFSTestUtil.waitReplication(fs, path, REPL_FACTOR); - FSDataInputStream fis = cluster.getFileSystem().open(path); - byte buf[] = new byte[FILE_LENGTH]; - IOUtils.readFully(fis, buf, 0, FILE_LENGTH); - fis.close(); - Assert.assertArrayEquals(orig, buf); - Arrays.equals(orig, buf); - cluster.shutdown(); - } - - @Test(timeout=20000) - public void testBlockReaderLocalLegacyWithAppend() throws Exception { - final short REPL_FACTOR = 1; - final HdfsConfiguration conf = getConfiguration(null); - conf.setBoolean(HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, true); - - final MiniDFSCluster cluster = - new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); - cluster.waitActive(); - - final DistributedFileSystem dfs = cluster.getFileSystem(); - final Path path = new Path("/testBlockReaderLocalLegacy"); - DFSTestUtil.createFile(dfs, path, 10, REPL_FACTOR, 0); - DFSTestUtil.waitReplication(dfs, path, REPL_FACTOR); - - final ClientDatanodeProtocol proxy; - final Token<BlockTokenIdentifier> token; - final ExtendedBlock originalBlock; - final long originalGS; - { - final LocatedBlock lb = cluster.getNameNode().getRpcServer() - .getBlockLocations(path.toString(), 0, 1).get(0); - proxy = DFSUtilClient.createClientDatanodeProtocolProxy( - lb.getLocations()[0], conf, 60000, false); - token = lb.getBlockToken(); - - // get block and generation stamp - final ExtendedBlock blk = new ExtendedBlock(lb.getBlock()); - originalBlock = new ExtendedBlock(blk); - originalGS = originalBlock.getGenerationStamp(); - - // test getBlockLocalPathInfo - final BlockLocalPathInfo info = proxy.getBlockLocalPathInfo(blk, token); - Assert.assertEquals(originalGS, info.getBlock().getGenerationStamp()); - } - - { // append one byte - FSDataOutputStream out = dfs.append(path); - out.write(1); - out.close(); - } - - { - // get new generation stamp - final LocatedBlock lb = cluster.getNameNode().getRpcServer() - .getBlockLocations(path.toString(), 0, 1).get(0); - final long newGS = lb.getBlock().getGenerationStamp(); - Assert.assertTrue(newGS > originalGS); - - // getBlockLocalPathInfo using the original block. - Assert.assertEquals(originalGS, originalBlock.getGenerationStamp()); - final BlockLocalPathInfo info = proxy.getBlockLocalPathInfo( - originalBlock, token); - Assert.assertEquals(newGS, info.getBlock().getGenerationStamp()); - } - cluster.shutdown(); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java deleted file mode 100644 index 5ff343a..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java +++ /dev/null @@ -1,125 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdfs; - -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.verify; - -import java.util.List; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.protocol.LocatedBlock; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; -import org.apache.hadoop.test.GenericTestUtils; -import org.apache.log4j.Level; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; - -public class TestClientBlockVerification { - - static BlockReaderTestUtil util = null; - static final Path TEST_FILE = new Path("/test.file"); - static final int FILE_SIZE_K = 256; - static LocatedBlock testBlock = null; - - static { - GenericTestUtils.setLogLevel(RemoteBlockReader2.LOG, Level.ALL); - } - @BeforeClass - public static void setupCluster() throws Exception { - final int REPLICATION_FACTOR = 1; - util = new BlockReaderTestUtil(REPLICATION_FACTOR); - util.writeFile(TEST_FILE, FILE_SIZE_K); - List<LocatedBlock> blkList = util.getFileBlocks(TEST_FILE, FILE_SIZE_K); - testBlock = blkList.get(0); // Use the first block to test - } - - /** - * Verify that if we read an entire block, we send CHECKSUM_OK - */ - @Test - public void testBlockVerification() throws Exception { - RemoteBlockReader2 reader = (RemoteBlockReader2)spy( - util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024)); - util.readAndCheckEOS(reader, FILE_SIZE_K * 1024, true); - verify(reader).sendReadResult(Status.CHECKSUM_OK); - reader.close(); - } - - /** - * Test that if we do an incomplete read, we don't call CHECKSUM_OK - */ - @Test - public void testIncompleteRead() throws Exception { - RemoteBlockReader2 reader = (RemoteBlockReader2)spy( - util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024)); - util.readAndCheckEOS(reader, FILE_SIZE_K / 2 * 1024, false); - - // We asked the blockreader for the whole file, and only read - // half of it, so no CHECKSUM_OK - verify(reader, never()).sendReadResult(Status.CHECKSUM_OK); - reader.close(); - } - - /** - * Test that if we ask for a half block, and read it all, we *do* - * send CHECKSUM_OK. The DN takes care of knowing whether it was - * the whole block or not. - */ - @Test - public void testCompletePartialRead() throws Exception { - // Ask for half the file - RemoteBlockReader2 reader = (RemoteBlockReader2)spy( - util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024 / 2)); - // And read half the file - util.readAndCheckEOS(reader, FILE_SIZE_K * 1024 / 2, true); - verify(reader).sendReadResult(Status.CHECKSUM_OK); - reader.close(); - } - - /** - * Test various unaligned reads to make sure that we properly - * account even when we don't start or end on a checksum boundary - */ - @Test - public void testUnalignedReads() throws Exception { - int startOffsets[] = new int[] { 0, 3, 129 }; - int lengths[] = new int[] { 30, 300, 512, 513, 1025 }; - for (int startOffset : startOffsets) { - for (int length : lengths) { - DFSClient.LOG.info("Testing startOffset = " + startOffset + " and " + - " len=" + length); - RemoteBlockReader2 reader = (RemoteBlockReader2)spy( - util.getBlockReader(testBlock, startOffset, length)); - util.readAndCheckEOS(reader, length, true); - verify(reader).sendReadResult(Status.CHECKSUM_OK); - reader.close(); - } - } - } - - - @AfterClass - public static void teardownCluster() throws Exception { - util.shutdown(); - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java index 8d2398d..3e0ad6d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java @@ -26,6 +26,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil; import org.junit.Assert; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDisableConnCache.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDisableConnCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDisableConnCache.java index f2043fb..c9d831a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDisableConnCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDisableConnCache.java @@ -24,6 +24,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil; import org.junit.Test; /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java index 05698ec..d54164f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java @@ -28,6 +28,7 @@ import java.util.Random; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.util.Time; import org.apache.log4j.Level; http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRemoteBlockReader.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRemoteBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRemoteBlockReader.java deleted file mode 100644 index cef1d6d..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRemoteBlockReader.java +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs; - -import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; - -public class TestRemoteBlockReader extends TestBlockReaderBase { - - HdfsConfiguration createConf() { - HdfsConfiguration conf = new HdfsConfiguration(); - conf.setBoolean(HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER, true); - return conf; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRemoteBlockReader2.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRemoteBlockReader2.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRemoteBlockReader2.java deleted file mode 100644 index c23b4b7..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRemoteBlockReader2.java +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs; - -public class TestRemoteBlockReader2 extends TestBlockReaderBase { - HdfsConfiguration createConf() { - HdfsConfiguration conf = new HdfsConfiguration(); - return conf; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/BlockReaderTestUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/BlockReaderTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/BlockReaderTestUtil.java new file mode 100644 index 0000000..c597921 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/BlockReaderTestUtil.java @@ -0,0 +1,267 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.client.impl; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.util.List; +import java.util.Random; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FsTracer; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.BlockReader; +import org.apache.hadoop.hdfs.ClientContext; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSUtilClient; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.RemotePeerFactory; +import org.apache.hadoop.hdfs.net.Peer; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor; +import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache; +import org.apache.hadoop.hdfs.server.namenode.CacheManager; +import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager; +import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache; +import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica; +import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.token.Token; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; + +/** + * A helper class to setup the cluster, and get to BlockReader and DataNode for a block. + */ +public class BlockReaderTestUtil { + /** + * Returns true if we should run tests that generate large files (> 1GB) + */ + static public boolean shouldTestLargeFiles() { + String property = System.getProperty("hdfs.test.large.files"); + if (property == null) return false; + if (property.isEmpty()) return true; + return Boolean.parseBoolean(property); + } + + private HdfsConfiguration conf = null; + private MiniDFSCluster cluster = null; + + /** + * Setup the cluster + */ + public BlockReaderTestUtil(int replicationFactor) throws Exception { + this(replicationFactor, new HdfsConfiguration()); + } + + public BlockReaderTestUtil(int replicationFactor, HdfsConfiguration config) throws Exception { + this.conf = config; + conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, replicationFactor); + cluster = new MiniDFSCluster.Builder(conf).format(true).build(); + cluster.waitActive(); + } + + /** + * Shutdown cluster + */ + public void shutdown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + public MiniDFSCluster getCluster() { + return cluster; + } + + public HdfsConfiguration getConf() { + return conf; + } + + /** + * Create a file of the given size filled with random data. + * @return File data. + */ + public byte[] writeFile(Path filepath, int sizeKB) + throws IOException { + FileSystem fs = cluster.getFileSystem(); + + // Write a file with the specified amount of data + DataOutputStream os = fs.create(filepath); + byte data[] = new byte[1024 * sizeKB]; + new Random().nextBytes(data); + os.write(data); + os.close(); + return data; + } + + /** + * Get the list of Blocks for a file. + */ + public List<LocatedBlock> getFileBlocks(Path filepath, int sizeKB) + throws IOException { + // Return the blocks we just wrote + DFSClient dfsclient = getDFSClient(); + return dfsclient.getNamenode().getBlockLocations( + filepath.toString(), 0, sizeKB * 1024).getLocatedBlocks(); + } + + /** + * Get the DFSClient. + */ + public DFSClient getDFSClient() throws IOException { + InetSocketAddress nnAddr = new InetSocketAddress("localhost", cluster.getNameNodePort()); + return new DFSClient(nnAddr, conf); + } + + /** + * Exercise the BlockReader and read length bytes. + * + * It does not verify the bytes read. + */ + public void readAndCheckEOS(BlockReader reader, int length, boolean expectEof) + throws IOException { + byte buf[] = new byte[1024]; + int nRead = 0; + while (nRead < length) { + DFSClient.LOG.info("So far read " + nRead + " - going to read more."); + int n = reader.read(buf, 0, buf.length); + assertTrue(n > 0); + nRead += n; + } + + if (expectEof) { + DFSClient.LOG.info("Done reading, expect EOF for next read."); + assertEquals(-1, reader.read(buf, 0, buf.length)); + } + } + + /** + * Get a BlockReader for the given block. + */ + public BlockReader getBlockReader(LocatedBlock testBlock, int offset, int lenToRead) + throws IOException { + return getBlockReader(cluster.getFileSystem(), testBlock, offset, lenToRead); + } + + /** + * Get a BlockReader for the given block. + */ + public static BlockReader getBlockReader(final DistributedFileSystem fs, + LocatedBlock testBlock, int offset, long lenToRead) throws IOException { + InetSocketAddress targetAddr = null; + ExtendedBlock block = testBlock.getBlock(); + DatanodeInfo[] nodes = testBlock.getLocations(); + targetAddr = NetUtils.createSocketAddr(nodes[0].getXferAddr()); + + return new BlockReaderFactory(fs.getClient().getConf()). + setInetSocketAddress(targetAddr). + setBlock(block). + setFileName(targetAddr.toString()+ ":" + block.getBlockId()). + setBlockToken(testBlock.getBlockToken()). + setStartOffset(offset). + setLength(lenToRead). + setVerifyChecksum(true). + setClientName("BlockReaderTestUtil"). + setDatanodeInfo(nodes[0]). + setClientCacheContext(ClientContext.getFromConf(fs.getConf())). + setCachingStrategy(CachingStrategy.newDefaultStrategy()). + setConfiguration(fs.getConf()). + setAllowShortCircuitLocalReads(true). + setTracer(FsTracer.get(fs.getConf())). + setRemotePeerFactory(new RemotePeerFactory() { + @Override + public Peer newConnectedPeer(InetSocketAddress addr, + Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId) + throws IOException { + Peer peer = null; + Socket sock = NetUtils. + getDefaultSocketFactory(fs.getConf()).createSocket(); + try { + sock.connect(addr, HdfsConstants.READ_TIMEOUT); + sock.setSoTimeout(HdfsConstants.READ_TIMEOUT); + peer = DFSUtilClient.peerFromSocket(sock); + } finally { + if (peer == null) { + IOUtils.closeQuietly(sock); + } + } + return peer; + } + }). + build(); + } + + /** + * Get a DataNode that serves our testBlock. + */ + public DataNode getDataNode(LocatedBlock testBlock) { + DatanodeInfo[] nodes = testBlock.getLocations(); + int ipcport = nodes[0].getIpcPort(); + return cluster.getDataNode(ipcport); + } + + public static void enableHdfsCachingTracing() { + LogManager.getLogger(CacheReplicationMonitor.class.getName()).setLevel( + Level.TRACE); + LogManager.getLogger(CacheManager.class.getName()).setLevel( + Level.TRACE); + LogManager.getLogger(FsDatasetCache.class.getName()).setLevel( + Level.TRACE); + } + + public static void enableBlockReaderFactoryTracing() { + LogManager.getLogger(BlockReaderFactory.class.getName()).setLevel( + Level.TRACE); + LogManager.getLogger(ShortCircuitCache.class.getName()).setLevel( + Level.TRACE); + LogManager.getLogger(ShortCircuitReplica.class.getName()).setLevel( + Level.TRACE); + LogManager.getLogger(BlockReaderLocal.class.getName()).setLevel( + Level.TRACE); + } + + public static void enableShortCircuitShmTracing() { + LogManager.getLogger(DfsClientShmManager.class.getName()).setLevel( + Level.TRACE); + LogManager.getLogger(ShortCircuitRegistry.class.getName()).setLevel( + Level.TRACE); + LogManager.getLogger(ShortCircuitShm.class.getName()).setLevel( + Level.TRACE); + LogManager.getLogger(DataNode.class.getName()).setLevel( + Level.TRACE); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderBase.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderBase.java new file mode 100644 index 0000000..70ed428 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderBase.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.client.impl; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.Random; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.BlockReader; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +abstract public class TestBlockReaderBase { + private BlockReaderTestUtil util; + private byte[] blockData; + private BlockReader reader; + + /** + * if override this, make sure return array length is less than + * block size. + */ + byte [] getBlockData() { + int length = 1 << 22; + byte[] data = new byte[length]; + for (int i = 0; i < length; i++) { + data[i] = (byte) (i % 133); + } + return data; + } + + private BlockReader getBlockReader(LocatedBlock block) throws Exception { + return util.getBlockReader(block, 0, blockData.length); + } + + abstract HdfsConfiguration createConf(); + + @Before + public void setup() throws Exception { + util = new BlockReaderTestUtil(1, createConf()); + blockData = getBlockData(); + DistributedFileSystem fs = util.getCluster().getFileSystem(); + Path testfile = new Path("/testfile"); + FSDataOutputStream fout = fs.create(testfile); + fout.write(blockData); + fout.close(); + LocatedBlock blk = util.getFileBlocks(testfile, blockData.length).get(0); + reader = getBlockReader(blk); + } + + @After + public void shutdown() throws Exception { + util.shutdown(); + } + + @Test(timeout=60000) + public void testSkip() throws IOException { + Random random = new Random(); + byte [] buf = new byte[1]; + for (int pos = 0; pos < blockData.length;) { + long skip = random.nextInt(100) + 1; + long skipped = reader.skip(skip); + if (pos + skip >= blockData.length) { + assertEquals(blockData.length, pos + skipped); + break; + } else { + assertEquals(skip, skipped); + pos += skipped; + assertEquals(1, reader.read(buf, 0, 1)); + + assertEquals(blockData[pos], buf[0]); + pos += 1; + } + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f308561f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderFactory.java new file mode 100644 index 0000000..ca498c7 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderFactory.java @@ -0,0 +1,539 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.client.impl; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS; +import static org.hamcrest.CoreMatchers.equalTo; + +import java.io.File; +import java.io.IOException; +import java.nio.channels.ClosedByInterruptException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.BlockReader; +import org.apache.hadoop.hdfs.DFSInputStream; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.PerDatanodeVisitorInfo; +import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.Visitor; +import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache; +import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplicaInfo; +import org.apache.hadoop.net.unix.DomainSocket; +import org.apache.hadoop.net.unix.TemporarySocketDirectory; +import org.junit.After; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.util.concurrent.Uninterruptibles; + +public class TestBlockReaderFactory { + static final Log LOG = LogFactory.getLog(TestBlockReaderFactory.class); + + @Before + public void init() { + DomainSocket.disableBindPathValidation(); + Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null)); + } + + @After + public void cleanup() { + DFSInputStream.tcpReadsDisabledForTesting = false; + BlockReaderFactory.createShortCircuitReplicaInfoCallback = null; + } + + public static Configuration createShortCircuitConf(String testName, + TemporarySocketDirectory sockDir) { + Configuration conf = new Configuration(); + conf.set(DFS_CLIENT_CONTEXT, testName); + conf.setLong(DFS_BLOCK_SIZE_KEY, 4096); + conf.set(DFS_DOMAIN_SOCKET_PATH_KEY, new File(sockDir.getDir(), + testName + "._PORT").getAbsolutePath()); + conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true); + conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY, + false); + conf.setBoolean(DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, false); + return conf; + } + + /** + * If we have a UNIX domain socket configured, + * and we have dfs.client.domain.socket.data.traffic set to true, + * and short-circuit access fails, we should still be able to pass + * data traffic over the UNIX domain socket. Test this. + */ + @Test(timeout=60000) + public void testFallbackFromShortCircuitToUnixDomainTraffic() + throws Exception { + DFSInputStream.tcpReadsDisabledForTesting = true; + TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); + + // The server is NOT configured with short-circuit local reads; + // the client is. Both support UNIX domain reads. + Configuration clientConf = createShortCircuitConf( + "testFallbackFromShortCircuitToUnixDomainTraffic", sockDir); + clientConf.set(DFS_CLIENT_CONTEXT, + "testFallbackFromShortCircuitToUnixDomainTraffic_clientContext"); + clientConf.setBoolean(DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, true); + Configuration serverConf = new Configuration(clientConf); + serverConf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, false); + + MiniDFSCluster cluster = + new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build(); + cluster.waitActive(); + FileSystem dfs = FileSystem.get(cluster.getURI(0), clientConf); + String TEST_FILE = "/test_file"; + final int TEST_FILE_LEN = 8193; + final int SEED = 0xFADED; + DFSTestUtil.createFile(dfs, new Path(TEST_FILE), TEST_FILE_LEN, + (short)1, SEED); + byte contents[] = DFSTestUtil.readFileBuffer(dfs, new Path(TEST_FILE)); + byte expected[] = DFSTestUtil. + calculateFileContentsFromSeed(SEED, TEST_FILE_LEN); + Assert.assertTrue(Arrays.equals(contents, expected)); + cluster.shutdown(); + sockDir.close(); + } + + /** + * Test the case where we have multiple threads waiting on the + * ShortCircuitCache delivering a certain ShortCircuitReplica. + * + * In this case, there should only be one call to + * createShortCircuitReplicaInfo. This one replica should be shared + * by all threads. + */ + @Test(timeout=60000) + public void testMultipleWaitersOnShortCircuitCache() + throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + final AtomicBoolean creationIsBlocked = new AtomicBoolean(true); + final AtomicBoolean testFailed = new AtomicBoolean(false); + DFSInputStream.tcpReadsDisabledForTesting = true; + BlockReaderFactory.createShortCircuitReplicaInfoCallback = + new ShortCircuitCache.ShortCircuitReplicaCreator() { + @Override + public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() { + Uninterruptibles.awaitUninterruptibly(latch); + if (!creationIsBlocked.compareAndSet(true, false)) { + Assert.fail("there were multiple calls to " + + "createShortCircuitReplicaInfo. Only one was expected."); + } + return null; + } + }; + TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); + Configuration conf = createShortCircuitConf( + "testMultipleWaitersOnShortCircuitCache", sockDir); + MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + cluster.waitActive(); + final DistributedFileSystem dfs = cluster.getFileSystem(); + final String TEST_FILE = "/test_file"; + final int TEST_FILE_LEN = 4000; + final int SEED = 0xFADED; + final int NUM_THREADS = 10; + DFSTestUtil.createFile(dfs, new Path(TEST_FILE), TEST_FILE_LEN, + (short)1, SEED); + Runnable readerRunnable = new Runnable() { + @Override + public void run() { + try { + byte contents[] = DFSTestUtil.readFileBuffer(dfs, new Path(TEST_FILE)); + Assert.assertFalse(creationIsBlocked.get()); + byte expected[] = DFSTestUtil. + calculateFileContentsFromSeed(SEED, TEST_FILE_LEN); + Assert.assertTrue(Arrays.equals(contents, expected)); + } catch (Throwable e) { + LOG.error("readerRunnable error", e); + testFailed.set(true); + } + } + }; + Thread threads[] = new Thread[NUM_THREADS]; + for (int i = 0; i < NUM_THREADS; i++) { + threads[i] = new Thread(readerRunnable); + threads[i].start(); + } + Thread.sleep(500); + latch.countDown(); + for (int i = 0; i < NUM_THREADS; i++) { + Uninterruptibles.joinUninterruptibly(threads[i]); + } + cluster.shutdown(); + sockDir.close(); + Assert.assertFalse(testFailed.get()); + } + + /** + * Test the case where we have a failure to complete a short circuit read + * that occurs, and then later on, we have a success. + * Any thread waiting on a cache load should receive the failure (if it + * occurs); however, the failure result should not be cached. We want + * to be able to retry later and succeed. + */ + @Test(timeout=60000) + public void testShortCircuitCacheTemporaryFailure() + throws Exception { + BlockReaderTestUtil.enableBlockReaderFactoryTracing(); + final AtomicBoolean replicaCreationShouldFail = new AtomicBoolean(true); + final AtomicBoolean testFailed = new AtomicBoolean(false); + DFSInputStream.tcpReadsDisabledForTesting = true; + BlockReaderFactory.createShortCircuitReplicaInfoCallback = + new ShortCircuitCache.ShortCircuitReplicaCreator() { + @Override + public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() { + if (replicaCreationShouldFail.get()) { + // Insert a short delay to increase the chance that one client + // thread waits for the other client thread's failure via + // a condition variable. + Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS); + return new ShortCircuitReplicaInfo(); + } + return null; + } + }; + TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); + Configuration conf = createShortCircuitConf( + "testShortCircuitCacheTemporaryFailure", sockDir); + final MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + cluster.waitActive(); + final DistributedFileSystem dfs = cluster.getFileSystem(); + final String TEST_FILE = "/test_file"; + final int TEST_FILE_LEN = 4000; + final int NUM_THREADS = 2; + final int SEED = 0xFADED; + final CountDownLatch gotFailureLatch = new CountDownLatch(NUM_THREADS); + final CountDownLatch shouldRetryLatch = new CountDownLatch(1); + DFSTestUtil.createFile(dfs, new Path(TEST_FILE), TEST_FILE_LEN, + (short)1, SEED); + Runnable readerRunnable = new Runnable() { + @Override + public void run() { + try { + // First time should fail. + List<LocatedBlock> locatedBlocks = + cluster.getNameNode().getRpcServer().getBlockLocations( + TEST_FILE, 0, TEST_FILE_LEN).getLocatedBlocks(); + LocatedBlock lblock = locatedBlocks.get(0); // first block + BlockReader blockReader = null; + try { + blockReader = BlockReaderTestUtil.getBlockReader( + cluster.getFileSystem(), lblock, 0, TEST_FILE_LEN); + Assert.fail("expected getBlockReader to fail the first time."); + } catch (Throwable t) { + Assert.assertTrue("expected to see 'TCP reads were disabled " + + "for testing' in exception " + t, t.getMessage().contains( + "TCP reads were disabled for testing")); + } finally { + if (blockReader != null) blockReader.close(); // keep findbugs happy + } + gotFailureLatch.countDown(); + shouldRetryLatch.await(); + + // Second time should succeed. + try { + blockReader = BlockReaderTestUtil.getBlockReader( + cluster.getFileSystem(), lblock, 0, TEST_FILE_LEN); + } catch (Throwable t) { + LOG.error("error trying to retrieve a block reader " + + "the second time.", t); + throw t; + } finally { + if (blockReader != null) blockReader.close(); + } + } catch (Throwable t) { + LOG.error("getBlockReader failure", t); + testFailed.set(true); + } + } + }; + Thread threads[] = new Thread[NUM_THREADS]; + for (int i = 0; i < NUM_THREADS; i++) { + threads[i] = new Thread(readerRunnable); + threads[i].start(); + } + gotFailureLatch.await(); + replicaCreationShouldFail.set(false); + shouldRetryLatch.countDown(); + for (int i = 0; i < NUM_THREADS; i++) { + Uninterruptibles.joinUninterruptibly(threads[i]); + } + cluster.shutdown(); + sockDir.close(); + Assert.assertFalse(testFailed.get()); + } + + /** + * Test that a client which supports short-circuit reads using + * shared memory can fall back to not using shared memory when + * the server doesn't support it. + */ + @Test + public void testShortCircuitReadFromServerWithoutShm() throws Exception { + TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); + Configuration clientConf = createShortCircuitConf( + "testShortCircuitReadFromServerWithoutShm", sockDir); + Configuration serverConf = new Configuration(clientConf); + serverConf.setInt( + DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS, 0); + DFSInputStream.tcpReadsDisabledForTesting = true; + final MiniDFSCluster cluster = + new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build(); + cluster.waitActive(); + clientConf.set(DFS_CLIENT_CONTEXT, + "testShortCircuitReadFromServerWithoutShm_clientContext"); + final DistributedFileSystem fs = + (DistributedFileSystem)FileSystem.get(cluster.getURI(0), clientConf); + final String TEST_FILE = "/test_file"; + final int TEST_FILE_LEN = 4000; + final int SEED = 0xFADEC; + DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN, + (short)1, SEED); + byte contents[] = DFSTestUtil.readFileBuffer(fs, new Path(TEST_FILE)); + byte expected[] = DFSTestUtil. + calculateFileContentsFromSeed(SEED, TEST_FILE_LEN); + Assert.assertTrue(Arrays.equals(contents, expected)); + final ShortCircuitCache cache = + fs.getClient().getClientContext().getShortCircuitCache(); + final DatanodeInfo datanode = + new DatanodeInfo(cluster.getDataNodes().get(0).getDatanodeId()); + cache.getDfsClientShmManager().visit(new Visitor() { + @Override + public void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info) + throws IOException { + Assert.assertEquals(1, info.size()); + PerDatanodeVisitorInfo vinfo = info.get(datanode); + Assert.assertTrue(vinfo.disabled); + Assert.assertEquals(0, vinfo.full.size()); + Assert.assertEquals(0, vinfo.notFull.size()); + } + }); + cluster.shutdown(); + sockDir.close(); + } + + /** + * Test that a client which does not support short-circuit reads using + * shared memory can talk with a server which supports it. + */ + @Test + public void testShortCircuitReadFromClientWithoutShm() throws Exception { + TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); + Configuration clientConf = createShortCircuitConf( + "testShortCircuitReadWithoutShm", sockDir); + Configuration serverConf = new Configuration(clientConf); + DFSInputStream.tcpReadsDisabledForTesting = true; + final MiniDFSCluster cluster = + new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build(); + cluster.waitActive(); + clientConf.setInt( + DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS, 0); + clientConf.set(DFS_CLIENT_CONTEXT, + "testShortCircuitReadFromClientWithoutShm_clientContext"); + final DistributedFileSystem fs = + (DistributedFileSystem)FileSystem.get(cluster.getURI(0), clientConf); + final String TEST_FILE = "/test_file"; + final int TEST_FILE_LEN = 4000; + final int SEED = 0xFADEC; + DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN, + (short)1, SEED); + byte contents[] = DFSTestUtil.readFileBuffer(fs, new Path(TEST_FILE)); + byte expected[] = DFSTestUtil. + calculateFileContentsFromSeed(SEED, TEST_FILE_LEN); + Assert.assertTrue(Arrays.equals(contents, expected)); + final ShortCircuitCache cache = + fs.getClient().getClientContext().getShortCircuitCache(); + Assert.assertEquals(null, cache.getDfsClientShmManager()); + cluster.shutdown(); + sockDir.close(); + } + + /** + * Test shutting down the ShortCircuitCache while there are things in it. + */ + @Test + public void testShortCircuitCacheShutdown() throws Exception { + TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); + Configuration conf = createShortCircuitConf( + "testShortCircuitCacheShutdown", sockDir); + conf.set(DFS_CLIENT_CONTEXT, "testShortCircuitCacheShutdown"); + Configuration serverConf = new Configuration(conf); + DFSInputStream.tcpReadsDisabledForTesting = true; + final MiniDFSCluster cluster = + new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build(); + cluster.waitActive(); + final DistributedFileSystem fs = + (DistributedFileSystem)FileSystem.get(cluster.getURI(0), conf); + final String TEST_FILE = "/test_file"; + final int TEST_FILE_LEN = 4000; + final int SEED = 0xFADEC; + DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN, + (short)1, SEED); + byte contents[] = DFSTestUtil.readFileBuffer(fs, new Path(TEST_FILE)); + byte expected[] = DFSTestUtil. + calculateFileContentsFromSeed(SEED, TEST_FILE_LEN); + Assert.assertTrue(Arrays.equals(contents, expected)); + final ShortCircuitCache cache = + fs.getClient().getClientContext().getShortCircuitCache(); + cache.close(); + Assert.assertTrue(cache.getDfsClientShmManager(). + getDomainSocketWatcher().isClosed()); + cluster.shutdown(); + sockDir.close(); + } + + /** + * When an InterruptedException is sent to a thread calling + * FileChannel#read, the FileChannel is immediately closed and the + * thread gets an exception. This effectively means that we might have + * someone asynchronously calling close() on the file descriptors we use + * in BlockReaderLocal. So when unreferencing a ShortCircuitReplica in + * ShortCircuitCache#unref, we should check if the FileChannel objects + * are still open. If not, we should purge the replica to avoid giving + * it out to any future readers. + * + * This is a regression test for HDFS-6227: Short circuit read failed + * due to ClosedChannelException. + * + * Note that you may still get ClosedChannelException errors if two threads + * are reading from the same replica and an InterruptedException is delivered + * to one of them. + */ + @Test(timeout=120000) + public void testPurgingClosedReplicas() throws Exception { + BlockReaderTestUtil.enableBlockReaderFactoryTracing(); + final AtomicInteger replicasCreated = new AtomicInteger(0); + final AtomicBoolean testFailed = new AtomicBoolean(false); + DFSInputStream.tcpReadsDisabledForTesting = true; + BlockReaderFactory.createShortCircuitReplicaInfoCallback = + new ShortCircuitCache.ShortCircuitReplicaCreator() { + @Override + public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() { + replicasCreated.incrementAndGet(); + return null; + } + }; + TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); + Configuration conf = createShortCircuitConf( + "testPurgingClosedReplicas", sockDir); + final MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + cluster.waitActive(); + final DistributedFileSystem dfs = cluster.getFileSystem(); + final String TEST_FILE = "/test_file"; + final int TEST_FILE_LEN = 4095; + final int SEED = 0xFADE0; + final DistributedFileSystem fs = + (DistributedFileSystem)FileSystem.get(cluster.getURI(0), conf); + DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN, + (short)1, SEED); + + final Semaphore sem = new Semaphore(0); + final List<LocatedBlock> locatedBlocks = + cluster.getNameNode().getRpcServer().getBlockLocations( + TEST_FILE, 0, TEST_FILE_LEN).getLocatedBlocks(); + final LocatedBlock lblock = locatedBlocks.get(0); // first block + final byte[] buf = new byte[TEST_FILE_LEN]; + Runnable readerRunnable = new Runnable() { + @Override + public void run() { + try { + while (true) { + BlockReader blockReader = null; + try { + blockReader = BlockReaderTestUtil.getBlockReader( + cluster.getFileSystem(), lblock, 0, TEST_FILE_LEN); + sem.release(); + try { + blockReader.readAll(buf, 0, TEST_FILE_LEN); + } finally { + sem.acquireUninterruptibly(); + } + } catch (ClosedByInterruptException e) { + LOG.info("got the expected ClosedByInterruptException", e); + sem.release(); + break; + } finally { + if (blockReader != null) blockReader.close(); + } + LOG.info("read another " + TEST_FILE_LEN + " bytes."); + } + } catch (Throwable t) { + LOG.error("getBlockReader failure", t); + testFailed.set(true); + sem.release(); + } + } + }; + Thread thread = new Thread(readerRunnable); + thread.start(); + + // While the thread is reading, send it interrupts. + // These should trigger a ClosedChannelException. + while (thread.isAlive()) { + sem.acquireUninterruptibly(); + thread.interrupt(); + sem.release(); + } + Assert.assertFalse(testFailed.get()); + + // We should be able to read from the file without + // getting a ClosedChannelException. + BlockReader blockReader = null; + try { + blockReader = BlockReaderTestUtil.getBlockReader( + cluster.getFileSystem(), lblock, 0, TEST_FILE_LEN); + blockReader.readFully(buf, 0, TEST_FILE_LEN); + } finally { + if (blockReader != null) blockReader.close(); + } + byte expected[] = DFSTestUtil. + calculateFileContentsFromSeed(SEED, TEST_FILE_LEN); + Assert.assertTrue(Arrays.equals(buf, expected)); + + // Another ShortCircuitReplica object should have been created. + Assert.assertEquals(2, replicasCreated.get()); + + dfs.close(); + cluster.shutdown(); + sockDir.close(); + } +}
