HADOOP-12073. Azure FileSystem PageBlobInputStream does not return -1 on EOF. 
Contributed by Ivan Mitic.

(cherry picked from commit c45784bc9031353b938f4756473937cca759b3dc)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f5b0cce7
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f5b0cce7
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f5b0cce7

Branch: refs/heads/branch-2
Commit: f5b0cce7faaee32fbb0f8f2cec233ff178f208ea
Parents: 116a720
Author: cnauroth <cnaur...@apache.org>
Authored: Mon Jun 8 22:42:14 2015 -0700
Committer: cnauroth <cnaur...@apache.org>
Committed: Mon Jun 8 22:42:24 2015 -0700

----------------------------------------------------------------------
 hadoop-common-project/hadoop-common/CHANGES.txt |  3 +
 .../fs/azure/AzureNativeFileSystemStore.java    |  2 +-
 .../hadoop/fs/azure/PageBlobInputStream.java    | 32 +++++--
 .../hadoop/fs/azure/PageBlobOutputStream.java   | 10 ++-
 .../fs/azure/NativeAzureFileSystemBaseTest.java | 79 ++++++++++++++++-
 ...tiveAzureFileSystemContractPageBlobLive.java | 90 ++++++++++++++++++++
 6 files changed, 204 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/f5b0cce7/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt 
b/hadoop-common-project/hadoop-common/CHANGES.txt
index 7bfc5fa..d16262a 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -356,6 +356,9 @@ Release 2.8.0 - UNRELEASED
     HADOOP-12054. RPC client should not retry for InvalidToken exceptions.
     (Varun Saxena via Arpit Agarwal)
 
+    HADOOP-12073. Azure FileSystem PageBlobInputStream does not return -1 on
+    EOF. (Ivan Mitic via cnauroth)
+
 Release 2.7.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f5b0cce7/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
index 69bda06..7741f17 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
@@ -2301,7 +2301,7 @@ public class AzureNativeFileSystemStore implements 
NativeFileSystemStore {
     throws AzureException {
     if (blob instanceof CloudPageBlobWrapper) {
       try {
-        return PageBlobInputStream.getPageBlobSize((CloudPageBlobWrapper) blob,
+        return PageBlobInputStream.getPageBlobDataSize((CloudPageBlobWrapper) 
blob,
             getInstrumentedContext(
                 isConcurrentOOBAppendAllowed()));
       } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f5b0cce7/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobInputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobInputStream.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobInputStream.java
index 468ac65..097201b 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobInputStream.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobInputStream.java
@@ -80,7 +80,7 @@ final class PageBlobInputStream extends InputStream {
    * @throws IOException If the format is corrupt.
    * @throws StorageException If anything goes wrong in the requests.
    */
-  public static long getPageBlobSize(CloudPageBlobWrapper blob,
+  public static long getPageBlobDataSize(CloudPageBlobWrapper blob,
       OperationContext opContext) throws IOException, StorageException {
     // Get the page ranges for the blob. There should be one range starting
     // at byte 0, but we tolerate (and ignore) ranges after the first one.
@@ -156,7 +156,7 @@ final class PageBlobInputStream extends InputStream {
     }
     if (pageBlobSize == -1) {
       try {
-        pageBlobSize = getPageBlobSize(blob, opContext);
+        pageBlobSize = getPageBlobDataSize(blob, opContext);
       } catch (StorageException e) {
         throw new IOException("Unable to get page blob size.", e);
       }
@@ -179,7 +179,13 @@ final class PageBlobInputStream extends InputStream {
 
   /**
    * Check our buffer and download more from the server if needed.
-   * @return true if there's more data in the buffer, false if we're done.
+   * If data is not available in the buffer, method downloads maximum
+   * page blob download size (4MB) or if there is less then 4MB left,
+   * all remaining pages.
+   * If we are on the last page, method will return true even if
+   * we reached the end of stream.
+   * @return true if there's more data in the buffer, false if buffer is empty
+   *         and we reached the end of the blob.
    * @throws IOException
    */
   private synchronized boolean ensureDataInBuffer() throws IOException {
@@ -257,11 +263,15 @@ final class PageBlobInputStream extends InputStream {
   @Override
   public synchronized int read(byte[] outputBuffer, int offset, int len)
       throws IOException {
+    // If len is zero return 0 per the InputStream contract
+    if (len == 0) {
+      return 0;
+    }
+
     int numberOfBytesRead = 0;
     while (len > 0) {
       if (!ensureDataInBuffer()) {
-        filePosition += numberOfBytesRead;
-        return numberOfBytesRead;
+        break;
       }
       int bytesRemainingInCurrentPage = getBytesRemainingInCurrentPage();
       int numBytesToRead = Math.min(len, bytesRemainingInCurrentPage);
@@ -277,6 +287,13 @@ final class PageBlobInputStream extends InputStream {
         currentOffsetInBuffer += numBytesToRead;
       }
     }
+
+    // if outputBuffer len is > 0 and zero bytes were read, we reached
+    // an EOF
+    if (numberOfBytesRead == 0) {
+      return -1;
+    }
+
     filePosition += numberOfBytesRead;
     return numberOfBytesRead;
   }
@@ -284,8 +301,9 @@ final class PageBlobInputStream extends InputStream {
   @Override
   public int read() throws IOException {
     byte[] oneByte = new byte[1];
-    if (read(oneByte) == 0) {
-      return -1;
+    int result = read(oneByte);
+    if (result < 0) {
+      return result;
     }
     return oneByte[0];
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f5b0cce7/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java
index 2b8846c..8689375 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java
@@ -117,6 +117,8 @@ final class PageBlobOutputStream extends OutputStream 
implements Syncable {
   // The last task given to the ioThreadPool to execute, to allow
   // waiting until it's done.
   private WriteRequest lastQueuedTask;
+  // Whether the stream has been closed.
+  private boolean closed = false;
 
   public static final Log LOG = 
LogFactory.getLog(AzureNativeFileSystemStore.class);
 
@@ -201,7 +203,11 @@ final class PageBlobOutputStream extends OutputStream 
implements Syncable {
    * service.
    */
   @Override
-  public void close() throws IOException {
+  public synchronized void close() throws IOException {
+    if (closed) {
+      return;
+    }
+
     LOG.debug("Closing page blob output stream.");
     flush();
     checkStreamState();
@@ -221,7 +227,7 @@ final class PageBlobOutputStream extends OutputStream 
implements Syncable {
       Thread.currentThread().interrupt();
     }
 
-    this.lastError = new IOException("Stream is already closed.");
+    closed = true;
   }
 
   // Log the stacks of all threads.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f5b0cce7/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemBaseTest.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemBaseTest.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemBaseTest.java
index 9ce6cc9..6989a70 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemBaseTest.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemBaseTest.java
@@ -41,7 +41,6 @@ import java.util.TimeZone;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -54,7 +53,6 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-
 import org.apache.hadoop.fs.azure.AzureException;
 import org.apache.hadoop.fs.azure.NativeAzureFileSystem.FolderRenamePending;
 
@@ -473,6 +471,83 @@ public abstract class NativeAzureFileSystemBaseTest {
   }
 
   @Test
+  public void testInputStreamReadWithZeroSizeBuffer() throws Exception {
+    Path newFile = new Path("zeroSizeRead");
+    OutputStream output = fs.create(newFile);
+    output.write(10);
+    output.close();
+
+    InputStream input = fs.open(newFile);
+    int result = input.read(new byte[2], 0, 0);
+    assertEquals(0, result);
+  }
+
+  @Test
+  public void testInputStreamReadWithBufferReturnsMinusOneOnEof() throws 
Exception {
+    Path newFile = new Path("eofRead");
+    OutputStream output = fs.create(newFile);
+    output.write(10);
+    output.close();
+
+    // Read first byte back
+    InputStream input = fs.open(newFile);
+    byte[] buff = new byte[1];
+    int result = input.read(buff, 0, 1);
+    assertEquals(1, result);
+    assertEquals(10, buff[0]);
+
+    // Issue another read and make sure it returns -1
+    buff[0] = 2;
+    result = input.read(buff, 0, 1);
+    assertEquals(-1, result);
+    // Buffer is intact
+    assertEquals(2, buff[0]);
+  }
+
+  @Test
+  public void 
testInputStreamReadWithBufferReturnsMinusOneOnEofForLargeBuffer() throws 
Exception {
+    Path newFile = new Path("eofRead2");
+    OutputStream output = fs.create(newFile);
+    byte[] outputBuff = new byte[97331];
+    for(int i = 0; i < outputBuff.length; ++i) {
+      outputBuff[i] = (byte)(Math.random() * 255);
+    }
+    output.write(outputBuff);
+    output.close();
+
+    // Read the content of the file
+    InputStream input = fs.open(newFile);
+    byte[] buff = new byte[131072];
+    int result = input.read(buff, 0, buff.length);
+    assertEquals(outputBuff.length, result);
+    for(int i = 0; i < outputBuff.length; ++i) {
+      assertEquals(outputBuff[i], buff[i]);
+    }
+
+    // Issue another read and make sure it returns -1
+    buff = new byte[131072];
+    result = input.read(buff, 0, buff.length);
+    assertEquals(-1, result);
+  }
+
+  @Test
+  public void testInputStreamReadIntReturnsMinusOneOnEof() throws Exception {
+    Path newFile = new Path("eofRead3");
+    OutputStream output = fs.create(newFile);
+    output.write(10);
+    output.close();
+
+    // Read first byte back
+    InputStream input = fs.open(newFile);
+    int value = input.read();
+    assertEquals(10, value);
+
+    // Issue another read and make sure it returns -1
+    value = input.read();
+    assertEquals(-1, value);
+  }
+
+  @Test
   public void testSetPermissionOnFile() throws Exception {
     Path newFile = new Path("testPermission");
     OutputStream output = fs.create(newFile);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f5b0cce7/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemContractPageBlobLive.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemContractPageBlobLive.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemContractPageBlobLive.java
new file mode 100644
index 0000000..3c3b782
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemContractPageBlobLive.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystemContractBaseTest;
+import org.junit.Ignore;
+
+public class TestNativeAzureFileSystemContractPageBlobLive extends
+    FileSystemContractBaseTest {
+  private AzureBlobStorageTestAccount testAccount;
+
+  private AzureBlobStorageTestAccount createTestAccount()
+      throws Exception {
+    Configuration conf = new Configuration();
+
+    // Configure the page blob directories key so every file created is a page 
blob.
+    conf.set(AzureNativeFileSystemStore.KEY_PAGE_BLOB_DIRECTORIES, "/");
+
+    // Configure the atomic rename directories key so every folder will have
+    // atomic rename applied.
+    conf.set(AzureNativeFileSystemStore.KEY_ATOMIC_RENAME_DIRECTORIES, "/");
+    return AzureBlobStorageTestAccount.create(conf);
+  }
+
+  @Override
+  protected void setUp() throws Exception {
+    testAccount = createTestAccount();
+    if (testAccount != null) {
+      fs = testAccount.getFileSystem();
+    }
+  }
+
+  @Override
+  protected void tearDown() throws Exception {
+    if (testAccount != null) {
+      testAccount.cleanup();
+      testAccount = null;
+      fs = null;
+    }
+  }
+
+  @Override
+  protected void runTest() throws Throwable {
+    if (testAccount != null) {
+      super.runTest();
+    }
+  }
+  
+  /**
+   * The following tests are failing on Azure and the Azure 
+   * file system code needs to be modified to make them pass.
+   * A separate work item has been opened for this.
+   */
+  @Ignore
+  public void testMoveFileUnderParent() throws Throwable {
+  }
+
+  @Ignore
+  public void testRenameFileToSelf() throws Throwable {
+  }
+  
+  @Ignore
+  public void testRenameChildDirForbidden() throws Exception {
+  }
+  
+  @Ignore
+  public void testMoveDirUnderParent() throws Throwable {
+  }
+  
+  @Ignore
+  public void testRenameDirToSelf() throws Throwable {
+  }
+}

Reply via email to