Author: tomwhite
Date: Tue Jan 5 22:14:17 2010
New Revision: 896243
URL: http://svn.apache.org/viewvc?rev=896243&view=rev
Log:
HADOOP-3205. Read multiple chunks directly from FSInputChecker subclass into
user buffers. Contributed by Todd Lipcon.
Modified:
hadoop/common/trunk/CHANGES.txt
hadoop/common/trunk/src/java/org/apache/hadoop/fs/ChecksumFileSystem.java
hadoop/common/trunk/src/java/org/apache/hadoop/fs/ChecksumFs.java
hadoop/common/trunk/src/java/org/apache/hadoop/fs/FSInputChecker.java
hadoop/common/trunk/src/test/core/org/apache/hadoop/fs/TestChecksumFileSystem.java
Modified: hadoop/common/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/CHANGES.txt?rev=896243&r1=896242&r2=896243&view=diff
==============================================================================
--- hadoop/common/trunk/CHANGES.txt (original)
+++ hadoop/common/trunk/CHANGES.txt Tue Jan 5 22:14:17 2010
@@ -83,6 +83,9 @@
HADOOP-6443. Serialization classes accept invalid metadata.
(Aaron Kimball via tomwhite)
+ HADOOP-3205. Read multiple chunks directly from FSInputChecker subclass
+ into user buffers. (Todd Lipcon via tomwhite)
+
OPTIMIZATIONS
BUG FIXES
Modified:
hadoop/common/trunk/src/java/org/apache/hadoop/fs/ChecksumFileSystem.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/fs/ChecksumFileSystem.java?rev=896243&r1=896242&r2=896243&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/fs/ChecksumFileSystem.java
(original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/fs/ChecksumFileSystem.java
Tue Jan 5 22:14:17 2010
@@ -205,24 +205,41 @@
@Override
protected int readChunk(long pos, byte[] buf, int offset, int len,
byte[] checksum) throws IOException {
+
boolean eof = false;
- if(needChecksum()) {
- try {
- long checksumPos = getChecksumFilePos(pos);
- if(checksumPos != sums.getPos()) {
- sums.seek(checksumPos);
- }
- sums.readFully(checksum);
- } catch (EOFException e) {
+ if (needChecksum()) {
+ assert checksum != null; // we have a checksum buffer
+ assert checksum.length % CHECKSUM_SIZE == 0; // it is sane length
+ assert len >= bytesPerSum; // we must read at least one chunk
+
+ final int checksumsToRead = Math.min(
+ len/bytesPerSum, // number of checksums based on len to read
+ checksum.length / CHECKSUM_SIZE); // size of checksum buffer
+ long checksumPos = getChecksumFilePos(pos);
+ if(checksumPos != sums.getPos()) {
+ sums.seek(checksumPos);
+ }
+
+ int sumLenRead = sums.read(checksum, 0, CHECKSUM_SIZE *
checksumsToRead);
+ if (sumLenRead >= 0 && sumLenRead % CHECKSUM_SIZE != 0) {
+ throw new ChecksumException(
+ "Checksum file not a length multiple of checksum size " +
+ "in " + file + " at " + pos + " checksumpos: " + checksumPos +
+ " sumLenread: " + sumLenRead,
+ pos);
+ }
+ if (sumLenRead <= 0) { // we're at the end of the file
eof = true;
+ } else {
+ // Adjust amount of data to read based on how many checksum chunks
we read
+ len = Math.min(len, bytesPerSum * (sumLenRead / CHECKSUM_SIZE));
}
- len = bytesPerSum;
}
if(pos != datas.getPos()) {
datas.seek(pos);
}
int nread = readFully(datas, buf, offset, len);
- if( eof && nread > 0) {
+ if (eof && nread > 0) {
throw new ChecksumException("Checksum error: "+file+" at "+pos, pos);
}
return nread;
Modified: hadoop/common/trunk/src/java/org/apache/hadoop/fs/ChecksumFs.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/fs/ChecksumFs.java?rev=896243&r1=896242&r2=896243&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/fs/ChecksumFs.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/fs/ChecksumFs.java Tue Jan
5 22:14:17 2010
@@ -200,21 +200,35 @@
byte[] checksum) throws IOException {
boolean eof = false;
if (needChecksum()) {
- try {
- final long checksumPos = getChecksumFilePos(pos);
- if (checksumPos != sums.getPos()) {
- sums.seek(checksumPos);
- }
- sums.readFully(checksum);
- } catch (EOFException e) {
+ assert checksum != null; // we have a checksum buffer
+ assert checksum.length % CHECKSUM_SIZE == 0; // it is sane length
+ assert len >= bytesPerSum; // we must read at least one chunk
+
+ final int checksumsToRead = Math.min(
+ len/bytesPerSum, // number of checksums based on len to read
+ checksum.length / CHECKSUM_SIZE); // size of checksum buffer
+ long checksumPos = getChecksumFilePos(pos);
+ if(checksumPos != sums.getPos()) {
+ sums.seek(checksumPos);
+ }
+
+ int sumLenRead = sums.read(checksum, 0, CHECKSUM_SIZE *
checksumsToRead);
+ if (sumLenRead >= 0 && sumLenRead % CHECKSUM_SIZE != 0) {
+ throw new EOFException("Checksum file not a length multiple of
checksum size " +
+ "in " + file + " at " + pos + " checksumpos:
" + checksumPos +
+ " sumLenread: " + sumLenRead );
+ }
+ if (sumLenRead <= 0) { // we're at the end of the file
eof = true;
+ } else {
+ // Adjust amount of data to read based on how many checksum chunks
we read
+ len = Math.min(len, bytesPerSum * (sumLenRead / CHECKSUM_SIZE));
}
- len = bytesPerSum;
}
if (pos != datas.getPos()) {
datas.seek(pos);
}
- final int nread = readFully(datas, buf, offset, len);
+ int nread = readFully(datas, buf, offset, len);
if (eof && nread > 0) {
throw new ChecksumException("Checksum error: "+file+" at "+pos, pos);
}
Modified: hadoop/common/trunk/src/java/org/apache/hadoop/fs/FSInputChecker.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/fs/FSInputChecker.java?rev=896243&r1=896242&r2=896243&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/fs/FSInputChecker.java
(original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/fs/FSInputChecker.java Tue
Jan 5 22:14:17 2010
@@ -24,6 +24,8 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.util.StringUtils;
+import java.nio.ByteBuffer;
+import java.nio.IntBuffer;
/**
* This is a generic input stream for verifying checksums for
@@ -38,16 +40,26 @@
protected Path file;
private Checksum sum;
private boolean verifyChecksum = true;
- private byte[] buf;
+ private int maxChunkSize; // data bytes for checksum (eg 512)
+ private byte[] buf; // buffer for non-chunk-aligned reading
private byte[] checksum;
- private int pos;
- private int count;
+ private IntBuffer checksumInts; // wrapper on checksum buffer
+ private int pos; // the position of the reader inside buf
+ private int count; // the number of bytes currently in buf
private int numOfRetries;
// cached file position
+ // this should always be a multiple of maxChunkSize
private long chunkPos = 0;
-
+
+ // Number of checksum chunks that can be read at once into a user
+ // buffer. Chosen by benchmarks - higher values do not reduce
+ // CPU usage. The size of the data reads made to the underlying stream
+ // will be CHUNKS_PER_READ * maxChunkSize.
+ private static final int CHUNKS_PER_READ = 32;
+ protected static final int CHECKSUM_SIZE = 4; // 32-bit checksum
+
/** Constructor
*
* @param file The name of the file to be read
@@ -72,14 +84,34 @@
set(verifyChecksum, sum, chunkSize, checksumSize);
}
- /** Reads in next checksum chunk data into <code>buf</code> at
<code>offset</code>
+ /**
+ * Reads in checksum chunks into <code>buf</code> at <code>offset</code>
* and checksum into <code>checksum</code>.
+ * Since checksums can be disabled, there are two cases implementors need
+ * to worry about:
+ *
+ * (a) needChecksum() will return false:
+ * - len can be any positive value
+ * - checksum will be null
+ * Implementors should simply pass through to the underlying data stream.
+ * or
+ * (b) needChecksum() will return true:
+ * - len >= maxChunkSize
+ * - checksum.length is a multiple of CHECKSUM_SIZE
+ * Implementors should read an integer number of data chunks into
+ * buf. The amount read should be bounded by len or by
+ * checksum.length / CHECKSUM_SIZE * maxChunkSize. Note that len may
+ * be a value that is not a multiple of maxChunkSize, in which case
+ * the implementation may return less than len.
+ *
* The method is used for implementing read, therefore, it should be
optimized
- * for sequential reading
+ * for sequential reading.
+ *
* @param pos chunkPos
* @param buf desitination buffer
* @param offset offset in buf at which to store data
- * @param len maximun number of bytes to read
+ * @param len maximum number of bytes to read
+ * @param checksum the data buffer into which to write checksums
* @return number of bytes read
*/
abstract protected int readChunk(long pos, byte[] buf, int offset, int len,
@@ -96,7 +128,7 @@
protected synchronized boolean needChecksum() {
return verifyChecksum && sum != null;
}
-
+
/**
* Read one checksum-verified byte
*
@@ -173,7 +205,7 @@
private void fill( ) throws IOException {
assert(pos>=count);
// fill internal buffer
- count = readChecksumChunk(buf, 0, buf.length);
+ count = readChecksumChunk(buf, 0, maxChunkSize);
if (count < 0) count = 0;
}
@@ -185,13 +217,13 @@
throws IOException {
int avail = count-pos;
if( avail <= 0 ) {
- if(len>=buf.length) {
+ if(len >= maxChunkSize) {
// read a chunk to user buffer directly; avoid one copy
int nread = readChecksumChunk(b, off, len);
return nread;
} else {
// read a chunk into the local buffer
- fill();
+ fill();
if( count <= 0 ) {
return -1;
} else {
@@ -207,10 +239,10 @@
return cnt;
}
- /* Read up one checksum chunk to array <i>b</i> at pos <i>off</i>
- * It requires a checksum chunk boundary
+ /* Read up one or more checksum chunk to array <i>b</i> at pos <i>off</i>
+ * It requires at least one checksum chunk boundary
* in between <cur_pos, cur_pos+len>
- * and it stops reading at the boundary or at the end of the stream;
+ * and it stops reading at the last boundary or at the end of the stream;
* Otherwise an IllegalArgumentException is thrown.
* This makes sure that all data read are checksum verified.
*
@@ -223,7 +255,7 @@
* the stream has been reached.
* @throws IOException if an I/O error occurs.
*/
- private int readChecksumChunk(byte b[], int off, int len)
+ private int readChecksumChunk(byte b[], final int off, final int len)
throws IOException {
// invalidate buffer
count = pos = 0;
@@ -236,13 +268,12 @@
try {
read = readChunk(chunkPos, b, off, len, checksum);
- if( read > 0 ) {
+ if( read > 0) {
if( needChecksum() ) {
- sum.update(b, off, read);
- verifySum(chunkPos);
+ verifySums(b, off, read);
}
chunkPos += read;
- }
+ }
retry = false;
} catch (ChecksumException ce) {
LOG.info("Found checksum error: b[" + off + ", " + (off+read) + "]="
@@ -266,26 +297,38 @@
} while (retry);
return read;
}
-
- /* verify checksum for the chunk.
- * @throws ChecksumException if there is a mismatch
- */
- private void verifySum(long errPos) throws ChecksumException {
- long crc = getChecksum();
- long sumValue = sum.getValue();
- sum.reset();
- if (crc != sumValue) {
- throw new ChecksumException(
- "Checksum error: "+file+" at "+errPos, errPos);
+
+ private void verifySums(final byte b[], final int off, int read)
+ throws ChecksumException
+ {
+ int leftToVerify = read;
+ int verifyOff = 0;
+ checksumInts.rewind();
+ checksumInts.limit((read - 1)/maxChunkSize + 1);
+
+ while (leftToVerify > 0) {
+ sum.update(b, off + verifyOff, Math.min(leftToVerify, maxChunkSize));
+ int expected = checksumInts.get();
+ int calculated = (int)sum.getValue();
+ sum.reset();
+
+ if (expected != calculated) {
+ long errPos = chunkPos + verifyOff;
+ throw new ChecksumException(
+ "Checksum error: "+file+" at "+ errPos +
+ " exp: " + expected + " got: " + calculated, errPos);
+ }
+ leftToVerify -= maxChunkSize;
+ verifyOff += maxChunkSize;
}
}
-
- /* calculate checksum value */
- private long getChecksum() {
- return checksum2long(checksum);
- }
- /** Convert a checksum byte array to a long */
+ /**
+ * Convert a checksum byte array to a long
+ * This is deprecated since 0.22 since it is no longer in use
+ * by this class.
+ */
+ @Deprecated
static public long checksum2long(byte[] checksum) {
long crc = 0L;
for(int i=0; i<checksum.length; i++) {
@@ -293,7 +336,7 @@
}
return crc;
}
-
+
@Override
public synchronized long getPos() throws IOException {
return chunkPos-Math.max(0L, count - pos);
@@ -399,11 +442,19 @@
* @param checksumSize checksum size
*/
final protected synchronized void set(boolean verifyChecksum,
- Checksum sum, int maxChunkSize, int checksumSize ) {
+ Checksum sum, int maxChunkSize, int checksumSize) {
+
+ // The code makes assumptions that checksums are always 32-bit.
+ assert !verifyChecksum || sum == null || checksumSize == CHECKSUM_SIZE;
+
+ this.maxChunkSize = maxChunkSize;
this.verifyChecksum = verifyChecksum;
this.sum = sum;
this.buf = new byte[maxChunkSize];
- this.checksum = new byte[checksumSize];
+ // The size of the checksum array here determines how much we can
+ // read in a single call to readChunk
+ this.checksum = new byte[CHUNKS_PER_READ * checksumSize];
+ this.checksumInts = ByteBuffer.wrap(checksum).asIntBuffer();
this.count = 0;
this.pos = 0;
}
Modified:
hadoop/common/trunk/src/test/core/org/apache/hadoop/fs/TestChecksumFileSystem.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/src/test/core/org/apache/hadoop/fs/TestChecksumFileSystem.java?rev=896243&r1=896242&r2=896243&view=diff
==============================================================================
---
hadoop/common/trunk/src/test/core/org/apache/hadoop/fs/TestChecksumFileSystem.java
(original)
+++
hadoop/common/trunk/src/test/core/org/apache/hadoop/fs/TestChecksumFileSystem.java
Tue Jan 5 22:14:17 2010
@@ -26,6 +26,9 @@
import junit.framework.TestCase;
public class TestChecksumFileSystem extends TestCase {
+ static final String TEST_ROOT_DIR
+ = System.getProperty("test.build.data","build/test/data/work-dir/localfs");
+
public void testgetChecksumLength() throws Exception {
assertEquals(8, ChecksumFileSystem.getChecksumLength(0L, 512));
assertEquals(12, ChecksumFileSystem.getChecksumLength(1L, 512));
@@ -38,10 +41,7 @@
ChecksumFileSystem.getChecksumLength(10000000000000L, 10));
}
- public void testVerifyChecksum() throws Exception {
- String TEST_ROOT_DIR
- = System.getProperty("test.build.data","build/test/data/work-dir/localfs");
-
+ public void testVerifyChecksum() throws Exception {
Configuration conf = new Configuration();
LocalFileSystem localFs = FileSystem.getLocal(conf);
Path testPath = new Path(TEST_ROOT_DIR, "testPath");
@@ -54,9 +54,15 @@
fout.write("testing you".getBytes());
fout.close();
+ // Exercise some boundary cases - a divisor of the chunk size
+ // the chunk size, 2x chunk size, and +/-1 around these.
TestLocalFileSystem.readFile(localFs, testPath, 128);
+ TestLocalFileSystem.readFile(localFs, testPath, 511);
TestLocalFileSystem.readFile(localFs, testPath, 512);
+ TestLocalFileSystem.readFile(localFs, testPath, 513);
+ TestLocalFileSystem.readFile(localFs, testPath, 1023);
TestLocalFileSystem.readFile(localFs, testPath, 1024);
+ TestLocalFileSystem.readFile(localFs, testPath, 1025);
localFs.delete(localFs.getChecksumFile(testPath), true);
assertTrue("checksum deleted",
!localFs.exists(localFs.getChecksumFile(testPath)));
@@ -75,9 +81,80 @@
assertTrue("error reading", errorRead);
//now setting verify false, the read should succeed
- localFs.setVerifyChecksum(false);
- String str = TestLocalFileSystem.readFile(localFs, testPath, 1024);
- assertTrue("read", "testing".equals(str));
+ try {
+ localFs.setVerifyChecksum(false);
+ String str = TestLocalFileSystem.readFile(localFs, testPath, 1024);
+ assertTrue("read", "testing".equals(str));
+ } finally {
+ // reset for other tests
+ localFs.setVerifyChecksum(true);
+ }
}
+
+ public void testMultiChunkFile() throws Exception {
+ Configuration conf = new Configuration();
+ LocalFileSystem localFs = FileSystem.getLocal(conf);
+ Path testPath = new Path(TEST_ROOT_DIR, "testMultiChunk");
+ FSDataOutputStream fout = localFs.create(testPath);
+ for (int i = 0; i < 1000; i++) {
+ fout.write(("testing" + i).getBytes());
+ }
+ fout.close();
+
+ // Exercise some boundary cases - a divisor of the chunk size
+ // the chunk size, 2x chunk size, and +/-1 around these.
+ TestLocalFileSystem.readFile(localFs, testPath, 128);
+ TestLocalFileSystem.readFile(localFs, testPath, 511);
+ TestLocalFileSystem.readFile(localFs, testPath, 512);
+ TestLocalFileSystem.readFile(localFs, testPath, 513);
+ TestLocalFileSystem.readFile(localFs, testPath, 1023);
+ TestLocalFileSystem.readFile(localFs, testPath, 1024);
+ TestLocalFileSystem.readFile(localFs, testPath, 1025);
+ }
+
+ /**
+ * Test to ensure that if the checksum file is truncated, a
+ * ChecksumException is thrown
+ */
+ public void testTruncatedChecksum() throws Exception {
+ Configuration conf = new Configuration();
+ LocalFileSystem localFs = FileSystem.getLocal(conf);
+ Path testPath = new Path(TEST_ROOT_DIR, "testtruncatedcrc");
+ FSDataOutputStream fout = localFs.create(testPath);
+ fout.write("testing truncation".getBytes());
+ fout.close();
+
+ // Read in the checksum
+ Path checksumFile = localFs.getChecksumFile(testPath);
+ FileSystem rawFs = localFs.getRawFileSystem();
+ FSDataInputStream checksumStream = rawFs.open(checksumFile);
+ byte buf[] = new byte[8192];
+ int read = checksumStream.read(buf, 0, buf.length);
+ checksumStream.close();
+
+ // Now rewrite the checksum file with the last byte missing
+ FSDataOutputStream replaceStream = rawFs.create(checksumFile);
+ replaceStream.write(buf, 0, read - 1);
+ replaceStream.close();
+
+ // Now reading the file should fail with a ChecksumException
+ try {
+ TestLocalFileSystem.readFile(localFs, testPath, 1024);
+ fail("Did not throw a ChecksumException when reading truncated " +
+ "crc file");
+ } catch(ChecksumException ie) {
+ }
+
+ // telling it not to verify checksums, should avoid issue.
+ try {
+ localFs.setVerifyChecksum(false);
+ String str = TestLocalFileSystem.readFile(localFs, testPath, 1024);
+ assertTrue("read", "testing truncation".equals(str));
+ } finally {
+ // reset for other tests
+ localFs.setVerifyChecksum(true);
+ }
+
+ }
}