Author: kihwal Date: Thu Feb 6 15:51:23 2014 New Revision: 1565315 URL: http://svn.apache.org/r1565315 Log: HDFS-5881. Fix skip() of the short-circuit local reader(legacy). Contributed by Kihwal Lee.
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1565315&r1=1565314&r2=1565315&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original) +++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Thu Feb 6 15:51:23 2014 @@ -11,6 +11,7 @@ Release 0.23.11 - UNRELEASED OPTIMIZATIONS BUG FIXES + HDFS-5881. Fix skip() of the short-circuit local reader(legacy). (kihwal) Release 0.23.10 - 2013-12-09 Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java?rev=1565315&r1=1565314&r2=1565315&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java Thu Feb 6 15:51:23 2014 @@ -395,17 +395,17 @@ class BlockReaderLocal implements BlockR skipBuf = new byte[bytesPerChecksum]; } int ret = read(skipBuf, 0, (int)(n - remaining)); - return ret; + return (remaining + ret); } // optimize for big gap: discard the current buffer, skip to // the beginning of the appropriate checksum chunk and then // read to the middle of that chunk to be in sync with checksums. - this.offsetFromChunkBoundary = newPosition % bytesPerChecksum; - long toskip = n - remaining - this.offsetFromChunkBoundary; + int myOffsetFromChunkBoundary = newPosition % bytesPerChecksum; + long toskip = n - remaining - myOffsetFromChunkBoundary; - dataBuff.clear(); - checksumBuff.clear(); + dataBuff.position(dataBuff.limit()); + checksumBuff.position(checksumBuff.limit()); long dataSkipped = dataIn.skip(toskip); if (dataSkipped != toskip) { @@ -419,17 +419,30 @@ class BlockReaderLocal implements BlockR } } - // read into the middle of the chunk + // Reset this.offsetFromChunkBoundary so that the next read + // returns data from the beginning of the chunk. + this.offsetFromChunkBoundary = 0; + + // If the new position is chunk-aligned, we are done. + if (myOffsetFromChunkBoundary == 0) { + assert (toskip + remaining) == n; + return (toskip + remaining); + } + + // The new position is not chunk-aligned, so we need to skip + // myOffsetFromChunkBoundary bytes more. if (skipBuf == null) { skipBuf = new byte[bytesPerChecksum]; } assert skipBuf.length == bytesPerChecksum; - assert this.offsetFromChunkBoundary < bytesPerChecksum; - int ret = read(skipBuf, 0, this.offsetFromChunkBoundary); + assert myOffsetFromChunkBoundary < bytesPerChecksum; + + int ret = read(skipBuf, 0, myOffsetFromChunkBoundary); + if (ret == -1) { // EOS - return toskip; + return (toskip + remaining); } else { - return (toskip + ret); + return (toskip + remaining + ret); } } @@ -470,4 +483,4 @@ class BlockReaderLocal implements BlockR public boolean hasSentStatusCode() { return false; } -} \ No newline at end of file +} Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1565315&r1=1565314&r2=1565315&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java Thu Feb 6 15:51:23 2014 @@ -955,6 +955,14 @@ public class DFSInputStream extends FSIn pos += blockReader.skip(diff); if (pos == targetPos) { done = true; + } else { + // The range was already checked. If the block reader returns + // something unexpected instead of throwing an exception, it is + // most likely a bug. + String errMsg = "BlockReader failed to seek to " + + targetPos + ". Instead, it seeked to " + pos + "."; + DFSClient.LOG.warn(errMsg); + throw new IOException(errMsg); } } catch (IOException e) {//make following read to retry if(DFSClient.LOG.isDebugEnabled()) { Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java?rev=1565315&r1=1565314&r2=1565315&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java Thu Feb 6 15:51:23 2014 @@ -54,7 +54,10 @@ public class TestShortCircuitLocalRead { static final String DIR = "/" + TestShortCircuitLocalRead.class.getSimpleName() + "/"; static final long seed = 0xDEADBEEFL; - static final int blockSize = 5120; + // block size needs to be bigger than the internal buffer, so that block redaer + // can be reused. + static final int blockSize = 65536; + static final int internalDataBufSize = 512 * 64; boolean simulatedStorage = false; // creates a file but does not close it @@ -91,7 +94,12 @@ public class TestShortCircuitLocalRead { // Now read using a different API. actual = new byte[expected.length-readOffset]; stm = fs.open(name); - long skipped = stm.skip(readOffset); + long skipped = 0; + if (readOffset >= internalDataBufSize) { + // force multiple seeks across internal data buffer boundary. + skipped += stm.read(actual, 0, internalDataBufSize); + } + skipped += stm.skip(readOffset - skipped); Assert.assertEquals(skipped, readOffset); //Read a small number of bytes first. int nread = stm.read(actual, 0, 3); @@ -172,6 +180,7 @@ public class TestShortCircuitLocalRead { public void testReadFromAnOffset() throws IOException { doTestShortCircuitRead(false, 3*blockSize+100, 777); doTestShortCircuitRead(true, 3*blockSize+100, 777); + doTestShortCircuitRead(false, 3*blockSize+100, internalDataBufSize + 7245); } @Test