Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.h URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.h?rev=1515906&r1=1515905&r2=1515906&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.h (original) +++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.h Tue Aug 20 18:07:47 2013 @@ -21,6 +21,7 @@ #include <jni.h> /* for jboolean */ +struct hdfsBuilder; struct NativeMiniDfsCluster; /** @@ -28,17 +29,24 @@ struct NativeMiniDfsCluster; */ struct NativeMiniDfsConf { /** - * Nonzero if the cluster should be formatted prior to startup + * Nonzero if the cluster should be formatted prior to startup. */ jboolean doFormat; + /** * Whether or not to enable webhdfs in MiniDfsCluster */ jboolean webhdfsEnabled; + /** * The http port of the namenode in MiniDfsCluster */ jint namenodeHttpPort; + + /** + * Nonzero if we should configure short circuit. + */ + jboolean configureShortCircuit; }; /** @@ -84,7 +92,7 @@ void nmdFree(struct NativeMiniDfsCluster * * @return the port, or a negative error code */ -int nmdGetNameNodePort(const struct NativeMiniDfsCluster *cl); +int nmdGetNameNodePort(const struct NativeMiniDfsCluster *cl); /** * Get the http address that's in use by the given (non-HA) nativeMiniDfs @@ -101,4 +109,14 @@ int nmdGetNameNodePort(const struct Nati int nmdGetNameNodeHttpAddress(const struct NativeMiniDfsCluster *cl, int *port, const char **hostName); +/** + * Configure the HDFS builder appropriately to connect to this cluster. + * + * @param bld The hdfs builder + * + * @return the port, or a negative error code + */ +int nmdConfigureHdfsBuilder(struct NativeMiniDfsCluster *cl, + struct hdfsBuilder *bld); + #endif
Added: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test/test_libhdfs_zerocopy.c URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test/test_libhdfs_zerocopy.c?rev=1515906&view=auto ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test/test_libhdfs_zerocopy.c (added) +++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test/test_libhdfs_zerocopy.c Tue Aug 20 18:07:47 2013 @@ -0,0 +1,225 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "expect.h" +#include "hdfs.h" +#include "native_mini_dfs.h" + +#include <errno.h> +#include <inttypes.h> +#include <semaphore.h> +#include <pthread.h> +#include <unistd.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <sys/types.h> + +#define TO_STR_HELPER(X) #X +#define TO_STR(X) TO_STR_HELPER(X) + +#define TEST_FILE_NAME_LENGTH 128 +#define TEST_ZEROCOPY_FULL_BLOCK_SIZE 4096 +#define TEST_ZEROCOPY_LAST_BLOCK_SIZE 3215 +#define TEST_ZEROCOPY_NUM_BLOCKS 6 +#define SMALL_READ_LEN 16 + +#define ZC_BUF_LEN 32768 + +static uint8_t *getZeroCopyBlockData(int blockIdx) +{ + uint8_t *buf = malloc(TEST_ZEROCOPY_FULL_BLOCK_SIZE); + int i; + if (!buf) { + fprintf(stderr, "malloc(%d) failed\n", TEST_ZEROCOPY_FULL_BLOCK_SIZE); + exit(1); + } + for (i = 0; i < TEST_ZEROCOPY_FULL_BLOCK_SIZE; i++) { + buf[i] = blockIdx + (i % 17); + } + return buf; +} + +static int getZeroCopyBlockLen(int blockIdx) +{ + if (blockIdx >= TEST_ZEROCOPY_NUM_BLOCKS) { + return 0; + } else if (blockIdx == (TEST_ZEROCOPY_NUM_BLOCKS - 1)) { + return TEST_ZEROCOPY_LAST_BLOCK_SIZE; + } else { + return TEST_ZEROCOPY_FULL_BLOCK_SIZE; + } +} + +static void printBuf(const uint8_t *buf, size_t len) __attribute__((unused)); + +static void printBuf(const uint8_t *buf, size_t len) +{ + size_t i; + + for (i = 0; i < len; i++) { + fprintf(stderr, "%02x", buf[i]); + } + fprintf(stderr, "\n"); +} + +static int doTestZeroCopyReads(hdfsFS fs, const char *fileName) +{ + hdfsFile file = NULL; + struct hadoopZeroCopyCursor *zcursor = NULL; + uint8_t *backingBuffer = NULL, *block; + const void *zcPtr; + + file = hdfsOpenFile(fs, fileName, O_RDONLY, 0, 0, 0); + EXPECT_NONNULL(file); + zcursor = hadoopZeroCopyCursorAlloc(file); + EXPECT_NONNULL(zcursor); + /* haven't read anything yet */ + EXPECT_ZERO(expectFileStats(file, 0LL, 0LL, 0LL, 0LL)); + block = getZeroCopyBlockData(0); + EXPECT_NONNULL(block); + /* first read is half of a block. */ + EXPECT_INT_EQ(TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2, + hadoopZeroCopyRead(zcursor, + TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2, &zcPtr)); + EXPECT_ZERO(memcmp(zcPtr, block, TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2)); + /* read the next half of the block */ + EXPECT_INT_EQ(TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2, + hadoopZeroCopyRead(zcursor, + TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2, &zcPtr)); + EXPECT_ZERO(memcmp(zcPtr, block + (TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2), + TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2)); + free(block); + EXPECT_ZERO(expectFileStats(file, TEST_ZEROCOPY_FULL_BLOCK_SIZE, + TEST_ZEROCOPY_FULL_BLOCK_SIZE, + TEST_ZEROCOPY_FULL_BLOCK_SIZE, + TEST_ZEROCOPY_FULL_BLOCK_SIZE)); + /* Now let's read just a few bytes. */ + EXPECT_INT_EQ(SMALL_READ_LEN, + hadoopZeroCopyRead(zcursor, SMALL_READ_LEN, &zcPtr)); + block = getZeroCopyBlockData(1); + EXPECT_NONNULL(block); + EXPECT_ZERO(memcmp(block, zcPtr, SMALL_READ_LEN)); + EXPECT_INT_EQ(TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN, + hdfsTell(fs, file)); + EXPECT_ZERO(expectFileStats(file, + TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN, + TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN, + TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN, + TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN)); + + /* Try to read a full block's worth of data. This will cross the block + * boundary, which means we have to fall back to non-zero-copy reads. + * However, because we don't have a backing buffer, the fallback will fail + * with EPROTONOSUPPORT. */ + EXPECT_INT_EQ(-1, + hadoopZeroCopyRead(zcursor, TEST_ZEROCOPY_FULL_BLOCK_SIZE, &zcPtr)); + EXPECT_INT_EQ(EPROTONOSUPPORT, errno); + + /* Now set a backing buffer and try again. It should succeed this time. */ + backingBuffer = malloc(ZC_BUF_LEN); + EXPECT_NONNULL(backingBuffer); + EXPECT_ZERO(hadoopZeroCopyCursorSetFallbackBuffer(zcursor, + backingBuffer, ZC_BUF_LEN)); + EXPECT_INT_EQ(TEST_ZEROCOPY_FULL_BLOCK_SIZE, + hadoopZeroCopyRead(zcursor, TEST_ZEROCOPY_FULL_BLOCK_SIZE, &zcPtr)); + EXPECT_ZERO(expectFileStats(file, + (2 * TEST_ZEROCOPY_FULL_BLOCK_SIZE) + SMALL_READ_LEN, + (2 * TEST_ZEROCOPY_FULL_BLOCK_SIZE) + SMALL_READ_LEN, + (2 * TEST_ZEROCOPY_FULL_BLOCK_SIZE) + SMALL_READ_LEN, + TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN)); + EXPECT_ZERO(memcmp(block + SMALL_READ_LEN, zcPtr, + TEST_ZEROCOPY_FULL_BLOCK_SIZE - SMALL_READ_LEN)); + free(block); + block = getZeroCopyBlockData(2); + EXPECT_NONNULL(block); + EXPECT_ZERO(memcmp(block, zcPtr + + (TEST_ZEROCOPY_FULL_BLOCK_SIZE - SMALL_READ_LEN), SMALL_READ_LEN)); + free(block); + hadoopZeroCopyCursorFree(zcursor); + EXPECT_ZERO(hdfsCloseFile(fs, file)); + free(backingBuffer); + return 0; +} + +static int createZeroCopyTestFile(hdfsFS fs, char *testFileName, + size_t testFileNameLen) +{ + int blockIdx, blockLen; + hdfsFile file; + uint8_t *data; + + snprintf(testFileName, testFileNameLen, "/zeroCopyTestFile.%d.%d", + getpid(), rand()); + file = hdfsOpenFile(fs, testFileName, O_WRONLY, 0, 1, + TEST_ZEROCOPY_FULL_BLOCK_SIZE); + EXPECT_NONNULL(file); + for (blockIdx = 0; blockIdx < TEST_ZEROCOPY_NUM_BLOCKS; blockIdx++) { + blockLen = getZeroCopyBlockLen(blockIdx); + data = getZeroCopyBlockData(blockIdx); + EXPECT_NONNULL(data); + EXPECT_INT_EQ(blockLen, hdfsWrite(fs, file, data, blockLen)); + } + EXPECT_ZERO(hdfsCloseFile(fs, file)); + return 0; +} + +/** + * Test that we can write a file with libhdfs and then read it back + */ +int main(void) +{ + int port; + struct NativeMiniDfsConf conf = { + .doFormat = 1, + .configureShortCircuit = 1, + }; + char testFileName[TEST_FILE_NAME_LENGTH]; + hdfsFS fs; + struct NativeMiniDfsCluster* cl; + struct hdfsBuilder *bld; + + cl = nmdCreate(&conf); + EXPECT_NONNULL(cl); + EXPECT_ZERO(nmdWaitClusterUp(cl)); + port = nmdGetNameNodePort(cl); + if (port < 0) { + fprintf(stderr, "TEST_ERROR: test_zerocopy: " + "nmdGetNameNodePort returned error %d\n", port); + return EXIT_FAILURE; + } + bld = hdfsNewBuilder(); + EXPECT_NONNULL(bld); + EXPECT_ZERO(nmdConfigureHdfsBuilder(cl, bld)); + hdfsBuilderSetForceNewInstance(bld); + hdfsBuilderConfSetStr(bld, "dfs.block.size", + TO_STR(TEST_ZEROCOPY_FULL_BLOCK_SIZE)); + /* ensure that we'll always get our mmaps */ + hdfsBuilderConfSetStr(bld, "dfs.client.read.shortcircuit.skip.checksum", + "true"); + fs = hdfsBuilderConnect(bld); + EXPECT_NONNULL(fs); + EXPECT_ZERO(createZeroCopyTestFile(fs, testFileName, + TEST_FILE_NAME_LENGTH)); + EXPECT_ZERO(doTestZeroCopyReads(fs, testFileName)); + EXPECT_ZERO(hdfsDisconnect(fs)); + EXPECT_ZERO(nmdShutdown(cl)); + nmdFree(cl); + fprintf(stderr, "TEST_SUCCESS\n"); + return EXIT_SUCCESS; +} Propchange: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test/test_libhdfs_zerocopy.c ------------------------------------------------------------------------------ svn:eol-style = native Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml?rev=1515906&r1=1515905&r2=1515906&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml (original) +++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml Tue Aug 20 18:07:47 2013 @@ -1391,4 +1391,32 @@ linearly increases. </description> </property> + +<property> + <name>dfs.client.mmap.cache.size</name> + <value>1024</value> + <description> + When zero-copy reads are used, the DFSClient keeps a cache of recently used + memory mapped regions. This parameter controls the maximum number of + entries that we will keep in that cache. + + If this is set to 0, we will not allow mmap. + + The larger this number is, the more file descriptors we will potentially + use for memory-mapped files. mmaped files also use virtual address space. + You may need to increase your ulimit virtual address space limits before + increasing the client mmap cache size. + </description> +</property> + +<property> + <name>dfs.client.mmap.cache.timeout.ms</name> + <value>900000</value> + <description> + The minimum length of time that we will keep an mmap entry in the cache + between uses. If an entry is in the cache longer than this, and nobody + uses it, it will be removed by a background thread. + </description> +</property> + </configuration> Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java?rev=1515906&r1=1515905&r2=1515906&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java (original) +++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java Tue Aug 20 18:07:47 2013 @@ -23,24 +23,44 @@ import java.io.FileInputStream; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.concurrent.TimeoutException; -import org.apache.hadoop.hdfs.DFSInputStream.ReadStatistics; +import org.apache.commons.lang.SystemUtils; import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.ZeroCopyCursor; +import org.apache.hadoop.hdfs.client.ClientMmap; +import org.apache.hadoop.hdfs.client.ClientMmapManager; import org.apache.hadoop.hdfs.client.HdfsDataInputStream; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.net.unix.TemporarySocketDirectory; +import org.apache.hadoop.io.nativeio.NativeIO; +import org.junit.AfterClass; import org.junit.Assert; import org.junit.Assume; +import org.junit.BeforeClass; import org.junit.Test; public class TestBlockReaderLocal { + private static TemporarySocketDirectory sockDir; + + @BeforeClass + public static void init() { + sockDir = new TemporarySocketDirectory(); + DomainSocket.disableBindPathValidation(); + } + + @AfterClass + public static void shutdown() throws IOException { + sockDir.close(); + } + public static void assertArrayRegionsEqual(byte []buf1, int off1, byte []buf2, int off2, int len) { for (int i = 0; i < len; i++) { @@ -100,10 +120,11 @@ public class TestBlockReaderLocal { FSDataInputStream fsIn = null; byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH]; + FileSystem fs = null; try { cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); cluster.waitActive(); - FileSystem fs = cluster.getFileSystem(); + fs = cluster.getFileSystem(); DFSTestUtil.createFile(fs, TEST_PATH, BlockReaderLocalTest.TEST_LENGTH, (short)1, RANDOM_SEED); try { @@ -138,6 +159,7 @@ public class TestBlockReaderLocal { test.doTest(blockReaderLocal, original); } finally { if (fsIn != null) fsIn.close(); + if (fs != null) fs.close(); if (cluster != null) cluster.shutdown(); if (dataIn != null) dataIn.close(); if (checkIn != null) checkIn.close(); @@ -382,10 +404,11 @@ public class TestBlockReaderLocal { final long RANDOM_SEED = 4567L; FSDataInputStream fsIn = null; byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH]; + FileSystem fs = null; try { cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); cluster.waitActive(); - FileSystem fs = cluster.getFileSystem(); + fs = cluster.getFileSystem(); DFSTestUtil.createFile(fs, TEST_PATH, BlockReaderLocalTest.TEST_LENGTH, (short)1, RANDOM_SEED); try { @@ -417,8 +440,327 @@ public class TestBlockReaderLocal { } finally { DFSInputStream.tcpReadsDisabledForTesting = false; if (fsIn != null) fsIn.close(); + if (fs != null) fs.close(); if (cluster != null) cluster.shutdown(); if (sockDir != null) sockDir.close(); } } + + private static byte[] byteBufferToArray(ByteBuffer buf) { + byte resultArray[] = new byte[buf.remaining()]; + buf.get(resultArray); + return resultArray; + } + + public static HdfsConfiguration initZeroCopyTest() { + Assume.assumeTrue(NativeIO.isAvailable()); + Assume.assumeTrue(SystemUtils.IS_OS_UNIX); + HdfsConfiguration conf = new HdfsConfiguration(); + conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true); + sockDir = new TemporarySocketDirectory(); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096); + conf.setInt(DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE, 3); + conf.setLong(DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS, 100); + conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, + new File(sockDir.getDir(), + "TestRequestMmapAccess._PORT.sock").getAbsolutePath()); + conf.setBoolean(DFSConfigKeys. + DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, true); + return conf; + } + + @Test + public void testZeroCopyReads() throws Exception { + HdfsConfiguration conf = initZeroCopyTest(); + MiniDFSCluster cluster = null; + final Path TEST_PATH = new Path("/a"); + FSDataInputStream fsIn = null; + ZeroCopyCursor zcursor = null; + + FileSystem fs = null; + try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + cluster.waitActive(); + fs = cluster.getFileSystem(); + DFSTestUtil.createFile(fs, TEST_PATH, + BlockReaderLocalTest.TEST_LENGTH, (short)1, 7567L); + try { + DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1); + } catch (InterruptedException e) { + Assert.fail("unexpected InterruptedException during " + + "waitReplication: " + e); + } catch (TimeoutException e) { + Assert.fail("unexpected TimeoutException during " + + "waitReplication: " + e); + } + fsIn = fs.open(TEST_PATH); + byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH]; + IOUtils.readFully(fsIn, original, 0, + BlockReaderLocalTest.TEST_LENGTH); + fsIn.close(); + fsIn = fs.open(TEST_PATH); + zcursor = fsIn.createZeroCopyCursor(); + zcursor.setFallbackBuffer(ByteBuffer. + allocateDirect(1024 * 1024 * 4)); + HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn; + zcursor.read(4096); + ByteBuffer result = zcursor.getData(); + Assert.assertEquals(4096, result.remaining()); + Assert.assertEquals(4096, + dfsIn.getReadStatistics().getTotalBytesRead()); + Assert.assertEquals(4096, + dfsIn.getReadStatistics().getTotalZeroCopyBytesRead()); + Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, 4096), + byteBufferToArray(result)); + } finally { + if (zcursor != null) zcursor.close(); + if (fsIn != null) fsIn.close(); + if (fs != null) fs.close(); + if (cluster != null) cluster.shutdown(); + } + } + + @Test + public void testShortZeroCopyReads() throws Exception { + HdfsConfiguration conf = initZeroCopyTest(); + MiniDFSCluster cluster = null; + final Path TEST_PATH = new Path("/a"); + FSDataInputStream fsIn = null; + ZeroCopyCursor zcursor = null; + + FileSystem fs = null; + try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + cluster.waitActive(); + fs = cluster.getFileSystem(); + DFSTestUtil.createFile(fs, TEST_PATH, + BlockReaderLocalTest.TEST_LENGTH, (short)1, 7567L); + try { + DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1); + } catch (InterruptedException e) { + Assert.fail("unexpected InterruptedException during " + + "waitReplication: " + e); + } catch (TimeoutException e) { + Assert.fail("unexpected TimeoutException during " + + "waitReplication: " + e); + } + fsIn = fs.open(TEST_PATH); + byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH]; + IOUtils.readFully(fsIn, original, 0, + BlockReaderLocalTest.TEST_LENGTH); + fsIn.close(); + fsIn = fs.open(TEST_PATH); + zcursor = fsIn.createZeroCopyCursor(); + zcursor.setFallbackBuffer(ByteBuffer. + allocateDirect(1024 * 1024 * 4)); + zcursor.setAllowShortReads(true); + HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn; + zcursor.read(8192); + ByteBuffer result = zcursor.getData(); + Assert.assertEquals(4096, result.remaining()); + Assert.assertEquals(4096, + dfsIn.getReadStatistics().getTotalBytesRead()); + Assert.assertEquals(4096, + dfsIn.getReadStatistics().getTotalZeroCopyBytesRead()); + Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, 4096), + byteBufferToArray(result)); + zcursor.read(4097); + result = zcursor.getData(); + Assert.assertEquals(4096, result.remaining()); + Assert.assertArrayEquals(Arrays.copyOfRange(original, 4096, 8192), + byteBufferToArray(result)); + zcursor.setAllowShortReads(false); + zcursor.read(4100); + result = zcursor.getData(); + Assert.assertEquals(4100, result.remaining()); + + Assert.assertArrayEquals(Arrays.copyOfRange(original, 8192, 12292), + byteBufferToArray(result)); + } finally { + if (zcursor != null) zcursor.close(); + if (fsIn != null) fsIn.close(); + if (fs != null) fs.close(); + if (cluster != null) cluster.shutdown(); + } + } + + @Test + public void testZeroCopyReadsNoBackingBuffer() throws Exception { + HdfsConfiguration conf = initZeroCopyTest(); + MiniDFSCluster cluster = null; + final Path TEST_PATH = new Path("/a"); + FSDataInputStream fsIn = null; + ZeroCopyCursor zcursor = null; + + FileSystem fs = null; + try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + cluster.waitActive(); + fs = cluster.getFileSystem(); + DFSTestUtil.createFile(fs, TEST_PATH, + BlockReaderLocalTest.TEST_LENGTH, (short)1, 7567L); + try { + DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1); + } catch (InterruptedException e) { + Assert.fail("unexpected InterruptedException during " + + "waitReplication: " + e); + } catch (TimeoutException e) { + Assert.fail("unexpected TimeoutException during " + + "waitReplication: " + e); + } + fsIn = fs.open(TEST_PATH); + byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH]; + IOUtils.readFully(fsIn, original, 0, + BlockReaderLocalTest.TEST_LENGTH); + fsIn.close(); + fsIn = fs.open(TEST_PATH); + zcursor = fsIn.createZeroCopyCursor(); + zcursor.setAllowShortReads(false); + HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn; + // This read is longer than the file, and we do not have short reads enabled. + try { + zcursor.read(8192); + Assert.fail("expected UnsupportedOperationException"); + } catch (UnsupportedOperationException e) { + // expected + } + // This read is longer than the block, and we do not have short reads enabled. + try { + zcursor.read(4097); + Assert.fail("expected UnsupportedOperationException"); + } catch (UnsupportedOperationException e) { + // expected + } + // This read should succeed. + zcursor.read(4096); + ByteBuffer result = zcursor.getData(); + Assert.assertEquals(4096, result.remaining()); + Assert.assertEquals(4096, + dfsIn.getReadStatistics().getTotalBytesRead()); + Assert.assertEquals(4096, + dfsIn.getReadStatistics().getTotalZeroCopyBytesRead()); + Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, 4096), + byteBufferToArray(result)); + } finally { + if (zcursor != null) zcursor.close(); + if (fsIn != null) fsIn.close(); + if (fs != null) fs.close(); + if (cluster != null) cluster.shutdown(); + } + } + + private static class CountingVisitor + implements ClientMmapManager.ClientMmapVisitor { + int count = 0; + + @Override + public void accept(ClientMmap mmap) { + count++; + } + + public void reset() { + count = 0; + } + } + + @Test + public void testZeroCopyMmapCache() throws Exception { + HdfsConfiguration conf = initZeroCopyTest(); + MiniDFSCluster cluster = null; + final Path TEST_PATH = new Path("/a"); + final int TEST_FILE_LENGTH = 16385; + final int RANDOM_SEED = 23453; + FSDataInputStream fsIn = null; + ZeroCopyCursor zcursor[] = { null, null, null, null, null }; + + DistributedFileSystem fs = null; + try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + cluster.waitActive(); + fs = cluster.getFileSystem(); + DFSTestUtil.createFile(fs, TEST_PATH, + TEST_FILE_LENGTH, (short)1, RANDOM_SEED); + try { + DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1); + } catch (InterruptedException e) { + Assert.fail("unexpected InterruptedException during " + + "waitReplication: " + e); + } catch (TimeoutException e) { + Assert.fail("unexpected TimeoutException during " + + "waitReplication: " + e); + } + fsIn = fs.open(TEST_PATH); + byte original[] = new byte[TEST_FILE_LENGTH]; + IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH); + fsIn.close(); + fsIn = fs.open(TEST_PATH); + for (int i = 0; i < zcursor.length; i++) { + zcursor[i] = fsIn.createZeroCopyCursor(); + zcursor[i].setAllowShortReads(false); + } + ClientMmapManager mmapManager = fs.getClient().getMmapManager(); + CountingVisitor countingVisitor = new CountingVisitor(); + mmapManager.visitMmaps(countingVisitor); + Assert.assertEquals(0, countingVisitor.count); + mmapManager.visitEvictable(countingVisitor); + Assert.assertEquals(0, countingVisitor.count); + zcursor[0].read(4096); + fsIn.seek(0); + zcursor[1].read(4096); + mmapManager.visitMmaps(countingVisitor); + Assert.assertEquals(1, countingVisitor.count); + countingVisitor.reset(); + mmapManager.visitEvictable(countingVisitor); + Assert.assertEquals(0, countingVisitor.count); + countingVisitor.reset(); + + // The mmaps should be of the first block of the file. + final ExtendedBlock firstBlock = DFSTestUtil.getFirstBlock(fs, TEST_PATH); + mmapManager.visitMmaps(new ClientMmapManager.ClientMmapVisitor() { + @Override + public void accept(ClientMmap mmap) { + Assert.assertEquals(firstBlock, mmap.getBlock()); + } + }); + + // Read more blocks. + zcursor[2].read(4096); + zcursor[3].read(4096); + try { + zcursor[4].read(4096); + Assert.fail("expected UnsupportedOperationException"); + } catch (UnsupportedOperationException e) { + // expected + } + + // we should have 3 mmaps, 0 evictable + mmapManager.visitMmaps(countingVisitor); + Assert.assertEquals(3, countingVisitor.count); + countingVisitor.reset(); + mmapManager.visitEvictable(countingVisitor); + Assert.assertEquals(0, countingVisitor.count); + + // After we close the cursors, the mmaps should be evictable for + // a brief period of time. Then, they should be closed (we're + // using a very quick timeout) + for (int i = 0; i < zcursor.length; i++) { + IOUtils.closeStream(zcursor[i]); + } + while (true) { + countingVisitor.reset(); + mmapManager.visitEvictable(countingVisitor); + if (0 == countingVisitor.count) { + break; + } + } + countingVisitor.reset(); + mmapManager.visitMmaps(countingVisitor); + Assert.assertEquals(0, countingVisitor.count); + } finally { + if (fsIn != null) fsIn.close(); + if (fs != null) fs.close(); + if (cluster != null) cluster.shutdown(); + } + + } }