Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.h URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.h?rev=1527113&r1=1527112&r2=1527113&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.h (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.h Fri Sep 27 22:51:12 2013 @@ -114,6 +114,47 @@ jthrowable classNameOfObject(jobject job * */ JNIEnv* getJNIEnv(void); +/** + * Figure out if a Java object is an instance of a particular class. + * + * @param env The Java environment. + * @param obj The object to check. + * @param name The class name to check. + * + * @return -1 if we failed to find the referenced class name. + * 0 if the object is not of the given class. + * 1 if the object is of the given class. + */ +int javaObjectIsOfClass(JNIEnv *env, jobject obj, const char *name); + +/** + * Set a value in a configuration object. + * + * @param env The JNI environment + * @param jConfiguration The configuration object to modify + * @param key The key to modify + * @param value The value to set the key to + * + * @return NULL on success; exception otherwise + */ +jthrowable hadoopConfSetStr(JNIEnv *env, jobject jConfiguration, + const char *key, const char *value); + +/** + * Fetch an instance of an Enum. + * + * @param env The JNI environment. + * @param className The enum class name. + * @param valueName The name of the enum value + * @param out (out param) on success, a local reference to an + * instance of the enum object. (Since Java enums are + * singletones, this is also the only instance.) + * + * @return NULL on success; exception otherwise + */ +jthrowable fetchEnumInstance(JNIEnv *env, const char *className, + const char *valueName, jobject *out); + #endif /*LIBHDFS_JNI_HELPER_H*/ /**
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.c URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.c?rev=1527113&r1=1527112&r2=1527113&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.c (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.c Fri Sep 27 22:51:12 2013 @@ -17,14 +17,19 @@ */ #include "exception.h" +#include "hdfs.h" +#include "hdfs_test.h" #include "jni_helper.h" #include "native_mini_dfs.h" #include <errno.h> #include <jni.h> +#include <limits.h> #include <stdio.h> #include <stdlib.h> #include <string.h> +#include <sys/types.h> +#include <unistd.h> #define MINIDFS_CLUSTER_BUILDER "org/apache/hadoop/hdfs/MiniDFSCluster$Builder" #define MINIDFS_CLUSTER "org/apache/hadoop/hdfs/MiniDFSCluster" @@ -39,8 +44,44 @@ struct NativeMiniDfsCluster { * The NativeMiniDfsCluster object */ jobject obj; + + /** + * Path to the domain socket, or the empty string if there is none. + */ + char domainSocketPath[PATH_MAX]; }; +static jthrowable nmdConfigureShortCircuit(JNIEnv *env, + struct NativeMiniDfsCluster *cl, jobject cobj) +{ + jthrowable jthr; + char *tmpDir; + + int ret = hdfsDisableDomainSocketSecurity(); + if (ret) { + return newRuntimeError(env, "failed to disable hdfs domain " + "socket security: error %d", ret); + } + jthr = hadoopConfSetStr(env, cobj, "dfs.client.read.shortcircuit", "true"); + if (jthr) { + return jthr; + } + tmpDir = getenv("TMPDIR"); + if (!tmpDir) { + tmpDir = "/tmp"; + } + snprintf(cl->domainSocketPath, PATH_MAX, "%s/native_mini_dfs.sock.%d.%d", + tmpDir, getpid(), rand()); + snprintf(cl->domainSocketPath, PATH_MAX, "%s/native_mini_dfs.sock.%d.%d", + tmpDir, getpid(), rand()); + jthr = hadoopConfSetStr(env, cobj, "dfs.domain.socket.path", + cl->domainSocketPath); + if (jthr) { + return jthr; + } + return NULL; +} + struct NativeMiniDfsCluster* nmdCreate(struct NativeMiniDfsConf *conf) { struct NativeMiniDfsCluster* cl = NULL; @@ -81,6 +122,28 @@ struct NativeMiniDfsCluster* nmdCreate(s goto error; } } + if (jthr) { + printExceptionAndFree(env, jthr, PRINT_EXC_ALL, + "nmdCreate: Configuration::setBoolean"); + goto error; + } + // Disable 'minimum block size' -- it's annoying in tests. + (*env)->DeleteLocalRef(env, jconfStr); + jconfStr = NULL; + jthr = newJavaStr(env, "dfs.namenode.fs-limits.min-block-size", &jconfStr); + if (jthr) { + printExceptionAndFree(env, jthr, PRINT_EXC_ALL, + "nmdCreate: new String"); + goto error; + } + jthr = invokeMethod(env, NULL, INSTANCE, cobj, HADOOP_CONF, + "setLong", "(Ljava/lang/String;J)V", jconfStr, 0LL); + if (jthr) { + printExceptionAndFree(env, jthr, PRINT_EXC_ALL, + "nmdCreate: Configuration::setLong"); + goto error; + } + // Creae MiniDFSCluster object jthr = constructNewObjectOfClass(env, &bld, MINIDFS_CLUSTER_BUILDER, "(L"HADOOP_CONF";)V", cobj); if (jthr) { @@ -88,6 +151,14 @@ struct NativeMiniDfsCluster* nmdCreate(s "nmdCreate: NativeMiniDfsCluster#Builder#Builder"); goto error; } + if (conf->configureShortCircuit) { + jthr = nmdConfigureShortCircuit(env, cl, cobj); + if (jthr) { + printExceptionAndFree(env, jthr, PRINT_EXC_ALL, + "nmdCreate: nmdConfigureShortCircuit error"); + goto error; + } + } jthr = invokeMethod(env, &val, INSTANCE, bld, MINIDFS_CLUSTER_BUILDER, "format", "(Z)L" MINIDFS_CLUSTER_BUILDER ";", conf->doFormat); if (jthr) { @@ -272,3 +343,29 @@ error_dlr_nn: return ret; } + +int nmdConfigureHdfsBuilder(struct NativeMiniDfsCluster *cl, + struct hdfsBuilder *bld) +{ + int port, ret; + + hdfsBuilderSetNameNode(bld, "localhost"); + port = nmdGetNameNodePort(cl); + if (port < 0) { + fprintf(stderr, "nmdGetNameNodePort failed with error %d\n", -port); + return EIO; + } + hdfsBuilderSetNameNodePort(bld, port); + if (cl->domainSocketPath[0]) { + ret = hdfsBuilderConfSetStr(bld, "dfs.client.read.shortcircuit", "true"); + if (ret) { + return ret; + } + ret = hdfsBuilderConfSetStr(bld, "dfs.domain.socket.path", + cl->domainSocketPath); + if (ret) { + return ret; + } + } + return 0; +} Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.h URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.h?rev=1527113&r1=1527112&r2=1527113&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.h (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.h Fri Sep 27 22:51:12 2013 @@ -21,6 +21,7 @@ #include <jni.h> /* for jboolean */ +struct hdfsBuilder; struct NativeMiniDfsCluster; /** @@ -28,17 +29,24 @@ struct NativeMiniDfsCluster; */ struct NativeMiniDfsConf { /** - * Nonzero if the cluster should be formatted prior to startup + * Nonzero if the cluster should be formatted prior to startup. */ jboolean doFormat; + /** * Whether or not to enable webhdfs in MiniDfsCluster */ jboolean webhdfsEnabled; + /** * The http port of the namenode in MiniDfsCluster */ jint namenodeHttpPort; + + /** + * Nonzero if we should configure short circuit. + */ + jboolean configureShortCircuit; }; /** @@ -84,7 +92,7 @@ void nmdFree(struct NativeMiniDfsCluster * * @return the port, or a negative error code */ -int nmdGetNameNodePort(const struct NativeMiniDfsCluster *cl); +int nmdGetNameNodePort(const struct NativeMiniDfsCluster *cl); /** * Get the http address that's in use by the given (non-HA) nativeMiniDfs @@ -101,4 +109,14 @@ int nmdGetNameNodePort(const struct Nati int nmdGetNameNodeHttpAddress(const struct NativeMiniDfsCluster *cl, int *port, const char **hostName); +/** + * Configure the HDFS builder appropriately to connect to this cluster. + * + * @param bld The hdfs builder + * + * @return the port, or a negative error code + */ +int nmdConfigureHdfsBuilder(struct NativeMiniDfsCluster *cl, + struct hdfsBuilder *bld); + #endif Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test/test_libhdfs_zerocopy.c URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test/test_libhdfs_zerocopy.c?rev=1527113&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test/test_libhdfs_zerocopy.c (added) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test/test_libhdfs_zerocopy.c Fri Sep 27 22:51:12 2013 @@ -0,0 +1,233 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "expect.h" +#include "hdfs.h" +#include "native_mini_dfs.h" + +#include <errno.h> +#include <inttypes.h> +#include <semaphore.h> +#include <pthread.h> +#include <unistd.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <sys/types.h> + +#define TO_STR_HELPER(X) #X +#define TO_STR(X) TO_STR_HELPER(X) + +#define TEST_FILE_NAME_LENGTH 128 +#define TEST_ZEROCOPY_FULL_BLOCK_SIZE 4096 +#define TEST_ZEROCOPY_LAST_BLOCK_SIZE 3215 +#define TEST_ZEROCOPY_NUM_BLOCKS 6 +#define SMALL_READ_LEN 16 + +#define ZC_BUF_LEN 32768 + +static uint8_t *getZeroCopyBlockData(int blockIdx) +{ + uint8_t *buf = malloc(TEST_ZEROCOPY_FULL_BLOCK_SIZE); + int i; + if (!buf) { + fprintf(stderr, "malloc(%d) failed\n", TEST_ZEROCOPY_FULL_BLOCK_SIZE); + exit(1); + } + for (i = 0; i < TEST_ZEROCOPY_FULL_BLOCK_SIZE; i++) { + buf[i] = blockIdx + (i % 17); + } + return buf; +} + +static int getZeroCopyBlockLen(int blockIdx) +{ + if (blockIdx >= TEST_ZEROCOPY_NUM_BLOCKS) { + return 0; + } else if (blockIdx == (TEST_ZEROCOPY_NUM_BLOCKS - 1)) { + return TEST_ZEROCOPY_LAST_BLOCK_SIZE; + } else { + return TEST_ZEROCOPY_FULL_BLOCK_SIZE; + } +} + +static void printBuf(const uint8_t *buf, size_t len) __attribute__((unused)); + +static void printBuf(const uint8_t *buf, size_t len) +{ + size_t i; + + for (i = 0; i < len; i++) { + fprintf(stderr, "%02x", buf[i]); + } + fprintf(stderr, "\n"); +} + +static int doTestZeroCopyReads(hdfsFS fs, const char *fileName) +{ + hdfsFile file = NULL; + struct hadoopRzOptions *opts = NULL; + struct hadoopRzBuffer *buffer = NULL; + uint8_t *block; + + file = hdfsOpenFile(fs, fileName, O_RDONLY, 0, 0, 0); + EXPECT_NONNULL(file); + opts = hadoopRzOptionsAlloc(); + EXPECT_NONNULL(opts); + EXPECT_ZERO(hadoopRzOptionsSetSkipChecksum(opts, 1)); + /* haven't read anything yet */ + EXPECT_ZERO(expectFileStats(file, 0LL, 0LL, 0LL, 0LL)); + block = getZeroCopyBlockData(0); + EXPECT_NONNULL(block); + /* first read is half of a block. */ + buffer = hadoopReadZero(file, opts, TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2); + EXPECT_NONNULL(buffer); + EXPECT_INT_EQ(TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2, + hadoopRzBufferLength(buffer)); + EXPECT_ZERO(memcmp(hadoopRzBufferGet(buffer), block, + TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2)); + hadoopRzBufferFree(file, buffer); + /* read the next half of the block */ + buffer = hadoopReadZero(file, opts, TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2); + EXPECT_NONNULL(buffer); + EXPECT_INT_EQ(TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2, + hadoopRzBufferLength(buffer)); + EXPECT_ZERO(memcmp(hadoopRzBufferGet(buffer), + block + (TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2), + TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2)); + hadoopRzBufferFree(file, buffer); + free(block); + EXPECT_ZERO(expectFileStats(file, TEST_ZEROCOPY_FULL_BLOCK_SIZE, + TEST_ZEROCOPY_FULL_BLOCK_SIZE, + TEST_ZEROCOPY_FULL_BLOCK_SIZE, + TEST_ZEROCOPY_FULL_BLOCK_SIZE)); + /* Now let's read just a few bytes. */ + buffer = hadoopReadZero(file, opts, SMALL_READ_LEN); + EXPECT_NONNULL(buffer); + EXPECT_INT_EQ(SMALL_READ_LEN, hadoopRzBufferLength(buffer)); + block = getZeroCopyBlockData(1); + EXPECT_NONNULL(block); + EXPECT_ZERO(memcmp(block, hadoopRzBufferGet(buffer), SMALL_READ_LEN)); + hadoopRzBufferFree(file, buffer); + EXPECT_INT_EQ(TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN, + hdfsTell(fs, file)); + EXPECT_ZERO(expectFileStats(file, + TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN, + TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN, + TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN, + TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN)); + + /* Clear 'skip checksums' and test that we can't do zero-copy reads any + * more. Since there is no ByteBufferPool set, we should fail with + * EPROTONOSUPPORT. + */ + EXPECT_ZERO(hadoopRzOptionsSetSkipChecksum(opts, 0)); + EXPECT_NULL(hadoopReadZero(file, opts, TEST_ZEROCOPY_FULL_BLOCK_SIZE)); + EXPECT_INT_EQ(EPROTONOSUPPORT, errno); + + /* Now set a ByteBufferPool and try again. It should succeed this time. */ + EXPECT_ZERO(hadoopRzOptionsSetByteBufferPool(opts, + ELASTIC_BYTE_BUFFER_POOL_CLASS)); + buffer = hadoopReadZero(file, opts, TEST_ZEROCOPY_FULL_BLOCK_SIZE); + EXPECT_NONNULL(buffer); + EXPECT_INT_EQ(TEST_ZEROCOPY_FULL_BLOCK_SIZE, hadoopRzBufferLength(buffer)); + EXPECT_ZERO(expectFileStats(file, + (2 * TEST_ZEROCOPY_FULL_BLOCK_SIZE) + SMALL_READ_LEN, + (2 * TEST_ZEROCOPY_FULL_BLOCK_SIZE) + SMALL_READ_LEN, + (2 * TEST_ZEROCOPY_FULL_BLOCK_SIZE) + SMALL_READ_LEN, + TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN)); + EXPECT_ZERO(memcmp(block + SMALL_READ_LEN, hadoopRzBufferGet(buffer), + TEST_ZEROCOPY_FULL_BLOCK_SIZE - SMALL_READ_LEN)); + free(block); + block = getZeroCopyBlockData(2); + EXPECT_NONNULL(block); + EXPECT_ZERO(memcmp(block, hadoopRzBufferGet(buffer) + + (TEST_ZEROCOPY_FULL_BLOCK_SIZE - SMALL_READ_LEN), SMALL_READ_LEN)); + hadoopRzBufferFree(file, buffer); + free(block); + hadoopRzOptionsFree(opts); + EXPECT_ZERO(hdfsCloseFile(fs, file)); + return 0; +} + +static int createZeroCopyTestFile(hdfsFS fs, char *testFileName, + size_t testFileNameLen) +{ + int blockIdx, blockLen; + hdfsFile file; + uint8_t *data; + + snprintf(testFileName, testFileNameLen, "/zeroCopyTestFile.%d.%d", + getpid(), rand()); + file = hdfsOpenFile(fs, testFileName, O_WRONLY, 0, 1, + TEST_ZEROCOPY_FULL_BLOCK_SIZE); + EXPECT_NONNULL(file); + for (blockIdx = 0; blockIdx < TEST_ZEROCOPY_NUM_BLOCKS; blockIdx++) { + blockLen = getZeroCopyBlockLen(blockIdx); + data = getZeroCopyBlockData(blockIdx); + EXPECT_NONNULL(data); + EXPECT_INT_EQ(blockLen, hdfsWrite(fs, file, data, blockLen)); + } + EXPECT_ZERO(hdfsCloseFile(fs, file)); + return 0; +} + +/** + * Test that we can write a file with libhdfs and then read it back + */ +int main(void) +{ + int port; + struct NativeMiniDfsConf conf = { + .doFormat = 1, + .configureShortCircuit = 1, + }; + char testFileName[TEST_FILE_NAME_LENGTH]; + hdfsFS fs; + struct NativeMiniDfsCluster* cl; + struct hdfsBuilder *bld; + + cl = nmdCreate(&conf); + EXPECT_NONNULL(cl); + EXPECT_ZERO(nmdWaitClusterUp(cl)); + port = nmdGetNameNodePort(cl); + if (port < 0) { + fprintf(stderr, "TEST_ERROR: test_zerocopy: " + "nmdGetNameNodePort returned error %d\n", port); + return EXIT_FAILURE; + } + bld = hdfsNewBuilder(); + EXPECT_NONNULL(bld); + EXPECT_ZERO(nmdConfigureHdfsBuilder(cl, bld)); + hdfsBuilderSetForceNewInstance(bld); + hdfsBuilderConfSetStr(bld, "dfs.block.size", + TO_STR(TEST_ZEROCOPY_FULL_BLOCK_SIZE)); + /* ensure that we'll always get our mmaps */ + hdfsBuilderConfSetStr(bld, "dfs.client.read.shortcircuit.skip.checksum", + "true"); + fs = hdfsBuilderConnect(bld); + EXPECT_NONNULL(fs); + EXPECT_ZERO(createZeroCopyTestFile(fs, testFileName, + TEST_FILE_NAME_LENGTH)); + EXPECT_ZERO(doTestZeroCopyReads(fs, testFileName)); + EXPECT_ZERO(hdfsDisconnect(fs)); + EXPECT_ZERO(nmdShutdown(cl)); + nmdFree(cl); + fprintf(stderr, "TEST_SUCCESS\n"); + return EXIT_SUCCESS; +} Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml?rev=1527113&r1=1527112&r2=1527113&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml Fri Sep 27 22:51:12 2013 @@ -1415,4 +1415,32 @@ linearly increases. </description> </property> + +<property> + <name>dfs.client.mmap.cache.size</name> + <value>1024</value> + <description> + When zero-copy reads are used, the DFSClient keeps a cache of recently used + memory mapped regions. This parameter controls the maximum number of + entries that we will keep in that cache. + + If this is set to 0, we will not allow mmap. + + The larger this number is, the more file descriptors we will potentially + use for memory-mapped files. mmaped files also use virtual address space. + You may need to increase your ulimit virtual address space limits before + increasing the client mmap cache size. + </description> +</property> + +<property> + <name>dfs.client.mmap.cache.timeout.ms</name> + <value>900000</value> + <description> + The minimum length of time that we will keep an mmap entry in the cache + between uses. If an entry is in the cache longer than this, and nobody + uses it, it will be removed by a background thread. + </description> +</property> + </configuration> Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java?rev=1527113&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java (added) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java Fri Sep 27 22:51:12 2013 @@ -0,0 +1,530 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.TimeoutException; +import java.util.Arrays; +import java.util.EnumSet; +import java.util.Random; + +import org.apache.commons.lang.SystemUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.client.ClientMmap; +import org.apache.hadoop.hdfs.client.ClientMmapManager; +import org.apache.hadoop.hdfs.client.HdfsDataInputStream; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.io.ByteBufferPool; +import org.apache.hadoop.io.ElasticByteBufferPool; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.nativeio.NativeIO; +import org.apache.hadoop.net.unix.DomainSocket; +import org.apache.hadoop.net.unix.TemporarySocketDirectory; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; + +/** + * This class tests if EnhancedByteBufferAccess works correctly. + */ +public class TestEnhancedByteBufferAccess { + private static final Log LOG = + LogFactory.getLog(TestEnhancedByteBufferAccess.class.getName()); + + static TemporarySocketDirectory sockDir; + + @BeforeClass + public static void init() { + sockDir = new TemporarySocketDirectory(); + DomainSocket.disableBindPathValidation(); + } + + private static byte[] byteBufferToArray(ByteBuffer buf) { + byte resultArray[] = new byte[buf.remaining()]; + buf.get(resultArray); + buf.flip(); + return resultArray; + } + + public static HdfsConfiguration initZeroCopyTest() { + Assume.assumeTrue(NativeIO.isAvailable()); + Assume.assumeTrue(SystemUtils.IS_OS_UNIX); + HdfsConfiguration conf = new HdfsConfiguration(); + conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096); + conf.setInt(DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE, 3); + conf.setLong(DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS, 100); + conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, + new File(sockDir.getDir(), + "TestRequestMmapAccess._PORT.sock").getAbsolutePath()); + conf.setBoolean(DFSConfigKeys. + DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, true); + return conf; + } + + @Test + public void testZeroCopyReads() throws Exception { + HdfsConfiguration conf = initZeroCopyTest(); + MiniDFSCluster cluster = null; + final Path TEST_PATH = new Path("/a"); + FSDataInputStream fsIn = null; + final int TEST_FILE_LENGTH = 12345; + + FileSystem fs = null; + try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + cluster.waitActive(); + fs = cluster.getFileSystem(); + DFSTestUtil.createFile(fs, TEST_PATH, + TEST_FILE_LENGTH, (short)1, 7567L); + try { + DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1); + } catch (InterruptedException e) { + Assert.fail("unexpected InterruptedException during " + + "waitReplication: " + e); + } catch (TimeoutException e) { + Assert.fail("unexpected TimeoutException during " + + "waitReplication: " + e); + } + fsIn = fs.open(TEST_PATH); + byte original[] = new byte[TEST_FILE_LENGTH]; + IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH); + fsIn.close(); + fsIn = fs.open(TEST_PATH); + ByteBuffer result = fsIn.read(null, 4096, + EnumSet.of(ReadOption.SKIP_CHECKSUMS)); + Assert.assertEquals(4096, result.remaining()); + HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn; + Assert.assertEquals(4096, + dfsIn.getReadStatistics().getTotalBytesRead()); + Assert.assertEquals(4096, + dfsIn.getReadStatistics().getTotalZeroCopyBytesRead()); + Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, 4096), + byteBufferToArray(result)); + fsIn.releaseBuffer(result); + } finally { + if (fsIn != null) fsIn.close(); + if (fs != null) fs.close(); + if (cluster != null) cluster.shutdown(); + } + } + + @Test + public void testShortZeroCopyReads() throws Exception { + HdfsConfiguration conf = initZeroCopyTest(); + MiniDFSCluster cluster = null; + final Path TEST_PATH = new Path("/a"); + FSDataInputStream fsIn = null; + final int TEST_FILE_LENGTH = 12345; + + FileSystem fs = null; + try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + cluster.waitActive(); + fs = cluster.getFileSystem(); + DFSTestUtil.createFile(fs, TEST_PATH, TEST_FILE_LENGTH, (short)1, 7567L); + try { + DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1); + } catch (InterruptedException e) { + Assert.fail("unexpected InterruptedException during " + + "waitReplication: " + e); + } catch (TimeoutException e) { + Assert.fail("unexpected TimeoutException during " + + "waitReplication: " + e); + } + fsIn = fs.open(TEST_PATH); + byte original[] = new byte[TEST_FILE_LENGTH]; + IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH); + fsIn.close(); + fsIn = fs.open(TEST_PATH); + + // Try to read 8192, but only get 4096 because of the block size. + HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn; + ByteBuffer result = + dfsIn.read(null, 8192, EnumSet.of(ReadOption.SKIP_CHECKSUMS)); + Assert.assertEquals(4096, result.remaining()); + Assert.assertEquals(4096, + dfsIn.getReadStatistics().getTotalBytesRead()); + Assert.assertEquals(4096, + dfsIn.getReadStatistics().getTotalZeroCopyBytesRead()); + Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, 4096), + byteBufferToArray(result)); + dfsIn.releaseBuffer(result); + + // Try to read 4097, but only get 4096 because of the block size. + result = + dfsIn.read(null, 4097, EnumSet.of(ReadOption.SKIP_CHECKSUMS)); + Assert.assertEquals(4096, result.remaining()); + Assert.assertArrayEquals(Arrays.copyOfRange(original, 4096, 8192), + byteBufferToArray(result)); + dfsIn.releaseBuffer(result); + } finally { + if (fsIn != null) fsIn.close(); + if (fs != null) fs.close(); + if (cluster != null) cluster.shutdown(); + } + } + + @Test + public void testZeroCopyReadsNoFallback() throws Exception { + HdfsConfiguration conf = initZeroCopyTest(); + MiniDFSCluster cluster = null; + final Path TEST_PATH = new Path("/a"); + FSDataInputStream fsIn = null; + final int TEST_FILE_LENGTH = 12345; + + FileSystem fs = null; + try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + cluster.waitActive(); + fs = cluster.getFileSystem(); + DFSTestUtil.createFile(fs, TEST_PATH, + TEST_FILE_LENGTH, (short)1, 7567L); + try { + DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1); + } catch (InterruptedException e) { + Assert.fail("unexpected InterruptedException during " + + "waitReplication: " + e); + } catch (TimeoutException e) { + Assert.fail("unexpected TimeoutException during " + + "waitReplication: " + e); + } + fsIn = fs.open(TEST_PATH); + byte original[] = new byte[TEST_FILE_LENGTH]; + IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH); + fsIn.close(); + fsIn = fs.open(TEST_PATH); + HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn; + ByteBuffer result; + try { + result = dfsIn.read(null, 4097, EnumSet.noneOf(ReadOption.class)); + Assert.fail("expected UnsupportedOperationException"); + } catch (UnsupportedOperationException e) { + // expected + } + result = dfsIn.read(null, 4096, EnumSet.of(ReadOption.SKIP_CHECKSUMS)); + Assert.assertEquals(4096, result.remaining()); + Assert.assertEquals(4096, + dfsIn.getReadStatistics().getTotalBytesRead()); + Assert.assertEquals(4096, + dfsIn.getReadStatistics().getTotalZeroCopyBytesRead()); + Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, 4096), + byteBufferToArray(result)); + } finally { + if (fsIn != null) fsIn.close(); + if (fs != null) fs.close(); + if (cluster != null) cluster.shutdown(); + } + } + + private static class CountingVisitor + implements ClientMmapManager.ClientMmapVisitor { + int count = 0; + + @Override + public void accept(ClientMmap mmap) { + count++; + } + + public void reset() { + count = 0; + } + } + + @Test + public void testZeroCopyMmapCache() throws Exception { + HdfsConfiguration conf = initZeroCopyTest(); + MiniDFSCluster cluster = null; + final Path TEST_PATH = new Path("/a"); + final int TEST_FILE_LENGTH = 16385; + final int RANDOM_SEED = 23453; + FSDataInputStream fsIn = null; + ByteBuffer results[] = { null, null, null, null, null }; + + DistributedFileSystem fs = null; + try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + cluster.waitActive(); + fs = cluster.getFileSystem(); + DFSTestUtil.createFile(fs, TEST_PATH, + TEST_FILE_LENGTH, (short)1, RANDOM_SEED); + try { + DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1); + } catch (InterruptedException e) { + Assert.fail("unexpected InterruptedException during " + + "waitReplication: " + e); + } catch (TimeoutException e) { + Assert.fail("unexpected TimeoutException during " + + "waitReplication: " + e); + } + fsIn = fs.open(TEST_PATH); + byte original[] = new byte[TEST_FILE_LENGTH]; + IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH); + fsIn.close(); + fsIn = fs.open(TEST_PATH); + final ClientMmapManager mmapManager = fs.getClient().getMmapManager(); + final CountingVisitor countingVisitor = new CountingVisitor(); + mmapManager.visitMmaps(countingVisitor); + Assert.assertEquals(0, countingVisitor.count); + mmapManager.visitEvictable(countingVisitor); + Assert.assertEquals(0, countingVisitor.count); + results[0] = fsIn.read(null, 4096, + EnumSet.of(ReadOption.SKIP_CHECKSUMS)); + fsIn.seek(0); + results[1] = fsIn.read(null, 4096, + EnumSet.of(ReadOption.SKIP_CHECKSUMS)); + mmapManager.visitMmaps(countingVisitor); + Assert.assertEquals(1, countingVisitor.count); + countingVisitor.reset(); + mmapManager.visitEvictable(countingVisitor); + Assert.assertEquals(0, countingVisitor.count); + countingVisitor.reset(); + + // The mmaps should be of the first block of the file. + final ExtendedBlock firstBlock = DFSTestUtil.getFirstBlock(fs, TEST_PATH); + mmapManager.visitMmaps(new ClientMmapManager.ClientMmapVisitor() { + @Override + public void accept(ClientMmap mmap) { + Assert.assertEquals(firstBlock, mmap.getBlock()); + } + }); + + // Read more blocks. + results[2] = fsIn.read(null, 4096, + EnumSet.of(ReadOption.SKIP_CHECKSUMS)); + results[3] = fsIn.read(null, 4096, + EnumSet.of(ReadOption.SKIP_CHECKSUMS)); + try { + results[4] = fsIn.read(null, 4096, + EnumSet.of(ReadOption.SKIP_CHECKSUMS)); + Assert.fail("expected UnsupportedOperationException"); + } catch (UnsupportedOperationException e) { + // expected + } + + // we should have 3 mmaps, 0 evictable + mmapManager.visitMmaps(countingVisitor); + Assert.assertEquals(3, countingVisitor.count); + countingVisitor.reset(); + mmapManager.visitEvictable(countingVisitor); + Assert.assertEquals(0, countingVisitor.count); + + // After we close the cursors, the mmaps should be evictable for + // a brief period of time. Then, they should be closed (we're + // using a very quick timeout) + for (ByteBuffer buffer : results) { + if (buffer != null) { + fsIn.releaseBuffer(buffer); + } + } + GenericTestUtils.waitFor(new Supplier<Boolean>() { + public Boolean get() { + countingVisitor.reset(); + try { + mmapManager.visitEvictable(countingVisitor); + } catch (InterruptedException e) { + e.printStackTrace(); + return false; + } + return (0 == countingVisitor.count); + } + }, 10, 10000); + countingVisitor.reset(); + mmapManager.visitMmaps(countingVisitor); + Assert.assertEquals(0, countingVisitor.count); + } finally { + if (fsIn != null) fsIn.close(); + if (fs != null) fs.close(); + if (cluster != null) cluster.shutdown(); + } + } + + /** + * Test HDFS fallback reads. HDFS streams support the ByteBufferReadable + * interface. + */ + @Test + public void testHdfsFallbackReads() throws Exception { + HdfsConfiguration conf = initZeroCopyTest(); + MiniDFSCluster cluster = null; + final Path TEST_PATH = new Path("/a"); + final int TEST_FILE_LENGTH = 16385; + final int RANDOM_SEED = 23453; + FSDataInputStream fsIn = null; + + DistributedFileSystem fs = null; + try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + cluster.waitActive(); + fs = cluster.getFileSystem(); + DFSTestUtil.createFile(fs, TEST_PATH, + TEST_FILE_LENGTH, (short)1, RANDOM_SEED); + try { + DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1); + } catch (InterruptedException e) { + Assert.fail("unexpected InterruptedException during " + + "waitReplication: " + e); + } catch (TimeoutException e) { + Assert.fail("unexpected TimeoutException during " + + "waitReplication: " + e); + } + fsIn = fs.open(TEST_PATH); + byte original[] = new byte[TEST_FILE_LENGTH]; + IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH); + fsIn.close(); + fsIn = fs.open(TEST_PATH); + testFallbackImpl(fsIn, original); + } finally { + if (fsIn != null) fsIn.close(); + if (fs != null) fs.close(); + if (cluster != null) cluster.shutdown(); + } + } + + private static class RestrictedAllocatingByteBufferPool + implements ByteBufferPool { + private final boolean direct; + + RestrictedAllocatingByteBufferPool(boolean direct) { + this.direct = direct; + } + @Override + public ByteBuffer getBuffer(boolean direct, int length) { + Preconditions.checkArgument(this.direct == direct); + return direct ? ByteBuffer.allocateDirect(length) : + ByteBuffer.allocate(length); + } + @Override + public void putBuffer(ByteBuffer buffer) { + } + } + + private static void testFallbackImpl(InputStream stream, + byte original[]) throws Exception { + RestrictedAllocatingByteBufferPool bufferPool = + new RestrictedAllocatingByteBufferPool( + stream instanceof ByteBufferReadable); + + ByteBuffer result = ByteBufferUtil.fallbackRead(stream, bufferPool, 10); + Assert.assertEquals(10, result.remaining()); + Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, 10), + byteBufferToArray(result)); + + result = ByteBufferUtil.fallbackRead(stream, bufferPool, 5000); + Assert.assertEquals(5000, result.remaining()); + Assert.assertArrayEquals(Arrays.copyOfRange(original, 10, 5010), + byteBufferToArray(result)); + + result = ByteBufferUtil.fallbackRead(stream, bufferPool, 9999999); + Assert.assertEquals(11375, result.remaining()); + Assert.assertArrayEquals(Arrays.copyOfRange(original, 5010, 16385), + byteBufferToArray(result)); + + result = ByteBufferUtil.fallbackRead(stream, bufferPool, 10); + Assert.assertNull(result); + } + + /** + * Test the {@link ByteBufferUtil#fallbackRead} function directly. + */ + @Test + public void testFallbackRead() throws Exception { + HdfsConfiguration conf = initZeroCopyTest(); + MiniDFSCluster cluster = null; + final Path TEST_PATH = new Path("/a"); + final int TEST_FILE_LENGTH = 16385; + final int RANDOM_SEED = 23453; + FSDataInputStream fsIn = null; + + DistributedFileSystem fs = null; + try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + cluster.waitActive(); + fs = cluster.getFileSystem(); + DFSTestUtil.createFile(fs, TEST_PATH, + TEST_FILE_LENGTH, (short)1, RANDOM_SEED); + try { + DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1); + } catch (InterruptedException e) { + Assert.fail("unexpected InterruptedException during " + + "waitReplication: " + e); + } catch (TimeoutException e) { + Assert.fail("unexpected TimeoutException during " + + "waitReplication: " + e); + } + fsIn = fs.open(TEST_PATH); + byte original[] = new byte[TEST_FILE_LENGTH]; + IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH); + fsIn.close(); + fsIn = fs.open(TEST_PATH); + testFallbackImpl(fsIn, original); + } finally { + if (fsIn != null) fsIn.close(); + if (fs != null) fs.close(); + if (cluster != null) cluster.shutdown(); + } + } + + /** + * Test fallback reads on a stream which does not support the + * ByteBufferReadable * interface. + */ + @Test + public void testIndirectFallbackReads() throws Exception { + final File TEST_DIR = new File( + System.getProperty("test.build.data","build/test/data")); + final String TEST_PATH = TEST_DIR + File.separator + + "indirectFallbackTestFile"; + final int TEST_FILE_LENGTH = 16385; + final int RANDOM_SEED = 23453; + FileOutputStream fos = null; + FileInputStream fis = null; + try { + fos = new FileOutputStream(TEST_PATH); + Random random = new Random(RANDOM_SEED); + byte original[] = new byte[TEST_FILE_LENGTH]; + random.nextBytes(original); + fos.write(original); + fos.close(); + fos = null; + fis = new FileInputStream(TEST_PATH); + testFallbackImpl(fis, original); + } finally { + IOUtils.cleanup(LOG, fos, fis); + new File(TEST_PATH).delete(); + } + } +} Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java?rev=1527113&r1=1527112&r2=1527113&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java Fri Sep 27 22:51:12 2013 @@ -25,7 +25,6 @@ import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.util.concurrent.TimeoutException; -import org.apache.hadoop.hdfs.DFSInputStream.ReadStatistics; import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; @@ -36,11 +35,26 @@ import org.apache.hadoop.hdfs.protocol.E import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.net.unix.TemporarySocketDirectory; +import org.junit.AfterClass; import org.junit.Assert; import org.junit.Assume; +import org.junit.BeforeClass; import org.junit.Test; public class TestBlockReaderLocal { + private static TemporarySocketDirectory sockDir; + + @BeforeClass + public static void init() { + sockDir = new TemporarySocketDirectory(); + DomainSocket.disableBindPathValidation(); + } + + @AfterClass + public static void shutdown() throws IOException { + sockDir.close(); + } + public static void assertArrayRegionsEqual(byte []buf1, int off1, byte []buf2, int off2, int len) { for (int i = 0; i < len; i++) { @@ -100,10 +114,11 @@ public class TestBlockReaderLocal { FSDataInputStream fsIn = null; byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH]; + FileSystem fs = null; try { cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); cluster.waitActive(); - FileSystem fs = cluster.getFileSystem(); + fs = cluster.getFileSystem(); DFSTestUtil.createFile(fs, TEST_PATH, BlockReaderLocalTest.TEST_LENGTH, (short)1, RANDOM_SEED); try { @@ -138,6 +153,7 @@ public class TestBlockReaderLocal { test.doTest(blockReaderLocal, original); } finally { if (fsIn != null) fsIn.close(); + if (fs != null) fs.close(); if (cluster != null) cluster.shutdown(); if (dataIn != null) dataIn.close(); if (checkIn != null) checkIn.close(); @@ -382,10 +398,11 @@ public class TestBlockReaderLocal { final long RANDOM_SEED = 4567L; FSDataInputStream fsIn = null; byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH]; + FileSystem fs = null; try { cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); cluster.waitActive(); - FileSystem fs = cluster.getFileSystem(); + fs = cluster.getFileSystem(); DFSTestUtil.createFile(fs, TEST_PATH, BlockReaderLocalTest.TEST_LENGTH, (short)1, RANDOM_SEED); try { @@ -417,6 +434,7 @@ public class TestBlockReaderLocal { } finally { DFSInputStream.tcpReadsDisabledForTesting = false; if (fsIn != null) fsIn.close(); + if (fs != null) fs.close(); if (cluster != null) cluster.shutdown(); if (sockDir != null) sockDir.close(); }