Author: todd
Date: Fri Jun 8 20:01:32 2012
New Revision: 1348217
URL: http://svn.apache.org/viewvc?rev=1348217&view=rev
Log:
HDFS-2834. Add a ByteBuffer-based read API to DFSInputStream. Contributed by
Henry Robinson.
(svn merge -c 1303474 from trunk)
Added:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java
- copied unchanged from r1303474,
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java
- copied unchanged from r1303474,
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java
Modified:
hadoop/common/branches/branch-2/hadoop-hdfs-project/ (props changed)
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/ (props
changed)
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/
(props changed)
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/native/
(props changed)
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode/
(props changed)
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/
(props changed)
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/secondary/
(props changed)
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/hdfs/
(props changed)
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelRead.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java
Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-hdfs-project:r1303474
Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs:r1303474
Modified:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1348217&r1=1348216&r2=1348217&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
(original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
Fri Jun 8 20:01:32 2012
@@ -70,6 +70,9 @@ Release 2.0.1-alpha - UNRELEASED
HDFS-2982. Startup performance suffers when there are many edit log
segments. (Colin Patrick McCabe via todd)
+ HDFS-2834. Add a ByteBuffer-based read API to DFSInputStream.
+ (Henry Robinson via todd)
+
BUG FIXES
HDFS-3385. The last block of INodeFileUnderConstruction is not
Propchange:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/
------------------------------------------------------------------------------
Merged
/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1303474
Modified:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java?rev=1348217&r1=1348216&r2=1348217&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
Fri Jun 8 20:01:32 2012
@@ -20,11 +20,13 @@ package org.apache.hadoop.hdfs;
import java.io.IOException;
import java.net.Socket;
+import org.apache.hadoop.fs.ByteBufferReadable;
+
/**
* A BlockReader is responsible for reading a single block
* from a single datanode.
*/
-public interface BlockReader {
+public interface BlockReader extends ByteBufferReadable {
/* same interface as inputStream java.io.InputStream#read()
* used by DFSInputStream#read()
Modified:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java?rev=1348217&r1=1348216&r2=1348217&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
Fri Jun 8 20:01:32 2012
@@ -118,20 +118,32 @@ class BlockReaderLocal implements BlockR
private static Map<Integer, LocalDatanodeInfo> localDatanodeInfoMap = new
HashMap<Integer, LocalDatanodeInfo>();
private final FileInputStream dataIn; // reader for the data file
- private FileInputStream checksumIn; // reader for the checksum file
+ private final FileInputStream checksumIn; // reader for the checksum file
+ /**
+ * Offset from the most recent chunk boundary at which the next read should
+ * take place. Is only set to non-zero at construction time, and is
+ * decremented (usually to 0) by subsequent reads. This avoids having to do a
+ * checksum read at construction to position the read cursor correctly.
+ */
private int offsetFromChunkBoundary;
private byte[] skipBuf = null;
- private ByteBuffer dataBuff = null;
+
+ /**
+ * Used for checksummed reads that need to be staged before copying to their
+ * output buffer because they are either a) smaller than the checksum chunk
+ * size or b) issued by the slower read(byte[]...) path
+ */
+ private ByteBuffer slowReadBuff = null;
private ByteBuffer checksumBuff = null;
private DataChecksum checksum;
private final boolean verifyChecksum;
private static DirectBufferPool bufferPool = new DirectBufferPool();
- private int bytesPerChecksum;
- private int checksumSize;
+ private final int bytesPerChecksum;
+ private final int checksumSize;
/** offset in block where reader wants to actually read */
private long startOffset;
@@ -170,7 +182,7 @@ class BlockReaderLocal implements BlockR
if (LOG.isDebugEnabled()) {
LOG.debug("New BlockReaderLocal for file " + blkfile + " of size "
+ blkfile.length() + " startOffset " + startOffset + " length "
- + length + " short circuit checksum " + skipChecksumCheck);
+ + length + " short circuit checksum " + !skipChecksumCheck);
}
if (!skipChecksumCheck) {
@@ -254,6 +266,20 @@ class BlockReaderLocal implements BlockR
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT);
}
+ private static int getSlowReadBufferNumChunks(Configuration conf, int
bytesPerChecksum) {
+ int bufferSizeBytes =
conf.getInt(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY,
+ DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT);
+
+ if (bufferSizeBytes < bytesPerChecksum) {
+ throw new IllegalArgumentException("Configured BlockReaderLocal buffer
size (" + bufferSizeBytes + ") " +
+ "is not large enough to hold a single chunk (" + bytesPerChecksum +
"). Please configure " +
+ DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY + "
appropriately");
+ }
+
+ // Round down to nearest chunk size
+ return bufferSizeBytes / bytesPerChecksum;
+ }
+
private BlockReaderLocal(Configuration conf, String hdfsfile,
ExtendedBlock block, Token<BlockTokenIdentifier> token, long startOffset,
long length, BlockLocalPathInfo pathinfo, FileInputStream dataIn)
@@ -279,33 +305,47 @@ class BlockReaderLocal implements BlockR
this.dataIn = dataIn;
this.checksumIn = checksumIn;
this.offsetFromChunkBoundary = (int) (startOffset-firstChunkOffset);
- dataBuff = bufferPool.getBuffer(bytesPerChecksum*64);
- checksumBuff = bufferPool.getBuffer(checksumSize*64);
- //Initially the buffers have nothing to read.
- dataBuff.flip();
+
+ int chunksPerChecksumRead = getSlowReadBufferNumChunks(conf,
bytesPerChecksum);
+ slowReadBuff = bufferPool.getBuffer(bytesPerChecksum *
chunksPerChecksumRead);
+ checksumBuff = bufferPool.getBuffer(checksumSize * chunksPerChecksumRead);
+ // Initially the buffers have nothing to read.
+ slowReadBuff.flip();
checksumBuff.flip();
- long toSkip = firstChunkOffset;
- while (toSkip > 0) {
- long skipped = dataIn.skip(toSkip);
- if (skipped == 0) {
- throw new IOException("Couldn't initialize input stream");
- }
- toSkip -= skipped;
- }
- if (checksumIn != null) {
- long checkSumOffset = (firstChunkOffset / bytesPerChecksum)
- * checksumSize;
- while (checkSumOffset > 0) {
- long skipped = checksumIn.skip(checkSumOffset);
+ boolean success = false;
+ try {
+ // Skip both input streams to beginning of the chunk containing
startOffset
+ long toSkip = firstChunkOffset;
+ while (toSkip > 0) {
+ long skipped = dataIn.skip(toSkip);
if (skipped == 0) {
- throw new IOException("Couldn't initialize checksum input stream");
+ throw new IOException("Couldn't initialize input stream");
}
- checkSumOffset -= skipped;
+ toSkip -= skipped;
+ }
+ if (checksumIn != null) {
+ long checkSumOffset = (firstChunkOffset / bytesPerChecksum) *
checksumSize;
+ while (checkSumOffset > 0) {
+ long skipped = checksumIn.skip(checkSumOffset);
+ if (skipped == 0) {
+ throw new IOException("Couldn't initialize checksum input stream");
+ }
+ checkSumOffset -= skipped;
+ }
+ }
+ success = true;
+ } finally {
+ if (!success) {
+ bufferPool.returnBuffer(slowReadBuff);
+ bufferPool.returnBuffer(checksumBuff);
}
}
}
- private int readIntoBuffer(FileInputStream stream, ByteBuffer buf)
+ /**
+ * Reads bytes into a buffer until EOF or the buffer's limit is reached
+ */
+ private int fillBuffer(FileInputStream stream, ByteBuffer buf)
throws IOException {
int bytesRead = stream.getChannel().read(buf);
if (bytesRead < 0) {
@@ -323,45 +363,229 @@ class BlockReaderLocal implements BlockR
return bytesRead;
}
- @Override
- public synchronized int read(byte[] buf, int off, int len) throws
IOException {
- if (LOG.isDebugEnabled()) {
- LOG.info("read off " + off + " len " + len);
+ /**
+ * Utility method used by read(ByteBuffer) to partially copy a ByteBuffer
into
+ * another.
+ */
+ private void writeSlice(ByteBuffer from, ByteBuffer to, int length) {
+ int oldLimit = from.limit();
+ from.limit(from.position() + length);
+ try {
+ to.put(from);
+ } finally {
+ from.limit(oldLimit);
}
- if (!verifyChecksum) {
- return dataIn.read(buf, off, len);
- } else {
- int dataRead = -1;
- if (dataBuff.remaining() == 0) {
- dataBuff.clear();
- checksumBuff.clear();
- dataRead = readIntoBuffer(dataIn, dataBuff);
- readIntoBuffer(checksumIn, checksumBuff);
- checksumBuff.flip();
- dataBuff.flip();
- checksum.verifyChunkedSums(dataBuff, checksumBuff, filename,
- this.startOffset);
- } else {
- dataRead = dataBuff.remaining();
- }
- if (dataRead > 0) {
- int nRead = Math.min(dataRead - offsetFromChunkBoundary, len);
- if (offsetFromChunkBoundary > 0) {
- dataBuff.position(offsetFromChunkBoundary);
- // Its either end of file or dataRead is greater than the
- // offsetFromChunkBoundary
- offsetFromChunkBoundary = 0;
+ }
+
+ @Override
+ public synchronized int read(ByteBuffer buf) throws IOException {
+ int nRead = 0;
+ if (verifyChecksum) {
+ // A 'direct' read actually has three phases. The first drains any
+ // remaining bytes from the slow read buffer. After this the read is
+ // guaranteed to be on a checksum chunk boundary. If there are still
bytes
+ // to read, the fast direct path is used for as many remaining bytes as
+ // possible, up to a multiple of the checksum chunk size. Finally, any
+ // 'odd' bytes remaining at the end of the read cause another slow read
to
+ // be issued, which involves an extra copy.
+
+ // Every 'slow' read tries to fill the slow read buffer in one go for
+ // efficiency's sake. As described above, all non-checksum-chunk-aligned
+ // reads will be served from the slower read path.
+
+ if (slowReadBuff.hasRemaining()) {
+ // There are remaining bytes from a small read available. This usually
+ // means this read is unaligned, which falls back to the slow path.
+ int fromSlowReadBuff = Math.min(buf.remaining(),
slowReadBuff.remaining());
+ writeSlice(slowReadBuff, buf, fromSlowReadBuff);
+ nRead += fromSlowReadBuff;
+ }
+
+ if (buf.remaining() >= bytesPerChecksum && offsetFromChunkBoundary == 0)
{
+ // Since we have drained the 'small read' buffer, we are guaranteed to
+ // be chunk-aligned
+ int len = buf.remaining() - (buf.remaining() % bytesPerChecksum);
+
+ // There's only enough checksum buffer space available to checksum one
+ // entire slow read buffer. This saves keeping the number of checksum
+ // chunks around.
+ len = Math.min(len, slowReadBuff.capacity());
+ int oldlimit = buf.limit();
+ buf.limit(buf.position() + len);
+ int readResult = 0;
+ try {
+ readResult = doByteBufferRead(buf);
+ } finally {
+ buf.limit(oldlimit);
}
- if (nRead > 0) {
- dataBuff.get(buf, off, nRead);
+ if (readResult == -1) {
return nRead;
} else {
- return 0;
+ nRead += readResult;
+ buf.position(buf.position() + readResult);
}
- } else {
- return -1;
+ }
+
+ // offsetFromChunkBoundary > 0 => unaligned read, use slow path to read
+ // until chunk boundary
+ if ((buf.remaining() > 0 && buf.remaining() < bytesPerChecksum) ||
offsetFromChunkBoundary > 0) {
+ int toRead = Math.min(buf.remaining(), bytesPerChecksum -
offsetFromChunkBoundary);
+ int readResult = fillSlowReadBuffer(toRead);
+ if (readResult == -1) {
+ return nRead;
+ } else {
+ int fromSlowReadBuff = Math.min(readResult, buf.remaining());
+ writeSlice(slowReadBuff, buf, fromSlowReadBuff);
+ nRead += fromSlowReadBuff;
+ }
+ }
+ } else {
+ // Non-checksummed reads are much easier; we can just fill the buffer
directly.
+ nRead = doByteBufferRead(buf);
+ if (nRead > 0) {
+ buf.position(buf.position() + nRead);
}
}
+ return nRead;
+ }
+
+ /**
+ * Tries to read as many bytes as possible into supplied buffer, checksumming
+ * each chunk if needed.
+ *
+ * <b>Preconditions:</b>
+ * <ul>
+ * <li>
+ * If checksumming is enabled, buf.remaining must be a multiple of
+ * bytesPerChecksum. Note that this is not a requirement for clients of
+ * read(ByteBuffer) - in the case of non-checksum-sized read requests,
+ * read(ByteBuffer) will substitute a suitably sized buffer to pass to this
+ * method.
+ * </li>
+ * </ul>
+ * <b>Postconditions:</b>
+ * <ul>
+ * <li>buf.limit and buf.mark are unchanged.</li>
+ * <li>buf.position += min(offsetFromChunkBoundary, totalBytesRead) - so the
+ * requested bytes can be read straight from the buffer</li>
+ * </ul>
+ *
+ * @param buf
+ * byte buffer to write bytes to. If checksums are not required, buf
+ * can have any number of bytes remaining, otherwise there must be a
+ * multiple of the checksum chunk size remaining.
+ * @return <tt>max(min(totalBytesRead, len) - offsetFromChunkBoundary,
0)</tt>
+ * that is, the the number of useful bytes (up to the amount
+ * requested) readable from the buffer by the client.
+ */
+ private synchronized int doByteBufferRead(ByteBuffer buf) throws IOException
{
+ if (verifyChecksum) {
+ assert buf.remaining() % bytesPerChecksum == 0;
+ }
+ int dataRead = -1;
+
+ int oldpos = buf.position();
+ // Read as much as we can into the buffer.
+ dataRead = fillBuffer(dataIn, buf);
+
+ if (dataRead == -1) {
+ return -1;
+ }
+
+ if (verifyChecksum) {
+ ByteBuffer toChecksum = buf.duplicate();
+ toChecksum.position(oldpos);
+ toChecksum.limit(oldpos + dataRead);
+
+ checksumBuff.clear();
+ // Equivalent to (int)Math.ceil(toChecksum.remaining() * 1.0 /
bytesPerChecksum );
+ int numChunks =
+ (toChecksum.remaining() + bytesPerChecksum - 1) / bytesPerChecksum;
+ checksumBuff.limit(checksumSize * numChunks);
+
+ fillBuffer(checksumIn, checksumBuff);
+ checksumBuff.flip();
+
+ checksum.verifyChunkedSums(toChecksum, checksumBuff, filename,
+ this.startOffset);
+ }
+
+ if (dataRead >= 0) {
+ buf.position(oldpos + Math.min(offsetFromChunkBoundary, dataRead));
+ }
+
+ if (dataRead < offsetFromChunkBoundary) {
+ // yikes, didn't even get enough bytes to honour offset. This can happen
+ // even if we are verifying checksums if we are at EOF.
+ offsetFromChunkBoundary -= dataRead;
+ dataRead = 0;
+ } else {
+ dataRead -= offsetFromChunkBoundary;
+ offsetFromChunkBoundary = 0;
+ }
+
+ return dataRead;
+ }
+
+ /**
+ * Ensures that up to len bytes are available and checksummed in the slow
read
+ * buffer. The number of bytes available to read is returned. If the buffer
is
+ * not already empty, the number of remaining bytes is returned and no actual
+ * read happens.
+ *
+ * @param len
+ * the maximum number of bytes to make available. After len bytes
+ * are read, the underlying bytestream <b>must</b> be at a checksum
+ * boundary, or EOF. That is, (len + currentPosition) %
+ * bytesPerChecksum == 0.
+ * @return the number of bytes available to read, or -1 if EOF.
+ */
+ private synchronized int fillSlowReadBuffer(int len) throws IOException {
+ int nRead = -1;
+ if (slowReadBuff.hasRemaining()) {
+ // Already got data, good to go.
+ nRead = Math.min(len, slowReadBuff.remaining());
+ } else {
+ // Round a complete read of len bytes (plus any implicit offset) to the
+ // next chunk boundary, since we try and read in multiples of a chunk
+ int nextChunk = len + offsetFromChunkBoundary +
+ (bytesPerChecksum - ((len + offsetFromChunkBoundary) %
bytesPerChecksum));
+ int limit = Math.min(nextChunk, slowReadBuff.capacity());
+ assert limit % bytesPerChecksum == 0;
+
+ slowReadBuff.clear();
+ slowReadBuff.limit(limit);
+
+ nRead = doByteBufferRead(slowReadBuff);
+
+ if (nRead > 0) {
+ // So that next time we call slowReadBuff.hasRemaining(), we don't get
a
+ // false positive.
+ slowReadBuff.limit(nRead + slowReadBuff.position());
+ }
+ }
+ return nRead;
+ }
+
+ @Override
+ public synchronized int read(byte[] buf, int off, int len) throws
IOException {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("read off " + off + " len " + len);
+ }
+ if (!verifyChecksum) {
+ return dataIn.read(buf, off, len);
+ }
+
+ int nRead = fillSlowReadBuffer(slowReadBuff.capacity());
+
+ if (nRead > 0) {
+ // Possible that buffer is filled with a larger read than we need, since
+ // we tried to read as much as possible at once
+ nRead = Math.min(len, nRead);
+ slowReadBuff.get(buf, off, nRead);
+ }
+
+ return nRead;
}
@Override
@@ -377,20 +601,20 @@ class BlockReaderLocal implements BlockR
}
// caller made sure newPosition is not beyond EOF.
- int remaining = dataBuff.remaining();
- int position = dataBuff.position();
+ int remaining = slowReadBuff.remaining();
+ int position = slowReadBuff.position();
int newPosition = position + (int)n;
// if the new offset is already read into dataBuff, just reposition
if (n <= remaining) {
assert offsetFromChunkBoundary == 0;
- dataBuff.position(newPosition);
+ slowReadBuff.position(newPosition);
return n;
}
// for small gap, read through to keep the data/checksum in sync
if (n - remaining <= bytesPerChecksum) {
- dataBuff.position(position + remaining);
+ slowReadBuff.position(position + remaining);
if (skipBuf == null) {
skipBuf = new byte[bytesPerChecksum];
}
@@ -401,11 +625,16 @@ class BlockReaderLocal implements BlockR
// optimize for big gap: discard the current buffer, skip to
// the beginning of the appropriate checksum chunk and then
// read to the middle of that chunk to be in sync with checksums.
- this.offsetFromChunkBoundary = newPosition % bytesPerChecksum;
- long toskip = n - remaining - this.offsetFromChunkBoundary;
- dataBuff.clear();
- checksumBuff.clear();
+ // We can't use this.offsetFromChunkBoundary because we need to know how
+ // many bytes of the offset were really read. Calling read(..) with a
+ // positive this.offsetFromChunkBoundary causes that many bytes to get
+ // silently skipped.
+ int myOffsetFromChunkBoundary = newPosition % bytesPerChecksum;
+ long toskip = n - remaining - myOffsetFromChunkBoundary;
+
+ slowReadBuff.position(slowReadBuff.limit());
+ checksumBuff.position(checksumBuff.limit());
long dataSkipped = dataIn.skip(toskip);
if (dataSkipped != toskip) {
@@ -424,8 +653,10 @@ class BlockReaderLocal implements BlockR
skipBuf = new byte[bytesPerChecksum];
}
assert skipBuf.length == bytesPerChecksum;
- assert this.offsetFromChunkBoundary < bytesPerChecksum;
- int ret = read(skipBuf, 0, this.offsetFromChunkBoundary);
+ assert myOffsetFromChunkBoundary < bytesPerChecksum;
+
+ int ret = read(skipBuf, 0, myOffsetFromChunkBoundary);
+
if (ret == -1) { // EOS
return toskip;
} else {
@@ -439,9 +670,9 @@ class BlockReaderLocal implements BlockR
if (checksumIn != null) {
checksumIn.close();
}
- if (dataBuff != null) {
- bufferPool.returnBuffer(dataBuff);
- dataBuff = null;
+ if (slowReadBuff != null) {
+ bufferPool.returnBuffer(slowReadBuff);
+ slowReadBuff = null;
}
if (checksumBuff != null) {
bufferPool.returnBuffer(checksumBuff);
Modified:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1348217&r1=1348216&r2=1348217&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
Fri Jun 8 20:01:32 2012
@@ -291,6 +291,8 @@ public class DFSConfigKeys extends Commo
public static final boolean DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT = false;
public static final String DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY =
"dfs.client.read.shortcircuit.skip.checksum";
public static final boolean
DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT = false;
+ public static final String DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY =
"dfs.client.read.shortcircuit.buffer.size";
+ public static final int DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT =
1024 * 1024;
// property for fsimage compression
public static final String DFS_IMAGE_COMPRESS_KEY = "dfs.image.compress";
Modified:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1348217&r1=1348216&r2=1348217&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
Fri Jun 8 20:01:32 2012
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
+import java.nio.ByteBuffer;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.HashMap;
@@ -33,6 +34,7 @@ import java.util.concurrent.ConcurrentHa
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
@@ -54,16 +56,16 @@ import org.apache.hadoop.security.token.
* negotiation of the namenode and various datanodes as necessary.
****************************************************************/
@InterfaceAudience.Private
-public class DFSInputStream extends FSInputStream {
+public class DFSInputStream extends FSInputStream implements
ByteBufferReadable {
private final SocketCache socketCache;
private final DFSClient dfsClient;
private boolean closed = false;
private final String src;
- private long prefetchSize;
+ private final long prefetchSize;
private BlockReader blockReader = null;
- private boolean verifyChecksum;
+ private final boolean verifyChecksum;
private LocatedBlocks locatedBlocks = null;
private long lastBlockBeingWrittenLength = 0;
private DatanodeInfo currentNode = null;
@@ -83,17 +85,17 @@ public class DFSInputStream extends FSIn
* capped at maxBlockAcquireFailures
*/
private int failures = 0;
- private int timeWindow;
+ private final int timeWindow;
/* XXX Use of CocurrentHashMap is temp fix. Need to fix
* parallel accesses to DFSInputStream (through ptreads) properly */
- private ConcurrentHashMap<DatanodeInfo, DatanodeInfo> deadNodes =
+ private final ConcurrentHashMap<DatanodeInfo, DatanodeInfo> deadNodes =
new ConcurrentHashMap<DatanodeInfo, DatanodeInfo>();
private int buffersize = 1;
- private byte[] oneByteBuf = new byte[1]; // used for 'int read()'
+ private final byte[] oneByteBuf = new byte[1]; // used for 'int read()'
- private int nCachedConnRetry;
+ private final int nCachedConnRetry;
void addToDeadNodes(DatanodeInfo dnInfo) {
deadNodes.put(dnInfo, dnInfo);
@@ -501,11 +503,63 @@ public class DFSInputStream extends FSIn
return ( ret <= 0 ) ? -1 : (oneByteBuf[0] & 0xff);
}
+ /**
+ * Wraps different possible read implementations so that readBuffer can be
+ * strategy-agnostic.
+ */
+ private interface ReaderStrategy {
+ public int doRead(BlockReader blockReader, int off, int len) throws
ChecksumException, IOException;
+ }
+
+ /**
+ * Used to read bytes into a byte[]
+ */
+ private static class ByteArrayStrategy implements ReaderStrategy {
+ final byte[] buf;
+
+ public ByteArrayStrategy(byte[] buf) {
+ this.buf = buf;
+ }
+
+ @Override
+ public int doRead(BlockReader blockReader, int off, int len) throws
ChecksumException, IOException {
+ return blockReader.read(buf, off, len);
+ }
+ }
+
+ /**
+ * Used to read bytes into a user-supplied ByteBuffer
+ */
+ private static class ByteBufferStrategy implements ReaderStrategy {
+ final ByteBuffer buf;
+ ByteBufferStrategy(ByteBuffer buf) {
+ this.buf = buf;
+ }
+
+ @Override
+ public int doRead(BlockReader blockReader, int off, int len) throws
ChecksumException, IOException {
+ int oldpos = buf.position();
+ int oldlimit = buf.limit();
+ boolean success = false;
+ try {
+ int ret = blockReader.read(buf);
+ success = true;
+ return ret;
+ } finally {
+ if (!success) {
+ // Reset to original state so that retries work correctly.
+ buf.position(oldpos);
+ buf.limit(oldlimit);
+ }
+ }
+ }
+ }
+
/* This is a used by regular read() and handles ChecksumExceptions.
* name readBuffer() is chosen to imply similarity to readBuffer() in
* ChecksumFileSystem
*/
- private synchronized int readBuffer(byte buf[], int off, int len,
+ private synchronized int readBuffer(ReaderStrategy reader, int off, int len,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
IOException ioe;
@@ -521,7 +575,7 @@ public class DFSInputStream extends FSIn
while (true) {
// retry as many times as seekToNewSource allows.
try {
- return blockReader.read(buf, off, len);
+ return reader.doRead(blockReader, off, len);
} catch ( ChecksumException ce ) {
DFSClient.LOG.warn("Found Checksum error for "
+ getCurrentBlock() + " from " + currentNode
@@ -557,11 +611,7 @@ public class DFSInputStream extends FSIn
}
}
- /**
- * Read the entire buffer.
- */
- @Override
- public synchronized int read(byte buf[], int off, int len) throws
IOException {
+ private int readWithStrategy(ReaderStrategy strategy, int off, int len)
throws IOException {
dfsClient.checkOpen();
if (closed) {
throw new IOException("Stream closed");
@@ -577,7 +627,7 @@ public class DFSInputStream extends FSIn
currentNode = blockSeekTo(pos);
}
int realLen = (int) Math.min(len, (blockEnd - pos + 1L));
- int result = readBuffer(buf, off, realLen, corruptedBlockMap);
+ int result = readBuffer(strategy, off, realLen, corruptedBlockMap);
if (result >= 0) {
pos += result;
@@ -612,6 +662,24 @@ public class DFSInputStream extends FSIn
}
/**
+ * Read the entire buffer.
+ */
+ @Override
+ public synchronized int read(final byte buf[], int off, int len) throws
IOException {
+ ReaderStrategy byteArrayReader = new ByteArrayStrategy(buf);
+
+ return readWithStrategy(byteArrayReader, off, len);
+ }
+
+ @Override
+ public synchronized int read(final ByteBuffer buf) throws IOException {
+ ReaderStrategy byteBufferReader = new ByteBufferStrategy(buf);
+
+ return readWithStrategy(byteBufferReader, 0, buf.remaining());
+ }
+
+
+ /**
* Add corrupted block replica into map.
* @param corruptedBlockMap
*/
@@ -1093,5 +1161,4 @@ public class DFSInputStream extends FSIn
this.addr = addr;
}
}
-
}
Modified:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java?rev=1348217&r1=1348216&r2=1348217&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
Fri Jun 8 20:01:32 2012
@@ -56,7 +56,7 @@ import org.apache.hadoop.util.DataChecks
public class RemoteBlockReader extends FSInputChecker implements BlockReader {
Socket dnSock; //for now just sending the status code (e.g. checksumOk)
after the read.
- private DataInputStream in;
+ private final DataInputStream in;
private DataChecksum checksum;
/** offset in block of the last chunk received */
@@ -71,8 +71,8 @@ public class RemoteBlockReader extends F
if startOffset is not chunk-aligned */
private final long firstChunkOffset;
- private int bytesPerChecksum;
- private int checksumSize;
+ private final int bytesPerChecksum;
+ private final int checksumSize;
/**
* The total number of bytes we need to transfer from the DN.
@@ -479,4 +479,9 @@ public class RemoteBlockReader extends F
return s.toString() + ":" + poolId + ":" + blockId;
}
+ @Override
+ public int read(ByteBuffer buf) throws IOException {
+ throw new UnsupportedOperationException("readDirect unsupported in
RemoteBlockReader");
+ }
+
}
Modified:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java?rev=1348217&r1=1348216&r2=1348217&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
Fri Jun 8 20:01:32 2012
@@ -84,7 +84,7 @@ public class RemoteBlockReader2 impleme
static final Log LOG = LogFactory.getLog(RemoteBlockReader2.class);
Socket dnSock; //for now just sending the status code (e.g. checksumOk)
after the read.
- private ReadableByteChannel in;
+ private final ReadableByteChannel in;
private DataChecksum checksum;
private PacketHeader curHeader;
@@ -100,11 +100,11 @@ public class RemoteBlockReader2 impleme
private final String filename;
private static DirectBufferPool bufferPool = new DirectBufferPool();
- private ByteBuffer headerBuf = ByteBuffer.allocate(
+ private final ByteBuffer headerBuf = ByteBuffer.allocate(
PacketHeader.PKT_HEADER_LEN);
- private int bytesPerChecksum;
- private int checksumSize;
+ private final int bytesPerChecksum;
+ private final int checksumSize;
/**
* The total number of bytes we need to transfer from the DN.
@@ -140,6 +140,26 @@ public class RemoteBlockReader2 impleme
return nRead;
}
+
+ @Override
+ public int read(ByteBuffer buf) throws IOException {
+ if (curPacketBuf == null || curDataSlice.remaining() == 0 &&
bytesNeededToFinish > 0) {
+ readNextPacket();
+ }
+ if (curDataSlice.remaining() == 0) {
+ // we're at EOF now
+ return -1;
+ }
+
+ int nRead = Math.min(curDataSlice.remaining(), buf.remaining());
+ ByteBuffer writeSlice = curDataSlice.duplicate();
+ writeSlice.limit(writeSlice.position() + nRead);
+ buf.put(writeSlice);
+ curDataSlice.position(writeSlice.position());
+
+ return nRead;
+ }
+
private void readNextPacket() throws IOException {
Preconditions.checkState(curHeader == null ||
!curHeader.isLastPacketInBlock());
@@ -325,6 +345,7 @@ public class RemoteBlockReader2 impleme
/**
* Take the socket used to talk to the DN.
*/
+ @Override
public Socket takeSocket() {
assert hasSentStatusCode() :
"BlockReader shouldn't give back sockets mid-read";
@@ -337,6 +358,7 @@ public class RemoteBlockReader2 impleme
* Whether the BlockReader has reached the end of its input stream
* and successfully sent a status code back to the datanode.
*/
+ @Override
public boolean hasSentStatusCode() {
return sentStatusCode;
}
Propchange:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/native/
------------------------------------------------------------------------------
Merged
/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native:r1303474
Propchange:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode/
------------------------------------------------------------------------------
Merged
/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode:r1303474
Propchange:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/
------------------------------------------------------------------------------
Merged
/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs:r1303474
Propchange:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/secondary/
------------------------------------------------------------------------------
Merged
/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/secondary:r1303474
Propchange:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/hdfs/
------------------------------------------------------------------------------
Merged
/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/hdfs:r1303474
Modified:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java?rev=1348217&r1=1348216&r2=1348217&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
Fri Jun 8 20:01:32 2012
@@ -49,7 +49,11 @@ public class BlockReaderTestUtil {
* Setup the cluster
*/
public BlockReaderTestUtil(int replicationFactor) throws Exception {
- conf = new HdfsConfiguration();
+ this(replicationFactor, new HdfsConfiguration());
+ }
+
+ public BlockReaderTestUtil(int replicationFactor, HdfsConfiguration config)
throws Exception {
+ this.conf = config;
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, replicationFactor);
cluster = new MiniDFSCluster.Builder(conf).format(true).build();
cluster.waitActive();
Modified:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelRead.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelRead.java?rev=1348217&r1=1348216&r2=1348217&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelRead.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelRead.java
Fri Jun 8 20:01:32 2012
@@ -18,177 +18,21 @@
package org.apache.hadoop.hdfs;
import java.io.IOException;
-import java.util.Random;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.log4j.Level;
-import org.apache.log4j.LogManager;
-
-import org.junit.Test;
import org.junit.AfterClass;
import org.junit.BeforeClass;
-import static org.junit.Assert.*;
-
-/**
- * Test the use of DFSInputStream by multiple concurrent readers.
- */
-public class TestParallelRead {
-
- static final Log LOG = LogFactory.getLog(TestParallelRead.class);
- static BlockReaderTestUtil util = null;
- static DFSClient dfsClient = null;
- static final int FILE_SIZE_K = 256;
- static Random rand = null;
-
- static {
- // The client-trace log ends up causing a lot of blocking threads
- // in this when it's being used as a performance benchmark.
- LogManager.getLogger(DataNode.class.getName() + ".clienttrace")
- .setLevel(Level.WARN);
- }
+import org.junit.Test;
- private class TestFileInfo {
- public DFSInputStream dis;
- public Path filepath;
- public byte[] authenticData;
- }
+public class TestParallelRead extends TestParallelReadUtil {
@BeforeClass
- public static void setupCluster() throws Exception {
- final int REPLICATION_FACTOR = 2;
- util = new BlockReaderTestUtil(REPLICATION_FACTOR);
- dfsClient = util.getDFSClient();
- rand = new Random(System.currentTimeMillis());
+ static public void setupCluster() throws Exception {
+ setupCluster(DEFAULT_REPLICATION_FACTOR, new HdfsConfiguration());
}
- /**
- * A worker to do one "unit" of read.
- */
- static class ReadWorker extends Thread {
- static public final int N_ITERATIONS = 1024;
-
- private static final double PROPORTION_NON_POSITIONAL_READ = 0.10;
-
- private TestFileInfo testInfo;
- private long fileSize;
- private long bytesRead;
- private boolean error;
-
- ReadWorker(TestFileInfo testInfo, int id) {
- super("ReadWorker-" + id + "-" + testInfo.filepath.toString());
- this.testInfo = testInfo;
- fileSize = testInfo.dis.getFileLength();
- assertEquals(fileSize, testInfo.authenticData.length);
- bytesRead = 0;
- error = false;
- }
-
- /**
- * Randomly do one of (1) Small read; and (2) Large Pread.
- */
- @Override
- public void run() {
- for (int i = 0; i < N_ITERATIONS; ++i) {
- int startOff = rand.nextInt((int) fileSize);
- int len = 0;
- try {
- double p = rand.nextDouble();
- if (p < PROPORTION_NON_POSITIONAL_READ) {
- // Do a small regular read. Very likely this will leave unread
- // data on the socket and make the socket uncacheable.
- len = Math.min(rand.nextInt(64), (int) fileSize - startOff);
- read(startOff, len);
- bytesRead += len;
- } else {
- // Do a positional read most of the time.
- len = rand.nextInt((int) (fileSize - startOff));
- pRead(startOff, len);
- bytesRead += len;
- }
- } catch (Exception ex) {
- LOG.error(getName() + ": Error while testing read at " + startOff +
- " length " + len);
- error = true;
- fail(ex.getMessage());
- }
- }
- }
-
- public long getBytesRead() {
- return bytesRead;
- }
-
- /**
- * Raising error in a thread doesn't seem to fail the test.
- * So check afterwards.
- */
- public boolean hasError() {
- return error;
- }
-
- /**
- * Seek to somewhere random and read.
- */
- private void read(int start, int len) throws Exception {
- assertTrue(
- "Bad args: " + start + " + " + len + " should be < " + fileSize,
- start + len < fileSize);
- DFSInputStream dis = testInfo.dis;
-
- synchronized (dis) {
- dis.seek(start);
-
- byte buf[] = new byte[len];
- int cnt = 0;
- while (cnt < len) {
- cnt += dis.read(buf, cnt, buf.length - cnt);
- }
- verifyData("Read data corrupted", buf, start, start + len);
- }
- }
-
- /**
- * Positional read.
- */
- private void pRead(int start, int len) throws Exception {
- assertTrue(
- "Bad args: " + start + " + " + len + " should be < " + fileSize,
- start + len < fileSize);
- DFSInputStream dis = testInfo.dis;
-
- byte buf[] = new byte[len];
- int cnt = 0;
- while (cnt < len) {
- cnt += dis.read(start, buf, cnt, buf.length - cnt);
- }
- verifyData("Pread data corrupted", buf, start, start + len);
- }
-
- /**
- * Verify read data vs authentic data
- */
- private void verifyData(String msg, byte actual[], int start, int end)
- throws Exception {
- byte auth[] = testInfo.authenticData;
- if (end > auth.length) {
- throw new Exception(msg + ": Actual array (" + end +
- ") is past the end of authentic data (" +
- auth.length + ")");
- }
-
- int j = start;
- for (int i = 0; i < actual.length; ++i, ++j) {
- if (auth[j] != actual[i]) {
- throw new Exception(msg + ": Arrays byte " + i + " (at offset " +
- j + ") differs: expect " +
- auth[j] + " got " + actual[i]);
- }
- }
- }
+ @AfterClass
+ static public void teardownCluster() throws Exception {
+ TestParallelReadUtil.teardownCluster();
}
/**
@@ -199,85 +43,17 @@ public class TestParallelRead {
* need to be manually collected, which is inconvenient.
*/
@Test
- public void testParallelRead() throws IOException {
- if (!runParallelRead(1, 4)) {
- fail("Check log for errors");
- }
- if (!runParallelRead(1, 16)) {
- fail("Check log for errors");
- }
- if (!runParallelRead(2, 4)) {
- fail("Check log for errors");
- }
+ public void testParallelReadCopying() throws IOException {
+ runTestWorkload(new CopyingReadWorkerHelper());
}
- /**
- * Start the parallel read with the given parameters.
- */
- boolean runParallelRead(int nFiles, int nWorkerEach) throws IOException {
- ReadWorker workers[] = new ReadWorker[nFiles * nWorkerEach];
- TestFileInfo testInfoArr[] = new TestFileInfo[nFiles];
-
- // Prepare the files and workers
- int nWorkers = 0;
- for (int i = 0; i < nFiles; ++i) {
- TestFileInfo testInfo = new TestFileInfo();
- testInfoArr[i] = testInfo;
-
- testInfo.filepath = new Path("/TestParallelRead.dat." + i);
- testInfo.authenticData = util.writeFile(testInfo.filepath, FILE_SIZE_K);
- testInfo.dis = dfsClient.open(testInfo.filepath.toString());
-
- for (int j = 0; j < nWorkerEach; ++j) {
- workers[nWorkers++] = new ReadWorker(testInfo, nWorkers);
- }
- }
-
- // Start the workers and wait
- long starttime = System.currentTimeMillis();
- for (ReadWorker worker : workers) {
- worker.start();
- }
-
- for (ReadWorker worker : workers) {
- try {
- worker.join();
- } catch (InterruptedException ignored) { }
- }
- long endtime = System.currentTimeMillis();
-
- // Cleanup
- for (TestFileInfo testInfo : testInfoArr) {
- testInfo.dis.close();
- }
-
- // Report
- boolean res = true;
- long totalRead = 0;
- for (ReadWorker worker : workers) {
- long nread = worker.getBytesRead();
- LOG.info("--- Report: " + worker.getName() + " read " + nread + " B; " +
- "average " + nread / ReadWorker.N_ITERATIONS + " B per read");
- totalRead += nread;
- if (worker.hasError()) {
- res = false;
- }
- }
-
- double timeTakenSec = (endtime - starttime) / 1000.0;
- long totalReadKB = totalRead / 1024;
- LOG.info("=== Report: " + nWorkers + " threads read " +
- totalReadKB + " KB (across " +
- nFiles + " file(s)) in " +
- timeTakenSec + "s; average " +
- totalReadKB / timeTakenSec + " KB/s");
-
- return res;
+ @Test
+ public void testParallelReadByteBuffer() throws IOException {
+ runTestWorkload(new DirectReadWorkerHelper());
}
- @AfterClass
- public static void teardownCluster() throws Exception {
- util.shutdown();
+ @Test
+ public void testParallelReadMixed() throws IOException {
+ runTestWorkload(new MixedWorkloadHelper());
}
-
}
Modified:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java?rev=1348217&r1=1348216&r2=1348217&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java
Fri Jun 8 20:01:32 2012
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertTru
import java.io.EOFException;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import org.apache.hadoop.conf.Configuration;
@@ -28,6 +29,7 @@ import org.apache.hadoop.fs.FSDataInputS
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -63,7 +65,7 @@ public class TestShortCircuitLocalRead {
throws IOException {
FSDataOutputStream stm = fileSys.create(name, true,
fileSys.getConf().getInt("io.file.buffer.size", 4096),
- (short)repl, (long)blockSize);
+ (short)repl, blockSize);
return stm;
}
@@ -113,6 +115,43 @@ public class TestShortCircuitLocalRead {
}
/**
+ * Verifies that reading a file with the direct read(ByteBuffer) api gives
the expected set of bytes.
+ */
+ static void checkFileContentDirect(FileSystem fs, Path name, byte[] expected,
+ int readOffset) throws IOException {
+ DFSDataInputStream stm = (DFSDataInputStream)fs.open(name);
+
+ ByteBuffer actual = ByteBuffer.allocate(expected.length - readOffset);
+
+ long skipped = stm.skip(readOffset);
+ Assert.assertEquals(skipped, readOffset);
+
+ actual.limit(3);
+
+ //Read a small number of bytes first.
+ int nread = stm.read(actual);
+ actual.limit(nread + 2);
+ nread += stm.read(actual);
+
+ // Read across chunk boundary
+ actual.limit(Math.min(actual.capacity(), nread + 517));
+ nread += stm.read(actual);
+ checkData(actual.array(), readOffset, expected, nread, "A few bytes");
+ //Now read rest of it
+ actual.limit(actual.capacity());
+ while (actual.hasRemaining()) {
+ int nbytes = stm.read(actual);
+
+ if (nbytes < 0) {
+ throw new EOFException("End of file reached before reading fully.");
+ }
+ nread += nbytes;
+ }
+ checkData(actual.array(), readOffset, expected, "Read 3");
+ stm.close();
+ }
+
+ /**
* Test that file data can be read by reading the block file
* directly from the local store.
*/
@@ -145,6 +184,7 @@ public class TestShortCircuitLocalRead {
stm.write(fileData);
stm.close();
checkFileContent(fs, file1, fileData, readOffset);
+ checkFileContentDirect(fs, file1, fileData, readOffset);
} finally {
fs.close();
cluster.shutdown();
@@ -328,6 +368,7 @@ public class TestShortCircuitLocalRead {
Thread[] threads = new Thread[threadCount];
for (int i = 0; i < threadCount; i++) {
threads[i] = new Thread() {
+ @Override
public void run() {
for (int i = 0; i < iteration; i++) {
try {