Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java?rev=1526020&r1=1526019&r2=1526020&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java Tue Sep 24 21:40:53 2013 @@ -23,24 +23,18 @@ import java.io.FileInputStream; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; -import java.util.Arrays; import java.util.concurrent.TimeoutException; -import org.apache.commons.lang.SystemUtils; import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.ZeroCopyCursor; -import org.apache.hadoop.hdfs.client.ClientMmap; -import org.apache.hadoop.hdfs.client.ClientMmapManager; import org.apache.hadoop.hdfs.client.HdfsDataInputStream; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.net.unix.TemporarySocketDirectory; -import org.apache.hadoop.io.nativeio.NativeIO; import org.junit.AfterClass; import org.junit.Assert; import org.junit.Assume; @@ -445,322 +439,4 @@ public class TestBlockReaderLocal { if (sockDir != null) sockDir.close(); } } - - private static byte[] byteBufferToArray(ByteBuffer buf) { - byte resultArray[] = new byte[buf.remaining()]; - buf.get(resultArray); - return resultArray; - } - - public static HdfsConfiguration initZeroCopyTest() { - Assume.assumeTrue(NativeIO.isAvailable()); - Assume.assumeTrue(SystemUtils.IS_OS_UNIX); - HdfsConfiguration conf = new HdfsConfiguration(); - conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true); - sockDir = new TemporarySocketDirectory(); - conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096); - conf.setInt(DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE, 3); - conf.setLong(DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS, 100); - conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, - new File(sockDir.getDir(), - "TestRequestMmapAccess._PORT.sock").getAbsolutePath()); - conf.setBoolean(DFSConfigKeys. - DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, true); - return conf; - } - - @Test - public void testZeroCopyReads() throws Exception { - HdfsConfiguration conf = initZeroCopyTest(); - MiniDFSCluster cluster = null; - final Path TEST_PATH = new Path("/a"); - FSDataInputStream fsIn = null; - ZeroCopyCursor zcursor = null; - - FileSystem fs = null; - try { - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); - cluster.waitActive(); - fs = cluster.getFileSystem(); - DFSTestUtil.createFile(fs, TEST_PATH, - BlockReaderLocalTest.TEST_LENGTH, (short)1, 7567L); - try { - DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1); - } catch (InterruptedException e) { - Assert.fail("unexpected InterruptedException during " + - "waitReplication: " + e); - } catch (TimeoutException e) { - Assert.fail("unexpected TimeoutException during " + - "waitReplication: " + e); - } - fsIn = fs.open(TEST_PATH); - byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH]; - IOUtils.readFully(fsIn, original, 0, - BlockReaderLocalTest.TEST_LENGTH); - fsIn.close(); - fsIn = fs.open(TEST_PATH); - zcursor = fsIn.createZeroCopyCursor(); - zcursor.setFallbackBuffer(ByteBuffer. - allocateDirect(1024 * 1024 * 4)); - HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn; - zcursor.read(4096); - ByteBuffer result = zcursor.getData(); - Assert.assertEquals(4096, result.remaining()); - Assert.assertEquals(4096, - dfsIn.getReadStatistics().getTotalBytesRead()); - Assert.assertEquals(4096, - dfsIn.getReadStatistics().getTotalZeroCopyBytesRead()); - Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, 4096), - byteBufferToArray(result)); - } finally { - if (zcursor != null) zcursor.close(); - if (fsIn != null) fsIn.close(); - if (fs != null) fs.close(); - if (cluster != null) cluster.shutdown(); - } - } - - @Test - public void testShortZeroCopyReads() throws Exception { - HdfsConfiguration conf = initZeroCopyTest(); - MiniDFSCluster cluster = null; - final Path TEST_PATH = new Path("/a"); - FSDataInputStream fsIn = null; - ZeroCopyCursor zcursor = null; - - FileSystem fs = null; - try { - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); - cluster.waitActive(); - fs = cluster.getFileSystem(); - DFSTestUtil.createFile(fs, TEST_PATH, - BlockReaderLocalTest.TEST_LENGTH, (short)1, 7567L); - try { - DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1); - } catch (InterruptedException e) { - Assert.fail("unexpected InterruptedException during " + - "waitReplication: " + e); - } catch (TimeoutException e) { - Assert.fail("unexpected TimeoutException during " + - "waitReplication: " + e); - } - fsIn = fs.open(TEST_PATH); - byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH]; - IOUtils.readFully(fsIn, original, 0, - BlockReaderLocalTest.TEST_LENGTH); - fsIn.close(); - fsIn = fs.open(TEST_PATH); - zcursor = fsIn.createZeroCopyCursor(); - zcursor.setFallbackBuffer(ByteBuffer. - allocateDirect(1024 * 1024 * 4)); - zcursor.setAllowShortReads(true); - HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn; - zcursor.read(8192); - ByteBuffer result = zcursor.getData(); - Assert.assertEquals(4096, result.remaining()); - Assert.assertEquals(4096, - dfsIn.getReadStatistics().getTotalBytesRead()); - Assert.assertEquals(4096, - dfsIn.getReadStatistics().getTotalZeroCopyBytesRead()); - Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, 4096), - byteBufferToArray(result)); - zcursor.read(4097); - result = zcursor.getData(); - Assert.assertEquals(4096, result.remaining()); - Assert.assertArrayEquals(Arrays.copyOfRange(original, 4096, 8192), - byteBufferToArray(result)); - zcursor.setAllowShortReads(false); - zcursor.read(4100); - result = zcursor.getData(); - Assert.assertEquals(4100, result.remaining()); - - Assert.assertArrayEquals(Arrays.copyOfRange(original, 8192, 12292), - byteBufferToArray(result)); - } finally { - if (zcursor != null) zcursor.close(); - if (fsIn != null) fsIn.close(); - if (fs != null) fs.close(); - if (cluster != null) cluster.shutdown(); - } - } - - @Test - public void testZeroCopyReadsNoBackingBuffer() throws Exception { - HdfsConfiguration conf = initZeroCopyTest(); - MiniDFSCluster cluster = null; - final Path TEST_PATH = new Path("/a"); - FSDataInputStream fsIn = null; - ZeroCopyCursor zcursor = null; - - FileSystem fs = null; - try { - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); - cluster.waitActive(); - fs = cluster.getFileSystem(); - DFSTestUtil.createFile(fs, TEST_PATH, - BlockReaderLocalTest.TEST_LENGTH, (short)1, 7567L); - try { - DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1); - } catch (InterruptedException e) { - Assert.fail("unexpected InterruptedException during " + - "waitReplication: " + e); - } catch (TimeoutException e) { - Assert.fail("unexpected TimeoutException during " + - "waitReplication: " + e); - } - fsIn = fs.open(TEST_PATH); - byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH]; - IOUtils.readFully(fsIn, original, 0, - BlockReaderLocalTest.TEST_LENGTH); - fsIn.close(); - fsIn = fs.open(TEST_PATH); - zcursor = fsIn.createZeroCopyCursor(); - zcursor.setAllowShortReads(false); - HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn; - // This read is longer than the file, and we do not have short reads enabled. - try { - zcursor.read(8192); - Assert.fail("expected UnsupportedOperationException"); - } catch (UnsupportedOperationException e) { - // expected - } - // This read is longer than the block, and we do not have short reads enabled. - try { - zcursor.read(4097); - Assert.fail("expected UnsupportedOperationException"); - } catch (UnsupportedOperationException e) { - // expected - } - // This read should succeed. - zcursor.read(4096); - ByteBuffer result = zcursor.getData(); - Assert.assertEquals(4096, result.remaining()); - Assert.assertEquals(4096, - dfsIn.getReadStatistics().getTotalBytesRead()); - Assert.assertEquals(4096, - dfsIn.getReadStatistics().getTotalZeroCopyBytesRead()); - Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, 4096), - byteBufferToArray(result)); - } finally { - if (zcursor != null) zcursor.close(); - if (fsIn != null) fsIn.close(); - if (fs != null) fs.close(); - if (cluster != null) cluster.shutdown(); - } - } - - private static class CountingVisitor - implements ClientMmapManager.ClientMmapVisitor { - int count = 0; - - @Override - public void accept(ClientMmap mmap) { - count++; - } - - public void reset() { - count = 0; - } - } - - @Test - public void testZeroCopyMmapCache() throws Exception { - HdfsConfiguration conf = initZeroCopyTest(); - MiniDFSCluster cluster = null; - final Path TEST_PATH = new Path("/a"); - final int TEST_FILE_LENGTH = 16385; - final int RANDOM_SEED = 23453; - FSDataInputStream fsIn = null; - ZeroCopyCursor zcursor[] = { null, null, null, null, null }; - - DistributedFileSystem fs = null; - try { - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); - cluster.waitActive(); - fs = cluster.getFileSystem(); - DFSTestUtil.createFile(fs, TEST_PATH, - TEST_FILE_LENGTH, (short)1, RANDOM_SEED); - try { - DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1); - } catch (InterruptedException e) { - Assert.fail("unexpected InterruptedException during " + - "waitReplication: " + e); - } catch (TimeoutException e) { - Assert.fail("unexpected TimeoutException during " + - "waitReplication: " + e); - } - fsIn = fs.open(TEST_PATH); - byte original[] = new byte[TEST_FILE_LENGTH]; - IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH); - fsIn.close(); - fsIn = fs.open(TEST_PATH); - for (int i = 0; i < zcursor.length; i++) { - zcursor[i] = fsIn.createZeroCopyCursor(); - zcursor[i].setAllowShortReads(false); - } - ClientMmapManager mmapManager = fs.getClient().getMmapManager(); - CountingVisitor countingVisitor = new CountingVisitor(); - mmapManager.visitMmaps(countingVisitor); - Assert.assertEquals(0, countingVisitor.count); - mmapManager.visitEvictable(countingVisitor); - Assert.assertEquals(0, countingVisitor.count); - zcursor[0].read(4096); - fsIn.seek(0); - zcursor[1].read(4096); - mmapManager.visitMmaps(countingVisitor); - Assert.assertEquals(1, countingVisitor.count); - countingVisitor.reset(); - mmapManager.visitEvictable(countingVisitor); - Assert.assertEquals(0, countingVisitor.count); - countingVisitor.reset(); - - // The mmaps should be of the first block of the file. - final ExtendedBlock firstBlock = DFSTestUtil.getFirstBlock(fs, TEST_PATH); - mmapManager.visitMmaps(new ClientMmapManager.ClientMmapVisitor() { - @Override - public void accept(ClientMmap mmap) { - Assert.assertEquals(firstBlock, mmap.getBlock()); - } - }); - - // Read more blocks. - zcursor[2].read(4096); - zcursor[3].read(4096); - try { - zcursor[4].read(4096); - Assert.fail("expected UnsupportedOperationException"); - } catch (UnsupportedOperationException e) { - // expected - } - - // we should have 3 mmaps, 0 evictable - mmapManager.visitMmaps(countingVisitor); - Assert.assertEquals(3, countingVisitor.count); - countingVisitor.reset(); - mmapManager.visitEvictable(countingVisitor); - Assert.assertEquals(0, countingVisitor.count); - - // After we close the cursors, the mmaps should be evictable for - // a brief period of time. Then, they should be closed (we're - // using a very quick timeout) - for (int i = 0; i < zcursor.length; i++) { - IOUtils.closeStream(zcursor[i]); - } - while (true) { - countingVisitor.reset(); - mmapManager.visitEvictable(countingVisitor); - if (0 == countingVisitor.count) { - break; - } - } - countingVisitor.reset(); - mmapManager.visitMmaps(countingVisitor); - Assert.assertEquals(0, countingVisitor.count); - } finally { - if (fsIn != null) fsIn.close(); - if (fs != null) fs.close(); - if (cluster != null) cluster.shutdown(); - } - - } }