http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6dfb717/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderUtil.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderUtil.java new file mode 100644 index 0000000..57fbe47 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderUtil.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.client.impl; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.BlockReader; + +import java.io.IOException; + +/** + * For sharing between the local and remote block reader implementations. + */ [email protected] +class BlockReaderUtil { + + /* See {@link BlockReader#readAll(byte[], int, int)} */ + public static int readAll(BlockReader reader, + byte[] buf, int offset, int len) throws IOException { + int n = 0; + for (;;) { + int nread = reader.read(buf, offset + n, len - n); + if (nread <= 0) + return (n == 0) ? nread : n; + n += nread; + if (n >= len) + return n; + } + } + + /* See {@link BlockReader#readFully(byte[], int, int)} */ + public static void readFully(BlockReader reader, + byte[] buf, int off, int len) throws IOException { + int toRead = len; + while (toRead > 0) { + int ret = reader.read(buf, off, toRead); + if (ret < 0) { + throw new IOException("Premature EOF from inputStream"); + } + toRead -= ret; + off += ret; + } + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6dfb717/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/ExternalBlockReader.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/ExternalBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/ExternalBlockReader.java new file mode 100644 index 0000000..a060a6e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/ExternalBlockReader.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.client.impl; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.EnumSet; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.ReadOption; +import org.apache.hadoop.hdfs.BlockReader; +import org.apache.hadoop.hdfs.ReplicaAccessor; +import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; + +/** + * An ExternalBlockReader uses pluggable ReplicaAccessor objects to read from + * replicas. + */ [email protected] +public final class ExternalBlockReader implements BlockReader { + private final ReplicaAccessor accessor; + private final long visibleLength; + private long pos; + + ExternalBlockReader(ReplicaAccessor accessor, long visibleLength, + long startOffset) { + this.accessor = accessor; + this.visibleLength = visibleLength; + this.pos = startOffset; + } + + @Override + public int read(byte[] buf, int off, int len) throws IOException { + int nread = accessor.read(pos, buf, off, len); + if (nread < 0) { + return nread; + } + pos += nread; + return nread; + } + + @Override + public int read(ByteBuffer buf) throws IOException { + int nread = accessor.read(pos, buf); + if (nread < 0) { + return nread; + } + pos += nread; + return nread; + } + + @Override + public long skip(long n) throws IOException { + // You cannot skip backwards + if (n <= 0) { + return 0; + } + // You can't skip past the last offset that we want to read with this + // block reader. + long oldPos = pos; + pos += n; + if (pos > visibleLength) { + pos = visibleLength; + } + return pos - oldPos; + } + + @Override + public int available() { + // We return the amount of bytes between the current offset and the visible + // length. Some of the other block readers return a shorter length than + // that. The only advantage to returning a shorter length is that the + // DFSInputStream will trash your block reader and create a new one if + // someone tries to seek() beyond the available() region. + long diff = visibleLength - pos; + if (diff > Integer.MAX_VALUE) { + return Integer.MAX_VALUE; + } else { + return (int)diff; + } + } + + @Override + public void close() throws IOException { + accessor.close(); + } + + @Override + public void readFully(byte[] buf, int offset, int len) throws IOException { + BlockReaderUtil.readFully(this, buf, offset, len); + } + + @Override + public int readAll(byte[] buf, int offset, int len) throws IOException { + return BlockReaderUtil.readAll(this, buf, offset, len); + } + + @Override + public boolean isLocal() { + return accessor.isLocal(); + } + + @Override + public boolean isShortCircuit() { + return accessor.isShortCircuit(); + } + + @Override + public ClientMmap getClientMmap(EnumSet<ReadOption> opts) { + // For now, pluggable ReplicaAccessors do not support zero-copy. + return null; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6dfb717/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml index 3ae8b59..426fb72 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml @@ -133,7 +133,7 @@ <!-- Don't complain about LocalDatanodeInfo's anonymous class --> <Match> - <Class name="org.apache.hadoop.hdfs.BlockReaderLocal$LocalDatanodeInfo$1" /> + <Class name="org.apache.hadoop.hdfs.client.impl.BlockReaderLocal$LocalDatanodeInfo$1" /> <Bug pattern="SE_BAD_FIELD_INNER_CLASS" /> </Match> <!-- Only one method increments numFailedVolumes and it is synchronized --> http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6dfb717/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java index 8f1b921..a813e50 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java @@ -43,7 +43,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.hdfs.BlockReader; -import org.apache.hadoop.hdfs.BlockReaderFactory; +import org.apache.hadoop.hdfs.client.impl.BlockReaderFactory; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtilClient; http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6dfb717/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java index 3455f55..f48bb01 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java @@ -38,7 +38,7 @@ import org.apache.commons.lang.SystemUtils; import org.apache.commons.lang.mutable.MutableBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hdfs.BlockReaderTestUtil; +import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil; import org.apache.hadoop.hdfs.ClientContext; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6dfb717/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java deleted file mode 100644 index ba25d97..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java +++ /dev/null @@ -1,259 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdfs; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.io.DataOutputStream; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.util.List; -import java.util.Random; - -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FsTracer; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.net.Peer; -import org.apache.hadoop.hdfs.protocol.DatanodeID; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.HdfsConstants; -import org.apache.hadoop.hdfs.protocol.LocatedBlock; -import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; -import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor; -import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; -import org.apache.hadoop.hdfs.server.datanode.DataNode; -import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache; -import org.apache.hadoop.hdfs.server.namenode.CacheManager; -import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager; -import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache; -import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica; -import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.token.Token; -import org.apache.log4j.Level; -import org.apache.log4j.LogManager; - -/** - * A helper class to setup the cluster, and get to BlockReader and DataNode for a block. - */ -public class BlockReaderTestUtil { - /** - * Returns true if we should run tests that generate large files (> 1GB) - */ - static public boolean shouldTestLargeFiles() { - String property = System.getProperty("hdfs.test.large.files"); - if (property == null) return false; - if (property.isEmpty()) return true; - return Boolean.parseBoolean(property); - } - - private HdfsConfiguration conf = null; - private MiniDFSCluster cluster = null; - - /** - * Setup the cluster - */ - public BlockReaderTestUtil(int replicationFactor) throws Exception { - this(replicationFactor, new HdfsConfiguration()); - } - - public BlockReaderTestUtil(int replicationFactor, HdfsConfiguration config) throws Exception { - this.conf = config; - conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, replicationFactor); - cluster = new MiniDFSCluster.Builder(conf).format(true).build(); - cluster.waitActive(); - } - - /** - * Shutdown cluster - */ - public void shutdown() { - if (cluster != null) { - cluster.shutdown(); - } - } - - public MiniDFSCluster getCluster() { - return cluster; - } - - public HdfsConfiguration getConf() { - return conf; - } - - /** - * Create a file of the given size filled with random data. - * @return File data. - */ - public byte[] writeFile(Path filepath, int sizeKB) - throws IOException { - FileSystem fs = cluster.getFileSystem(); - - // Write a file with the specified amount of data - DataOutputStream os = fs.create(filepath); - byte data[] = new byte[1024 * sizeKB]; - new Random().nextBytes(data); - os.write(data); - os.close(); - return data; - } - - /** - * Get the list of Blocks for a file. - */ - public List<LocatedBlock> getFileBlocks(Path filepath, int sizeKB) - throws IOException { - // Return the blocks we just wrote - DFSClient dfsclient = getDFSClient(); - return dfsclient.getNamenode().getBlockLocations( - filepath.toString(), 0, sizeKB * 1024).getLocatedBlocks(); - } - - /** - * Get the DFSClient. - */ - public DFSClient getDFSClient() throws IOException { - InetSocketAddress nnAddr = new InetSocketAddress("localhost", cluster.getNameNodePort()); - return new DFSClient(nnAddr, conf); - } - - /** - * Exercise the BlockReader and read length bytes. - * - * It does not verify the bytes read. - */ - public void readAndCheckEOS(BlockReader reader, int length, boolean expectEof) - throws IOException { - byte buf[] = new byte[1024]; - int nRead = 0; - while (nRead < length) { - DFSClient.LOG.info("So far read " + nRead + " - going to read more."); - int n = reader.read(buf, 0, buf.length); - assertTrue(n > 0); - nRead += n; - } - - if (expectEof) { - DFSClient.LOG.info("Done reading, expect EOF for next read."); - assertEquals(-1, reader.read(buf, 0, buf.length)); - } - } - - /** - * Get a BlockReader for the given block. - */ - public BlockReader getBlockReader(LocatedBlock testBlock, int offset, int lenToRead) - throws IOException { - return getBlockReader(cluster, testBlock, offset, lenToRead); - } - - /** - * Get a BlockReader for the given block. - */ - public static BlockReader getBlockReader(MiniDFSCluster cluster, - LocatedBlock testBlock, int offset, int lenToRead) throws IOException { - InetSocketAddress targetAddr = null; - ExtendedBlock block = testBlock.getBlock(); - DatanodeInfo[] nodes = testBlock.getLocations(); - targetAddr = NetUtils.createSocketAddr(nodes[0].getXferAddr()); - - final DistributedFileSystem fs = cluster.getFileSystem(); - return new BlockReaderFactory(fs.getClient().getConf()). - setInetSocketAddress(targetAddr). - setBlock(block). - setFileName(targetAddr.toString()+ ":" + block.getBlockId()). - setBlockToken(testBlock.getBlockToken()). - setStartOffset(offset). - setLength(lenToRead). - setVerifyChecksum(true). - setClientName("BlockReaderTestUtil"). - setDatanodeInfo(nodes[0]). - setClientCacheContext(ClientContext.getFromConf(fs.getConf())). - setCachingStrategy(CachingStrategy.newDefaultStrategy()). - setConfiguration(fs.getConf()). - setAllowShortCircuitLocalReads(true). - setTracer(FsTracer.get(fs.getConf())). - setRemotePeerFactory(new RemotePeerFactory() { - @Override - public Peer newConnectedPeer(InetSocketAddress addr, - Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId) - throws IOException { - Peer peer = null; - Socket sock = NetUtils. - getDefaultSocketFactory(fs.getConf()).createSocket(); - try { - sock.connect(addr, HdfsConstants.READ_TIMEOUT); - sock.setSoTimeout(HdfsConstants.READ_TIMEOUT); - peer = DFSUtilClient.peerFromSocket(sock); - } finally { - if (peer == null) { - IOUtils.closeQuietly(sock); - } - } - return peer; - } - }). - build(); - } - - /** - * Get a DataNode that serves our testBlock. - */ - public DataNode getDataNode(LocatedBlock testBlock) { - DatanodeInfo[] nodes = testBlock.getLocations(); - int ipcport = nodes[0].getIpcPort(); - return cluster.getDataNode(ipcport); - } - - public static void enableHdfsCachingTracing() { - LogManager.getLogger(CacheReplicationMonitor.class.getName()).setLevel( - Level.TRACE); - LogManager.getLogger(CacheManager.class.getName()).setLevel( - Level.TRACE); - LogManager.getLogger(FsDatasetCache.class.getName()).setLevel( - Level.TRACE); - } - - public static void enableBlockReaderFactoryTracing() { - LogManager.getLogger(BlockReaderFactory.class.getName()).setLevel( - Level.TRACE); - LogManager.getLogger(ShortCircuitCache.class.getName()).setLevel( - Level.TRACE); - LogManager.getLogger(ShortCircuitReplica.class.getName()).setLevel( - Level.TRACE); - LogManager.getLogger(BlockReaderLocal.class.getName()).setLevel( - Level.TRACE); - } - - public static void enableShortCircuitShmTracing() { - LogManager.getLogger(DfsClientShmManager.class.getName()).setLevel( - Level.TRACE); - LogManager.getLogger(ShortCircuitRegistry.class.getName()).setLevel( - Level.TRACE); - LogManager.getLogger(ShortCircuitShm.class.getName()).setLevel( - Level.TRACE); - LogManager.getLogger(DataNode.class.getName()).setLevel( - Level.TRACE); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6dfb717/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderBase.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderBase.java deleted file mode 100644 index 3d916a7..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderBase.java +++ /dev/null @@ -1,94 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs; - -import static org.junit.Assert.assertEquals; - -import java.io.IOException; -import java.util.Random; - -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.protocol.LocatedBlock; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -abstract public class TestBlockReaderBase { - private BlockReaderTestUtil util; - private byte[] blockData; - private BlockReader reader; - - /** - * if override this, make sure return array length is less than - * block size. - */ - byte [] getBlockData() { - int length = 1 << 22; - byte[] data = new byte[length]; - for (int i = 0; i < length; i++) { - data[i] = (byte) (i % 133); - } - return data; - } - - private BlockReader getBlockReader(LocatedBlock block) throws Exception { - return util.getBlockReader(block, 0, blockData.length); - } - - abstract HdfsConfiguration createConf(); - - @Before - public void setup() throws Exception { - util = new BlockReaderTestUtil(1, createConf()); - blockData = getBlockData(); - DistributedFileSystem fs = util.getCluster().getFileSystem(); - Path testfile = new Path("/testfile"); - FSDataOutputStream fout = fs.create(testfile); - fout.write(blockData); - fout.close(); - LocatedBlock blk = util.getFileBlocks(testfile, blockData.length).get(0); - reader = getBlockReader(blk); - } - - @After - public void shutdown() throws Exception { - util.shutdown(); - } - - @Test(timeout=60000) - public void testSkip() throws IOException { - Random random = new Random(); - byte [] buf = new byte[1]; - for (int pos = 0; pos < blockData.length;) { - long skip = random.nextInt(100) + 1; - long skipped = reader.skip(skip); - if (pos + skip >= blockData.length) { - assertEquals(blockData.length, pos + skipped); - break; - } else { - assertEquals(skip, skipped); - pos += skipped; - assertEquals(1, reader.read(buf, 0, 1)); - - assertEquals(blockData[pos], buf[0]); - pos += 1; - } - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6dfb717/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java deleted file mode 100644 index a8ca9c7..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java +++ /dev/null @@ -1,534 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs; - -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY; -import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT; -import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS; -import static org.hamcrest.CoreMatchers.equalTo; - -import java.io.File; -import java.io.IOException; -import java.nio.channels.ClosedByInterruptException; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.LocatedBlock; -import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.PerDatanodeVisitorInfo; -import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.Visitor; -import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache; -import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplicaInfo; -import org.apache.hadoop.net.unix.DomainSocket; -import org.apache.hadoop.net.unix.TemporarySocketDirectory; -import org.junit.After; -import org.junit.Assert; -import org.junit.Assume; -import org.junit.Before; -import org.junit.Test; - -import com.google.common.util.concurrent.Uninterruptibles; - -public class TestBlockReaderFactory { - static final Log LOG = LogFactory.getLog(TestBlockReaderFactory.class); - - @Before - public void init() { - DomainSocket.disableBindPathValidation(); - Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null)); - } - - @After - public void cleanup() { - DFSInputStream.tcpReadsDisabledForTesting = false; - BlockReaderFactory.createShortCircuitReplicaInfoCallback = null; - } - - public static Configuration createShortCircuitConf(String testName, - TemporarySocketDirectory sockDir) { - Configuration conf = new Configuration(); - conf.set(DFS_CLIENT_CONTEXT, testName); - conf.setLong(DFS_BLOCK_SIZE_KEY, 4096); - conf.set(DFS_DOMAIN_SOCKET_PATH_KEY, new File(sockDir.getDir(), - testName + "._PORT").getAbsolutePath()); - conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true); - conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY, - false); - conf.setBoolean(DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, false); - return conf; - } - - /** - * If we have a UNIX domain socket configured, - * and we have dfs.client.domain.socket.data.traffic set to true, - * and short-circuit access fails, we should still be able to pass - * data traffic over the UNIX domain socket. Test this. - */ - @Test(timeout=60000) - public void testFallbackFromShortCircuitToUnixDomainTraffic() - throws Exception { - DFSInputStream.tcpReadsDisabledForTesting = true; - TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); - - // The server is NOT configured with short-circuit local reads; - // the client is. Both support UNIX domain reads. - Configuration clientConf = createShortCircuitConf( - "testFallbackFromShortCircuitToUnixDomainTraffic", sockDir); - clientConf.set(DFS_CLIENT_CONTEXT, - "testFallbackFromShortCircuitToUnixDomainTraffic_clientContext"); - clientConf.setBoolean(DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, true); - Configuration serverConf = new Configuration(clientConf); - serverConf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, false); - - MiniDFSCluster cluster = - new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build(); - cluster.waitActive(); - FileSystem dfs = FileSystem.get(cluster.getURI(0), clientConf); - String TEST_FILE = "/test_file"; - final int TEST_FILE_LEN = 8193; - final int SEED = 0xFADED; - DFSTestUtil.createFile(dfs, new Path(TEST_FILE), TEST_FILE_LEN, - (short)1, SEED); - byte contents[] = DFSTestUtil.readFileBuffer(dfs, new Path(TEST_FILE)); - byte expected[] = DFSTestUtil. - calculateFileContentsFromSeed(SEED, TEST_FILE_LEN); - Assert.assertTrue(Arrays.equals(contents, expected)); - cluster.shutdown(); - sockDir.close(); - } - - /** - * Test the case where we have multiple threads waiting on the - * ShortCircuitCache delivering a certain ShortCircuitReplica. - * - * In this case, there should only be one call to - * createShortCircuitReplicaInfo. This one replica should be shared - * by all threads. - */ - @Test(timeout=60000) - public void testMultipleWaitersOnShortCircuitCache() - throws Exception { - final CountDownLatch latch = new CountDownLatch(1); - final AtomicBoolean creationIsBlocked = new AtomicBoolean(true); - final AtomicBoolean testFailed = new AtomicBoolean(false); - DFSInputStream.tcpReadsDisabledForTesting = true; - BlockReaderFactory.createShortCircuitReplicaInfoCallback = - new ShortCircuitCache.ShortCircuitReplicaCreator() { - @Override - public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() { - Uninterruptibles.awaitUninterruptibly(latch); - if (!creationIsBlocked.compareAndSet(true, false)) { - Assert.fail("there were multiple calls to " - + "createShortCircuitReplicaInfo. Only one was expected."); - } - return null; - } - }; - TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); - Configuration conf = createShortCircuitConf( - "testMultipleWaitersOnShortCircuitCache", sockDir); - MiniDFSCluster cluster = - new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); - cluster.waitActive(); - final DistributedFileSystem dfs = cluster.getFileSystem(); - final String TEST_FILE = "/test_file"; - final int TEST_FILE_LEN = 4000; - final int SEED = 0xFADED; - final int NUM_THREADS = 10; - DFSTestUtil.createFile(dfs, new Path(TEST_FILE), TEST_FILE_LEN, - (short)1, SEED); - Runnable readerRunnable = new Runnable() { - @Override - public void run() { - try { - byte contents[] = DFSTestUtil.readFileBuffer(dfs, new Path(TEST_FILE)); - Assert.assertFalse(creationIsBlocked.get()); - byte expected[] = DFSTestUtil. - calculateFileContentsFromSeed(SEED, TEST_FILE_LEN); - Assert.assertTrue(Arrays.equals(contents, expected)); - } catch (Throwable e) { - LOG.error("readerRunnable error", e); - testFailed.set(true); - } - } - }; - Thread threads[] = new Thread[NUM_THREADS]; - for (int i = 0; i < NUM_THREADS; i++) { - threads[i] = new Thread(readerRunnable); - threads[i].start(); - } - Thread.sleep(500); - latch.countDown(); - for (int i = 0; i < NUM_THREADS; i++) { - Uninterruptibles.joinUninterruptibly(threads[i]); - } - cluster.shutdown(); - sockDir.close(); - Assert.assertFalse(testFailed.get()); - } - - /** - * Test the case where we have a failure to complete a short circuit read - * that occurs, and then later on, we have a success. - * Any thread waiting on a cache load should receive the failure (if it - * occurs); however, the failure result should not be cached. We want - * to be able to retry later and succeed. - */ - @Test(timeout=60000) - public void testShortCircuitCacheTemporaryFailure() - throws Exception { - BlockReaderTestUtil.enableBlockReaderFactoryTracing(); - final AtomicBoolean replicaCreationShouldFail = new AtomicBoolean(true); - final AtomicBoolean testFailed = new AtomicBoolean(false); - DFSInputStream.tcpReadsDisabledForTesting = true; - BlockReaderFactory.createShortCircuitReplicaInfoCallback = - new ShortCircuitCache.ShortCircuitReplicaCreator() { - @Override - public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() { - if (replicaCreationShouldFail.get()) { - // Insert a short delay to increase the chance that one client - // thread waits for the other client thread's failure via - // a condition variable. - Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS); - return new ShortCircuitReplicaInfo(); - } - return null; - } - }; - TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); - Configuration conf = createShortCircuitConf( - "testShortCircuitCacheTemporaryFailure", sockDir); - final MiniDFSCluster cluster = - new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); - cluster.waitActive(); - final DistributedFileSystem dfs = cluster.getFileSystem(); - final String TEST_FILE = "/test_file"; - final int TEST_FILE_LEN = 4000; - final int NUM_THREADS = 2; - final int SEED = 0xFADED; - final CountDownLatch gotFailureLatch = new CountDownLatch(NUM_THREADS); - final CountDownLatch shouldRetryLatch = new CountDownLatch(1); - DFSTestUtil.createFile(dfs, new Path(TEST_FILE), TEST_FILE_LEN, - (short)1, SEED); - Runnable readerRunnable = new Runnable() { - @Override - public void run() { - try { - // First time should fail. - List<LocatedBlock> locatedBlocks = - cluster.getNameNode().getRpcServer().getBlockLocations( - TEST_FILE, 0, TEST_FILE_LEN).getLocatedBlocks(); - LocatedBlock lblock = locatedBlocks.get(0); // first block - BlockReader blockReader = null; - try { - blockReader = BlockReaderTestUtil. - getBlockReader(cluster, lblock, 0, TEST_FILE_LEN); - Assert.fail("expected getBlockReader to fail the first time."); - } catch (Throwable t) { - Assert.assertTrue("expected to see 'TCP reads were disabled " + - "for testing' in exception " + t, t.getMessage().contains( - "TCP reads were disabled for testing")); - } finally { - if (blockReader != null) blockReader.close(); // keep findbugs happy - } - gotFailureLatch.countDown(); - shouldRetryLatch.await(); - - // Second time should succeed. - try { - blockReader = BlockReaderTestUtil. - getBlockReader(cluster, lblock, 0, TEST_FILE_LEN); - } catch (Throwable t) { - LOG.error("error trying to retrieve a block reader " + - "the second time.", t); - throw t; - } finally { - if (blockReader != null) blockReader.close(); - } - } catch (Throwable t) { - LOG.error("getBlockReader failure", t); - testFailed.set(true); - } - } - }; - Thread threads[] = new Thread[NUM_THREADS]; - for (int i = 0; i < NUM_THREADS; i++) { - threads[i] = new Thread(readerRunnable); - threads[i].start(); - } - gotFailureLatch.await(); - replicaCreationShouldFail.set(false); - shouldRetryLatch.countDown(); - for (int i = 0; i < NUM_THREADS; i++) { - Uninterruptibles.joinUninterruptibly(threads[i]); - } - cluster.shutdown(); - sockDir.close(); - Assert.assertFalse(testFailed.get()); - } - - /** - * Test that a client which supports short-circuit reads using - * shared memory can fall back to not using shared memory when - * the server doesn't support it. - */ - @Test - public void testShortCircuitReadFromServerWithoutShm() throws Exception { - TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); - Configuration clientConf = createShortCircuitConf( - "testShortCircuitReadFromServerWithoutShm", sockDir); - Configuration serverConf = new Configuration(clientConf); - serverConf.setInt( - DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS, 0); - DFSInputStream.tcpReadsDisabledForTesting = true; - final MiniDFSCluster cluster = - new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build(); - cluster.waitActive(); - clientConf.set(DFS_CLIENT_CONTEXT, - "testShortCircuitReadFromServerWithoutShm_clientContext"); - final DistributedFileSystem fs = - (DistributedFileSystem)FileSystem.get(cluster.getURI(0), clientConf); - final String TEST_FILE = "/test_file"; - final int TEST_FILE_LEN = 4000; - final int SEED = 0xFADEC; - DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN, - (short)1, SEED); - byte contents[] = DFSTestUtil.readFileBuffer(fs, new Path(TEST_FILE)); - byte expected[] = DFSTestUtil. - calculateFileContentsFromSeed(SEED, TEST_FILE_LEN); - Assert.assertTrue(Arrays.equals(contents, expected)); - final ShortCircuitCache cache = - fs.dfs.getClientContext().getShortCircuitCache(); - final DatanodeInfo datanode = - new DatanodeInfo(cluster.getDataNodes().get(0).getDatanodeId()); - cache.getDfsClientShmManager().visit(new Visitor() { - @Override - public void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info) - throws IOException { - Assert.assertEquals(1, info.size()); - PerDatanodeVisitorInfo vinfo = info.get(datanode); - Assert.assertTrue(vinfo.disabled); - Assert.assertEquals(0, vinfo.full.size()); - Assert.assertEquals(0, vinfo.notFull.size()); - } - }); - cluster.shutdown(); - sockDir.close(); - } - - /** - * Test that a client which does not support short-circuit reads using - * shared memory can talk with a server which supports it. - */ - @Test - public void testShortCircuitReadFromClientWithoutShm() throws Exception { - TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); - Configuration clientConf = createShortCircuitConf( - "testShortCircuitReadWithoutShm", sockDir); - Configuration serverConf = new Configuration(clientConf); - DFSInputStream.tcpReadsDisabledForTesting = true; - final MiniDFSCluster cluster = - new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build(); - cluster.waitActive(); - clientConf.setInt( - DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS, 0); - clientConf.set(DFS_CLIENT_CONTEXT, - "testShortCircuitReadFromClientWithoutShm_clientContext"); - final DistributedFileSystem fs = - (DistributedFileSystem)FileSystem.get(cluster.getURI(0), clientConf); - final String TEST_FILE = "/test_file"; - final int TEST_FILE_LEN = 4000; - final int SEED = 0xFADEC; - DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN, - (short)1, SEED); - byte contents[] = DFSTestUtil.readFileBuffer(fs, new Path(TEST_FILE)); - byte expected[] = DFSTestUtil. - calculateFileContentsFromSeed(SEED, TEST_FILE_LEN); - Assert.assertTrue(Arrays.equals(contents, expected)); - final ShortCircuitCache cache = - fs.dfs.getClientContext().getShortCircuitCache(); - Assert.assertEquals(null, cache.getDfsClientShmManager()); - cluster.shutdown(); - sockDir.close(); - } - - /** - * Test shutting down the ShortCircuitCache while there are things in it. - */ - @Test - public void testShortCircuitCacheShutdown() throws Exception { - TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); - Configuration conf = createShortCircuitConf( - "testShortCircuitCacheShutdown", sockDir); - conf.set(DFS_CLIENT_CONTEXT, "testShortCircuitCacheShutdown"); - Configuration serverConf = new Configuration(conf); - DFSInputStream.tcpReadsDisabledForTesting = true; - final MiniDFSCluster cluster = - new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build(); - cluster.waitActive(); - final DistributedFileSystem fs = - (DistributedFileSystem)FileSystem.get(cluster.getURI(0), conf); - final String TEST_FILE = "/test_file"; - final int TEST_FILE_LEN = 4000; - final int SEED = 0xFADEC; - DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN, - (short)1, SEED); - byte contents[] = DFSTestUtil.readFileBuffer(fs, new Path(TEST_FILE)); - byte expected[] = DFSTestUtil. - calculateFileContentsFromSeed(SEED, TEST_FILE_LEN); - Assert.assertTrue(Arrays.equals(contents, expected)); - final ShortCircuitCache cache = - fs.dfs.getClientContext().getShortCircuitCache(); - cache.close(); - Assert.assertTrue(cache.getDfsClientShmManager(). - getDomainSocketWatcher().isClosed()); - cluster.shutdown(); - sockDir.close(); - } - - /** - * When an InterruptedException is sent to a thread calling - * FileChannel#read, the FileChannel is immediately closed and the - * thread gets an exception. This effectively means that we might have - * someone asynchronously calling close() on the file descriptors we use - * in BlockReaderLocal. So when unreferencing a ShortCircuitReplica in - * ShortCircuitCache#unref, we should check if the FileChannel objects - * are still open. If not, we should purge the replica to avoid giving - * it out to any future readers. - * - * This is a regression test for HDFS-6227: Short circuit read failed - * due to ClosedChannelException. - * - * Note that you may still get ClosedChannelException errors if two threads - * are reading from the same replica and an InterruptedException is delivered - * to one of them. - */ - @Test(timeout=120000) - public void testPurgingClosedReplicas() throws Exception { - BlockReaderTestUtil.enableBlockReaderFactoryTracing(); - final AtomicInteger replicasCreated = new AtomicInteger(0); - final AtomicBoolean testFailed = new AtomicBoolean(false); - DFSInputStream.tcpReadsDisabledForTesting = true; - BlockReaderFactory.createShortCircuitReplicaInfoCallback = - new ShortCircuitCache.ShortCircuitReplicaCreator() { - @Override - public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() { - replicasCreated.incrementAndGet(); - return null; - } - }; - TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); - Configuration conf = createShortCircuitConf( - "testPurgingClosedReplicas", sockDir); - final MiniDFSCluster cluster = - new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); - cluster.waitActive(); - final DistributedFileSystem dfs = cluster.getFileSystem(); - final String TEST_FILE = "/test_file"; - final int TEST_FILE_LEN = 4095; - final int SEED = 0xFADE0; - final DistributedFileSystem fs = - (DistributedFileSystem)FileSystem.get(cluster.getURI(0), conf); - DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN, - (short)1, SEED); - - final Semaphore sem = new Semaphore(0); - final List<LocatedBlock> locatedBlocks = - cluster.getNameNode().getRpcServer().getBlockLocations( - TEST_FILE, 0, TEST_FILE_LEN).getLocatedBlocks(); - final LocatedBlock lblock = locatedBlocks.get(0); // first block - final byte[] buf = new byte[TEST_FILE_LEN]; - Runnable readerRunnable = new Runnable() { - @Override - public void run() { - try { - while (true) { - BlockReader blockReader = null; - try { - blockReader = BlockReaderTestUtil. - getBlockReader(cluster, lblock, 0, TEST_FILE_LEN); - sem.release(); - try { - blockReader.readAll(buf, 0, TEST_FILE_LEN); - } finally { - sem.acquireUninterruptibly(); - } - } catch (ClosedByInterruptException e) { - LOG.info("got the expected ClosedByInterruptException", e); - sem.release(); - break; - } finally { - if (blockReader != null) blockReader.close(); - } - LOG.info("read another " + TEST_FILE_LEN + " bytes."); - } - } catch (Throwable t) { - LOG.error("getBlockReader failure", t); - testFailed.set(true); - sem.release(); - } - } - }; - Thread thread = new Thread(readerRunnable); - thread.start(); - - // While the thread is reading, send it interrupts. - // These should trigger a ClosedChannelException. - while (thread.isAlive()) { - sem.acquireUninterruptibly(); - thread.interrupt(); - sem.release(); - } - Assert.assertFalse(testFailed.get()); - - // We should be able to read from the file without - // getting a ClosedChannelException. - BlockReader blockReader = null; - try { - blockReader = BlockReaderTestUtil. - getBlockReader(cluster, lblock, 0, TEST_FILE_LEN); - blockReader.readFully(buf, 0, TEST_FILE_LEN); - } finally { - if (blockReader != null) blockReader.close(); - } - byte expected[] = DFSTestUtil. - calculateFileContentsFromSeed(SEED, TEST_FILE_LEN); - Assert.assertTrue(Arrays.equals(buf, expected)); - - // Another ShortCircuitReplica object should have been created. - Assert.assertEquals(2, replicasCreated.get()); - - dfs.close(); - cluster.shutdown(); - sockDir.close(); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6dfb717/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java deleted file mode 100644 index 2d6c63a..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java +++ /dev/null @@ -1,778 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs; - -import static org.hamcrest.CoreMatchers.equalTo; - -import java.io.EOFException; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.RandomAccessFile; -import java.nio.ByteBuffer; -import java.util.UUID; -import java.util.concurrent.TimeoutException; - -import org.apache.hadoop.fs.ChecksumException; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; -import org.apache.hadoop.hdfs.client.HdfsDataInputStream; -import org.apache.hadoop.hdfs.client.impl.DfsClientConf; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; -import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache; -import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica; -import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm; -import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId; -import org.apache.hadoop.fs.FsTracer; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.net.unix.DomainSocket; -import org.apache.hadoop.net.unix.TemporarySocketDirectory; -import org.apache.hadoop.util.Time; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.Assume; -import org.junit.BeforeClass; -import org.junit.Test; - -public class TestBlockReaderLocal { - private static TemporarySocketDirectory sockDir; - - @BeforeClass - public static void init() { - sockDir = new TemporarySocketDirectory(); - DomainSocket.disableBindPathValidation(); - } - - @AfterClass - public static void shutdown() throws IOException { - sockDir.close(); - } - - public static void assertArrayRegionsEqual(byte []buf1, int off1, byte []buf2, - int off2, int len) { - for (int i = 0; i < len; i++) { - if (buf1[off1 + i] != buf2[off2 + i]) { - Assert.fail("arrays differ at byte " + i + ". " + - "The first array has " + (int)buf1[off1 + i] + - ", but the second array has " + (int)buf2[off2 + i]); - } - } - } - - /** - * Similar to IOUtils#readFully(). Reads bytes in a loop. - * - * @param reader The BlockReaderLocal to read bytes from - * @param buf The ByteBuffer to read into - * @param off The offset in the buffer to read into - * @param len The number of bytes to read. - * - * @throws IOException If it could not read the requested number of bytes - */ - private static void readFully(BlockReaderLocal reader, - ByteBuffer buf, int off, int len) throws IOException { - int amt = len; - while (amt > 0) { - buf.limit(off + len); - buf.position(off); - long ret = reader.read(buf); - if (ret < 0) { - throw new EOFException( "Premature EOF from BlockReaderLocal " + - "after reading " + (len - amt) + " byte(s)."); - } - amt -= ret; - off += ret; - } - } - - private static class BlockReaderLocalTest { - final static int TEST_LENGTH = 12345; - final static int BYTES_PER_CHECKSUM = 512; - - public void setConfiguration(HdfsConfiguration conf) { - // default: no-op - } - public void setup(File blockFile, boolean usingChecksums) - throws IOException { - // default: no-op - } - public void doTest(BlockReaderLocal reader, byte original[]) - throws IOException { - // default: no-op - } - } - - public void runBlockReaderLocalTest(BlockReaderLocalTest test, - boolean checksum, long readahead) throws IOException { - Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null)); - MiniDFSCluster cluster = null; - HdfsConfiguration conf = new HdfsConfiguration(); - conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY, - !checksum); - conf.setLong(HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, - BlockReaderLocalTest.BYTES_PER_CHECKSUM); - conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "CRC32C"); - conf.setLong(HdfsClientConfigKeys.DFS_CLIENT_CACHE_READAHEAD, readahead); - test.setConfiguration(conf); - FileInputStream dataIn = null, metaIn = null; - final Path TEST_PATH = new Path("/a"); - final long RANDOM_SEED = 4567L; - BlockReaderLocal blockReaderLocal = null; - FSDataInputStream fsIn = null; - byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH]; - - FileSystem fs = null; - ShortCircuitShm shm = null; - RandomAccessFile raf = null; - try { - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); - cluster.waitActive(); - fs = cluster.getFileSystem(); - DFSTestUtil.createFile(fs, TEST_PATH, - BlockReaderLocalTest.TEST_LENGTH, (short)1, RANDOM_SEED); - try { - DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1); - } catch (InterruptedException e) { - Assert.fail("unexpected InterruptedException during " + - "waitReplication: " + e); - } catch (TimeoutException e) { - Assert.fail("unexpected TimeoutException during " + - "waitReplication: " + e); - } - fsIn = fs.open(TEST_PATH); - IOUtils.readFully(fsIn, original, 0, - BlockReaderLocalTest.TEST_LENGTH); - fsIn.close(); - fsIn = null; - ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, TEST_PATH); - File dataFile = cluster.getBlockFile(0, block); - File metaFile = cluster.getBlockMetadataFile(0, block); - - ShortCircuitCache shortCircuitCache = - ClientContext.getFromConf(conf).getShortCircuitCache(); - cluster.shutdown(); - cluster = null; - test.setup(dataFile, checksum); - FileInputStream streams[] = { - new FileInputStream(dataFile), - new FileInputStream(metaFile) - }; - dataIn = streams[0]; - metaIn = streams[1]; - ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(), - block.getBlockPoolId()); - raf = new RandomAccessFile( - new File(sockDir.getDir().getAbsolutePath(), - UUID.randomUUID().toString()), "rw"); - raf.setLength(8192); - FileInputStream shmStream = new FileInputStream(raf.getFD()); - shm = new ShortCircuitShm(ShmId.createRandom(), shmStream); - ShortCircuitReplica replica = - new ShortCircuitReplica(key, dataIn, metaIn, shortCircuitCache, - Time.now(), shm.allocAndRegisterSlot( - ExtendedBlockId.fromExtendedBlock(block))); - blockReaderLocal = new BlockReaderLocal.Builder( - new DfsClientConf.ShortCircuitConf(conf)). - setFilename(TEST_PATH.getName()). - setBlock(block). - setShortCircuitReplica(replica). - setCachingStrategy(new CachingStrategy(false, readahead)). - setVerifyChecksum(checksum). - setTracer(FsTracer.get(conf)). - build(); - dataIn = null; - metaIn = null; - test.doTest(blockReaderLocal, original); - // BlockReaderLocal should not alter the file position. - Assert.assertEquals(0, streams[0].getChannel().position()); - Assert.assertEquals(0, streams[1].getChannel().position()); - } finally { - if (fsIn != null) fsIn.close(); - if (fs != null) fs.close(); - if (cluster != null) cluster.shutdown(); - if (dataIn != null) dataIn.close(); - if (metaIn != null) metaIn.close(); - if (blockReaderLocal != null) blockReaderLocal.close(); - if (shm != null) shm.free(); - if (raf != null) raf.close(); - } - } - - private static class TestBlockReaderLocalImmediateClose - extends BlockReaderLocalTest { - } - - @Test - public void testBlockReaderLocalImmediateClose() throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), true, 0); - runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), false, 0); - } - - private static class TestBlockReaderSimpleReads - extends BlockReaderLocalTest { - @Override - public void doTest(BlockReaderLocal reader, byte original[]) - throws IOException { - byte buf[] = new byte[TEST_LENGTH]; - reader.readFully(buf, 0, 512); - assertArrayRegionsEqual(original, 0, buf, 0, 512); - reader.readFully(buf, 512, 512); - assertArrayRegionsEqual(original, 512, buf, 512, 512); - reader.readFully(buf, 1024, 513); - assertArrayRegionsEqual(original, 1024, buf, 1024, 513); - reader.readFully(buf, 1537, 514); - assertArrayRegionsEqual(original, 1537, buf, 1537, 514); - // Readahead is always at least the size of one chunk in this test. - Assert.assertTrue(reader.getMaxReadaheadLength() >= - BlockReaderLocalTest.BYTES_PER_CHECKSUM); - } - } - - @Test - public void testBlockReaderSimpleReads() throws IOException { - runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), true, - HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); - } - - @Test - public void testBlockReaderSimpleReadsShortReadahead() throws IOException { - runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), true, - BlockReaderLocalTest.BYTES_PER_CHECKSUM - 1); - } - - @Test - public void testBlockReaderSimpleReadsNoChecksum() throws IOException { - runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), false, - HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); - } - - @Test - public void testBlockReaderSimpleReadsNoReadahead() throws IOException { - runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), true, 0); - } - - @Test - public void testBlockReaderSimpleReadsNoChecksumNoReadahead() - throws IOException { - runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), false, 0); - } - - private static class TestBlockReaderLocalArrayReads2 - extends BlockReaderLocalTest { - @Override - public void doTest(BlockReaderLocal reader, byte original[]) - throws IOException { - byte buf[] = new byte[TEST_LENGTH]; - reader.readFully(buf, 0, 10); - assertArrayRegionsEqual(original, 0, buf, 0, 10); - reader.readFully(buf, 10, 100); - assertArrayRegionsEqual(original, 10, buf, 10, 100); - reader.readFully(buf, 110, 700); - assertArrayRegionsEqual(original, 110, buf, 110, 700); - reader.readFully(buf, 810, 1); // from offset 810 to offset 811 - reader.readFully(buf, 811, 5); - assertArrayRegionsEqual(original, 811, buf, 811, 5); - reader.readFully(buf, 816, 900); // skip from offset 816 to offset 1716 - reader.readFully(buf, 1716, 5); - assertArrayRegionsEqual(original, 1716, buf, 1716, 5); - } - } - - @Test - public void testBlockReaderLocalArrayReads2() throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(), - true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); - } - - @Test - public void testBlockReaderLocalArrayReads2NoChecksum() - throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(), - false, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); - } - - @Test - public void testBlockReaderLocalArrayReads2NoReadahead() - throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(), true, 0); - } - - @Test - public void testBlockReaderLocalArrayReads2NoChecksumNoReadahead() - throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(), false, 0); - } - - private static class TestBlockReaderLocalByteBufferReads - extends BlockReaderLocalTest { - @Override - public void doTest(BlockReaderLocal reader, byte original[]) - throws IOException { - ByteBuffer buf = ByteBuffer.wrap(new byte[TEST_LENGTH]); - readFully(reader, buf, 0, 10); - assertArrayRegionsEqual(original, 0, buf.array(), 0, 10); - readFully(reader, buf, 10, 100); - assertArrayRegionsEqual(original, 10, buf.array(), 10, 100); - readFully(reader, buf, 110, 700); - assertArrayRegionsEqual(original, 110, buf.array(), 110, 700); - reader.skip(1); // skip from offset 810 to offset 811 - readFully(reader, buf, 811, 5); - assertArrayRegionsEqual(original, 811, buf.array(), 811, 5); - } - } - - @Test - public void testBlockReaderLocalByteBufferReads() - throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferReads(), - true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); - } - - @Test - public void testBlockReaderLocalByteBufferReadsNoChecksum() - throws IOException { - runBlockReaderLocalTest( - new TestBlockReaderLocalByteBufferReads(), - false, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); - } - - @Test - public void testBlockReaderLocalByteBufferReadsNoReadahead() - throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferReads(), - true, 0); - } - - @Test - public void testBlockReaderLocalByteBufferReadsNoChecksumNoReadahead() - throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferReads(), - false, 0); - } - - /** - * Test reads that bypass the bounce buffer (because they are aligned - * and bigger than the readahead). - */ - private static class TestBlockReaderLocalByteBufferFastLaneReads - extends BlockReaderLocalTest { - @Override - public void doTest(BlockReaderLocal reader, byte original[]) - throws IOException { - ByteBuffer buf = ByteBuffer.allocateDirect(TEST_LENGTH); - readFully(reader, buf, 0, 5120); - buf.flip(); - assertArrayRegionsEqual(original, 0, - DFSTestUtil.asArray(buf), 0, - 5120); - reader.skip(1537); - readFully(reader, buf, 0, 1); - buf.flip(); - assertArrayRegionsEqual(original, 6657, - DFSTestUtil.asArray(buf), 0, - 1); - reader.forceAnchorable(); - readFully(reader, buf, 0, 5120); - buf.flip(); - assertArrayRegionsEqual(original, 6658, - DFSTestUtil.asArray(buf), 0, - 5120); - reader.forceUnanchorable(); - readFully(reader, buf, 0, 513); - buf.flip(); - assertArrayRegionsEqual(original, 11778, - DFSTestUtil.asArray(buf), 0, - 513); - reader.skip(3); - readFully(reader, buf, 0, 50); - buf.flip(); - assertArrayRegionsEqual(original, 12294, - DFSTestUtil.asArray(buf), 0, - 50); - } - } - - @Test - public void testBlockReaderLocalByteBufferFastLaneReads() - throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferFastLaneReads(), - true, 2 * BlockReaderLocalTest.BYTES_PER_CHECKSUM); - } - - @Test - public void testBlockReaderLocalByteBufferFastLaneReadsNoChecksum() - throws IOException { - runBlockReaderLocalTest( - new TestBlockReaderLocalByteBufferFastLaneReads(), - false, 2 * BlockReaderLocalTest.BYTES_PER_CHECKSUM); - } - - @Test - public void testBlockReaderLocalByteBufferFastLaneReadsNoReadahead() - throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferFastLaneReads(), - true, 0); - } - - @Test - public void testBlockReaderLocalByteBufferFastLaneReadsNoChecksumNoReadahead() - throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferFastLaneReads(), - false, 0); - } - - private static class TestBlockReaderLocalReadCorruptStart - extends BlockReaderLocalTest { - boolean usingChecksums = false; - @Override - public void setup(File blockFile, boolean usingChecksums) - throws IOException { - RandomAccessFile bf = null; - this.usingChecksums = usingChecksums; - try { - bf = new RandomAccessFile(blockFile, "rw"); - bf.write(new byte[] {0,0,0,0,0,0,0,0,0,0,0,0,0,0}); - } finally { - if (bf != null) bf.close(); - } - } - - public void doTest(BlockReaderLocal reader, byte original[]) - throws IOException { - byte buf[] = new byte[TEST_LENGTH]; - if (usingChecksums) { - try { - reader.readFully(buf, 0, 10); - Assert.fail("did not detect corruption"); - } catch (IOException e) { - // expected - } - } else { - reader.readFully(buf, 0, 10); - } - } - } - - @Test - public void testBlockReaderLocalReadCorruptStart() - throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalReadCorruptStart(), true, - HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); - } - - private static class TestBlockReaderLocalReadCorrupt - extends BlockReaderLocalTest { - boolean usingChecksums = false; - @Override - public void setup(File blockFile, boolean usingChecksums) - throws IOException { - RandomAccessFile bf = null; - this.usingChecksums = usingChecksums; - try { - bf = new RandomAccessFile(blockFile, "rw"); - bf.seek(1539); - bf.write(new byte[] {0,0,0,0,0,0,0,0,0,0,0,0,0,0}); - } finally { - if (bf != null) bf.close(); - } - } - - public void doTest(BlockReaderLocal reader, byte original[]) - throws IOException { - byte buf[] = new byte[TEST_LENGTH]; - try { - reader.readFully(buf, 0, 10); - assertArrayRegionsEqual(original, 0, buf, 0, 10); - reader.readFully(buf, 10, 100); - assertArrayRegionsEqual(original, 10, buf, 10, 100); - reader.readFully(buf, 110, 700); - assertArrayRegionsEqual(original, 110, buf, 110, 700); - reader.skip(1); // skip from offset 810 to offset 811 - reader.readFully(buf, 811, 5); - assertArrayRegionsEqual(original, 811, buf, 811, 5); - reader.readFully(buf, 816, 900); - if (usingChecksums) { - // We should detect the corruption when using a checksum file. - Assert.fail("did not detect corruption"); - } - } catch (ChecksumException e) { - if (!usingChecksums) { - Assert.fail("didn't expect to get ChecksumException: not " + - "using checksums."); - } - } - } - } - - @Test - public void testBlockReaderLocalReadCorrupt() - throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), true, - HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); - } - - @Test - public void testBlockReaderLocalReadCorruptNoChecksum() - throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), false, - HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); - } - - @Test - public void testBlockReaderLocalReadCorruptNoReadahead() - throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), true, 0); - } - - @Test - public void testBlockReaderLocalReadCorruptNoChecksumNoReadahead() - throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), false, 0); - } - - private static class TestBlockReaderLocalWithMlockChanges - extends BlockReaderLocalTest { - @Override - public void setup(File blockFile, boolean usingChecksums) - throws IOException { - } - - @Override - public void doTest(BlockReaderLocal reader, byte original[]) - throws IOException { - ByteBuffer buf = ByteBuffer.wrap(new byte[TEST_LENGTH]); - reader.skip(1); - readFully(reader, buf, 1, 9); - assertArrayRegionsEqual(original, 1, buf.array(), 1, 9); - readFully(reader, buf, 10, 100); - assertArrayRegionsEqual(original, 10, buf.array(), 10, 100); - reader.forceAnchorable(); - readFully(reader, buf, 110, 700); - assertArrayRegionsEqual(original, 110, buf.array(), 110, 700); - reader.forceUnanchorable(); - reader.skip(1); // skip from offset 810 to offset 811 - readFully(reader, buf, 811, 5); - assertArrayRegionsEqual(original, 811, buf.array(), 811, 5); - } - } - - @Test - public void testBlockReaderLocalWithMlockChanges() - throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalWithMlockChanges(), - true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); - } - - @Test - public void testBlockReaderLocalWithMlockChangesNoChecksum() - throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalWithMlockChanges(), - false, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); - } - - @Test - public void testBlockReaderLocalWithMlockChangesNoReadahead() - throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalWithMlockChanges(), - true, 0); - } - - @Test - public void testBlockReaderLocalWithMlockChangesNoChecksumNoReadahead() - throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalWithMlockChanges(), - false, 0); - } - - private static class TestBlockReaderLocalOnFileWithoutChecksum - extends BlockReaderLocalTest { - @Override - public void setConfiguration(HdfsConfiguration conf) { - conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "NULL"); - } - - @Override - public void doTest(BlockReaderLocal reader, byte original[]) - throws IOException { - Assert.assertTrue(!reader.getVerifyChecksum()); - ByteBuffer buf = ByteBuffer.wrap(new byte[TEST_LENGTH]); - reader.skip(1); - readFully(reader, buf, 1, 9); - assertArrayRegionsEqual(original, 1, buf.array(), 1, 9); - readFully(reader, buf, 10, 100); - assertArrayRegionsEqual(original, 10, buf.array(), 10, 100); - reader.forceAnchorable(); - readFully(reader, buf, 110, 700); - assertArrayRegionsEqual(original, 110, buf.array(), 110, 700); - reader.forceUnanchorable(); - reader.skip(1); // skip from offset 810 to offset 811 - readFully(reader, buf, 811, 5); - assertArrayRegionsEqual(original, 811, buf.array(), 811, 5); - } - } - - private static class TestBlockReaderLocalReadZeroBytes - extends BlockReaderLocalTest { - @Override - public void doTest(BlockReaderLocal reader, byte original[]) - throws IOException { - byte emptyArr[] = new byte[0]; - Assert.assertEquals(0, reader.read(emptyArr, 0, 0)); - ByteBuffer emptyBuf = ByteBuffer.wrap(emptyArr); - Assert.assertEquals(0, reader.read(emptyBuf)); - reader.skip(1); - Assert.assertEquals(0, reader.read(emptyArr, 0, 0)); - Assert.assertEquals(0, reader.read(emptyBuf)); - reader.skip(BlockReaderLocalTest.TEST_LENGTH - 1); - Assert.assertEquals(-1, reader.read(emptyArr, 0, 0)); - Assert.assertEquals(-1, reader.read(emptyBuf)); - } - } - - @Test - public void testBlockReaderLocalOnFileWithoutChecksum() - throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalOnFileWithoutChecksum(), - true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); - } - - @Test - public void testBlockReaderLocalOnFileWithoutChecksumNoChecksum() - throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalOnFileWithoutChecksum(), - false, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); - } - - @Test - public void testBlockReaderLocalOnFileWithoutChecksumNoReadahead() - throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalOnFileWithoutChecksum(), - true, 0); - } - - @Test - public void testBlockReaderLocalOnFileWithoutChecksumNoChecksumNoReadahead() - throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalOnFileWithoutChecksum(), - false, 0); - } - - @Test - public void testBlockReaderLocalReadZeroBytes() - throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalReadZeroBytes(), - true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); - } - - @Test - public void testBlockReaderLocalReadZeroBytesNoChecksum() - throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalReadZeroBytes(), - false, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); - } - - @Test - public void testBlockReaderLocalReadZeroBytesNoReadahead() - throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalReadZeroBytes(), - true, 0); - } - - @Test - public void testBlockReaderLocalReadZeroBytesNoChecksumNoReadahead() - throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalReadZeroBytes(), - false, 0); - } - - - @Test(timeout=60000) - public void TestStatisticsForShortCircuitLocalRead() throws Exception { - testStatistics(true); - } - - @Test(timeout=60000) - public void TestStatisticsForLocalRead() throws Exception { - testStatistics(false); - } - - private void testStatistics(boolean isShortCircuit) throws Exception { - Assume.assumeTrue(DomainSocket.getLoadingFailureReason() == null); - HdfsConfiguration conf = new HdfsConfiguration(); - TemporarySocketDirectory sockDir = null; - if (isShortCircuit) { - DFSInputStream.tcpReadsDisabledForTesting = true; - sockDir = new TemporarySocketDirectory(); - conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, - new File(sockDir.getDir(), "TestStatisticsForLocalRead.%d.sock"). - getAbsolutePath()); - conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true); - DomainSocket.disableBindPathValidation(); - } else { - conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, false); - } - MiniDFSCluster cluster = null; - final Path TEST_PATH = new Path("/a"); - final long RANDOM_SEED = 4567L; - FSDataInputStream fsIn = null; - byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH]; - FileSystem fs = null; - try { - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); - cluster.waitActive(); - fs = cluster.getFileSystem(); - DFSTestUtil.createFile(fs, TEST_PATH, - BlockReaderLocalTest.TEST_LENGTH, (short)1, RANDOM_SEED); - try { - DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1); - } catch (InterruptedException e) { - Assert.fail("unexpected InterruptedException during " + - "waitReplication: " + e); - } catch (TimeoutException e) { - Assert.fail("unexpected TimeoutException during " + - "waitReplication: " + e); - } - fsIn = fs.open(TEST_PATH); - IOUtils.readFully(fsIn, original, 0, - BlockReaderLocalTest.TEST_LENGTH); - HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn; - Assert.assertEquals(BlockReaderLocalTest.TEST_LENGTH, - dfsIn.getReadStatistics().getTotalBytesRead()); - Assert.assertEquals(BlockReaderLocalTest.TEST_LENGTH, - dfsIn.getReadStatistics().getTotalLocalBytesRead()); - if (isShortCircuit) { - Assert.assertEquals(BlockReaderLocalTest.TEST_LENGTH, - dfsIn.getReadStatistics().getTotalShortCircuitBytesRead()); - } else { - Assert.assertEquals(0, - dfsIn.getReadStatistics().getTotalShortCircuitBytesRead()); - } - fsIn.close(); - fsIn = null; - } finally { - DFSInputStream.tcpReadsDisabledForTesting = false; - if (fsIn != null) fsIn.close(); - if (fs != null) fs.close(); - if (cluster != null) cluster.shutdown(); - if (sockDir != null) sockDir.close(); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6dfb717/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocalLegacy.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocalLegacy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocalLegacy.java deleted file mode 100644 index af28bd3..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocalLegacy.java +++ /dev/null @@ -1,220 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.io.File; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Arrays; - -import org.apache.hadoop.fs.ChecksumException; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; -import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; -import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.LocatedBlock; -import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.net.unix.DomainSocket; -import org.apache.hadoop.net.unix.TemporarySocketDirectory; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; -import org.junit.Assert; -import org.junit.Assume; -import org.junit.BeforeClass; -import org.junit.Test; - -public class TestBlockReaderLocalLegacy { - @BeforeClass - public static void setupCluster() throws IOException { - DFSInputStream.tcpReadsDisabledForTesting = true; - DomainSocket.disableBindPathValidation(); - } - - private static HdfsConfiguration getConfiguration( - TemporarySocketDirectory socketDir) throws IOException { - HdfsConfiguration conf = new HdfsConfiguration(); - if (socketDir == null) { - conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, ""); - } else { - conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, - new File(socketDir.getDir(), "TestBlockReaderLocalLegacy.%d.sock"). - getAbsolutePath()); - } - conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true); - conf.setBoolean(HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, true); - conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY, - false); - conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY, - UserGroupInformation.getCurrentUser().getShortUserName()); - conf.setBoolean(HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, false); - // Set short retry timeouts so this test runs faster - conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10); - return conf; - } - - /** - * Test that, in the case of an error, the position and limit of a ByteBuffer - * are left unchanged. This is not mandated by ByteBufferReadable, but clients - * of this class might immediately issue a retry on failure, so it's polite. - */ - @Test - public void testStablePositionAfterCorruptRead() throws Exception { - final short REPL_FACTOR = 1; - final long FILE_LENGTH = 512L; - - HdfsConfiguration conf = getConfiguration(null); - MiniDFSCluster cluster = - new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); - cluster.waitActive(); - FileSystem fs = cluster.getFileSystem(); - - Path path = new Path("/corrupted"); - - DFSTestUtil.createFile(fs, path, FILE_LENGTH, REPL_FACTOR, 12345L); - DFSTestUtil.waitReplication(fs, path, REPL_FACTOR); - - ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, path); - int blockFilesCorrupted = cluster.corruptBlockOnDataNodes(block); - assertEquals("All replicas not corrupted", REPL_FACTOR, blockFilesCorrupted); - - FSDataInputStream dis = cluster.getFileSystem().open(path); - ByteBuffer buf = ByteBuffer.allocateDirect((int)FILE_LENGTH); - boolean sawException = false; - try { - dis.read(buf); - } catch (ChecksumException ex) { - sawException = true; - } - - assertTrue(sawException); - assertEquals(0, buf.position()); - assertEquals(buf.capacity(), buf.limit()); - - dis = cluster.getFileSystem().open(path); - buf.position(3); - buf.limit(25); - sawException = false; - try { - dis.read(buf); - } catch (ChecksumException ex) { - sawException = true; - } - - assertTrue(sawException); - assertEquals(3, buf.position()); - assertEquals(25, buf.limit()); - cluster.shutdown(); - } - - @Test - public void testBothOldAndNewShortCircuitConfigured() throws Exception { - final short REPL_FACTOR = 1; - final int FILE_LENGTH = 512; - Assume.assumeTrue(null == DomainSocket.getLoadingFailureReason()); - TemporarySocketDirectory socketDir = new TemporarySocketDirectory(); - HdfsConfiguration conf = getConfiguration(socketDir); - MiniDFSCluster cluster = - new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); - cluster.waitActive(); - socketDir.close(); - FileSystem fs = cluster.getFileSystem(); - - Path path = new Path("/foo"); - byte orig[] = new byte[FILE_LENGTH]; - for (int i = 0; i < orig.length; i++) { - orig[i] = (byte)(i%10); - } - FSDataOutputStream fos = fs.create(path, (short)1); - fos.write(orig); - fos.close(); - DFSTestUtil.waitReplication(fs, path, REPL_FACTOR); - FSDataInputStream fis = cluster.getFileSystem().open(path); - byte buf[] = new byte[FILE_LENGTH]; - IOUtils.readFully(fis, buf, 0, FILE_LENGTH); - fis.close(); - Assert.assertArrayEquals(orig, buf); - Arrays.equals(orig, buf); - cluster.shutdown(); - } - - @Test(timeout=20000) - public void testBlockReaderLocalLegacyWithAppend() throws Exception { - final short REPL_FACTOR = 1; - final HdfsConfiguration conf = getConfiguration(null); - conf.setBoolean(HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, true); - - final MiniDFSCluster cluster = - new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); - cluster.waitActive(); - - final DistributedFileSystem dfs = cluster.getFileSystem(); - final Path path = new Path("/testBlockReaderLocalLegacy"); - DFSTestUtil.createFile(dfs, path, 10, REPL_FACTOR, 0); - DFSTestUtil.waitReplication(dfs, path, REPL_FACTOR); - - final ClientDatanodeProtocol proxy; - final Token<BlockTokenIdentifier> token; - final ExtendedBlock originalBlock; - final long originalGS; - { - final LocatedBlock lb = cluster.getNameNode().getRpcServer() - .getBlockLocations(path.toString(), 0, 1).get(0); - proxy = DFSUtilClient.createClientDatanodeProtocolProxy( - lb.getLocations()[0], conf, 60000, false); - token = lb.getBlockToken(); - - // get block and generation stamp - final ExtendedBlock blk = new ExtendedBlock(lb.getBlock()); - originalBlock = new ExtendedBlock(blk); - originalGS = originalBlock.getGenerationStamp(); - - // test getBlockLocalPathInfo - final BlockLocalPathInfo info = proxy.getBlockLocalPathInfo(blk, token); - Assert.assertEquals(originalGS, info.getBlock().getGenerationStamp()); - } - - { // append one byte - FSDataOutputStream out = dfs.append(path); - out.write(1); - out.close(); - } - - { - // get new generation stamp - final LocatedBlock lb = cluster.getNameNode().getRpcServer() - .getBlockLocations(path.toString(), 0, 1).get(0); - final long newGS = lb.getBlock().getGenerationStamp(); - Assert.assertTrue(newGS > originalGS); - - // getBlockLocalPathInfo using the original block. - Assert.assertEquals(originalGS, originalBlock.getGenerationStamp()); - final BlockLocalPathInfo info = proxy.getBlockLocalPathInfo( - originalBlock, token); - Assert.assertEquals(newGS, info.getBlock().getGenerationStamp()); - } - cluster.shutdown(); - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
