http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6dfb717/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderUtil.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderUtil.java
new file mode 100644
index 0000000..57fbe47
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderUtil.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.client.impl;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.BlockReader;
+
+import java.io.IOException;
+
+/**
+ * For sharing between the local and remote block reader implementations.
+ */
[email protected]
+class BlockReaderUtil {
+
+  /* See {@link BlockReader#readAll(byte[], int, int)} */
+  public static int readAll(BlockReader reader,
+      byte[] buf, int offset, int len) throws IOException {
+    int n = 0;
+    for (;;) {
+      int nread = reader.read(buf, offset + n, len - n);
+      if (nread <= 0)
+        return (n == 0) ? nread : n;
+      n += nread;
+      if (n >= len)
+        return n;
+    }
+  }
+
+  /* See {@link BlockReader#readFully(byte[], int, int)} */
+  public static void readFully(BlockReader reader,
+      byte[] buf, int off, int len) throws IOException {
+    int toRead = len;
+    while (toRead > 0) {
+      int ret = reader.read(buf, off, toRead);
+      if (ret < 0) {
+        throw new IOException("Premature EOF from inputStream");
+      }
+      toRead -= ret;
+      off += ret;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6dfb717/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/ExternalBlockReader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/ExternalBlockReader.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/ExternalBlockReader.java
new file mode 100644
index 0000000..a060a6e
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/ExternalBlockReader.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.client.impl;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.EnumSet;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.ReadOption;
+import org.apache.hadoop.hdfs.BlockReader;
+import org.apache.hadoop.hdfs.ReplicaAccessor;
+import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
+
+/**
+ * An ExternalBlockReader uses pluggable ReplicaAccessor objects to read from
+ * replicas.
+ */
[email protected]
+public final class ExternalBlockReader implements BlockReader {
+  private final ReplicaAccessor accessor;
+  private final long visibleLength;
+  private long pos;
+
+  ExternalBlockReader(ReplicaAccessor accessor, long visibleLength,
+                      long startOffset) {
+    this.accessor = accessor;
+    this.visibleLength = visibleLength;
+    this.pos = startOffset;
+  }
+
+  @Override
+  public int read(byte[] buf, int off, int len) throws IOException {
+    int nread = accessor.read(pos, buf, off, len);
+    if (nread < 0) {
+      return nread;
+    }
+    pos += nread;
+    return nread;
+  }
+
+  @Override
+  public int read(ByteBuffer buf) throws IOException {
+    int nread = accessor.read(pos, buf);
+    if (nread < 0) {
+      return nread;
+    }
+    pos += nread;
+    return nread;
+  }
+
+  @Override
+  public long skip(long n) throws IOException {
+    // You cannot skip backwards
+    if (n <= 0) {
+      return 0;
+    }
+    // You can't skip past the last offset that we want to read with this
+    // block reader.
+    long oldPos = pos;
+    pos += n;
+    if (pos > visibleLength) {
+      pos = visibleLength;
+    }
+    return pos - oldPos;
+  }
+
+  @Override
+  public int available() {
+    // We return the amount of bytes between the current offset and the visible
+    // length.  Some of the other block readers return a shorter length than
+    // that.  The only advantage to returning a shorter length is that the
+    // DFSInputStream will trash your block reader and create a new one if
+    // someone tries to seek() beyond the available() region.
+    long diff = visibleLength - pos;
+    if (diff > Integer.MAX_VALUE) {
+      return Integer.MAX_VALUE;
+    } else {
+      return (int)diff;
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    accessor.close();
+  }
+
+  @Override
+  public void readFully(byte[] buf, int offset, int len) throws IOException {
+    BlockReaderUtil.readFully(this, buf, offset, len);
+  }
+
+  @Override
+  public int readAll(byte[] buf, int offset, int len) throws IOException {
+    return BlockReaderUtil.readAll(this, buf, offset, len);
+  }
+
+  @Override
+  public boolean isLocal() {
+    return accessor.isLocal();
+  }
+
+  @Override
+  public boolean isShortCircuit() {
+    return accessor.isShortCircuit();
+  }
+
+  @Override
+  public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
+    // For now, pluggable ReplicaAccessors do not support zero-copy.
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6dfb717/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml 
b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
index 3ae8b59..426fb72 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
@@ -133,7 +133,7 @@
 
      <!-- Don't complain about LocalDatanodeInfo's anonymous class -->
      <Match>
-       <Class 
name="org.apache.hadoop.hdfs.BlockReaderLocal$LocalDatanodeInfo$1" />
+       <Class 
name="org.apache.hadoop.hdfs.client.impl.BlockReaderLocal$LocalDatanodeInfo$1" 
/>
        <Bug pattern="SE_BAD_FIELD_INNER_CLASS" />
      </Match>
      <!-- Only one method increments numFailedVolumes and it is synchronized 
-->

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6dfb717/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
index 8f1b921..a813e50 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
@@ -43,7 +43,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.hdfs.BlockReader;
-import org.apache.hadoop.hdfs.BlockReaderFactory;
+import org.apache.hadoop.hdfs.client.impl.BlockReaderFactory;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtilClient;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6dfb717/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java
index 3455f55..f48bb01 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java
@@ -38,7 +38,7 @@ import org.apache.commons.lang.SystemUtils;
 import org.apache.commons.lang.mutable.MutableBoolean;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hdfs.BlockReaderTestUtil;
+import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil;
 import org.apache.hadoop.hdfs.ClientContext;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6dfb717/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
deleted file mode 100644
index ba25d97..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
+++ /dev/null
@@ -1,259 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.util.List;
-import java.util.Random;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FsTracer;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.net.Peer;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor;
-import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache;
-import org.apache.hadoop.hdfs.server.namenode.CacheManager;
-import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager;
-import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache;
-import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica;
-import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.token.Token;
-import org.apache.log4j.Level;
-import org.apache.log4j.LogManager;
-
-/**
- * A helper class to setup the cluster, and get to BlockReader and DataNode 
for a block.
- */
-public class BlockReaderTestUtil {
-  /**
-   * Returns true if we should run tests that generate large files (> 1GB)
-   */
-  static public boolean shouldTestLargeFiles() {
-    String property = System.getProperty("hdfs.test.large.files");
-    if (property == null) return false;
-    if (property.isEmpty()) return true;
-    return Boolean.parseBoolean(property);
-  }
-
-  private HdfsConfiguration conf = null;
-  private MiniDFSCluster cluster = null;
-
-  /**
-   * Setup the cluster
-   */
-  public BlockReaderTestUtil(int replicationFactor) throws Exception {
-    this(replicationFactor, new HdfsConfiguration());
-  }
-
-  public BlockReaderTestUtil(int replicationFactor, HdfsConfiguration config) 
throws Exception {
-    this.conf = config;
-    conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, replicationFactor);
-    cluster = new MiniDFSCluster.Builder(conf).format(true).build();
-    cluster.waitActive();
-  }
-
-  /**
-   * Shutdown cluster
-   */
-  public void shutdown() {
-    if (cluster != null) {
-      cluster.shutdown();
-    }
-  }
-
-  public MiniDFSCluster getCluster() {
-    return cluster;
-  }
-
-  public HdfsConfiguration getConf() {
-    return conf;
-  }
-
-  /**
-   * Create a file of the given size filled with random data.
-   * @return  File data.
-   */
-  public byte[] writeFile(Path filepath, int sizeKB)
-      throws IOException {
-    FileSystem fs = cluster.getFileSystem();
-
-    // Write a file with the specified amount of data
-    DataOutputStream os = fs.create(filepath);
-    byte data[] = new byte[1024 * sizeKB];
-    new Random().nextBytes(data);
-    os.write(data);
-    os.close();
-    return data;
-  }
-
-  /**
-   * Get the list of Blocks for a file.
-   */
-  public List<LocatedBlock> getFileBlocks(Path filepath, int sizeKB)
-      throws IOException {
-    // Return the blocks we just wrote
-    DFSClient dfsclient = getDFSClient();
-    return dfsclient.getNamenode().getBlockLocations(
-      filepath.toString(), 0, sizeKB * 1024).getLocatedBlocks();
-  }
-
-  /**
-   * Get the DFSClient.
-   */
-  public DFSClient getDFSClient() throws IOException {
-    InetSocketAddress nnAddr = new InetSocketAddress("localhost", 
cluster.getNameNodePort());
-    return new DFSClient(nnAddr, conf);
-  }
-
-  /**
-   * Exercise the BlockReader and read length bytes.
-   *
-   * It does not verify the bytes read.
-   */
-  public void readAndCheckEOS(BlockReader reader, int length, boolean 
expectEof)
-      throws IOException {
-    byte buf[] = new byte[1024];
-    int nRead = 0;
-    while (nRead < length) {
-      DFSClient.LOG.info("So far read " + nRead + " - going to read more.");
-      int n = reader.read(buf, 0, buf.length);
-      assertTrue(n > 0);
-      nRead += n;
-    }
-
-    if (expectEof) {
-      DFSClient.LOG.info("Done reading, expect EOF for next read.");
-      assertEquals(-1, reader.read(buf, 0, buf.length));
-    }
-  }
-
-  /**
-   * Get a BlockReader for the given block.
-   */
-  public BlockReader getBlockReader(LocatedBlock testBlock, int offset, int 
lenToRead)
-      throws IOException {
-    return getBlockReader(cluster, testBlock, offset, lenToRead);
-  }
-
-  /**
-   * Get a BlockReader for the given block.
-   */
-  public static BlockReader getBlockReader(MiniDFSCluster cluster,
-      LocatedBlock testBlock, int offset, int lenToRead) throws IOException {
-    InetSocketAddress targetAddr = null;
-    ExtendedBlock block = testBlock.getBlock();
-    DatanodeInfo[] nodes = testBlock.getLocations();
-    targetAddr = NetUtils.createSocketAddr(nodes[0].getXferAddr());
-
-    final DistributedFileSystem fs = cluster.getFileSystem();
-    return new BlockReaderFactory(fs.getClient().getConf()).
-      setInetSocketAddress(targetAddr).
-      setBlock(block).
-      setFileName(targetAddr.toString()+ ":" + block.getBlockId()).
-      setBlockToken(testBlock.getBlockToken()).
-      setStartOffset(offset).
-      setLength(lenToRead).
-      setVerifyChecksum(true).
-      setClientName("BlockReaderTestUtil").
-      setDatanodeInfo(nodes[0]).
-      setClientCacheContext(ClientContext.getFromConf(fs.getConf())).
-      setCachingStrategy(CachingStrategy.newDefaultStrategy()).
-      setConfiguration(fs.getConf()).
-      setAllowShortCircuitLocalReads(true).
-      setTracer(FsTracer.get(fs.getConf())).
-      setRemotePeerFactory(new RemotePeerFactory() {
-        @Override
-        public Peer newConnectedPeer(InetSocketAddress addr,
-            Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)
-            throws IOException {
-          Peer peer = null;
-          Socket sock = NetUtils.
-              getDefaultSocketFactory(fs.getConf()).createSocket();
-          try {
-            sock.connect(addr, HdfsConstants.READ_TIMEOUT);
-            sock.setSoTimeout(HdfsConstants.READ_TIMEOUT);
-            peer = DFSUtilClient.peerFromSocket(sock);
-          } finally {
-            if (peer == null) {
-              IOUtils.closeQuietly(sock);
-            }
-          }
-          return peer;
-        }
-      }).
-      build();
-  }
-
-  /**
-   * Get a DataNode that serves our testBlock.
-   */
-  public DataNode getDataNode(LocatedBlock testBlock) {
-    DatanodeInfo[] nodes = testBlock.getLocations();
-    int ipcport = nodes[0].getIpcPort();
-    return cluster.getDataNode(ipcport);
-  }
-  
-  public static void enableHdfsCachingTracing() {
-    LogManager.getLogger(CacheReplicationMonitor.class.getName()).setLevel(
-        Level.TRACE);
-    LogManager.getLogger(CacheManager.class.getName()).setLevel(
-        Level.TRACE);
-    LogManager.getLogger(FsDatasetCache.class.getName()).setLevel(
-        Level.TRACE);
-  }
-
-  public static void enableBlockReaderFactoryTracing() {
-    LogManager.getLogger(BlockReaderFactory.class.getName()).setLevel(
-        Level.TRACE);
-    LogManager.getLogger(ShortCircuitCache.class.getName()).setLevel(
-        Level.TRACE);
-    LogManager.getLogger(ShortCircuitReplica.class.getName()).setLevel(
-        Level.TRACE);
-    LogManager.getLogger(BlockReaderLocal.class.getName()).setLevel(
-        Level.TRACE);
-  }
-
-  public static void enableShortCircuitShmTracing() {
-    LogManager.getLogger(DfsClientShmManager.class.getName()).setLevel(
-        Level.TRACE);
-    LogManager.getLogger(ShortCircuitRegistry.class.getName()).setLevel(
-        Level.TRACE);
-    LogManager.getLogger(ShortCircuitShm.class.getName()).setLevel(
-        Level.TRACE);
-    LogManager.getLogger(DataNode.class.getName()).setLevel(
-        Level.TRACE);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6dfb717/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderBase.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderBase.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderBase.java
deleted file mode 100644
index 3d916a7..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderBase.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.util.Random;
-
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-abstract public class TestBlockReaderBase {
-  private BlockReaderTestUtil util;
-  private byte[] blockData;
-  private BlockReader reader;
-
-  /**
-   * if override this, make sure return array length is less than
-   * block size.
-   */
-  byte [] getBlockData() {
-    int length = 1 << 22;
-    byte[] data = new byte[length];
-    for (int i = 0; i < length; i++) {
-      data[i] = (byte) (i % 133);
-    }
-    return data;
-  }
-
-  private BlockReader getBlockReader(LocatedBlock block) throws Exception {
-    return util.getBlockReader(block, 0, blockData.length);
-  }
-
-  abstract HdfsConfiguration createConf();
-
-  @Before
-  public void setup() throws Exception {
-    util = new BlockReaderTestUtil(1, createConf());
-    blockData = getBlockData();
-    DistributedFileSystem fs = util.getCluster().getFileSystem();
-    Path testfile = new Path("/testfile");
-    FSDataOutputStream fout = fs.create(testfile);
-    fout.write(blockData);
-    fout.close();
-    LocatedBlock blk = util.getFileBlocks(testfile, blockData.length).get(0);
-    reader = getBlockReader(blk);
-  }
-
-  @After
-  public void shutdown() throws Exception {
-    util.shutdown();
-  }
-
-  @Test(timeout=60000)
-  public void testSkip() throws IOException {
-    Random random = new Random();
-    byte [] buf = new byte[1];
-    for (int pos = 0; pos < blockData.length;) {
-      long skip = random.nextInt(100) + 1;
-      long skipped = reader.skip(skip);
-      if (pos + skip >= blockData.length) {
-        assertEquals(blockData.length, pos + skipped);
-        break;
-      } else {
-        assertEquals(skip, skipped);
-        pos += skipped;
-        assertEquals(1, reader.read(buf, 0, 1));
-
-        assertEquals(blockData[pos], buf[0]);
-        pos += 1;
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6dfb717/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java
deleted file mode 100644
index a8ca9c7..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java
+++ /dev/null
@@ -1,534 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
-import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT;
-import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY;
-import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS;
-import static org.hamcrest.CoreMatchers.equalTo;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.channels.ClosedByInterruptException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import 
org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.PerDatanodeVisitorInfo;
-import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.Visitor;
-import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache;
-import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplicaInfo;
-import org.apache.hadoop.net.unix.DomainSocket;
-import org.apache.hadoop.net.unix.TemporarySocketDirectory;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Assume;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.google.common.util.concurrent.Uninterruptibles;
-
-public class TestBlockReaderFactory {
-  static final Log LOG = LogFactory.getLog(TestBlockReaderFactory.class);
-
-  @Before
-  public void init() {
-    DomainSocket.disableBindPathValidation();
-    Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
-  }
-
-  @After
-  public void cleanup() {
-    DFSInputStream.tcpReadsDisabledForTesting = false;
-    BlockReaderFactory.createShortCircuitReplicaInfoCallback = null;
-  }
-
-  public static Configuration createShortCircuitConf(String testName,
-      TemporarySocketDirectory sockDir) {
-    Configuration conf = new Configuration();
-    conf.set(DFS_CLIENT_CONTEXT, testName);
-    conf.setLong(DFS_BLOCK_SIZE_KEY, 4096);
-    conf.set(DFS_DOMAIN_SOCKET_PATH_KEY, new File(sockDir.getDir(),
-        testName + "._PORT").getAbsolutePath());
-    conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
-    conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY,
-        false);
-    conf.setBoolean(DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, false);
-    return conf;
-  }
-
-  /**
-   * If we have a UNIX domain socket configured,
-   * and we have dfs.client.domain.socket.data.traffic set to true,
-   * and short-circuit access fails, we should still be able to pass
-   * data traffic over the UNIX domain socket.  Test this.
-   */
-  @Test(timeout=60000)
-  public void testFallbackFromShortCircuitToUnixDomainTraffic()
-      throws Exception {
-    DFSInputStream.tcpReadsDisabledForTesting = true;
-    TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
-
-    // The server is NOT configured with short-circuit local reads;
-    // the client is.  Both support UNIX domain reads.
-    Configuration clientConf = createShortCircuitConf(
-        "testFallbackFromShortCircuitToUnixDomainTraffic", sockDir);
-    clientConf.set(DFS_CLIENT_CONTEXT,
-        "testFallbackFromShortCircuitToUnixDomainTraffic_clientContext");
-    clientConf.setBoolean(DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, true);
-    Configuration serverConf = new Configuration(clientConf);
-    serverConf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, false);
-
-    MiniDFSCluster cluster =
-        new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build();
-    cluster.waitActive();
-    FileSystem dfs = FileSystem.get(cluster.getURI(0), clientConf);
-    String TEST_FILE = "/test_file";
-    final int TEST_FILE_LEN = 8193;
-    final int SEED = 0xFADED;
-    DFSTestUtil.createFile(dfs, new Path(TEST_FILE), TEST_FILE_LEN,
-        (short)1, SEED);
-    byte contents[] = DFSTestUtil.readFileBuffer(dfs, new Path(TEST_FILE));
-    byte expected[] = DFSTestUtil.
-        calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
-    Assert.assertTrue(Arrays.equals(contents, expected));
-    cluster.shutdown();
-    sockDir.close();
-  }
-  
-  /**
-   * Test the case where we have multiple threads waiting on the
-   * ShortCircuitCache delivering a certain ShortCircuitReplica.
-   *
-   * In this case, there should only be one call to
-   * createShortCircuitReplicaInfo.  This one replica should be shared
-   * by all threads.
-   */
-  @Test(timeout=60000)
-  public void testMultipleWaitersOnShortCircuitCache()
-      throws Exception {
-    final CountDownLatch latch = new CountDownLatch(1);
-    final AtomicBoolean creationIsBlocked = new AtomicBoolean(true);
-    final AtomicBoolean testFailed = new AtomicBoolean(false);
-    DFSInputStream.tcpReadsDisabledForTesting = true;
-    BlockReaderFactory.createShortCircuitReplicaInfoCallback =
-      new ShortCircuitCache.ShortCircuitReplicaCreator() {
-        @Override
-        public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
-          Uninterruptibles.awaitUninterruptibly(latch);
-          if (!creationIsBlocked.compareAndSet(true, false)) {
-            Assert.fail("there were multiple calls to "
-                + "createShortCircuitReplicaInfo.  Only one was expected.");
-          }
-          return null;
-        }
-      };
-    TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
-    Configuration conf = createShortCircuitConf(
-        "testMultipleWaitersOnShortCircuitCache", sockDir);
-    MiniDFSCluster cluster =
-        new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
-    cluster.waitActive();
-    final DistributedFileSystem dfs = cluster.getFileSystem();
-    final String TEST_FILE = "/test_file";
-    final int TEST_FILE_LEN = 4000;
-    final int SEED = 0xFADED;
-    final int NUM_THREADS = 10;
-    DFSTestUtil.createFile(dfs, new Path(TEST_FILE), TEST_FILE_LEN,
-        (short)1, SEED);
-    Runnable readerRunnable = new Runnable() {
-      @Override
-      public void run() {
-        try {
-          byte contents[] = DFSTestUtil.readFileBuffer(dfs, new 
Path(TEST_FILE));
-          Assert.assertFalse(creationIsBlocked.get());
-          byte expected[] = DFSTestUtil.
-              calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
-          Assert.assertTrue(Arrays.equals(contents, expected));
-        } catch (Throwable e) {
-          LOG.error("readerRunnable error", e);
-          testFailed.set(true);
-        }
-      }
-    };
-    Thread threads[] = new Thread[NUM_THREADS];
-    for (int i = 0; i < NUM_THREADS; i++) {
-      threads[i] = new Thread(readerRunnable);
-      threads[i].start();
-    }
-    Thread.sleep(500);
-    latch.countDown();
-    for (int i = 0; i < NUM_THREADS; i++) {
-      Uninterruptibles.joinUninterruptibly(threads[i]);
-    }
-    cluster.shutdown();
-    sockDir.close();
-    Assert.assertFalse(testFailed.get());
-  }
-
-  /**
-   * Test the case where we have a failure to complete a short circuit read
-   * that occurs, and then later on, we have a success.
-   * Any thread waiting on a cache load should receive the failure (if it
-   * occurs);  however, the failure result should not be cached.  We want 
-   * to be able to retry later and succeed.
-   */
-  @Test(timeout=60000)
-  public void testShortCircuitCacheTemporaryFailure()
-      throws Exception {
-    BlockReaderTestUtil.enableBlockReaderFactoryTracing();
-    final AtomicBoolean replicaCreationShouldFail = new AtomicBoolean(true);
-    final AtomicBoolean testFailed = new AtomicBoolean(false);
-    DFSInputStream.tcpReadsDisabledForTesting = true;
-    BlockReaderFactory.createShortCircuitReplicaInfoCallback =
-      new ShortCircuitCache.ShortCircuitReplicaCreator() {
-        @Override
-        public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
-          if (replicaCreationShouldFail.get()) {
-            // Insert a short delay to increase the chance that one client
-            // thread waits for the other client thread's failure via
-            // a condition variable.
-            Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
-            return new ShortCircuitReplicaInfo();
-          }
-          return null;
-        }
-      };
-    TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
-    Configuration conf = createShortCircuitConf(
-        "testShortCircuitCacheTemporaryFailure", sockDir);
-    final MiniDFSCluster cluster =
-        new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
-    cluster.waitActive();
-    final DistributedFileSystem dfs = cluster.getFileSystem();
-    final String TEST_FILE = "/test_file";
-    final int TEST_FILE_LEN = 4000;
-    final int NUM_THREADS = 2;
-    final int SEED = 0xFADED;
-    final CountDownLatch gotFailureLatch = new CountDownLatch(NUM_THREADS);
-    final CountDownLatch shouldRetryLatch = new CountDownLatch(1);
-    DFSTestUtil.createFile(dfs, new Path(TEST_FILE), TEST_FILE_LEN,
-        (short)1, SEED);
-    Runnable readerRunnable = new Runnable() {
-      @Override
-      public void run() {
-        try {
-          // First time should fail.
-          List<LocatedBlock> locatedBlocks = 
-              cluster.getNameNode().getRpcServer().getBlockLocations(
-              TEST_FILE, 0, TEST_FILE_LEN).getLocatedBlocks();
-          LocatedBlock lblock = locatedBlocks.get(0); // first block
-          BlockReader blockReader = null;
-          try {
-            blockReader = BlockReaderTestUtil.
-                getBlockReader(cluster, lblock, 0, TEST_FILE_LEN);
-            Assert.fail("expected getBlockReader to fail the first time.");
-          } catch (Throwable t) { 
-            Assert.assertTrue("expected to see 'TCP reads were disabled " +
-                "for testing' in exception " + t, t.getMessage().contains(
-                "TCP reads were disabled for testing"));
-          } finally {
-            if (blockReader != null) blockReader.close(); // keep findbugs 
happy
-          }
-          gotFailureLatch.countDown();
-          shouldRetryLatch.await();
-
-          // Second time should succeed.
-          try {
-            blockReader = BlockReaderTestUtil.
-                getBlockReader(cluster, lblock, 0, TEST_FILE_LEN);
-          } catch (Throwable t) { 
-            LOG.error("error trying to retrieve a block reader " +
-                "the second time.", t);
-            throw t;
-          } finally {
-            if (blockReader != null) blockReader.close();
-          }
-        } catch (Throwable t) {
-          LOG.error("getBlockReader failure", t);
-          testFailed.set(true);
-        }
-      }
-    };
-    Thread threads[] = new Thread[NUM_THREADS];
-    for (int i = 0; i < NUM_THREADS; i++) {
-      threads[i] = new Thread(readerRunnable);
-      threads[i].start();
-    }
-    gotFailureLatch.await();
-    replicaCreationShouldFail.set(false);
-    shouldRetryLatch.countDown();
-    for (int i = 0; i < NUM_THREADS; i++) {
-      Uninterruptibles.joinUninterruptibly(threads[i]);
-    }
-    cluster.shutdown();
-    sockDir.close();
-    Assert.assertFalse(testFailed.get());
-  }
-
-   /**
-   * Test that a client which supports short-circuit reads using
-   * shared memory can fall back to not using shared memory when
-   * the server doesn't support it.
-   */
-  @Test
-  public void testShortCircuitReadFromServerWithoutShm() throws Exception {
-    TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
-    Configuration clientConf = createShortCircuitConf(
-        "testShortCircuitReadFromServerWithoutShm", sockDir);
-    Configuration serverConf = new Configuration(clientConf);
-    serverConf.setInt(
-        DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS, 0);
-    DFSInputStream.tcpReadsDisabledForTesting = true;
-    final MiniDFSCluster cluster =
-        new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build();
-    cluster.waitActive();
-    clientConf.set(DFS_CLIENT_CONTEXT,
-        "testShortCircuitReadFromServerWithoutShm_clientContext");
-    final DistributedFileSystem fs =
-        (DistributedFileSystem)FileSystem.get(cluster.getURI(0), clientConf);
-    final String TEST_FILE = "/test_file";
-    final int TEST_FILE_LEN = 4000;
-    final int SEED = 0xFADEC;
-    DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN,
-        (short)1, SEED);
-    byte contents[] = DFSTestUtil.readFileBuffer(fs, new Path(TEST_FILE));
-    byte expected[] = DFSTestUtil.
-        calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
-    Assert.assertTrue(Arrays.equals(contents, expected));
-    final ShortCircuitCache cache =
-        fs.dfs.getClientContext().getShortCircuitCache();
-    final DatanodeInfo datanode =
-        new DatanodeInfo(cluster.getDataNodes().get(0).getDatanodeId());
-    cache.getDfsClientShmManager().visit(new Visitor() {
-      @Override
-      public void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info)
-          throws IOException {
-        Assert.assertEquals(1,  info.size());
-        PerDatanodeVisitorInfo vinfo = info.get(datanode);
-        Assert.assertTrue(vinfo.disabled);
-        Assert.assertEquals(0, vinfo.full.size());
-        Assert.assertEquals(0, vinfo.notFull.size());
-      }
-    });
-    cluster.shutdown();
-    sockDir.close();
-  }
- 
-  /**
-   * Test that a client which does not support short-circuit reads using
-   * shared memory can talk with a server which supports it.
-   */
-  @Test
-  public void testShortCircuitReadFromClientWithoutShm() throws Exception {
-    TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
-    Configuration clientConf = createShortCircuitConf(
-        "testShortCircuitReadWithoutShm", sockDir);
-    Configuration serverConf = new Configuration(clientConf);
-    DFSInputStream.tcpReadsDisabledForTesting = true;
-    final MiniDFSCluster cluster =
-        new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build();
-    cluster.waitActive();
-    clientConf.setInt(
-        DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS, 0);
-    clientConf.set(DFS_CLIENT_CONTEXT,
-        "testShortCircuitReadFromClientWithoutShm_clientContext");
-    final DistributedFileSystem fs =
-        (DistributedFileSystem)FileSystem.get(cluster.getURI(0), clientConf);
-    final String TEST_FILE = "/test_file";
-    final int TEST_FILE_LEN = 4000;
-    final int SEED = 0xFADEC;
-    DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN,
-        (short)1, SEED);
-    byte contents[] = DFSTestUtil.readFileBuffer(fs, new Path(TEST_FILE));
-    byte expected[] = DFSTestUtil.
-        calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
-    Assert.assertTrue(Arrays.equals(contents, expected));
-    final ShortCircuitCache cache =
-        fs.dfs.getClientContext().getShortCircuitCache();
-    Assert.assertEquals(null, cache.getDfsClientShmManager());
-    cluster.shutdown();
-    sockDir.close();
-  }
-  
-  /**
-   * Test shutting down the ShortCircuitCache while there are things in it.
-   */
-  @Test
-  public void testShortCircuitCacheShutdown() throws Exception {
-    TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
-    Configuration conf = createShortCircuitConf(
-        "testShortCircuitCacheShutdown", sockDir);
-    conf.set(DFS_CLIENT_CONTEXT, "testShortCircuitCacheShutdown");
-    Configuration serverConf = new Configuration(conf);
-    DFSInputStream.tcpReadsDisabledForTesting = true;
-    final MiniDFSCluster cluster =
-        new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build();
-    cluster.waitActive();
-    final DistributedFileSystem fs =
-        (DistributedFileSystem)FileSystem.get(cluster.getURI(0), conf);
-    final String TEST_FILE = "/test_file";
-    final int TEST_FILE_LEN = 4000;
-    final int SEED = 0xFADEC;
-    DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN,
-        (short)1, SEED);
-    byte contents[] = DFSTestUtil.readFileBuffer(fs, new Path(TEST_FILE));
-    byte expected[] = DFSTestUtil.
-        calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
-    Assert.assertTrue(Arrays.equals(contents, expected));
-    final ShortCircuitCache cache =
-        fs.dfs.getClientContext().getShortCircuitCache();
-    cache.close();
-    Assert.assertTrue(cache.getDfsClientShmManager().
-        getDomainSocketWatcher().isClosed());
-    cluster.shutdown();
-    sockDir.close();
-  }
-
-  /**
-   * When an InterruptedException is sent to a thread calling
-   * FileChannel#read, the FileChannel is immediately closed and the
-   * thread gets an exception.  This effectively means that we might have
-   * someone asynchronously calling close() on the file descriptors we use
-   * in BlockReaderLocal.  So when unreferencing a ShortCircuitReplica in
-   * ShortCircuitCache#unref, we should check if the FileChannel objects
-   * are still open.  If not, we should purge the replica to avoid giving
-   * it out to any future readers.
-   *
-   * This is a regression test for HDFS-6227: Short circuit read failed
-   * due to ClosedChannelException.
-   *
-   * Note that you may still get ClosedChannelException errors if two threads
-   * are reading from the same replica and an InterruptedException is delivered
-   * to one of them.
-   */
-  @Test(timeout=120000)
-  public void testPurgingClosedReplicas() throws Exception {
-    BlockReaderTestUtil.enableBlockReaderFactoryTracing();
-    final AtomicInteger replicasCreated = new AtomicInteger(0);
-    final AtomicBoolean testFailed = new AtomicBoolean(false);
-    DFSInputStream.tcpReadsDisabledForTesting = true;
-    BlockReaderFactory.createShortCircuitReplicaInfoCallback =
-        new ShortCircuitCache.ShortCircuitReplicaCreator() {
-          @Override
-          public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
-            replicasCreated.incrementAndGet();
-            return null;
-          }
-        };
-    TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
-    Configuration conf = createShortCircuitConf(
-        "testPurgingClosedReplicas", sockDir);
-    final MiniDFSCluster cluster =
-        new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
-    cluster.waitActive();
-    final DistributedFileSystem dfs = cluster.getFileSystem();
-    final String TEST_FILE = "/test_file";
-    final int TEST_FILE_LEN = 4095;
-    final int SEED = 0xFADE0;
-    final DistributedFileSystem fs =
-        (DistributedFileSystem)FileSystem.get(cluster.getURI(0), conf);
-    DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN,
-        (short)1, SEED);
-
-    final Semaphore sem = new Semaphore(0);
-    final List<LocatedBlock> locatedBlocks =
-        cluster.getNameNode().getRpcServer().getBlockLocations(
-            TEST_FILE, 0, TEST_FILE_LEN).getLocatedBlocks();
-    final LocatedBlock lblock = locatedBlocks.get(0); // first block
-    final byte[] buf = new byte[TEST_FILE_LEN];
-    Runnable readerRunnable = new Runnable() {
-      @Override
-      public void run() {
-        try {
-          while (true) {
-            BlockReader blockReader = null;
-            try {
-              blockReader = BlockReaderTestUtil.
-                  getBlockReader(cluster, lblock, 0, TEST_FILE_LEN);
-              sem.release();
-              try {
-                blockReader.readAll(buf, 0, TEST_FILE_LEN);
-              } finally {
-                sem.acquireUninterruptibly();
-              }
-            } catch (ClosedByInterruptException e) {
-              LOG.info("got the expected ClosedByInterruptException", e);
-              sem.release();
-              break;
-            } finally {
-              if (blockReader != null) blockReader.close();
-            }
-            LOG.info("read another " + TEST_FILE_LEN + " bytes.");
-          }
-        } catch (Throwable t) {
-          LOG.error("getBlockReader failure", t);
-          testFailed.set(true);
-          sem.release();
-        }
-      }
-    };
-    Thread thread = new Thread(readerRunnable);
-    thread.start();
-
-    // While the thread is reading, send it interrupts.
-    // These should trigger a ClosedChannelException.
-    while (thread.isAlive()) {
-      sem.acquireUninterruptibly();
-      thread.interrupt();
-      sem.release();
-    }
-    Assert.assertFalse(testFailed.get());
-
-    // We should be able to read from the file without
-    // getting a ClosedChannelException.
-    BlockReader blockReader = null;
-    try {
-      blockReader = BlockReaderTestUtil.
-          getBlockReader(cluster, lblock, 0, TEST_FILE_LEN);
-      blockReader.readFully(buf, 0, TEST_FILE_LEN);
-    } finally {
-      if (blockReader != null) blockReader.close();
-    }
-    byte expected[] = DFSTestUtil.
-        calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
-    Assert.assertTrue(Arrays.equals(buf, expected));
-
-    // Another ShortCircuitReplica object should have been created.
-    Assert.assertEquals(2, replicasCreated.get());
-
-    dfs.close();
-    cluster.shutdown();
-    sockDir.close();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6dfb717/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java
deleted file mode 100644
index 2d6c63a..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java
+++ /dev/null
@@ -1,778 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import static org.hamcrest.CoreMatchers.equalTo;
-
-import java.io.EOFException;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.util.UUID;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.hadoop.fs.ChecksumException;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
-import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
-import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
-import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache;
-import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica;
-import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm;
-import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId;
-import org.apache.hadoop.fs.FsTracer;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.net.unix.DomainSocket;
-import org.apache.hadoop.net.unix.TemporarySocketDirectory;
-import org.apache.hadoop.util.Time;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Assume;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class TestBlockReaderLocal {
-  private static TemporarySocketDirectory sockDir;
-  
-  @BeforeClass
-  public static void init() {
-    sockDir = new TemporarySocketDirectory();
-    DomainSocket.disableBindPathValidation();
-  }
-  
-  @AfterClass
-  public static void shutdown() throws IOException {
-    sockDir.close();
-  }
-  
-  public static void assertArrayRegionsEqual(byte []buf1, int off1, byte 
[]buf2,
-      int off2, int len) {
-    for (int i = 0; i < len; i++) {
-      if (buf1[off1 + i] != buf2[off2 + i]) {
-        Assert.fail("arrays differ at byte " +  i + ". " + 
-          "The first array has " + (int)buf1[off1 + i] + 
-          ", but the second array has " + (int)buf2[off2 + i]);
-      }
-    }
-  }
-
-  /**
-   * Similar to IOUtils#readFully(). Reads bytes in a loop.
-   *
-   * @param reader           The BlockReaderLocal to read bytes from
-   * @param buf              The ByteBuffer to read into
-   * @param off              The offset in the buffer to read into
-   * @param len              The number of bytes to read.
-   * 
-   * @throws IOException     If it could not read the requested number of bytes
-   */
-  private static void readFully(BlockReaderLocal reader,
-      ByteBuffer buf, int off, int len) throws IOException {
-    int amt = len;
-    while (amt > 0) {
-      buf.limit(off + len);
-      buf.position(off);
-      long ret = reader.read(buf);
-      if (ret < 0) {
-        throw new EOFException( "Premature EOF from BlockReaderLocal " +
-            "after reading " + (len - amt) + " byte(s).");
-      }
-      amt -= ret;
-      off += ret;
-    }
-  }
-
-  private static class BlockReaderLocalTest {
-    final static int TEST_LENGTH = 12345;
-    final static int BYTES_PER_CHECKSUM = 512;
-
-    public void setConfiguration(HdfsConfiguration conf) {
-      // default: no-op
-    }
-    public void setup(File blockFile, boolean usingChecksums)
-        throws IOException {
-      // default: no-op
-    }
-    public void doTest(BlockReaderLocal reader, byte original[])
-        throws IOException {
-      // default: no-op
-    }
-  }
-  
-  public void runBlockReaderLocalTest(BlockReaderLocalTest test,
-      boolean checksum, long readahead) throws IOException {
-    Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
-    MiniDFSCluster cluster = null;
-    HdfsConfiguration conf = new HdfsConfiguration();
-    conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY,
-        !checksum);
-    conf.setLong(HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY,
-        BlockReaderLocalTest.BYTES_PER_CHECKSUM);
-    conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "CRC32C");
-    conf.setLong(HdfsClientConfigKeys.DFS_CLIENT_CACHE_READAHEAD, readahead);
-    test.setConfiguration(conf);
-    FileInputStream dataIn = null, metaIn = null;
-    final Path TEST_PATH = new Path("/a");
-    final long RANDOM_SEED = 4567L;
-    BlockReaderLocal blockReaderLocal = null;
-    FSDataInputStream fsIn = null;
-    byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH];
-    
-    FileSystem fs = null;
-    ShortCircuitShm shm = null;
-    RandomAccessFile raf = null;
-    try {
-      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
-      cluster.waitActive();
-      fs = cluster.getFileSystem();
-      DFSTestUtil.createFile(fs, TEST_PATH,
-          BlockReaderLocalTest.TEST_LENGTH, (short)1, RANDOM_SEED);
-      try {
-        DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
-      } catch (InterruptedException e) {
-        Assert.fail("unexpected InterruptedException during " +
-            "waitReplication: " + e);
-      } catch (TimeoutException e) {
-        Assert.fail("unexpected TimeoutException during " +
-            "waitReplication: " + e);
-      }
-      fsIn = fs.open(TEST_PATH);
-      IOUtils.readFully(fsIn, original, 0,
-          BlockReaderLocalTest.TEST_LENGTH);
-      fsIn.close();
-      fsIn = null;
-      ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, TEST_PATH);
-      File dataFile = cluster.getBlockFile(0, block);
-      File metaFile = cluster.getBlockMetadataFile(0, block);
-
-      ShortCircuitCache shortCircuitCache =
-          ClientContext.getFromConf(conf).getShortCircuitCache();
-      cluster.shutdown();
-      cluster = null;
-      test.setup(dataFile, checksum);
-      FileInputStream streams[] = {
-          new FileInputStream(dataFile),
-          new FileInputStream(metaFile)
-      };
-      dataIn = streams[0];
-      metaIn = streams[1];
-      ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(),
-          block.getBlockPoolId());
-      raf = new RandomAccessFile(
-          new File(sockDir.getDir().getAbsolutePath(),
-            UUID.randomUUID().toString()), "rw");
-      raf.setLength(8192);
-      FileInputStream shmStream = new FileInputStream(raf.getFD());
-      shm = new ShortCircuitShm(ShmId.createRandom(), shmStream);
-      ShortCircuitReplica replica = 
-          new ShortCircuitReplica(key, dataIn, metaIn, shortCircuitCache,
-              Time.now(), shm.allocAndRegisterSlot(
-                  ExtendedBlockId.fromExtendedBlock(block)));
-      blockReaderLocal = new BlockReaderLocal.Builder(
-              new DfsClientConf.ShortCircuitConf(conf)).
-          setFilename(TEST_PATH.getName()).
-          setBlock(block).
-          setShortCircuitReplica(replica).
-          setCachingStrategy(new CachingStrategy(false, readahead)).
-          setVerifyChecksum(checksum).
-          setTracer(FsTracer.get(conf)).
-          build();
-      dataIn = null;
-      metaIn = null;
-      test.doTest(blockReaderLocal, original);
-      // BlockReaderLocal should not alter the file position.
-      Assert.assertEquals(0, streams[0].getChannel().position());
-      Assert.assertEquals(0, streams[1].getChannel().position());
-    } finally {
-      if (fsIn != null) fsIn.close();
-      if (fs != null) fs.close();
-      if (cluster != null) cluster.shutdown();
-      if (dataIn != null) dataIn.close();
-      if (metaIn != null) metaIn.close();
-      if (blockReaderLocal != null) blockReaderLocal.close();
-      if (shm != null) shm.free();
-      if (raf != null) raf.close();
-    }
-  }
-  
-  private static class TestBlockReaderLocalImmediateClose 
-      extends BlockReaderLocalTest {
-  }
-  
-  @Test
-  public void testBlockReaderLocalImmediateClose() throws IOException {
-    runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), true, 0);
-    runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), false, 
0);
-  }
-  
-  private static class TestBlockReaderSimpleReads 
-      extends BlockReaderLocalTest {
-    @Override
-    public void doTest(BlockReaderLocal reader, byte original[]) 
-        throws IOException {
-      byte buf[] = new byte[TEST_LENGTH];
-      reader.readFully(buf, 0, 512);
-      assertArrayRegionsEqual(original, 0, buf, 0, 512);
-      reader.readFully(buf, 512, 512);
-      assertArrayRegionsEqual(original, 512, buf, 512, 512);
-      reader.readFully(buf, 1024, 513);
-      assertArrayRegionsEqual(original, 1024, buf, 1024, 513);
-      reader.readFully(buf, 1537, 514);
-      assertArrayRegionsEqual(original, 1537, buf, 1537, 514);
-      // Readahead is always at least the size of one chunk in this test.
-      Assert.assertTrue(reader.getMaxReadaheadLength() >=
-          BlockReaderLocalTest.BYTES_PER_CHECKSUM);
-    }
-  }
-  
-  @Test
-  public void testBlockReaderSimpleReads() throws IOException {
-    runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), true,
-        HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
-  }
-
-  @Test
-  public void testBlockReaderSimpleReadsShortReadahead() throws IOException {
-    runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), true,
-        BlockReaderLocalTest.BYTES_PER_CHECKSUM - 1);
-  }
-
-  @Test
-  public void testBlockReaderSimpleReadsNoChecksum() throws IOException {
-    runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), false,
-        HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
-  }
-
-  @Test
-  public void testBlockReaderSimpleReadsNoReadahead() throws IOException {
-    runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), true, 0);
-  }
-
-  @Test
-  public void testBlockReaderSimpleReadsNoChecksumNoReadahead()
-      throws IOException {
-    runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), false, 0);
-  }
-  
-  private static class TestBlockReaderLocalArrayReads2 
-      extends BlockReaderLocalTest {
-    @Override
-    public void doTest(BlockReaderLocal reader, byte original[]) 
-        throws IOException {
-      byte buf[] = new byte[TEST_LENGTH];
-      reader.readFully(buf, 0, 10);
-      assertArrayRegionsEqual(original, 0, buf, 0, 10);
-      reader.readFully(buf, 10, 100);
-      assertArrayRegionsEqual(original, 10, buf, 10, 100);
-      reader.readFully(buf, 110, 700);
-      assertArrayRegionsEqual(original, 110, buf, 110, 700);
-      reader.readFully(buf, 810, 1); // from offset 810 to offset 811
-      reader.readFully(buf, 811, 5);
-      assertArrayRegionsEqual(original, 811, buf, 811, 5);
-      reader.readFully(buf, 816, 900); // skip from offset 816 to offset 1716
-      reader.readFully(buf, 1716, 5);
-      assertArrayRegionsEqual(original, 1716, buf, 1716, 5);
-    }
-  }
-  
-  @Test
-  public void testBlockReaderLocalArrayReads2() throws IOException {
-    runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(),
-        true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
-  }
-
-  @Test
-  public void testBlockReaderLocalArrayReads2NoChecksum()
-      throws IOException {
-    runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(),
-        false, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
-  }
-
-  @Test
-  public void testBlockReaderLocalArrayReads2NoReadahead()
-      throws IOException {
-    runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(), true, 0);
-  }
-
-  @Test
-  public void testBlockReaderLocalArrayReads2NoChecksumNoReadahead()
-      throws IOException {
-    runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(), false, 0);
-  }
-
-  private static class TestBlockReaderLocalByteBufferReads 
-      extends BlockReaderLocalTest {
-    @Override
-    public void doTest(BlockReaderLocal reader, byte original[]) 
-        throws IOException {
-      ByteBuffer buf = ByteBuffer.wrap(new byte[TEST_LENGTH]);
-      readFully(reader, buf, 0, 10);
-      assertArrayRegionsEqual(original, 0, buf.array(), 0, 10);
-      readFully(reader, buf, 10, 100);
-      assertArrayRegionsEqual(original, 10, buf.array(), 10, 100);
-      readFully(reader, buf, 110, 700);
-      assertArrayRegionsEqual(original, 110, buf.array(), 110, 700);
-      reader.skip(1); // skip from offset 810 to offset 811
-      readFully(reader, buf, 811, 5);
-      assertArrayRegionsEqual(original, 811, buf.array(), 811, 5);
-    }
-  }
-  
-  @Test
-  public void testBlockReaderLocalByteBufferReads()
-      throws IOException {
-    runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferReads(),
-        true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
-  }
-
-  @Test
-  public void testBlockReaderLocalByteBufferReadsNoChecksum()
-      throws IOException {
-    runBlockReaderLocalTest(
-        new TestBlockReaderLocalByteBufferReads(),
-        false, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
-  }
-  
-  @Test
-  public void testBlockReaderLocalByteBufferReadsNoReadahead()
-      throws IOException {
-    runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferReads(),
-        true, 0);
-  }
-
-  @Test
-  public void testBlockReaderLocalByteBufferReadsNoChecksumNoReadahead()
-      throws IOException {
-    runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferReads(),
-        false, 0);
-  }
-
-  /**
-   * Test reads that bypass the bounce buffer (because they are aligned
-   * and bigger than the readahead).
-   */
-  private static class TestBlockReaderLocalByteBufferFastLaneReads 
-      extends BlockReaderLocalTest {
-    @Override
-    public void doTest(BlockReaderLocal reader, byte original[])
-        throws IOException {
-      ByteBuffer buf = ByteBuffer.allocateDirect(TEST_LENGTH);
-      readFully(reader, buf, 0, 5120);
-      buf.flip();
-      assertArrayRegionsEqual(original, 0,
-          DFSTestUtil.asArray(buf), 0,
-          5120);
-      reader.skip(1537);
-      readFully(reader, buf, 0, 1);
-      buf.flip();
-      assertArrayRegionsEqual(original, 6657,
-          DFSTestUtil.asArray(buf), 0,
-          1);
-      reader.forceAnchorable();
-      readFully(reader, buf, 0, 5120);
-      buf.flip();
-      assertArrayRegionsEqual(original, 6658,
-          DFSTestUtil.asArray(buf), 0,
-          5120);
-      reader.forceUnanchorable();
-      readFully(reader, buf, 0, 513);
-      buf.flip();
-      assertArrayRegionsEqual(original, 11778,
-          DFSTestUtil.asArray(buf), 0,
-          513);
-      reader.skip(3);
-      readFully(reader, buf, 0, 50);
-      buf.flip();
-      assertArrayRegionsEqual(original, 12294,
-          DFSTestUtil.asArray(buf), 0,
-          50);
-    }
-  }
-  
-  @Test
-  public void testBlockReaderLocalByteBufferFastLaneReads()
-      throws IOException {
-    runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferFastLaneReads(),
-        true, 2 * BlockReaderLocalTest.BYTES_PER_CHECKSUM);
-  }
-
-  @Test
-  public void testBlockReaderLocalByteBufferFastLaneReadsNoChecksum()
-      throws IOException {
-    runBlockReaderLocalTest(
-        new TestBlockReaderLocalByteBufferFastLaneReads(),
-        false, 2 * BlockReaderLocalTest.BYTES_PER_CHECKSUM);
-  }
-
-  @Test
-  public void testBlockReaderLocalByteBufferFastLaneReadsNoReadahead()
-      throws IOException {
-    runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferFastLaneReads(),
-        true, 0);
-  }
-
-  @Test
-  public void 
testBlockReaderLocalByteBufferFastLaneReadsNoChecksumNoReadahead()
-      throws IOException {
-    runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferFastLaneReads(),
-        false, 0);
-  }
-
-  private static class TestBlockReaderLocalReadCorruptStart
-      extends BlockReaderLocalTest {
-    boolean usingChecksums = false;
-    @Override
-    public void setup(File blockFile, boolean usingChecksums)
-        throws IOException {
-      RandomAccessFile bf = null;
-      this.usingChecksums = usingChecksums;
-      try {
-        bf = new RandomAccessFile(blockFile, "rw");
-        bf.write(new byte[] {0,0,0,0,0,0,0,0,0,0,0,0,0,0});
-      } finally {
-        if (bf != null) bf.close();
-      }
-    }
-
-    public void doTest(BlockReaderLocal reader, byte original[]) 
-        throws IOException {
-      byte buf[] = new byte[TEST_LENGTH];
-      if (usingChecksums) {
-        try {
-          reader.readFully(buf, 0, 10);
-          Assert.fail("did not detect corruption");
-        } catch (IOException e) {
-          // expected
-        }
-      } else {
-        reader.readFully(buf, 0, 10);
-      }
-    }
-  }
-  
-  @Test
-  public void testBlockReaderLocalReadCorruptStart()
-      throws IOException {
-    runBlockReaderLocalTest(new TestBlockReaderLocalReadCorruptStart(), true,
-        HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
-  }
-  
-  private static class TestBlockReaderLocalReadCorrupt
-      extends BlockReaderLocalTest {
-    boolean usingChecksums = false;
-    @Override
-    public void setup(File blockFile, boolean usingChecksums) 
-        throws IOException {
-      RandomAccessFile bf = null;
-      this.usingChecksums = usingChecksums;
-      try {
-        bf = new RandomAccessFile(blockFile, "rw");
-        bf.seek(1539);
-        bf.write(new byte[] {0,0,0,0,0,0,0,0,0,0,0,0,0,0});
-      } finally {
-        if (bf != null) bf.close();
-      }
-    }
-
-    public void doTest(BlockReaderLocal reader, byte original[]) 
-        throws IOException {
-      byte buf[] = new byte[TEST_LENGTH];
-      try {
-        reader.readFully(buf, 0, 10);
-        assertArrayRegionsEqual(original, 0, buf, 0, 10);
-        reader.readFully(buf, 10, 100);
-        assertArrayRegionsEqual(original, 10, buf, 10, 100);
-        reader.readFully(buf, 110, 700);
-        assertArrayRegionsEqual(original, 110, buf, 110, 700);
-        reader.skip(1); // skip from offset 810 to offset 811
-        reader.readFully(buf, 811, 5);
-        assertArrayRegionsEqual(original, 811, buf, 811, 5);
-        reader.readFully(buf, 816, 900);
-        if (usingChecksums) {
-          // We should detect the corruption when using a checksum file.
-          Assert.fail("did not detect corruption");
-        }
-      } catch (ChecksumException e) {
-        if (!usingChecksums) {
-          Assert.fail("didn't expect to get ChecksumException: not " +
-              "using checksums.");
-        }
-      }
-    }
-  }
-  
-  @Test
-  public void testBlockReaderLocalReadCorrupt()
-      throws IOException {
-    runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), true,
-        HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
-  }
-
-  @Test
-  public void testBlockReaderLocalReadCorruptNoChecksum()
-      throws IOException {
-    runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), false,
-        HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
-  }
-
-  @Test
-  public void testBlockReaderLocalReadCorruptNoReadahead()
-      throws IOException {
-    runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), true, 0);
-  }
-
-  @Test
-  public void testBlockReaderLocalReadCorruptNoChecksumNoReadahead()
-      throws IOException {
-    runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), false, 0);
-  }
-
-  private static class TestBlockReaderLocalWithMlockChanges
-      extends BlockReaderLocalTest {
-    @Override
-    public void setup(File blockFile, boolean usingChecksums)
-        throws IOException {
-    }
-    
-    @Override
-    public void doTest(BlockReaderLocal reader, byte original[])
-        throws IOException {
-      ByteBuffer buf = ByteBuffer.wrap(new byte[TEST_LENGTH]);
-      reader.skip(1);
-      readFully(reader, buf, 1, 9);
-      assertArrayRegionsEqual(original, 1, buf.array(), 1, 9);
-      readFully(reader, buf, 10, 100);
-      assertArrayRegionsEqual(original, 10, buf.array(), 10, 100);
-      reader.forceAnchorable();
-      readFully(reader, buf, 110, 700);
-      assertArrayRegionsEqual(original, 110, buf.array(), 110, 700);
-      reader.forceUnanchorable();
-      reader.skip(1); // skip from offset 810 to offset 811
-      readFully(reader, buf, 811, 5);
-      assertArrayRegionsEqual(original, 811, buf.array(), 811, 5);
-    }
-  }
-
-  @Test
-  public void testBlockReaderLocalWithMlockChanges()
-      throws IOException {
-    runBlockReaderLocalTest(new TestBlockReaderLocalWithMlockChanges(),
-        true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
-  }
-
-  @Test
-  public void testBlockReaderLocalWithMlockChangesNoChecksum()
-      throws IOException {
-    runBlockReaderLocalTest(new TestBlockReaderLocalWithMlockChanges(),
-        false, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
-  }
-
-  @Test
-  public void testBlockReaderLocalWithMlockChangesNoReadahead()
-      throws IOException {
-    runBlockReaderLocalTest(new TestBlockReaderLocalWithMlockChanges(),
-        true, 0);
-  }
-
-  @Test
-  public void testBlockReaderLocalWithMlockChangesNoChecksumNoReadahead()
-      throws IOException {
-    runBlockReaderLocalTest(new TestBlockReaderLocalWithMlockChanges(),
-        false, 0);
-  }
-
-  private static class TestBlockReaderLocalOnFileWithoutChecksum
-      extends BlockReaderLocalTest {
-    @Override
-    public void setConfiguration(HdfsConfiguration conf) {
-      conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "NULL");
-    }
-
-    @Override
-    public void doTest(BlockReaderLocal reader, byte original[])
-        throws IOException {
-      Assert.assertTrue(!reader.getVerifyChecksum());
-      ByteBuffer buf = ByteBuffer.wrap(new byte[TEST_LENGTH]);
-      reader.skip(1);
-      readFully(reader, buf, 1, 9);
-      assertArrayRegionsEqual(original, 1, buf.array(), 1, 9);
-      readFully(reader, buf, 10, 100);
-      assertArrayRegionsEqual(original, 10, buf.array(), 10, 100);
-      reader.forceAnchorable();
-      readFully(reader, buf, 110, 700);
-      assertArrayRegionsEqual(original, 110, buf.array(), 110, 700);
-      reader.forceUnanchorable();
-      reader.skip(1); // skip from offset 810 to offset 811
-      readFully(reader, buf, 811, 5);
-      assertArrayRegionsEqual(original, 811, buf.array(), 811, 5);
-    }
-  }
-
-  private static class TestBlockReaderLocalReadZeroBytes
-      extends BlockReaderLocalTest {
-    @Override
-    public void doTest(BlockReaderLocal reader, byte original[])
-        throws IOException {
-      byte emptyArr[] = new byte[0];
-      Assert.assertEquals(0, reader.read(emptyArr, 0, 0));
-      ByteBuffer emptyBuf = ByteBuffer.wrap(emptyArr);
-      Assert.assertEquals(0, reader.read(emptyBuf));
-      reader.skip(1);
-      Assert.assertEquals(0, reader.read(emptyArr, 0, 0));
-      Assert.assertEquals(0, reader.read(emptyBuf));
-      reader.skip(BlockReaderLocalTest.TEST_LENGTH - 1);
-      Assert.assertEquals(-1, reader.read(emptyArr, 0, 0));
-      Assert.assertEquals(-1, reader.read(emptyBuf));
-    }
-  }
-
-  @Test
-  public void testBlockReaderLocalOnFileWithoutChecksum()
-      throws IOException {
-    runBlockReaderLocalTest(new TestBlockReaderLocalOnFileWithoutChecksum(),
-        true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
-  }
-
-  @Test
-  public void testBlockReaderLocalOnFileWithoutChecksumNoChecksum()
-      throws IOException {
-    runBlockReaderLocalTest(new TestBlockReaderLocalOnFileWithoutChecksum(),
-        false, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
-  }
-
-  @Test
-  public void testBlockReaderLocalOnFileWithoutChecksumNoReadahead()
-      throws IOException {
-    runBlockReaderLocalTest(new TestBlockReaderLocalOnFileWithoutChecksum(),
-        true, 0);
-  }
-
-  @Test
-  public void testBlockReaderLocalOnFileWithoutChecksumNoChecksumNoReadahead()
-      throws IOException {
-    runBlockReaderLocalTest(new TestBlockReaderLocalOnFileWithoutChecksum(),
-        false, 0);
-  }
-  
-  @Test
-  public void testBlockReaderLocalReadZeroBytes()
-      throws IOException {
-    runBlockReaderLocalTest(new TestBlockReaderLocalReadZeroBytes(),
-        true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
-  }
-
-  @Test
-  public void testBlockReaderLocalReadZeroBytesNoChecksum()
-      throws IOException {
-    runBlockReaderLocalTest(new TestBlockReaderLocalReadZeroBytes(),
-        false, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
-  }
-
-  @Test
-  public void testBlockReaderLocalReadZeroBytesNoReadahead()
-      throws IOException {
-    runBlockReaderLocalTest(new TestBlockReaderLocalReadZeroBytes(),
-        true, 0);
-  }
-
-  @Test
-  public void testBlockReaderLocalReadZeroBytesNoChecksumNoReadahead()
-      throws IOException {
-    runBlockReaderLocalTest(new TestBlockReaderLocalReadZeroBytes(),
-        false, 0);
-  }
-  
-
-  @Test(timeout=60000)
-  public void TestStatisticsForShortCircuitLocalRead() throws Exception {
-    testStatistics(true);
-  }
-
-  @Test(timeout=60000)
-  public void TestStatisticsForLocalRead() throws Exception {
-    testStatistics(false);
-  }
-  
-  private void testStatistics(boolean isShortCircuit) throws Exception {
-    Assume.assumeTrue(DomainSocket.getLoadingFailureReason() == null);
-    HdfsConfiguration conf = new HdfsConfiguration();
-    TemporarySocketDirectory sockDir = null;
-    if (isShortCircuit) {
-      DFSInputStream.tcpReadsDisabledForTesting = true;
-      sockDir = new TemporarySocketDirectory();
-      conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
-        new File(sockDir.getDir(), "TestStatisticsForLocalRead.%d.sock").
-          getAbsolutePath());
-      conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
-      DomainSocket.disableBindPathValidation();
-    } else {
-      conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, false);
-    }
-    MiniDFSCluster cluster = null;
-    final Path TEST_PATH = new Path("/a");
-    final long RANDOM_SEED = 4567L;
-    FSDataInputStream fsIn = null;
-    byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH];
-    FileSystem fs = null;
-    try {
-      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
-      cluster.waitActive();
-      fs = cluster.getFileSystem();
-      DFSTestUtil.createFile(fs, TEST_PATH,
-          BlockReaderLocalTest.TEST_LENGTH, (short)1, RANDOM_SEED);
-      try {
-        DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
-      } catch (InterruptedException e) {
-        Assert.fail("unexpected InterruptedException during " +
-            "waitReplication: " + e);
-      } catch (TimeoutException e) {
-        Assert.fail("unexpected TimeoutException during " +
-            "waitReplication: " + e);
-      }
-      fsIn = fs.open(TEST_PATH);
-      IOUtils.readFully(fsIn, original, 0,
-          BlockReaderLocalTest.TEST_LENGTH);
-      HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn;
-      Assert.assertEquals(BlockReaderLocalTest.TEST_LENGTH, 
-          dfsIn.getReadStatistics().getTotalBytesRead());
-      Assert.assertEquals(BlockReaderLocalTest.TEST_LENGTH, 
-          dfsIn.getReadStatistics().getTotalLocalBytesRead());
-      if (isShortCircuit) {
-        Assert.assertEquals(BlockReaderLocalTest.TEST_LENGTH, 
-            dfsIn.getReadStatistics().getTotalShortCircuitBytesRead());
-      } else {
-        Assert.assertEquals(0,
-            dfsIn.getReadStatistics().getTotalShortCircuitBytesRead());
-      }
-      fsIn.close();
-      fsIn = null;
-    } finally {
-      DFSInputStream.tcpReadsDisabledForTesting = false;
-      if (fsIn != null) fsIn.close();
-      if (fs != null) fs.close();
-      if (cluster != null) cluster.shutdown();
-      if (sockDir != null) sockDir.close();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6dfb717/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocalLegacy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocalLegacy.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocalLegacy.java
deleted file mode 100644
index af28bd3..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocalLegacy.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-
-import org.apache.hadoop.fs.ChecksumException;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
-import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
-import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.net.unix.DomainSocket;
-import org.apache.hadoop.net.unix.TemporarySocketDirectory;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.junit.Assert;
-import org.junit.Assume;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class TestBlockReaderLocalLegacy {
-  @BeforeClass
-  public static void setupCluster() throws IOException {
-    DFSInputStream.tcpReadsDisabledForTesting = true;
-    DomainSocket.disableBindPathValidation();
-  }
-  
-  private static HdfsConfiguration getConfiguration(
-      TemporarySocketDirectory socketDir) throws IOException {
-    HdfsConfiguration conf = new HdfsConfiguration();
-    if (socketDir == null) {
-      conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, "");
-    } else {
-      conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
-        new File(socketDir.getDir(), "TestBlockReaderLocalLegacy.%d.sock").
-          getAbsolutePath());
-    }
-    conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
-    
conf.setBoolean(HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, 
true);
-    conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY,
-        false);
-    conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
-        UserGroupInformation.getCurrentUser().getShortUserName());
-    
conf.setBoolean(HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, 
false);
-    // Set short retry timeouts so this test runs faster
-    conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10);
-    return conf;
-  }
-
-  /**
-   * Test that, in the case of an error, the position and limit of a ByteBuffer
-   * are left unchanged. This is not mandated by ByteBufferReadable, but 
clients
-   * of this class might immediately issue a retry on failure, so it's polite.
-   */
-  @Test
-  public void testStablePositionAfterCorruptRead() throws Exception {
-    final short REPL_FACTOR = 1;
-    final long FILE_LENGTH = 512L;
-    
-    HdfsConfiguration conf = getConfiguration(null);
-    MiniDFSCluster cluster =
-        new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
-    cluster.waitActive();
-    FileSystem fs = cluster.getFileSystem();
-
-    Path path = new Path("/corrupted");
-
-    DFSTestUtil.createFile(fs, path, FILE_LENGTH, REPL_FACTOR, 12345L);
-    DFSTestUtil.waitReplication(fs, path, REPL_FACTOR);
-
-    ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, path);
-    int blockFilesCorrupted = cluster.corruptBlockOnDataNodes(block);
-    assertEquals("All replicas not corrupted", REPL_FACTOR, 
blockFilesCorrupted);
-
-    FSDataInputStream dis = cluster.getFileSystem().open(path);
-    ByteBuffer buf = ByteBuffer.allocateDirect((int)FILE_LENGTH);
-    boolean sawException = false;
-    try {
-      dis.read(buf);
-    } catch (ChecksumException ex) {
-      sawException = true;
-    }
-
-    assertTrue(sawException);
-    assertEquals(0, buf.position());
-    assertEquals(buf.capacity(), buf.limit());
-
-    dis = cluster.getFileSystem().open(path);
-    buf.position(3);
-    buf.limit(25);
-    sawException = false;
-    try {
-      dis.read(buf);
-    } catch (ChecksumException ex) {
-      sawException = true;
-    }
-
-    assertTrue(sawException);
-    assertEquals(3, buf.position());
-    assertEquals(25, buf.limit());
-    cluster.shutdown();
-  }
-
-  @Test
-  public void testBothOldAndNewShortCircuitConfigured() throws Exception {
-    final short REPL_FACTOR = 1;
-    final int FILE_LENGTH = 512;
-    Assume.assumeTrue(null == DomainSocket.getLoadingFailureReason());
-    TemporarySocketDirectory socketDir = new TemporarySocketDirectory();
-    HdfsConfiguration conf = getConfiguration(socketDir);
-    MiniDFSCluster cluster =
-        new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
-    cluster.waitActive();
-    socketDir.close();
-    FileSystem fs = cluster.getFileSystem();
-
-    Path path = new Path("/foo");
-    byte orig[] = new byte[FILE_LENGTH];
-    for (int i = 0; i < orig.length; i++) {
-      orig[i] = (byte)(i%10);
-    }
-    FSDataOutputStream fos = fs.create(path, (short)1);
-    fos.write(orig);
-    fos.close();
-    DFSTestUtil.waitReplication(fs, path, REPL_FACTOR);
-    FSDataInputStream fis = cluster.getFileSystem().open(path);
-    byte buf[] = new byte[FILE_LENGTH];
-    IOUtils.readFully(fis, buf, 0, FILE_LENGTH);
-    fis.close();
-    Assert.assertArrayEquals(orig, buf);
-    Arrays.equals(orig, buf);
-    cluster.shutdown();
-  }
-
-  @Test(timeout=20000)
-  public void testBlockReaderLocalLegacyWithAppend() throws Exception {
-    final short REPL_FACTOR = 1;
-    final HdfsConfiguration conf = getConfiguration(null);
-    
conf.setBoolean(HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, 
true);
-
-    final MiniDFSCluster cluster =
-        new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
-    cluster.waitActive();
-
-    final DistributedFileSystem dfs = cluster.getFileSystem();
-    final Path path = new Path("/testBlockReaderLocalLegacy");
-    DFSTestUtil.createFile(dfs, path, 10, REPL_FACTOR, 0);
-    DFSTestUtil.waitReplication(dfs, path, REPL_FACTOR);
-
-    final ClientDatanodeProtocol proxy;
-    final Token<BlockTokenIdentifier> token;
-    final ExtendedBlock originalBlock;
-    final long originalGS;
-    {
-      final LocatedBlock lb = cluster.getNameNode().getRpcServer()
-          .getBlockLocations(path.toString(), 0, 1).get(0);
-      proxy = DFSUtilClient.createClientDatanodeProtocolProxy(
-          lb.getLocations()[0], conf, 60000, false);
-      token = lb.getBlockToken();
-
-      // get block and generation stamp
-      final ExtendedBlock blk = new ExtendedBlock(lb.getBlock());
-      originalBlock = new ExtendedBlock(blk);
-      originalGS = originalBlock.getGenerationStamp();
-
-      // test getBlockLocalPathInfo
-      final BlockLocalPathInfo info = proxy.getBlockLocalPathInfo(blk, token);
-      Assert.assertEquals(originalGS, info.getBlock().getGenerationStamp());
-    }
-
-    { // append one byte
-      FSDataOutputStream out = dfs.append(path);
-      out.write(1);
-      out.close();
-    }
-
-    {
-      // get new generation stamp
-      final LocatedBlock lb = cluster.getNameNode().getRpcServer()
-          .getBlockLocations(path.toString(), 0, 1).get(0);
-      final long newGS = lb.getBlock().getGenerationStamp();
-      Assert.assertTrue(newGS > originalGS);
-
-      // getBlockLocalPathInfo using the original block.
-      Assert.assertEquals(originalGS, originalBlock.getGenerationStamp());
-      final BlockLocalPathInfo info = proxy.getBlockLocalPathInfo(
-          originalBlock, token);
-      Assert.assertEquals(newGS, info.getBlock().getGenerationStamp());
-    }
-    cluster.shutdown();
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to