Repository: hadoop
Updated Branches:
  refs/heads/trunk 024c3ec4a -> d91b7a845


HADOOP-14722. Azure: BlockBlobInputStream position incorrect after seek.
Contributed by Thomas Marquardt


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d91b7a84
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d91b7a84
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d91b7a84

Branch: refs/heads/trunk
Commit: d91b7a8451489f97bdde928cea774764155cfe03
Parents: 024c3ec
Author: Steve Loughran <ste...@apache.org>
Authored: Sun Aug 6 20:19:23 2017 +0100
Committer: Steve Loughran <ste...@apache.org>
Committed: Sun Aug 6 20:19:23 2017 +0100

----------------------------------------------------------------------
 .../hadoop/fs/azure/BlockBlobInputStream.java   | 91 +++++++++++++++-----
 .../fs/azure/TestBlockBlobInputStream.java      | 85 ++++++++++++++++--
 2 files changed, 150 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d91b7a84/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobInputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobInputStream.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobInputStream.java
index 5542415..c37b2be 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobInputStream.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobInputStream.java
@@ -43,11 +43,16 @@ final class BlockBlobInputStream extends InputStream 
implements Seekable {
   private InputStream blobInputStream = null;
   private int minimumReadSizeInBytes = 0;
   private long streamPositionAfterLastRead = -1;
+  // position of next network read within stream
   private long streamPosition = 0;
+  // length of stream
   private long streamLength = 0;
   private boolean closed = false;
+  // internal buffer, re-used for performance optimization
   private byte[] streamBuffer;
+  // zero-based offset within streamBuffer of current read position
   private int streamBufferPosition;
+  // length of data written to streamBuffer, streamBuffer may be larger
   private int streamBufferLength;
 
   /**
@@ -82,6 +87,16 @@ final class BlockBlobInputStream extends InputStream 
implements Seekable {
   }
 
   /**
+   * Reset the internal stream buffer but do not release the memory.
+   * The buffer can be reused to avoid frequent memory allocations of
+   * a large buffer.
+   */
+  private void resetStreamBuffer() {
+    streamBufferPosition = 0;
+    streamBufferLength = 0;
+  }
+
+  /**
    * Gets the read position of the stream.
    * @return the zero-based byte offset of the read position.
    * @throws IOException IO failure
@@ -89,7 +104,9 @@ final class BlockBlobInputStream extends InputStream 
implements Seekable {
   @Override
   public synchronized long getPos() throws IOException {
     checkState();
-    return streamPosition;
+    return (streamBuffer != null)
+        ? streamPosition - streamBufferLength + streamBufferPosition
+        : streamPosition;
   }
 
   /**
@@ -107,21 +124,39 @@ final class BlockBlobInputStream extends InputStream 
implements Seekable {
       throw new EOFException(
           FSExceptionMessages.CANNOT_SEEK_PAST_EOF + " " + pos);
     }
-    if (pos == getPos()) {
+
+    // calculate offset between the target and current position in the stream
+    long offset = pos - getPos();
+
+    if (offset == 0) {
       // no=op, no state change
       return;
     }
 
+    if (offset > 0) {
+      // forward seek, data can be skipped as an optimization
+      if (skip(offset) != offset) {
+        throw new EOFException(FSExceptionMessages.EOF_IN_READ_FULLY);
+      }
+      return;
+    }
+
+    // reverse seek, offset is negative
     if (streamBuffer != null) {
-      long offset = streamPosition - pos;
-      if (offset > 0 && offset < streamBufferLength) {
-        streamBufferPosition = streamBufferLength - (int) offset;
+      if (streamBufferPosition + offset >= 0) {
+        // target position is inside the stream buffer,
+        // only need to move backwards within the stream buffer
+        streamBufferPosition += offset;
       } else {
-        streamBufferPosition = streamBufferLength;
+        // target position is outside the stream buffer,
+        // need to reset stream buffer and move position for next network read
+        resetStreamBuffer();
+        streamPosition = pos;
       }
+    } else {
+      streamPosition = pos;
     }
 
-    streamPosition = pos;
     // close BlobInputStream after seek is invoked because BlobInputStream
     // does not support seek
     closeBlobInputStream();
@@ -189,8 +224,7 @@ final class BlockBlobInputStream extends InputStream 
implements Seekable {
         streamBuffer = new byte[(int) Math.min(minimumReadSizeInBytes,
             streamLength)];
       }
-      streamBufferPosition = 0;
-      streamBufferLength = 0;
+      resetStreamBuffer();
       outputStream = new MemoryOutputStream(streamBuffer, streamBufferPosition,
           streamBuffer.length);
       needToCopy = true;
@@ -295,27 +329,44 @@ final class BlockBlobInputStream extends InputStream 
implements Seekable {
    * @param n the number of bytes to be skipped.
    * @return the actual number of bytes skipped.
    * @throws IOException IO failure
+   * @throws IndexOutOfBoundsException if n is negative or if the sum of n
+   * and the current value of getPos() is greater than the length of the 
stream.
    */
   @Override
   public synchronized long skip(long n) throws IOException {
     checkState();
 
     if (blobInputStream != null) {
-      return blobInputStream.skip(n);
-    } else {
-      if (n < 0 || streamPosition + n > streamLength) {
-        throw new IndexOutOfBoundsException("skip range");
-      }
+      // blobInput stream is open; delegate the work to it
+      long skipped = blobInputStream.skip(n);
+      // update position to the actual skip value
+      streamPosition += skipped;
+      return skipped;
+    }
 
-      if (streamBuffer != null) {
-        streamBufferPosition = (n < streamBufferLength - streamBufferPosition)
-            ? streamBufferPosition + (int) n
-            : streamBufferLength;
-      }
+    // no blob stream; implement the skip logic directly
+    if (n < 0 || n > streamLength - getPos()) {
+      throw new IndexOutOfBoundsException("skip range");
+    }
 
+    if (streamBuffer != null) {
+      // there's a buffer, so seek with it
+      if (n < streamBufferLength - streamBufferPosition) {
+        // new range is in the buffer, so just update the buffer position
+        // skip within the buffer.
+        streamBufferPosition += (int) n;
+      } else {
+        // skip is out of range, so move position to ne value and reset
+        // the buffer ready for the next read()
+        streamPosition = getPos() + n;
+        resetStreamBuffer();
+      }
+    } else {
+      // no stream buffer; increment the stream position ready for
+      // the next triggered connection & read
       streamPosition += n;
-      return n;
     }
+    return n;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d91b7a84/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlockBlobInputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlockBlobInputStream.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlockBlobInputStream.java
index 2453584..0ae4012 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlockBlobInputStream.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlockBlobInputStream.java
@@ -155,7 +155,7 @@ public class TestBlockBlobInputStream extends 
AbstractWasbTestBase {
     }
 
     LOG.info("Creating test file {} of size: {}", TEST_FILE_PATH,
-        TEST_FILE_SIZE );
+        TEST_FILE_SIZE);
     ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
 
     try(FSDataOutputStream outputStream = fs.create(TEST_FILE_PATH)) {
@@ -198,7 +198,7 @@ public class TestBlockBlobInputStream extends 
AbstractWasbTestBase {
   }
 
   @Test
-  public void test_0200_BasicReadTestV2() throws Exception {
+  public void test_0200_BasicReadTest() throws Exception {
     assumeHugeFileExists();
 
     try (
@@ -214,12 +214,12 @@ public class TestBlockBlobInputStream extends 
AbstractWasbTestBase {
       // v1 forward seek and read a kilobyte into first kilobyte of bufferV1
       inputStreamV1.seek(5 * MEGABYTE);
       int numBytesReadV1 = inputStreamV1.read(bufferV1, 0, KILOBYTE);
-      assertEquals(numBytesReadV1, KILOBYTE);
+      assertEquals(KILOBYTE, numBytesReadV1);
 
       // v2 forward seek and read a kilobyte into first kilobyte of bufferV2
       inputStreamV2.seek(5 * MEGABYTE);
       int numBytesReadV2 = inputStreamV2.read(bufferV2, 0, KILOBYTE);
-      assertEquals(numBytesReadV2, KILOBYTE);
+      assertEquals(KILOBYTE, numBytesReadV2);
 
       assertArrayEquals(bufferV1, bufferV2);
 
@@ -229,17 +229,90 @@ public class TestBlockBlobInputStream extends 
AbstractWasbTestBase {
       // v1 reverse seek and read a megabyte into last megabyte of bufferV1
       inputStreamV1.seek(3 * MEGABYTE);
       numBytesReadV1 = inputStreamV1.read(bufferV1, offset, len);
-      assertEquals(numBytesReadV1, len);
+      assertEquals(len, numBytesReadV1);
 
       // v2 reverse seek and read a megabyte into last megabyte of bufferV2
       inputStreamV2.seek(3 * MEGABYTE);
       numBytesReadV2 = inputStreamV2.read(bufferV2, offset, len);
-      assertEquals(numBytesReadV2, len);
+      assertEquals(len, numBytesReadV2);
 
       assertArrayEquals(bufferV1, bufferV2);
     }
   }
 
+  @Test
+  public void test_0201_RandomReadTest() throws Exception {
+    assumeHugeFileExists();
+
+    try (
+        FSDataInputStream inputStreamV1
+            = accountUsingInputStreamV1.getFileSystem().open(TEST_FILE_PATH);
+
+        FSDataInputStream inputStreamV2
+            = accountUsingInputStreamV2.getFileSystem().open(TEST_FILE_PATH);
+    ) {
+      final int bufferSize = 4 * KILOBYTE;
+      byte[] bufferV1 = new byte[bufferSize];
+      byte[] bufferV2 = new byte[bufferV1.length];
+
+      verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
+
+      inputStreamV1.seek(0);
+      inputStreamV2.seek(0);
+
+      verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
+
+      verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
+
+      int seekPosition = 2 * KILOBYTE;
+      inputStreamV1.seek(seekPosition);
+      inputStreamV2.seek(seekPosition);
+
+      inputStreamV1.seek(0);
+      inputStreamV2.seek(0);
+
+      verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
+
+      verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
+
+      verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
+
+      seekPosition = 5 * KILOBYTE;
+      inputStreamV1.seek(seekPosition);
+      inputStreamV2.seek(seekPosition);
+
+      verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
+
+      seekPosition = 10 * KILOBYTE;
+      inputStreamV1.seek(seekPosition);
+      inputStreamV2.seek(seekPosition);
+
+      verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
+
+      verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
+
+      seekPosition = 4100 * KILOBYTE;
+      inputStreamV1.seek(seekPosition);
+      inputStreamV2.seek(seekPosition);
+
+      verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
+    }
+  }
+
+  private void verifyConsistentReads(FSDataInputStream inputStreamV1,
+      FSDataInputStream inputStreamV2,
+      byte[] bufferV1,
+      byte[] bufferV2) throws IOException {
+    int size = bufferV1.length;
+    final int numBytesReadV1 = inputStreamV1.read(bufferV1, 0, size);
+    assertEquals("Bytes read from V1 stream", size, numBytesReadV1);
+
+    final int numBytesReadV2 = inputStreamV2.read(bufferV2, 0, size);
+    assertEquals("Bytes read from V2 stream", size, numBytesReadV2);
+
+    assertArrayEquals("Mismatch in read data", bufferV1, bufferV2);
+  }
+
   /**
    * Validates the implementation of InputStream.markSupported.
    * @throws IOException


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to