Modified: 
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.h
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.h?rev=1515906&r1=1515905&r2=1515906&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.h
 (original)
+++ 
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.h
 Tue Aug 20 18:07:47 2013
@@ -21,6 +21,7 @@
 
 #include <jni.h> /* for jboolean */
 
+struct hdfsBuilder;
 struct NativeMiniDfsCluster; 
 
 /**
@@ -28,17 +29,24 @@ struct NativeMiniDfsCluster; 
  */
 struct NativeMiniDfsConf {
     /**
-     * Nonzero if the cluster should be formatted prior to startup
+     * Nonzero if the cluster should be formatted prior to startup.
      */
     jboolean doFormat;
+
     /**
      * Whether or not to enable webhdfs in MiniDfsCluster
      */
     jboolean webhdfsEnabled;
+
     /**
      * The http port of the namenode in MiniDfsCluster
      */
     jint namenodeHttpPort;
+
+    /**
+     * Nonzero if we should configure short circuit.
+     */
+    jboolean configureShortCircuit;
 };
 
 /**
@@ -84,7 +92,7 @@ void nmdFree(struct NativeMiniDfsCluster
  *
  * @return          the port, or a negative error code
  */
-int nmdGetNameNodePort(const struct NativeMiniDfsCluster *cl);
+int nmdGetNameNodePort(const struct NativeMiniDfsCluster *cl); 
 
 /**
  * Get the http address that's in use by the given (non-HA) nativeMiniDfs
@@ -101,4 +109,14 @@ int nmdGetNameNodePort(const struct Nati
 int nmdGetNameNodeHttpAddress(const struct NativeMiniDfsCluster *cl,
                                int *port, const char **hostName);
 
+/**
+ * Configure the HDFS builder appropriately to connect to this cluster.
+ *
+ * @param bld       The hdfs builder
+ *
+ * @return          the port, or a negative error code
+ */
+int nmdConfigureHdfsBuilder(struct NativeMiniDfsCluster *cl,
+                            struct hdfsBuilder *bld);
+
 #endif

Added: 
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test/test_libhdfs_zerocopy.c
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test/test_libhdfs_zerocopy.c?rev=1515906&view=auto
==============================================================================
--- 
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test/test_libhdfs_zerocopy.c
 (added)
+++ 
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test/test_libhdfs_zerocopy.c
 Tue Aug 20 18:07:47 2013
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "expect.h"
+#include "hdfs.h"
+#include "native_mini_dfs.h"
+
+#include <errno.h>
+#include <inttypes.h>
+#include <semaphore.h>
+#include <pthread.h>
+#include <unistd.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/types.h>
+
+#define TO_STR_HELPER(X) #X
+#define TO_STR(X) TO_STR_HELPER(X)
+
+#define TEST_FILE_NAME_LENGTH 128
+#define TEST_ZEROCOPY_FULL_BLOCK_SIZE 4096
+#define TEST_ZEROCOPY_LAST_BLOCK_SIZE 3215
+#define TEST_ZEROCOPY_NUM_BLOCKS 6
+#define SMALL_READ_LEN 16
+
+#define ZC_BUF_LEN 32768
+
+static uint8_t *getZeroCopyBlockData(int blockIdx)
+{
+    uint8_t *buf = malloc(TEST_ZEROCOPY_FULL_BLOCK_SIZE);
+    int i;
+    if (!buf) {
+        fprintf(stderr, "malloc(%d) failed\n", TEST_ZEROCOPY_FULL_BLOCK_SIZE);
+        exit(1);
+    }
+    for (i = 0; i < TEST_ZEROCOPY_FULL_BLOCK_SIZE; i++) {
+      buf[i] = blockIdx + (i % 17);
+    }
+    return buf;
+}
+
+static int getZeroCopyBlockLen(int blockIdx)
+{
+    if (blockIdx >= TEST_ZEROCOPY_NUM_BLOCKS) {
+        return 0;
+    } else if (blockIdx == (TEST_ZEROCOPY_NUM_BLOCKS - 1)) {
+        return TEST_ZEROCOPY_LAST_BLOCK_SIZE;
+    } else {
+        return TEST_ZEROCOPY_FULL_BLOCK_SIZE;
+    }
+}
+
+static void printBuf(const uint8_t *buf, size_t len) __attribute__((unused));
+
+static void printBuf(const uint8_t *buf, size_t len)
+{
+  size_t i;
+
+  for (i = 0; i < len; i++) {
+    fprintf(stderr, "%02x", buf[i]);
+  }
+  fprintf(stderr, "\n");
+}
+
+static int doTestZeroCopyReads(hdfsFS fs, const char *fileName)
+{
+    hdfsFile file = NULL;
+    struct hadoopZeroCopyCursor *zcursor = NULL;
+    uint8_t *backingBuffer = NULL, *block;
+    const void *zcPtr;
+
+    file = hdfsOpenFile(fs, fileName, O_RDONLY, 0, 0, 0);
+    EXPECT_NONNULL(file);
+    zcursor = hadoopZeroCopyCursorAlloc(file);
+    EXPECT_NONNULL(zcursor);
+    /* haven't read anything yet */
+    EXPECT_ZERO(expectFileStats(file, 0LL, 0LL, 0LL, 0LL));
+    block = getZeroCopyBlockData(0);
+    EXPECT_NONNULL(block);
+    /* first read is half of a block. */
+    EXPECT_INT_EQ(TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2,
+          hadoopZeroCopyRead(zcursor,
+                  TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2, &zcPtr));
+    EXPECT_ZERO(memcmp(zcPtr, block, TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2));
+    /* read the next half of the block */
+    EXPECT_INT_EQ(TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2,
+          hadoopZeroCopyRead(zcursor,
+                  TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2, &zcPtr));
+    EXPECT_ZERO(memcmp(zcPtr, block + (TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2),
+                       TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2));
+    free(block);
+    EXPECT_ZERO(expectFileStats(file, TEST_ZEROCOPY_FULL_BLOCK_SIZE, 
+              TEST_ZEROCOPY_FULL_BLOCK_SIZE,
+              TEST_ZEROCOPY_FULL_BLOCK_SIZE,
+              TEST_ZEROCOPY_FULL_BLOCK_SIZE));
+    /* Now let's read just a few bytes. */
+    EXPECT_INT_EQ(SMALL_READ_LEN,
+                  hadoopZeroCopyRead(zcursor, SMALL_READ_LEN, &zcPtr));
+    block = getZeroCopyBlockData(1);
+    EXPECT_NONNULL(block);
+    EXPECT_ZERO(memcmp(block, zcPtr, SMALL_READ_LEN));
+    EXPECT_INT_EQ(TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN,
+                  hdfsTell(fs, file));
+    EXPECT_ZERO(expectFileStats(file,
+          TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN,
+          TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN,
+          TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN,
+          TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN));
+
+    /* Try to read a full block's worth of data.  This will cross the block
+     * boundary, which means we have to fall back to non-zero-copy reads.
+     * However, because we don't have a backing buffer, the fallback will fail
+     * with EPROTONOSUPPORT. */
+    EXPECT_INT_EQ(-1, 
+          hadoopZeroCopyRead(zcursor, TEST_ZEROCOPY_FULL_BLOCK_SIZE, &zcPtr));
+    EXPECT_INT_EQ(EPROTONOSUPPORT, errno);
+
+    /* Now set a backing buffer and try again.  It should succeed this time. */
+    backingBuffer = malloc(ZC_BUF_LEN);
+    EXPECT_NONNULL(backingBuffer);
+    EXPECT_ZERO(hadoopZeroCopyCursorSetFallbackBuffer(zcursor,
+                                          backingBuffer, ZC_BUF_LEN));
+    EXPECT_INT_EQ(TEST_ZEROCOPY_FULL_BLOCK_SIZE, 
+          hadoopZeroCopyRead(zcursor, TEST_ZEROCOPY_FULL_BLOCK_SIZE, &zcPtr));
+    EXPECT_ZERO(expectFileStats(file,
+          (2 * TEST_ZEROCOPY_FULL_BLOCK_SIZE) + SMALL_READ_LEN,
+          (2 * TEST_ZEROCOPY_FULL_BLOCK_SIZE) + SMALL_READ_LEN,
+          (2 * TEST_ZEROCOPY_FULL_BLOCK_SIZE) + SMALL_READ_LEN,
+          TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN));
+    EXPECT_ZERO(memcmp(block + SMALL_READ_LEN, zcPtr,
+        TEST_ZEROCOPY_FULL_BLOCK_SIZE - SMALL_READ_LEN));
+    free(block);
+    block = getZeroCopyBlockData(2);
+    EXPECT_NONNULL(block);
+    EXPECT_ZERO(memcmp(block, zcPtr +
+        (TEST_ZEROCOPY_FULL_BLOCK_SIZE - SMALL_READ_LEN), SMALL_READ_LEN));
+    free(block);
+    hadoopZeroCopyCursorFree(zcursor);
+    EXPECT_ZERO(hdfsCloseFile(fs, file));
+    free(backingBuffer);
+    return 0;
+}
+
+static int createZeroCopyTestFile(hdfsFS fs, char *testFileName,
+                                  size_t testFileNameLen)
+{
+    int blockIdx, blockLen;
+    hdfsFile file;
+    uint8_t *data;
+
+    snprintf(testFileName, testFileNameLen, "/zeroCopyTestFile.%d.%d",
+             getpid(), rand());
+    file = hdfsOpenFile(fs, testFileName, O_WRONLY, 0, 1,
+                        TEST_ZEROCOPY_FULL_BLOCK_SIZE);
+    EXPECT_NONNULL(file);
+    for (blockIdx = 0; blockIdx < TEST_ZEROCOPY_NUM_BLOCKS; blockIdx++) {
+        blockLen = getZeroCopyBlockLen(blockIdx);
+        data = getZeroCopyBlockData(blockIdx);
+        EXPECT_NONNULL(data);
+        EXPECT_INT_EQ(blockLen, hdfsWrite(fs, file, data, blockLen));
+    }
+    EXPECT_ZERO(hdfsCloseFile(fs, file));
+    return 0;
+}
+
+/**
+ * Test that we can write a file with libhdfs and then read it back
+ */
+int main(void)
+{
+    int port;
+    struct NativeMiniDfsConf conf = {
+        .doFormat = 1,
+        .configureShortCircuit = 1,
+    };
+    char testFileName[TEST_FILE_NAME_LENGTH];
+    hdfsFS fs;
+    struct NativeMiniDfsCluster* cl;
+    struct hdfsBuilder *bld;
+
+    cl = nmdCreate(&conf);
+    EXPECT_NONNULL(cl);
+    EXPECT_ZERO(nmdWaitClusterUp(cl));
+    port = nmdGetNameNodePort(cl);
+    if (port < 0) {
+        fprintf(stderr, "TEST_ERROR: test_zerocopy: "
+                "nmdGetNameNodePort returned error %d\n", port);
+        return EXIT_FAILURE;
+    }
+    bld = hdfsNewBuilder();
+    EXPECT_NONNULL(bld);
+    EXPECT_ZERO(nmdConfigureHdfsBuilder(cl, bld));
+    hdfsBuilderSetForceNewInstance(bld);
+    hdfsBuilderConfSetStr(bld, "dfs.block.size",
+                          TO_STR(TEST_ZEROCOPY_FULL_BLOCK_SIZE));
+    /* ensure that we'll always get our mmaps */
+    hdfsBuilderConfSetStr(bld, "dfs.client.read.shortcircuit.skip.checksum",
+                          "true");
+    fs = hdfsBuilderConnect(bld);
+    EXPECT_NONNULL(fs);
+    EXPECT_ZERO(createZeroCopyTestFile(fs, testFileName,
+          TEST_FILE_NAME_LENGTH));
+    EXPECT_ZERO(doTestZeroCopyReads(fs, testFileName));
+    EXPECT_ZERO(hdfsDisconnect(fs));
+    EXPECT_ZERO(nmdShutdown(cl));
+    nmdFree(cl);
+    fprintf(stderr, "TEST_SUCCESS\n"); 
+    return EXIT_SUCCESS;
+}

Propchange: 
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test/test_libhdfs_zerocopy.c
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml?rev=1515906&r1=1515905&r2=1515906&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
 (original)
+++ 
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
 Tue Aug 20 18:07:47 2013
@@ -1391,4 +1391,32 @@
          linearly increases.
        </description>
 </property>
+
+<property>
+  <name>dfs.client.mmap.cache.size</name>
+  <value>1024</value>
+  <description>
+    When zero-copy reads are used, the DFSClient keeps a cache of recently used
+    memory mapped regions.  This parameter controls the maximum number of
+    entries that we will keep in that cache.
+
+    If this is set to 0, we will not allow mmap.
+
+    The larger this number is, the more file descriptors we will potentially
+    use for memory-mapped files.  mmaped files also use virtual address space.
+    You may need to increase your ulimit virtual address space limits before
+    increasing the client mmap cache size.
+  </description>
+</property>
+
+<property>
+  <name>dfs.client.mmap.cache.timeout.ms</name>
+  <value>900000</value>
+  <description>
+    The minimum length of time that we will keep an mmap entry in the cache
+    between uses.  If an entry is in the cache longer than this, and nobody
+    uses it, it will be removed by a background thread.
+  </description>
+</property>
+
 </configuration>

Modified: 
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java?rev=1515906&r1=1515905&r2=1515906&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java
 (original)
+++ 
hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java
 Tue Aug 20 18:07:47 2013
@@ -23,24 +23,44 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.concurrent.TimeoutException;
 
-import org.apache.hadoop.hdfs.DFSInputStream.ReadStatistics;
+import org.apache.commons.lang.SystemUtils;
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.ZeroCopyCursor;
+import org.apache.hadoop.hdfs.client.ClientMmap;
+import org.apache.hadoop.hdfs.client.ClientMmapManager;
 import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.unix.DomainSocket;
 import org.apache.hadoop.net.unix.TemporarySocketDirectory;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Assume;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class TestBlockReaderLocal {
+  private static TemporarySocketDirectory sockDir;
+  
+  @BeforeClass
+  public static void init() {
+    sockDir = new TemporarySocketDirectory();
+    DomainSocket.disableBindPathValidation();
+  }
+  
+  @AfterClass
+  public static void shutdown() throws IOException {
+    sockDir.close();
+  }
+  
   public static void assertArrayRegionsEqual(byte []buf1, int off1, byte 
[]buf2,
       int off2, int len) {
     for (int i = 0; i < len; i++) {
@@ -100,10 +120,11 @@ public class TestBlockReaderLocal {
     FSDataInputStream fsIn = null;
     byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH];
     
+    FileSystem fs = null;
     try {
       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
       cluster.waitActive();
-      FileSystem fs = cluster.getFileSystem();
+      fs = cluster.getFileSystem();
       DFSTestUtil.createFile(fs, TEST_PATH,
           BlockReaderLocalTest.TEST_LENGTH, (short)1, RANDOM_SEED);
       try {
@@ -138,6 +159,7 @@ public class TestBlockReaderLocal {
       test.doTest(blockReaderLocal, original);
     } finally {
       if (fsIn != null) fsIn.close();
+      if (fs != null) fs.close();
       if (cluster != null) cluster.shutdown();
       if (dataIn != null) dataIn.close();
       if (checkIn != null) checkIn.close();
@@ -382,10 +404,11 @@ public class TestBlockReaderLocal {
     final long RANDOM_SEED = 4567L;
     FSDataInputStream fsIn = null;
     byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH];
+    FileSystem fs = null;
     try {
       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
       cluster.waitActive();
-      FileSystem fs = cluster.getFileSystem();
+      fs = cluster.getFileSystem();
       DFSTestUtil.createFile(fs, TEST_PATH,
           BlockReaderLocalTest.TEST_LENGTH, (short)1, RANDOM_SEED);
       try {
@@ -417,8 +440,327 @@ public class TestBlockReaderLocal {
     } finally {
       DFSInputStream.tcpReadsDisabledForTesting = false;
       if (fsIn != null) fsIn.close();
+      if (fs != null) fs.close();
       if (cluster != null) cluster.shutdown();
       if (sockDir != null) sockDir.close();
     }
   }
+
+  private static byte[] byteBufferToArray(ByteBuffer buf) {
+    byte resultArray[] = new byte[buf.remaining()];
+    buf.get(resultArray);
+    return resultArray;
+  }
+  
+  public static HdfsConfiguration initZeroCopyTest() {
+    Assume.assumeTrue(NativeIO.isAvailable());
+    Assume.assumeTrue(SystemUtils.IS_OS_UNIX);
+    HdfsConfiguration conf = new HdfsConfiguration();
+    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
+    sockDir = new TemporarySocketDirectory();
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096);
+    conf.setInt(DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE, 3);
+    conf.setLong(DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS, 100);
+    conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
+        new File(sockDir.getDir(),
+          "TestRequestMmapAccess._PORT.sock").getAbsolutePath());
+    conf.setBoolean(DFSConfigKeys.
+        DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, true);
+    return conf;
+  }
+
+  @Test
+  public void testZeroCopyReads() throws Exception {
+    HdfsConfiguration conf = initZeroCopyTest();
+    MiniDFSCluster cluster = null;
+    final Path TEST_PATH = new Path("/a");
+    FSDataInputStream fsIn = null;
+    ZeroCopyCursor zcursor = null;
+    
+    FileSystem fs = null;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+      cluster.waitActive();
+      fs = cluster.getFileSystem();
+      DFSTestUtil.createFile(fs, TEST_PATH,
+          BlockReaderLocalTest.TEST_LENGTH, (short)1, 7567L);
+      try {
+        DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
+      } catch (InterruptedException e) {
+        Assert.fail("unexpected InterruptedException during " +
+            "waitReplication: " + e);
+      } catch (TimeoutException e) {
+        Assert.fail("unexpected TimeoutException during " +
+            "waitReplication: " + e);
+      }
+      fsIn = fs.open(TEST_PATH);
+      byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH];
+      IOUtils.readFully(fsIn, original, 0,
+          BlockReaderLocalTest.TEST_LENGTH);
+      fsIn.close();
+      fsIn = fs.open(TEST_PATH);
+      zcursor = fsIn.createZeroCopyCursor();
+      zcursor.setFallbackBuffer(ByteBuffer.
+          allocateDirect(1024 * 1024 * 4));
+      HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn;
+      zcursor.read(4096);
+      ByteBuffer result = zcursor.getData();
+      Assert.assertEquals(4096, result.remaining());
+      Assert.assertEquals(4096,
+          dfsIn.getReadStatistics().getTotalBytesRead());
+      Assert.assertEquals(4096,
+          dfsIn.getReadStatistics().getTotalZeroCopyBytesRead());
+      Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, 4096),
+          byteBufferToArray(result));
+    } finally {
+      if (zcursor != null) zcursor.close();
+      if (fsIn != null) fsIn.close();
+      if (fs != null) fs.close();
+      if (cluster != null) cluster.shutdown();
+    }
+  }
+  
+  @Test
+  public void testShortZeroCopyReads() throws Exception {
+    HdfsConfiguration conf = initZeroCopyTest();
+    MiniDFSCluster cluster = null;
+    final Path TEST_PATH = new Path("/a");
+    FSDataInputStream fsIn = null;
+    ZeroCopyCursor zcursor = null;
+    
+    FileSystem fs = null;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+      cluster.waitActive();
+      fs = cluster.getFileSystem();
+      DFSTestUtil.createFile(fs, TEST_PATH,
+          BlockReaderLocalTest.TEST_LENGTH, (short)1, 7567L);
+      try {
+        DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
+      } catch (InterruptedException e) {
+        Assert.fail("unexpected InterruptedException during " +
+            "waitReplication: " + e);
+      } catch (TimeoutException e) {
+        Assert.fail("unexpected TimeoutException during " +
+            "waitReplication: " + e);
+      }
+      fsIn = fs.open(TEST_PATH);
+      byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH];
+      IOUtils.readFully(fsIn, original, 0,
+          BlockReaderLocalTest.TEST_LENGTH);
+      fsIn.close();
+      fsIn = fs.open(TEST_PATH);
+      zcursor = fsIn.createZeroCopyCursor();
+      zcursor.setFallbackBuffer(ByteBuffer.
+          allocateDirect(1024 * 1024 * 4));
+      zcursor.setAllowShortReads(true);
+      HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn;
+      zcursor.read(8192);
+      ByteBuffer result = zcursor.getData();
+      Assert.assertEquals(4096, result.remaining());
+      Assert.assertEquals(4096,
+          dfsIn.getReadStatistics().getTotalBytesRead());
+      Assert.assertEquals(4096,
+          dfsIn.getReadStatistics().getTotalZeroCopyBytesRead());
+      Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, 4096),
+          byteBufferToArray(result));
+      zcursor.read(4097);
+      result = zcursor.getData();
+      Assert.assertEquals(4096, result.remaining());
+      Assert.assertArrayEquals(Arrays.copyOfRange(original, 4096, 8192),
+          byteBufferToArray(result));
+      zcursor.setAllowShortReads(false);
+      zcursor.read(4100);
+      result = zcursor.getData();
+      Assert.assertEquals(4100, result.remaining());
+
+      Assert.assertArrayEquals(Arrays.copyOfRange(original, 8192, 12292),
+          byteBufferToArray(result));
+    } finally {
+      if (zcursor != null) zcursor.close();
+      if (fsIn != null) fsIn.close();
+      if (fs != null) fs.close();
+      if (cluster != null) cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testZeroCopyReadsNoBackingBuffer() throws Exception {
+    HdfsConfiguration conf = initZeroCopyTest();
+    MiniDFSCluster cluster = null;
+    final Path TEST_PATH = new Path("/a");
+    FSDataInputStream fsIn = null;
+    ZeroCopyCursor zcursor = null;
+    
+    FileSystem fs = null;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+      cluster.waitActive();
+      fs = cluster.getFileSystem();
+      DFSTestUtil.createFile(fs, TEST_PATH,
+          BlockReaderLocalTest.TEST_LENGTH, (short)1, 7567L);
+      try {
+        DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
+      } catch (InterruptedException e) {
+        Assert.fail("unexpected InterruptedException during " +
+            "waitReplication: " + e);
+      } catch (TimeoutException e) {
+        Assert.fail("unexpected TimeoutException during " +
+            "waitReplication: " + e);
+      }
+      fsIn = fs.open(TEST_PATH);
+      byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH];
+      IOUtils.readFully(fsIn, original, 0,
+          BlockReaderLocalTest.TEST_LENGTH);
+      fsIn.close();
+      fsIn = fs.open(TEST_PATH);
+      zcursor = fsIn.createZeroCopyCursor();
+      zcursor.setAllowShortReads(false);
+      HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn;
+      // This read is longer than the file, and we do not have short reads 
enabled.
+      try {
+        zcursor.read(8192);
+        Assert.fail("expected UnsupportedOperationException");
+      } catch (UnsupportedOperationException e) {
+        // expected
+      }
+      // This read is longer than the block, and we do not have short reads 
enabled.
+      try {
+        zcursor.read(4097);
+        Assert.fail("expected UnsupportedOperationException");
+      } catch (UnsupportedOperationException e) {
+        // expected
+      }
+      // This read should succeed.
+      zcursor.read(4096);
+      ByteBuffer result = zcursor.getData();
+      Assert.assertEquals(4096, result.remaining());
+      Assert.assertEquals(4096,
+          dfsIn.getReadStatistics().getTotalBytesRead());
+      Assert.assertEquals(4096,
+          dfsIn.getReadStatistics().getTotalZeroCopyBytesRead());
+      Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, 4096),
+          byteBufferToArray(result));
+    } finally {
+      if (zcursor != null) zcursor.close();
+      if (fsIn != null) fsIn.close();
+      if (fs != null) fs.close();
+      if (cluster != null) cluster.shutdown();
+    }
+  }
+
+  private static class CountingVisitor
+      implements ClientMmapManager.ClientMmapVisitor {
+    int count = 0;
+
+    @Override
+    public void accept(ClientMmap mmap) {
+      count++;
+    }
+
+    public void reset() {
+      count = 0;
+    }
+  }
+
+  @Test
+  public void testZeroCopyMmapCache() throws Exception {
+    HdfsConfiguration conf = initZeroCopyTest();
+    MiniDFSCluster cluster = null;
+    final Path TEST_PATH = new Path("/a");
+    final int TEST_FILE_LENGTH = 16385;
+    final int RANDOM_SEED = 23453;
+    FSDataInputStream fsIn = null;
+    ZeroCopyCursor zcursor[] = { null, null, null, null, null };
+    
+    DistributedFileSystem fs = null;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+      cluster.waitActive();
+      fs = cluster.getFileSystem();
+      DFSTestUtil.createFile(fs, TEST_PATH,
+          TEST_FILE_LENGTH, (short)1, RANDOM_SEED);
+      try {
+        DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
+      } catch (InterruptedException e) {
+        Assert.fail("unexpected InterruptedException during " +
+            "waitReplication: " + e);
+      } catch (TimeoutException e) {
+        Assert.fail("unexpected TimeoutException during " +
+            "waitReplication: " + e);
+      }
+      fsIn = fs.open(TEST_PATH);
+      byte original[] = new byte[TEST_FILE_LENGTH];
+      IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH);
+      fsIn.close();
+      fsIn = fs.open(TEST_PATH);
+      for (int i = 0; i < zcursor.length; i++) {
+        zcursor[i] = fsIn.createZeroCopyCursor();
+        zcursor[i].setAllowShortReads(false);
+      }
+      ClientMmapManager mmapManager = fs.getClient().getMmapManager();
+      CountingVisitor countingVisitor = new CountingVisitor();
+      mmapManager.visitMmaps(countingVisitor);
+      Assert.assertEquals(0, countingVisitor.count);
+      mmapManager.visitEvictable(countingVisitor);
+      Assert.assertEquals(0, countingVisitor.count);
+      zcursor[0].read(4096);
+      fsIn.seek(0);
+      zcursor[1].read(4096);
+      mmapManager.visitMmaps(countingVisitor);
+      Assert.assertEquals(1, countingVisitor.count);
+      countingVisitor.reset();
+      mmapManager.visitEvictable(countingVisitor);
+      Assert.assertEquals(0, countingVisitor.count);
+      countingVisitor.reset();
+
+      // The mmaps should be of the first block of the file.
+      final ExtendedBlock firstBlock = DFSTestUtil.getFirstBlock(fs, 
TEST_PATH);
+      mmapManager.visitMmaps(new ClientMmapManager.ClientMmapVisitor() {
+        @Override
+        public void accept(ClientMmap mmap) {
+          Assert.assertEquals(firstBlock, mmap.getBlock());
+        }
+      });
+
+      // Read more blocks.
+      zcursor[2].read(4096);
+      zcursor[3].read(4096);
+      try {
+        zcursor[4].read(4096);
+        Assert.fail("expected UnsupportedOperationException");
+      } catch (UnsupportedOperationException e) {
+        // expected
+      }
+
+      // we should have 3 mmaps, 0 evictable
+      mmapManager.visitMmaps(countingVisitor);
+      Assert.assertEquals(3, countingVisitor.count);
+      countingVisitor.reset();
+      mmapManager.visitEvictable(countingVisitor);
+      Assert.assertEquals(0, countingVisitor.count);
+
+      // After we close the cursors, the mmaps should be evictable for 
+      // a brief period of time.  Then, they should be closed (we're 
+      // using a very quick timeout)
+      for (int i = 0; i < zcursor.length; i++) {
+        IOUtils.closeStream(zcursor[i]);
+      }
+      while (true) {
+        countingVisitor.reset();
+        mmapManager.visitEvictable(countingVisitor);
+        if (0 == countingVisitor.count) {
+          break;
+        }
+      }
+      countingVisitor.reset();
+      mmapManager.visitMmaps(countingVisitor);
+      Assert.assertEquals(0, countingVisitor.count);
+    } finally {
+      if (fsIn != null) fsIn.close();
+      if (fs != null) fs.close();
+      if (cluster != null) cluster.shutdown();
+    }
+
+  }
 }


Reply via email to