Repository: hadoop Updated Branches: refs/heads/branch-2.6 8b9241f93 -> 8ef73cd4c
HDFS-7694. FSDataInputStream should support "unbuffer" (cmccabe) (cherry picked from commit 6b39ad0865cb2a7960dd59d68178f0bf28865ce2) (cherry picked from commit e35788aa5a41940651af0b73dfeaeca011556904) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8ef73cd4 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8ef73cd4 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8ef73cd4 Branch: refs/heads/branch-2.6 Commit: 8ef73cd4ce640e5aebcdd35f33f7e27a7c83d7a9 Parents: 8b9241f Author: Colin Patrick Mccabe <[email protected]> Authored: Thu Feb 12 10:40:46 2015 -0800 Committer: Junping Du <[email protected]> Committed: Wed Jan 27 08:46:38 2016 -0800 ---------------------------------------------------------------------- .../java/org/apache/hadoop/fs/CanUnbuffer.java | 36 ++++++ .../org/apache/hadoop/fs/FSDataInputStream.java | 12 +- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 + .../org/apache/hadoop/hdfs/DFSInputStream.java | 8 +- .../java/org/apache/hadoop/hdfs/PeerCache.java | 9 +- .../hadoop-hdfs/src/main/native/libhdfs/hdfs.c | 28 ++++ .../hadoop-hdfs/src/main/native/libhdfs/hdfs.h | 9 ++ .../java/org/apache/hadoop/fs/TestUnbuffer.java | 127 +++++++++++++++++++ 8 files changed, 227 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/8ef73cd4/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CanUnbuffer.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CanUnbuffer.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CanUnbuffer.java new file mode 100644 index 0000000..07e65f5 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CanUnbuffer.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * FSDataInputStreams implement this interface to indicate that they can clear + * their buffers on request. + */ [email protected] [email protected] +public interface CanUnbuffer { + /** + * Reduce the buffering. This will also free sockets and file descriptors + * held by the stream, if possible. + */ + public void unbuffer(); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/8ef73cd4/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java index c8609d4..7dae991 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java @@ -35,7 +35,7 @@ import org.apache.hadoop.util.IdentityHashStore; public class FSDataInputStream extends DataInputStream implements Seekable, PositionedReadable, ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead, - HasEnhancedByteBufferAccess { + HasEnhancedByteBufferAccess, CanUnbuffer { /** * Map ByteBuffers that we have handed out to readers to ByteBufferPool * objects @@ -220,4 +220,14 @@ public class FSDataInputStream extends DataInputStream bufferPool.putBuffer(buffer); } } + + @Override + public void unbuffer() { + try { + ((CanUnbuffer)in).unbuffer(); + } catch (ClassCastException e) { + throw new UnsupportedOperationException("this stream does not " + + "support unbuffering."); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8ef73cd4/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index a5e04c4..260ab38 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -115,6 +115,8 @@ Release 2.6.1 - 2015-09-23 HDFS-8384. Allow NN to startup if there are files having a lease but are not under construction. (jing9) + HDFS-7694. FSDataInputStream should support "unbuffer" (cmccabe) + OPTIMIZATIONS HDFS-8480. Fix performance and timeout issues in HDFS-7929 by using http://git-wip-us.apache.org/repos/asf/hadoop/blob/8ef73cd4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index a9bbb77..781432e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -50,6 +50,7 @@ import org.apache.hadoop.fs.ByteBufferReadable; import org.apache.hadoop.fs.ByteBufferUtil; import org.apache.hadoop.fs.CanSetDropBehind; import org.apache.hadoop.fs.CanSetReadahead; +import org.apache.hadoop.fs.CanUnbuffer; import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.HasEnhancedByteBufferAccess; @@ -86,7 +87,7 @@ import com.google.common.annotations.VisibleForTesting; @InterfaceAudience.Private public class DFSInputStream extends FSInputStream implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, - HasEnhancedByteBufferAccess { + HasEnhancedByteBufferAccess, CanUnbuffer { @VisibleForTesting public static boolean tcpReadsDisabledForTesting = false; private long hedgedReadOpsLoopNumForTesting = 0; @@ -1767,4 +1768,9 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, ((ByteBufferPool)val).putBuffer(buffer); } } + + @Override + public synchronized void unbuffer() { + closeCurrentBlockReader(); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8ef73cd4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java index fbe4e15..392fc8a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java @@ -29,16 +29,21 @@ import com.google.common.collect.LinkedListMultimap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Time; /** * A cache of input stream sockets to Data Node. */ -class PeerCache { [email protected] [email protected] +@VisibleForTesting +public class PeerCache { private static final Log LOG = LogFactory.getLog(PeerCache.class); private static class Key { http://git-wip-us.apache.org/repos/asf/hadoop/blob/8ef73cd4/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c index 21f9c2b..2ce583e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c @@ -1006,6 +1006,34 @@ done: return file; } +int hdfsUnbufferFile(hdfsFile file) +{ + int ret; + jthrowable jthr; + JNIEnv *env = getJNIEnv(); + + if (!env) { + ret = EINTERNAL; + goto done; + } + if (file->type != HDFS_STREAM_INPUT) { + ret = ENOTSUP; + goto done; + } + jthr = invokeMethod(env, NULL, INSTANCE, file->file, HADOOP_ISTRM, + "unbuffer", "()V"); + if (jthr) { + ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, + HADOOP_ISTRM "#unbuffer failed:"); + goto done; + } + ret = 0; + +done: + errno = ret; + return ret; +} + int hdfsCloseFile(hdfsFS fs, hdfsFile file) { int ret; http://git-wip-us.apache.org/repos/asf/hadoop/blob/8ef73cd4/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h index 0625da3..0f30301 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h @@ -334,6 +334,15 @@ extern "C" { hdfsFile hdfsOpenFile(hdfsFS fs, const char* path, int flags, int bufferSize, short replication, tSize blocksize); + /** + * hdfsUnbufferFile - Reduce the buffering done on a file. + * + * @param file The file to unbuffer. + * @return 0 on success + * ENOTSUP if the file does not support unbuffering + * Errno will also be set to this value. + */ + int hdfsUnbufferFile(hdfsFile file); /** * hdfsCloseFile - Close an open file. http://git-wip-us.apache.org/repos/asf/hadoop/blob/8ef73cd4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestUnbuffer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestUnbuffer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestUnbuffer.java new file mode 100644 index 0000000..52c33e9 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestUnbuffer.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.PeerCache; +import org.apache.hadoop.io.IOUtils; +import org.junit.Assert; +import org.junit.Test; + +public class TestUnbuffer { + private static final Log LOG = + LogFactory.getLog(TestUnbuffer.class.getName()); + + /** + * Test that calling Unbuffer closes sockets. + */ + @Test + public void testUnbufferClosesSockets() throws Exception { + Configuration conf = new Configuration(); + // Set a new ClientContext. This way, we will have our own PeerCache, + // rather than sharing one with other unit tests. + conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT, + "testUnbufferClosesSocketsContext"); + + // Disable short-circuit reads. With short-circuit, we wouldn't hold open a + // TCP socket. + conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, false); + + // Set a really long socket timeout to avoid test timing issues. + conf.setLong(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, + 100000000L); + conf.setLong(DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY, + 100000000L); + + MiniDFSCluster cluster = null; + FSDataInputStream stream = null; + try { + cluster = new MiniDFSCluster.Builder(conf).build(); + DistributedFileSystem dfs = (DistributedFileSystem) + FileSystem.newInstance(conf); + final Path TEST_PATH = new Path("/test1"); + DFSTestUtil.createFile(dfs, TEST_PATH, 128, (short)1, 1); + stream = dfs.open(TEST_PATH); + // Read a byte. This will trigger the creation of a block reader. + stream.seek(2); + int b = stream.read(); + Assert.assertTrue(-1 != b); + + // The Peer cache should start off empty. + PeerCache cache = dfs.getClient().getClientContext().getPeerCache(); + Assert.assertEquals(0, cache.size()); + + // Unbuffer should clear the block reader and return the socket to the + // cache. + stream.unbuffer(); + stream.seek(2); + Assert.assertEquals(1, cache.size()); + int b2 = stream.read(); + Assert.assertEquals(b, b2); + } finally { + if (stream != null) { + IOUtils.cleanup(null, stream); + } + if (cluster != null) { + cluster.shutdown(); + } + } + } + + /** + * Test opening many files via TCP (not short-circuit). + * + * This is practical when using unbuffer, because it reduces the number of + * sockets and amount of memory that we use. + */ + @Test + public void testOpenManyFilesViaTcp() throws Exception { + final int NUM_OPENS = 500; + Configuration conf = new Configuration(); + conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, false); + MiniDFSCluster cluster = null; + FSDataInputStream[] streams = new FSDataInputStream[NUM_OPENS]; + try { + cluster = new MiniDFSCluster.Builder(conf).build(); + DistributedFileSystem dfs = cluster.getFileSystem(); + final Path TEST_PATH = new Path("/testFile"); + DFSTestUtil.createFile(dfs, TEST_PATH, 131072, (short)1, 1); + + for (int i = 0; i < NUM_OPENS; i++) { + streams[i] = dfs.open(TEST_PATH); + LOG.info("opening file " + i + "..."); + Assert.assertTrue(-1 != streams[i].read()); + streams[i].unbuffer(); + } + } finally { + for (FSDataInputStream stream : streams) { + IOUtils.cleanup(null, stream); + } + if (cluster != null) { + cluster.shutdown(); + } + } + } +}
