Author: eli
Date: Tue Dec 13 08:09:50 2011
New Revision: 1213592
URL: http://svn.apache.org/viewvc?rev=1213592&view=rev
Log:
HDFS-2654. Make BlockReaderLocal not extend RemoteBlockReader2. Contributed by
Eli Collins
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1213592&r1=1213591&r2=1213592&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Tue Dec 13
08:09:50 2011
@@ -204,6 +204,8 @@ Release 0.23.1 - UNRELEASED
HDFS-2604. Add a log message to show if WebHDFS is enabled and a
configuration section in the forrest doc. (szetszwo)
+ HDFS-2654. Make BlockReaderLocal not extend RemoteBlockReader2. (eli)
+
OPTIMIZATIONS
HDFS-2130. Switch default checksum to CRC32C. (todd)
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java?rev=1213592&r1=1213591&r2=1213592&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
Tue Dec 13 08:09:50 2011
@@ -21,6 +21,7 @@ import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
+import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
@@ -37,6 +38,7 @@ import org.apache.hadoop.hdfs.protocol.D
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.FSDataset;
+import org.apache.hadoop.hdfs.util.DirectBufferPool;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
@@ -57,8 +59,8 @@ import org.apache.hadoop.util.DataChecks
* if security is enabled.</li>
* </ul>
*/
-class BlockReaderLocal extends RemoteBlockReader2 {
- public static final Log LOG = LogFactory.getLog(DFSClient.class);
+class BlockReaderLocal implements BlockReader {
+ private static final Log LOG = LogFactory.getLog(DFSClient.class);
//Stores the cache and proxy for a local datanode.
private static class LocalDatanodeInfo {
@@ -117,13 +119,24 @@ class BlockReaderLocal extends RemoteBlo
private static Map<Integer, LocalDatanodeInfo> localDatanodeInfoMap = new
HashMap<Integer, LocalDatanodeInfo>();
private final FileInputStream dataIn; // reader for the data file
-
private FileInputStream checksumIn; // reader for the checksum file
private int offsetFromChunkBoundary;
- ByteBuffer dataBuff = null;
- ByteBuffer checksumBuff = null;
+ private byte[] skipBuf = null;
+ private ByteBuffer dataBuff = null;
+ private ByteBuffer checksumBuff = null;
+ private DataChecksum checksum;
+ private final boolean verifyChecksum;
+
+ private static DirectBufferPool bufferPool = new DirectBufferPool();
+
+ private int bytesPerChecksum;
+ private int checksumSize;
+
+ /** offset in block where reader wants to actually read */
+ private long startOffset;
+ private final String filename;
/**
* The only way this object can be instantiated.
@@ -256,9 +269,14 @@ class BlockReaderLocal extends RemoteBlo
long length, BlockLocalPathInfo pathinfo, DataChecksum checksum,
boolean verifyChecksum, FileInputStream dataIn, long firstChunkOffset,
FileInputStream checksumIn) throws IOException {
- super(hdfsfile, block.getBlockPoolId(), block.getBlockId(), dataIn
- .getChannel(), checksum, verifyChecksum, startOffset, firstChunkOffset,
- length, null);
+ this.filename = hdfsfile;
+ this.checksum = checksum;
+ this.verifyChecksum = verifyChecksum;
+ this.startOffset = Math.max(startOffset, 0);
+
+ bytesPerChecksum = this.checksum.getBytesPerChecksum();
+ checksumSize = this.checksum.getChecksumSize();
+
this.dataIn = dataIn;
this.checksumIn = checksumIn;
this.offsetFromChunkBoundary = (int) (startOffset-firstChunkOffset);
@@ -322,10 +340,8 @@ class BlockReaderLocal extends RemoteBlo
readIntoBuffer(checksumIn, checksumBuff);
checksumBuff.flip();
dataBuff.flip();
- if (verifyChecksum) {
- checksum.verifyChunkedSums(dataBuff, checksumBuff, filename,
- this.startOffset);
- }
+ checksum.verifyChunkedSums(dataBuff, checksumBuff, filename,
+ this.startOffset);
} else {
dataRead = dataBuff.remaining();
}
@@ -356,9 +372,24 @@ class BlockReaderLocal extends RemoteBlo
}
if (!verifyChecksum) {
return dataIn.skip(n);
- } else {
- return super.skip(n);
}
+ // Skip by reading the data so we stay in sync with checksums.
+ // This could be implemented more efficiently in the future to
+ // skip to the beginning of the appropriate checksum chunk
+ // and then only read to the middle of that chunk.
+ if (skipBuf == null) {
+ skipBuf = new byte[bytesPerChecksum];
+ }
+ long nSkipped = 0;
+ while ( nSkipped < n ) {
+ int toSkip = (int)Math.min(n-nSkipped, skipBuf.length);
+ int ret = read(skipBuf, 0, toSkip);
+ if ( ret <= 0 ) {
+ return nSkipped;
+ }
+ nSkipped += ret;
+ }
+ return nSkipped;
}
@Override
@@ -375,6 +406,27 @@ class BlockReaderLocal extends RemoteBlo
bufferPool.returnBuffer(checksumBuff);
checksumBuff = null;
}
- super.close();
+ startOffset = -1;
+ checksum = null;
+ }
+
+ @Override
+ public int readAll(byte[] buf, int offset, int len) throws IOException {
+ return BlockReaderUtil.readAll(this, buf, offset, len);
+ }
+
+ @Override
+ public void readFully(byte[] buf, int off, int len) throws IOException {
+ BlockReaderUtil.readFully(this, buf, off, len);
+ }
+
+ @Override
+ public Socket takeSocket() {
+ return null;
+ }
+
+ @Override
+ public boolean hasSentStatusCode() {
+ return false;
}
}
\ No newline at end of file
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java?rev=1213592&r1=1213591&r2=1213592&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
Tue Dec 13 08:09:50 2011
@@ -85,7 +85,7 @@ public class RemoteBlockReader2 impleme
Socket dnSock; //for now just sending the status code (e.g. checksumOk)
after the read.
private ReadableByteChannel in;
- protected DataChecksum checksum;
+ private DataChecksum checksum;
private PacketHeader curHeader;
private ByteBuffer curPacketBuf = null;
@@ -96,25 +96,24 @@ public class RemoteBlockReader2 impleme
private long lastSeqNo = -1;
/** offset in block where reader wants to actually read */
- protected long startOffset;
- protected final String filename;
+ private long startOffset;
+ private final String filename;
- protected static DirectBufferPool bufferPool =
- new DirectBufferPool();
+ private static DirectBufferPool bufferPool = new DirectBufferPool();
private ByteBuffer headerBuf = ByteBuffer.allocate(
PacketHeader.PKT_HEADER_LEN);
- protected int bytesPerChecksum;
- protected int checksumSize;
+ private int bytesPerChecksum;
+ private int checksumSize;
/**
* The total number of bytes we need to transfer from the DN.
* This is the amount that the user has requested plus some padding
* at the beginning so that the read can begin on a chunk boundary.
*/
- protected long bytesNeededToFinish;
+ private long bytesNeededToFinish;
- protected final boolean verifyChecksum;
+ private final boolean verifyChecksum;
private boolean sentStatusCode = false;
@@ -389,29 +388,12 @@ public class RemoteBlockReader2 impleme
@Override
public int readAll(byte[] buf, int offset, int len) throws IOException {
- int n = 0;
- for (;;) {
- int nread = read(buf, offset + n, len - n);
- if (nread <= 0)
- return (n == 0) ? nread : n;
- n += nread;
- if (n >= len)
- return n;
- }
+ return BlockReaderUtil.readAll(this, buf, offset, len);
}
@Override
- public void readFully(byte[] buf, int off, int len)
- throws IOException {
- int toRead = len;
- while (toRead > 0) {
- int ret = read(buf, off, toRead);
- if (ret < 0) {
- throw new IOException("Premature EOF from inputStream");
- }
- toRead -= ret;
- off += ret;
- }
+ public void readFully(byte[] buf, int off, int len) throws IOException {
+ BlockReaderUtil.readFully(this, buf, off, len);
}
/**