http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestBlobDataValidation.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestBlobDataValidation.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestBlobDataValidation.java
new file mode 100644
index 0000000..0aa9393
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestBlobDataValidation.java
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import static 
org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.KEY_CHECK_BLOCK_MD5;
+import static 
org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.KEY_STORE_BLOB_MD5;
+import static org.junit.Assume.assumeNotNull;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.util.Arrays;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import 
org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.TestHookOperationContext;
+import org.apache.hadoop.fs.azure.integration.AzureTestUtils;
+
+import org.junit.After;
+import org.junit.Test;
+
+import com.microsoft.azure.storage.Constants;
+import com.microsoft.azure.storage.OperationContext;
+import com.microsoft.azure.storage.ResponseReceivedEvent;
+import com.microsoft.azure.storage.StorageErrorCodeStrings;
+import com.microsoft.azure.storage.StorageEvent;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.BlockEntry;
+import com.microsoft.azure.storage.blob.BlockSearchMode;
+import com.microsoft.azure.storage.blob.CloudBlockBlob;
+import com.microsoft.azure.storage.core.Base64;
+
+/**
+ * Test that we do proper data integrity validation with MD5 checks as
+ * configured.
+ */
+public class ITestBlobDataValidation extends AbstractWasbTestWithTimeout {
+  private AzureBlobStorageTestAccount testAccount;
+
+  @After
+  public void tearDown() throws Exception {
+    testAccount = AzureTestUtils.cleanupTestAccount(testAccount);
+  }
+
+  /**
+   * Test that by default we don't store the blob-level MD5.
+   */
+  @Test
+  public void testBlobMd5StoreOffByDefault() throws Exception {
+    testAccount = AzureBlobStorageTestAccount.create();
+    testStoreBlobMd5(false);
+  }
+
+  /**
+   * Test that we get blob-level MD5 storage and validation if we specify that
+   * in the configuration.
+   */
+  @Test
+  public void testStoreBlobMd5() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setBoolean(KEY_STORE_BLOB_MD5, true);
+    testAccount = AzureBlobStorageTestAccount.create(conf);
+    testStoreBlobMd5(true);
+  }
+
+  /**
+   * Trims a suffix/prefix from the given string. For example if
+   * s is given as "/xy" and toTrim is "/", this method returns "xy"
+   */
+  private static String trim(String s, String toTrim) {
+    return StringUtils.removeEnd(StringUtils.removeStart(s, toTrim),
+        toTrim);
+  }
+
+  private void testStoreBlobMd5(boolean expectMd5Stored) throws Exception {
+    assumeNotNull(testAccount);
+    // Write a test file.
+    NativeAzureFileSystem fs = testAccount.getFileSystem();
+    Path testFilePath = AzureTestUtils.pathForTests(fs,
+        methodName.getMethodName());
+    String testFileKey = trim(testFilePath.toUri().getPath(), "/");
+    OutputStream outStream = fs.create(testFilePath);
+    outStream.write(new byte[] { 5, 15 });
+    outStream.close();
+
+    // Check that we stored/didn't store the MD5 field as configured.
+    CloudBlockBlob blob = testAccount.getBlobReference(testFileKey);
+    blob.downloadAttributes();
+    String obtainedMd5 = blob.getProperties().getContentMD5();
+    if (expectMd5Stored) {
+      assertNotNull(obtainedMd5);
+    } else {
+      assertNull("Expected no MD5, found: " + obtainedMd5, obtainedMd5);
+    }
+
+    // Mess with the content so it doesn't match the MD5.
+    String newBlockId = Base64.encode(new byte[] { 55, 44, 33, 22 });
+    blob.uploadBlock(newBlockId,
+        new ByteArrayInputStream(new byte[] { 6, 45 }), 2);
+    blob.commitBlockList(Arrays.asList(new BlockEntry[] { new BlockEntry(
+        newBlockId, BlockSearchMode.UNCOMMITTED) }));
+
+    // Now read back the content. If we stored the MD5 for the blob content
+    // we should get a data corruption error.
+    InputStream inStream = fs.open(testFilePath);
+    try {
+      byte[] inBuf = new byte[100];
+      while (inStream.read(inBuf) > 0){
+        //nothing;
+      }
+      inStream.close();
+      if (expectMd5Stored) {
+        fail("Should've thrown because of data corruption.");
+      }
+    } catch (IOException ex) {
+      if (!expectMd5Stored) {
+        throw ex;
+      }
+      StorageException cause = (StorageException)ex.getCause();
+      assertNotNull(cause);
+      assertEquals("Unexpected cause: " + cause,
+          StorageErrorCodeStrings.INVALID_MD5, cause.getErrorCode());
+    }
+  }
+
+  /**
+   * Test that by default we check block-level MD5.
+   */
+  @Test
+  public void testCheckBlockMd5() throws Exception {
+    testAccount = AzureBlobStorageTestAccount.create();
+    testCheckBlockMd5(true);
+  }
+
+  /**
+   * Test that we don't check block-level MD5 if we specify that in the
+   * configuration.
+   */
+  @Test
+  public void testDontCheckBlockMd5() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setBoolean(KEY_CHECK_BLOCK_MD5, false);
+    testAccount = AzureBlobStorageTestAccount.create(conf);
+    testCheckBlockMd5(false);
+  }
+
+  /**
+   * Connection inspector to check that MD5 fields for content is set/not set 
as
+   * expected.
+   */
+  private static class ContentMD5Checker extends
+      StorageEvent<ResponseReceivedEvent> {
+    private final boolean expectMd5;
+
+    public ContentMD5Checker(boolean expectMd5) {
+      this.expectMd5 = expectMd5;
+    }
+
+    @Override
+    public void eventOccurred(ResponseReceivedEvent eventArg) {
+      HttpURLConnection connection = (HttpURLConnection) eventArg
+          .getConnectionObject();
+      if (isGetRange(connection)) {
+        checkObtainedMd5(connection
+            .getHeaderField(Constants.HeaderConstants.CONTENT_MD5));
+      } else if (isPutBlock(connection)) {
+        checkObtainedMd5(connection
+            .getRequestProperty(Constants.HeaderConstants.CONTENT_MD5));
+      }
+    }
+
+    private void checkObtainedMd5(String obtainedMd5) {
+      if (expectMd5) {
+        assertNotNull(obtainedMd5);
+      } else {
+        assertNull("Expected no MD5, found: " + obtainedMd5, obtainedMd5);
+      }
+    }
+
+    private static boolean isPutBlock(HttpURLConnection connection) {
+      return connection.getRequestMethod().equals("PUT")
+          && connection.getURL().getQuery() != null
+          && connection.getURL().getQuery().contains("blockid");
+    }
+
+    private static boolean isGetRange(HttpURLConnection connection) {
+      return connection.getRequestMethod().equals("GET")
+          && connection
+              .getHeaderField(Constants.HeaderConstants.STORAGE_RANGE_HEADER) 
!= null;
+    }
+  }
+
+  private void testCheckBlockMd5(final boolean expectMd5Checked)
+      throws Exception {
+    assumeNotNull(testAccount);
+    Path testFilePath = new Path("/testFile");
+
+    // Add a hook to check that for GET/PUT requests we set/don't set
+    // the block-level MD5 field as configured. I tried to do clever
+    // testing by also messing with the raw data to see if we actually
+    // validate the data as expected, but the HttpURLConnection wasn't
+    // pluggable enough for me to do that.
+    testAccount.getFileSystem().getStore()
+    .addTestHookToOperationContext(new TestHookOperationContext() {
+    @Override
+          public OperationContext modifyOperationContext(
+              OperationContext original) {
+      original.getResponseReceivedEventHandler().addListener(
+          new ContentMD5Checker(expectMd5Checked));
+      return original;
+          }
+        });
+
+    OutputStream outStream = testAccount.getFileSystem().create(testFilePath);
+    outStream.write(new byte[] { 5, 15 });
+    outStream.close();
+
+    InputStream inStream = testAccount.getFileSystem().open(testFilePath);
+    byte[] inBuf = new byte[100];
+    while (inStream.read(inBuf) > 0){
+      //nothing;
+    }
+    inStream.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestBlobTypeSpeedDifference.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestBlobTypeSpeedDifference.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestBlobTypeSpeedDifference.java
new file mode 100644
index 0000000..b46ad5b
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestBlobTypeSpeedDifference.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Date;
+
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azure.integration.AzureTestUtils;
+import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation;
+
+
+/**
+ * A simple benchmark to find out the difference in speed between block
+ * and page blobs.
+ */
+public class ITestBlobTypeSpeedDifference extends AbstractWasbTestBase {
+
+  @Override
+  protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
+    return AzureBlobStorageTestAccount.create();
+  }
+
+  /**
+   * Writes data to the given stream of the given size, flushing every
+   * x bytes.
+   */
+  private static void writeTestFile(OutputStream writeStream,
+      long size, long flushInterval) throws IOException {
+    int bufferSize = (int) Math.min(1000, flushInterval);
+    byte[] buffer = new byte[bufferSize];
+    Arrays.fill(buffer, (byte) 7);
+    int bytesWritten = 0;
+    int bytesUnflushed = 0;
+    while (bytesWritten < size) {
+      int numberToWrite = (int) Math.min(bufferSize, size - bytesWritten);
+      writeStream.write(buffer, 0, numberToWrite);
+      bytesWritten += numberToWrite;
+      bytesUnflushed += numberToWrite;
+      if (bytesUnflushed >= flushInterval) {
+        writeStream.flush();
+        bytesUnflushed = 0;
+      }
+    }
+  }
+
+  private static class TestResult {
+    final long timeTakenInMs;
+    final long totalNumberOfRequests;
+
+    TestResult(long timeTakenInMs, long totalNumberOfRequests) {
+      this.timeTakenInMs = timeTakenInMs;
+      this.totalNumberOfRequests = totalNumberOfRequests;
+    }
+  }
+
+  /**
+   * Writes data to the given file of the given size, flushing every
+   * x bytes. Measure performance of that and return it.
+   */
+  private static TestResult writeTestFile(NativeAzureFileSystem fs, Path path,
+      long size, long flushInterval) throws IOException {
+    AzureFileSystemInstrumentation instrumentation =
+        fs.getInstrumentation();
+    long initialRequests = instrumentation.getCurrentWebResponses();
+    Date start = new Date();
+    OutputStream output = fs.create(path);
+    writeTestFile(output, size, flushInterval);
+    output.close();
+    long finalRequests = instrumentation.getCurrentWebResponses();
+    return new TestResult(new Date().getTime() - start.getTime(),
+        finalRequests - initialRequests);
+  }
+
+  /**
+   * Writes data to a block blob of the given size, flushing every
+   * x bytes. Measure performance of that and return it.
+   */
+  private static TestResult writeBlockBlobTestFile(NativeAzureFileSystem fs,
+      long size, long flushInterval) throws IOException {
+    return writeTestFile(fs, new Path("/blockBlob"), size, flushInterval);
+  }
+
+  /**
+   * Writes data to a page blob of the given size, flushing every
+   * x bytes. Measure performance of that and return it.
+   */
+  private static TestResult writePageBlobTestFile(NativeAzureFileSystem fs,
+      long size, long flushInterval) throws IOException {
+    Path testFile = AzureTestUtils.blobPathForTests(fs,
+        "writePageBlobTestFile");
+    return writeTestFile(fs,
+        testFile,
+        size, flushInterval);
+  }
+
+  /**
+   * Runs the benchmark over a small 10 KB file, flushing every 500 bytes.
+   */
+  @Test
+  public void testTenKbFileFrequentFlush() throws Exception {
+    testForSizeAndFlushInterval(getFileSystem(), 10 * 1000, 500);
+  }
+
+  /**
+   * Runs the benchmark for the given file size and flush frequency.
+   */
+  private static void testForSizeAndFlushInterval(NativeAzureFileSystem fs,
+      final long size, final long flushInterval) throws IOException {
+    for (int i = 0; i < 5; i++) {
+      TestResult pageBlobResults = writePageBlobTestFile(fs, size, 
flushInterval);
+      System.out.printf(
+          "Page blob upload took %d ms. Total number of requests: %d.\n",
+          pageBlobResults.timeTakenInMs, 
pageBlobResults.totalNumberOfRequests);
+      TestResult blockBlobResults = writeBlockBlobTestFile(fs, size, 
flushInterval);
+      System.out.printf(
+          "Block blob upload took %d ms. Total number of requests: %d.\n",
+          blockBlobResults.timeTakenInMs, 
blockBlobResults.totalNumberOfRequests);
+    }
+  }
+
+  /**
+   * Runs the benchmark for the given file size and flush frequency from the
+   * command line.
+   */
+  public static void main(String[] argv) throws Exception {
+    Configuration conf = new Configuration();
+    long size = 10 * 1000 * 1000;
+    long flushInterval = 2000;
+    if (argv.length > 0) {
+      size = Long.parseLong(argv[0]);
+    }
+    if (argv.length > 1) {
+      flushInterval = Long.parseLong(argv[1]);
+    }
+    testForSizeAndFlushInterval(
+        (NativeAzureFileSystem) FileSystem.get(conf),
+        size,
+        flushInterval);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestBlockBlobInputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestBlockBlobInputStream.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestBlockBlobInputStream.java
new file mode 100644
index 0000000..07a13df
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestBlockBlobInputStream.java
@@ -0,0 +1,874 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.io.EOFException;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.Random;
+import java.util.concurrent.Callable;
+
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runners.MethodSorters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azure.integration.AbstractAzureScaleTest;
+import org.apache.hadoop.fs.azure.integration.AzureTestUtils;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.contract.ContractTestUtils.NanoTimer;
+
+import static org.junit.Assume.assumeNotNull;
+
+import static org.apache.hadoop.test.LambdaTestUtils.*;
+
+/**
+ * Test semantics and performance of the original block blob input stream
+ * (KEY_INPUT_STREAM_VERSION=1) and the new
+ * <code>BlockBlobInputStream</code> (KEY_INPUT_STREAM_VERSION=2).
+ */
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+
+public class ITestBlockBlobInputStream extends AbstractAzureScaleTest {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      ITestBlockBlobInputStream.class);
+  private static final int KILOBYTE = 1024;
+  private static final int MEGABYTE = KILOBYTE * KILOBYTE;
+  private static final int TEST_FILE_SIZE = 6 * MEGABYTE;
+  private static final Path TEST_FILE_PATH = new Path(
+      "TestBlockBlobInputStream.txt");
+
+  private AzureBlobStorageTestAccount accountUsingInputStreamV1;
+  private AzureBlobStorageTestAccount accountUsingInputStreamV2;
+  private long testFileLength;
+
+
+
+  private FileStatus testFileStatus;
+  private Path hugefile;
+
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+    Configuration conf = new Configuration();
+    conf.setInt(AzureNativeFileSystemStore.KEY_INPUT_STREAM_VERSION, 1);
+
+    accountUsingInputStreamV1 = AzureBlobStorageTestAccount.create(
+        "testblockblobinputstream",
+        EnumSet.of(AzureBlobStorageTestAccount.CreateOptions.CreateContainer),
+        conf,
+        true);
+
+    accountUsingInputStreamV2 = AzureBlobStorageTestAccount.create(
+        "testblockblobinputstream",
+        EnumSet.noneOf(AzureBlobStorageTestAccount.CreateOptions.class),
+        null,
+        true);
+
+    assumeNotNull(accountUsingInputStreamV1);
+    assumeNotNull(accountUsingInputStreamV2);
+    hugefile = fs.makeQualified(TEST_FILE_PATH);
+    try {
+      testFileStatus = fs.getFileStatus(TEST_FILE_PATH);
+      testFileLength = testFileStatus.getLen();
+    } catch (FileNotFoundException e) {
+      // file doesn't exist
+      testFileLength = 0;
+    }
+  }
+
+  @Override
+  protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setInt(AzureNativeFileSystemStore.KEY_INPUT_STREAM_VERSION, 1);
+
+    accountUsingInputStreamV1 = AzureBlobStorageTestAccount.create(
+        "testblockblobinputstream",
+        EnumSet.of(AzureBlobStorageTestAccount.CreateOptions.CreateContainer),
+        conf,
+        true);
+
+    accountUsingInputStreamV2 = AzureBlobStorageTestAccount.create(
+        "testblockblobinputstream",
+        EnumSet.noneOf(AzureBlobStorageTestAccount.CreateOptions.class),
+        null,
+        true);
+
+    assumeNotNull(accountUsingInputStreamV1);
+    assumeNotNull(accountUsingInputStreamV2);
+    return accountUsingInputStreamV1;
+  }
+
+  /**
+   * Create a test file by repeating the characters in the alphabet.
+   * @throws IOException
+   */
+  private void createTestFileAndSetLength() throws IOException {
+    FileSystem fs = accountUsingInputStreamV1.getFileSystem();
+
+    // To reduce test run time, the test file can be reused.
+    if (fs.exists(TEST_FILE_PATH)) {
+      testFileStatus = fs.getFileStatus(TEST_FILE_PATH);
+      testFileLength = testFileStatus.getLen();
+      LOG.info("Reusing test file: {}", testFileStatus);
+      return;
+    }
+
+    int sizeOfAlphabet = ('z' - 'a' + 1);
+    byte[] buffer = new byte[26 * KILOBYTE];
+    char character = 'a';
+    for (int i = 0; i < buffer.length; i++) {
+      buffer[i] = (byte) character;
+      character = (character == 'z') ? 'a' : (char) ((int) character + 1);
+    }
+
+    LOG.info("Creating test file {} of size: {}", TEST_FILE_PATH,
+        TEST_FILE_SIZE);
+    ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
+
+    try(FSDataOutputStream outputStream = fs.create(TEST_FILE_PATH)) {
+      int bytesWritten = 0;
+      while (bytesWritten < TEST_FILE_SIZE) {
+        outputStream.write(buffer);
+        bytesWritten += buffer.length;
+      }
+      LOG.info("Closing stream {}", outputStream);
+      ContractTestUtils.NanoTimer closeTimer
+          = new ContractTestUtils.NanoTimer();
+      outputStream.close();
+      closeTimer.end("time to close() output stream");
+    }
+    timer.end("time to write %d KB", TEST_FILE_SIZE / 1024);
+    testFileLength = fs.getFileStatus(TEST_FILE_PATH).getLen();
+  }
+
+  void assumeHugeFileExists() throws IOException {
+    ContractTestUtils.assertPathExists(fs, "huge file not created", hugefile);
+    FileStatus status = fs.getFileStatus(hugefile);
+    ContractTestUtils.assertIsFile(hugefile, status);
+    assertTrue("File " + hugefile + " is empty", status.getLen() > 0);
+  }
+
+  /**
+   * Calculate megabits per second from the specified values for bytes and
+   * milliseconds.
+   * @param bytes The number of bytes.
+   * @param milliseconds The number of milliseconds.
+   * @return The number of megabits per second.
+   */
+  private static double toMbps(long bytes, long milliseconds) {
+    return bytes / 1000.0 * 8 / milliseconds;
+  }
+
+  @Test
+  public void test_0100_CreateHugeFile() throws IOException {
+    createTestFileAndSetLength();
+  }
+
+  @Test
+  public void test_0200_BasicReadTest() throws Exception {
+    assumeHugeFileExists();
+
+    try (
+        FSDataInputStream inputStreamV1
+            = accountUsingInputStreamV1.getFileSystem().open(TEST_FILE_PATH);
+
+        FSDataInputStream inputStreamV2
+            = accountUsingInputStreamV2.getFileSystem().open(TEST_FILE_PATH);
+    ) {
+      byte[] bufferV1 = new byte[3 * MEGABYTE];
+      byte[] bufferV2 = new byte[bufferV1.length];
+
+      // v1 forward seek and read a kilobyte into first kilobyte of bufferV1
+      inputStreamV1.seek(5 * MEGABYTE);
+      int numBytesReadV1 = inputStreamV1.read(bufferV1, 0, KILOBYTE);
+      assertEquals(KILOBYTE, numBytesReadV1);
+
+      // v2 forward seek and read a kilobyte into first kilobyte of bufferV2
+      inputStreamV2.seek(5 * MEGABYTE);
+      int numBytesReadV2 = inputStreamV2.read(bufferV2, 0, KILOBYTE);
+      assertEquals(KILOBYTE, numBytesReadV2);
+
+      assertArrayEquals(bufferV1, bufferV2);
+
+      int len = MEGABYTE;
+      int offset = bufferV1.length - len;
+
+      // v1 reverse seek and read a megabyte into last megabyte of bufferV1
+      inputStreamV1.seek(3 * MEGABYTE);
+      numBytesReadV1 = inputStreamV1.read(bufferV1, offset, len);
+      assertEquals(len, numBytesReadV1);
+
+      // v2 reverse seek and read a megabyte into last megabyte of bufferV2
+      inputStreamV2.seek(3 * MEGABYTE);
+      numBytesReadV2 = inputStreamV2.read(bufferV2, offset, len);
+      assertEquals(len, numBytesReadV2);
+
+      assertArrayEquals(bufferV1, bufferV2);
+    }
+  }
+
+  @Test
+  public void test_0201_RandomReadTest() throws Exception {
+    assumeHugeFileExists();
+
+    try (
+        FSDataInputStream inputStreamV1
+            = accountUsingInputStreamV1.getFileSystem().open(TEST_FILE_PATH);
+
+        FSDataInputStream inputStreamV2
+            = accountUsingInputStreamV2.getFileSystem().open(TEST_FILE_PATH);
+    ) {
+      final int bufferSize = 4 * KILOBYTE;
+      byte[] bufferV1 = new byte[bufferSize];
+      byte[] bufferV2 = new byte[bufferV1.length];
+
+      verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
+
+      inputStreamV1.seek(0);
+      inputStreamV2.seek(0);
+
+      verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
+
+      verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
+
+      int seekPosition = 2 * KILOBYTE;
+      inputStreamV1.seek(seekPosition);
+      inputStreamV2.seek(seekPosition);
+
+      inputStreamV1.seek(0);
+      inputStreamV2.seek(0);
+
+      verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
+
+      verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
+
+      verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
+
+      seekPosition = 5 * KILOBYTE;
+      inputStreamV1.seek(seekPosition);
+      inputStreamV2.seek(seekPosition);
+
+      verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
+
+      seekPosition = 10 * KILOBYTE;
+      inputStreamV1.seek(seekPosition);
+      inputStreamV2.seek(seekPosition);
+
+      verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
+
+      verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
+
+      seekPosition = 4100 * KILOBYTE;
+      inputStreamV1.seek(seekPosition);
+      inputStreamV2.seek(seekPosition);
+
+      verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
+    }
+  }
+
+  private void verifyConsistentReads(FSDataInputStream inputStreamV1,
+      FSDataInputStream inputStreamV2,
+      byte[] bufferV1,
+      byte[] bufferV2) throws IOException {
+    int size = bufferV1.length;
+    final int numBytesReadV1 = inputStreamV1.read(bufferV1, 0, size);
+    assertEquals("Bytes read from V1 stream", size, numBytesReadV1);
+
+    final int numBytesReadV2 = inputStreamV2.read(bufferV2, 0, size);
+    assertEquals("Bytes read from V2 stream", size, numBytesReadV2);
+
+    assertArrayEquals("Mismatch in read data", bufferV1, bufferV2);
+  }
+
+  /**
+   * Validates the implementation of InputStream.markSupported.
+   * @throws IOException
+   */
+  @Test
+  public void test_0301_MarkSupportedV1() throws IOException {
+    validateMarkSupported(accountUsingInputStreamV1.getFileSystem());
+  }
+
+  /**
+   * Validates the implementation of InputStream.markSupported.
+   * @throws IOException
+   */
+  @Test
+  public void test_0302_MarkSupportedV2() throws IOException {
+    validateMarkSupported(accountUsingInputStreamV1.getFileSystem());
+  }
+
+  private void validateMarkSupported(FileSystem fs) throws IOException {
+    assumeHugeFileExists();
+    try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) {
+      assertTrue("mark is not supported", inputStream.markSupported());
+    }
+  }
+
+  /**
+   * Validates the implementation of InputStream.mark and reset
+   * for version 1 of the block blob input stream.
+   * @throws Exception
+   */
+  @Test
+  public void test_0303_MarkAndResetV1() throws Exception {
+    validateMarkAndReset(accountUsingInputStreamV1.getFileSystem());
+  }
+
+  /**
+   * Validates the implementation of InputStream.mark and reset
+   * for version 2 of the block blob input stream.
+   * @throws Exception
+   */
+  @Test
+  public void test_0304_MarkAndResetV2() throws Exception {
+    validateMarkAndReset(accountUsingInputStreamV2.getFileSystem());
+  }
+
+  private void validateMarkAndReset(FileSystem fs) throws Exception {
+    assumeHugeFileExists();
+    try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) {
+      inputStream.mark(KILOBYTE - 1);
+
+      byte[] buffer = new byte[KILOBYTE];
+      int bytesRead = inputStream.read(buffer);
+      assertEquals(buffer.length, bytesRead);
+
+      inputStream.reset();
+      assertEquals("rest -> pos 0", 0, inputStream.getPos());
+
+      inputStream.mark(8 * KILOBYTE - 1);
+
+      buffer = new byte[8 * KILOBYTE];
+      bytesRead = inputStream.read(buffer);
+      assertEquals(buffer.length, bytesRead);
+
+      intercept(IOException.class,
+          "Resetting to invalid mark",
+          new Callable<FSDataInputStream>() {
+            @Override
+            public FSDataInputStream call() throws Exception {
+              inputStream.reset();
+              return inputStream;
+            }
+          }
+      );
+    }
+  }
+
+  /**
+   * Validates the implementation of Seekable.seekToNewSource, which should
+   * return false for version 1 of the block blob input stream.
+   * @throws IOException
+   */
+  @Test
+  public void test_0305_SeekToNewSourceV1() throws IOException {
+    validateSeekToNewSource(accountUsingInputStreamV1.getFileSystem());
+  }
+
+  /**
+   * Validates the implementation of Seekable.seekToNewSource, which should
+   * return false for version 2 of the block blob input stream.
+   * @throws IOException
+   */
+  @Test
+  public void test_0306_SeekToNewSourceV2() throws IOException {
+    validateSeekToNewSource(accountUsingInputStreamV2.getFileSystem());
+  }
+
+  private void validateSeekToNewSource(FileSystem fs) throws IOException {
+    assumeHugeFileExists();
+    try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) {
+      assertFalse(inputStream.seekToNewSource(0));
+    }
+  }
+
+  /**
+   * Validates the implementation of InputStream.skip and ensures there is no
+   * network I/O for version 1 of the block blob input stream.
+   * @throws Exception
+   */
+  @Test
+  public void test_0307_SkipBoundsV1() throws Exception {
+    validateSkipBounds(accountUsingInputStreamV1.getFileSystem());
+  }
+
+  /**
+   * Validates the implementation of InputStream.skip and ensures there is no
+   * network I/O for version 2 of the block blob input stream.
+   * @throws Exception
+   */
+  @Test
+  public void test_0308_SkipBoundsV2() throws Exception {
+    validateSkipBounds(accountUsingInputStreamV2.getFileSystem());
+  }
+
+  private void validateSkipBounds(FileSystem fs) throws Exception {
+    assumeHugeFileExists();
+    try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) {
+      NanoTimer timer = new NanoTimer();
+
+      long skipped = inputStream.skip(-1);
+      assertEquals(0, skipped);
+
+      skipped = inputStream.skip(0);
+      assertEquals(0, skipped);
+
+      assertTrue(testFileLength > 0);
+
+      skipped = inputStream.skip(testFileLength);
+      assertEquals(testFileLength, skipped);
+
+      intercept(EOFException.class,
+          new Callable<Long>() {
+            @Override
+            public Long call() throws Exception {
+              return inputStream.skip(1);
+            }
+          }
+      );
+      long elapsedTimeMs = timer.elapsedTimeMs();
+      assertTrue(
+          String.format(
+              "There should not be any network I/O (elapsedTimeMs=%1$d).",
+              elapsedTimeMs),
+          elapsedTimeMs < 20);
+    }
+  }
+
+  /**
+   * Validates the implementation of Seekable.seek and ensures there is no
+   * network I/O for forward seek.
+   * @throws Exception
+   */
+  @Test
+  public void test_0309_SeekBoundsV1() throws Exception {
+    validateSeekBounds(accountUsingInputStreamV1.getFileSystem());
+  }
+
+  /**
+   * Validates the implementation of Seekable.seek and ensures there is no
+   * network I/O for forward seek.
+   * @throws Exception
+   */
+  @Test
+  public void test_0310_SeekBoundsV2() throws Exception {
+    validateSeekBounds(accountUsingInputStreamV2.getFileSystem());
+  }
+
+  private void validateSeekBounds(FileSystem fs) throws Exception {
+    assumeHugeFileExists();
+    try (
+        FSDataInputStream inputStream = fs.open(TEST_FILE_PATH);
+    ) {
+      NanoTimer timer = new NanoTimer();
+
+      inputStream.seek(0);
+      assertEquals(0, inputStream.getPos());
+
+      intercept(EOFException.class,
+          FSExceptionMessages.NEGATIVE_SEEK,
+          new Callable<FSDataInputStream>() {
+            @Override
+            public FSDataInputStream call() throws Exception {
+              inputStream.seek(-1);
+              return inputStream;
+            }
+          }
+      );
+
+      assertTrue("Test file length only " + testFileLength, testFileLength > 
0);
+      inputStream.seek(testFileLength);
+      assertEquals(testFileLength, inputStream.getPos());
+
+      intercept(EOFException.class,
+          FSExceptionMessages.CANNOT_SEEK_PAST_EOF,
+          new Callable<FSDataInputStream>() {
+            @Override
+            public FSDataInputStream call() throws Exception {
+              inputStream.seek(testFileLength + 1);
+              return inputStream;
+            }
+          }
+      );
+
+      long elapsedTimeMs = timer.elapsedTimeMs();
+      assertTrue(
+          String.format(
+              "There should not be any network I/O (elapsedTimeMs=%1$d).",
+              elapsedTimeMs),
+          elapsedTimeMs < 20);
+    }
+  }
+
+  /**
+   * Validates the implementation of Seekable.seek, Seekable.getPos,
+   * and InputStream.available.
+   * @throws Exception
+   */
+  @Test
+  public void test_0311_SeekAndAvailableAndPositionV1() throws Exception {
+    validateSeekAndAvailableAndPosition(
+        accountUsingInputStreamV1.getFileSystem());
+  }
+
+  /**
+   * Validates the implementation of Seekable.seek, Seekable.getPos,
+   * and InputStream.available.
+   * @throws Exception
+   */
+  @Test
+  public void test_0312_SeekAndAvailableAndPositionV2() throws Exception {
+    validateSeekAndAvailableAndPosition(
+        accountUsingInputStreamV2.getFileSystem());
+  }
+
+  private void validateSeekAndAvailableAndPosition(FileSystem fs)
+      throws Exception {
+    assumeHugeFileExists();
+    try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) {
+      byte[] expected1 = {(byte) 'a', (byte) 'b', (byte) 'c'};
+      byte[] expected2 = {(byte) 'd', (byte) 'e', (byte) 'f'};
+      byte[] expected3 = {(byte) 'b', (byte) 'c', (byte) 'd'};
+      byte[] expected4 = {(byte) 'g', (byte) 'h', (byte) 'i'};
+      byte[] buffer = new byte[3];
+
+      int bytesRead = inputStream.read(buffer);
+      assertEquals(buffer.length, bytesRead);
+      assertArrayEquals(expected1, buffer);
+      assertEquals(buffer.length, inputStream.getPos());
+      assertEquals(testFileLength - inputStream.getPos(),
+          inputStream.available());
+
+      bytesRead = inputStream.read(buffer);
+      assertEquals(buffer.length, bytesRead);
+      assertArrayEquals(expected2, buffer);
+      assertEquals(2 * buffer.length, inputStream.getPos());
+      assertEquals(testFileLength - inputStream.getPos(),
+          inputStream.available());
+
+      // reverse seek
+      int seekPos = 0;
+      inputStream.seek(seekPos);
+
+      bytesRead = inputStream.read(buffer);
+      assertEquals(buffer.length, bytesRead);
+      assertArrayEquals(expected1, buffer);
+      assertEquals(buffer.length + seekPos, inputStream.getPos());
+      assertEquals(testFileLength - inputStream.getPos(),
+          inputStream.available());
+
+      // reverse seek
+      seekPos = 1;
+      inputStream.seek(seekPos);
+
+      bytesRead = inputStream.read(buffer);
+      assertEquals(buffer.length, bytesRead);
+      assertArrayEquals(expected3, buffer);
+      assertEquals(buffer.length + seekPos, inputStream.getPos());
+      assertEquals(testFileLength - inputStream.getPos(),
+          inputStream.available());
+
+      // forward seek
+      seekPos = 6;
+      inputStream.seek(seekPos);
+
+      bytesRead = inputStream.read(buffer);
+      assertEquals(buffer.length, bytesRead);
+      assertArrayEquals(expected4, buffer);
+      assertEquals(buffer.length + seekPos, inputStream.getPos());
+      assertEquals(testFileLength - inputStream.getPos(),
+          inputStream.available());
+    }
+  }
+
+  /**
+   * Validates the implementation of InputStream.skip, Seekable.getPos,
+   * and InputStream.available.
+   * @throws IOException
+   */
+  @Test
+  public void test_0313_SkipAndAvailableAndPositionV1() throws IOException {
+    validateSkipAndAvailableAndPosition(
+        accountUsingInputStreamV1.getFileSystem());
+  }
+
+  /**
+   * Validates the implementation of InputStream.skip, Seekable.getPos,
+   * and InputStream.available.
+   * @throws IOException
+   */
+  @Test
+  public void test_0314_SkipAndAvailableAndPositionV2() throws IOException {
+    validateSkipAndAvailableAndPosition(
+        accountUsingInputStreamV1.getFileSystem());
+  }
+
+  private void validateSkipAndAvailableAndPosition(FileSystem fs)
+      throws IOException {
+    assumeHugeFileExists();
+    try (
+        FSDataInputStream inputStream = fs.open(TEST_FILE_PATH);
+    ) {
+      byte[] expected1 = {(byte) 'a', (byte) 'b', (byte) 'c'};
+      byte[] expected2 = {(byte) 'd', (byte) 'e', (byte) 'f'};
+      byte[] expected3 = {(byte) 'b', (byte) 'c', (byte) 'd'};
+      byte[] expected4 = {(byte) 'g', (byte) 'h', (byte) 'i'};
+
+      assertEquals(testFileLength, inputStream.available());
+      assertEquals(0, inputStream.getPos());
+
+      int n = 3;
+      long skipped = inputStream.skip(n);
+
+      assertEquals(skipped, inputStream.getPos());
+      assertEquals(testFileLength - inputStream.getPos(),
+          inputStream.available());
+      assertEquals(skipped, n);
+
+      byte[] buffer = new byte[3];
+      int bytesRead = inputStream.read(buffer);
+      assertEquals(buffer.length, bytesRead);
+      assertArrayEquals(expected2, buffer);
+      assertEquals(buffer.length + skipped, inputStream.getPos());
+      assertEquals(testFileLength - inputStream.getPos(),
+          inputStream.available());
+
+      // does skip still work after seek?
+      int seekPos = 1;
+      inputStream.seek(seekPos);
+
+      bytesRead = inputStream.read(buffer);
+      assertEquals(buffer.length, bytesRead);
+      assertArrayEquals(expected3, buffer);
+      assertEquals(buffer.length + seekPos, inputStream.getPos());
+      assertEquals(testFileLength - inputStream.getPos(),
+          inputStream.available());
+
+      long currentPosition = inputStream.getPos();
+      n = 2;
+      skipped = inputStream.skip(n);
+
+      assertEquals(currentPosition + skipped, inputStream.getPos());
+      assertEquals(testFileLength - inputStream.getPos(),
+          inputStream.available());
+      assertEquals(skipped, n);
+
+      bytesRead = inputStream.read(buffer);
+      assertEquals(buffer.length, bytesRead);
+      assertArrayEquals(expected4, buffer);
+      assertEquals(buffer.length + skipped + currentPosition,
+          inputStream.getPos());
+      assertEquals(testFileLength - inputStream.getPos(),
+          inputStream.available());
+    }
+  }
+
+  /**
+   * Ensures parity in the performance of sequential read for
+   * version 1 and version 2 of the block blob input stream.
+   * @throws IOException
+   */
+  @Test
+  public void test_0315_SequentialReadPerformance() throws IOException {
+    assumeHugeFileExists();
+    final int maxAttempts = 10;
+    final double maxAcceptableRatio = 1.01;
+    double v1ElapsedMs = 0, v2ElapsedMs = 0;
+    double ratio = Double.MAX_VALUE;
+    for (int i = 0; i < maxAttempts && ratio >= maxAcceptableRatio; i++) {
+      v1ElapsedMs = sequentialRead(1,
+          accountUsingInputStreamV1.getFileSystem(), false);
+      v2ElapsedMs = sequentialRead(2,
+          accountUsingInputStreamV2.getFileSystem(), false);
+      ratio = v2ElapsedMs / v1ElapsedMs;
+      LOG.info(String.format(
+          "v1ElapsedMs=%1$d, v2ElapsedMs=%2$d, ratio=%3$.2f",
+          (long) v1ElapsedMs,
+          (long) v2ElapsedMs,
+          ratio));
+    }
+    assertTrue(String.format(
+        "Performance of version 2 is not acceptable: v1ElapsedMs=%1$d,"
+            + " v2ElapsedMs=%2$d, ratio=%3$.2f",
+        (long) v1ElapsedMs,
+        (long) v2ElapsedMs,
+        ratio),
+        ratio < maxAcceptableRatio);
+  }
+
+  /**
+   * Ensures parity in the performance of sequential read after reverse seek 
for
+   * version 2 of the block blob input stream.
+   * @throws IOException
+   */
+  @Test
+  public void test_0316_SequentialReadAfterReverseSeekPerformanceV2()
+      throws IOException {
+    assumeHugeFileExists();
+    final int maxAttempts = 10;
+    final double maxAcceptableRatio = 1.01;
+    double beforeSeekElapsedMs = 0, afterSeekElapsedMs = 0;
+    double ratio = Double.MAX_VALUE;
+    for (int i = 0; i < maxAttempts && ratio >= maxAcceptableRatio; i++) {
+      beforeSeekElapsedMs = sequentialRead(2,
+          accountUsingInputStreamV2.getFileSystem(), false);
+      afterSeekElapsedMs = sequentialRead(2,
+          accountUsingInputStreamV2.getFileSystem(), true);
+      ratio = afterSeekElapsedMs / beforeSeekElapsedMs;
+      LOG.info(String.format(
+          "beforeSeekElapsedMs=%1$d, afterSeekElapsedMs=%2$d, ratio=%3$.2f",
+          (long) beforeSeekElapsedMs,
+          (long) afterSeekElapsedMs,
+          ratio));
+    }
+    assertTrue(String.format(
+        "Performance of version 2 after reverse seek is not acceptable:"
+            + " beforeSeekElapsedMs=%1$d, afterSeekElapsedMs=%2$d,"
+            + " ratio=%3$.2f",
+        (long) beforeSeekElapsedMs,
+        (long) afterSeekElapsedMs,
+        ratio),
+        ratio < maxAcceptableRatio);
+  }
+
+  private long sequentialRead(int version,
+      FileSystem fs,
+      boolean afterReverseSeek) throws IOException {
+    byte[] buffer = new byte[16 * KILOBYTE];
+    long totalBytesRead = 0;
+    long bytesRead = 0;
+
+    try(FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) {
+      if (afterReverseSeek) {
+        while (bytesRead > 0 && totalBytesRead < 4 * MEGABYTE) {
+          bytesRead = inputStream.read(buffer);
+          totalBytesRead += bytesRead;
+        }
+        totalBytesRead = 0;
+        inputStream.seek(0);
+      }
+
+      NanoTimer timer = new NanoTimer();
+      while ((bytesRead = inputStream.read(buffer)) > 0) {
+        totalBytesRead += bytesRead;
+      }
+      long elapsedTimeMs = timer.elapsedTimeMs();
+
+      LOG.info(String.format(
+          "v%1$d: bytesRead=%2$d, elapsedMs=%3$d, Mbps=%4$.2f,"
+              + " afterReverseSeek=%5$s",
+          version,
+          totalBytesRead,
+          elapsedTimeMs,
+          toMbps(totalBytesRead, elapsedTimeMs),
+          afterReverseSeek));
+
+      assertEquals(testFileLength, totalBytesRead);
+      inputStream.close();
+      return elapsedTimeMs;
+    }
+  }
+
+  @Test
+  public void test_0317_RandomReadPerformance() throws IOException {
+    assumeHugeFileExists();
+    final int maxAttempts = 10;
+    final double maxAcceptableRatio = 0.10;
+    double v1ElapsedMs = 0, v2ElapsedMs = 0;
+    double ratio = Double.MAX_VALUE;
+    for (int i = 0; i < maxAttempts && ratio >= maxAcceptableRatio; i++) {
+      v1ElapsedMs = randomRead(1,
+          accountUsingInputStreamV1.getFileSystem());
+      v2ElapsedMs = randomRead(2,
+          accountUsingInputStreamV2.getFileSystem());
+      ratio = v2ElapsedMs / v1ElapsedMs;
+      LOG.info(String.format(
+          "v1ElapsedMs=%1$d, v2ElapsedMs=%2$d, ratio=%3$.2f",
+          (long) v1ElapsedMs,
+          (long) v2ElapsedMs,
+          ratio));
+    }
+    assertTrue(String.format(
+        "Performance of version 2 is not acceptable: v1ElapsedMs=%1$d,"
+            + " v2ElapsedMs=%2$d, ratio=%3$.2f",
+        (long) v1ElapsedMs,
+        (long) v2ElapsedMs,
+        ratio),
+        ratio < maxAcceptableRatio);
+  }
+
+  private long randomRead(int version, FileSystem fs) throws IOException {
+    assumeHugeFileExists();
+    final int minBytesToRead = 2 * MEGABYTE;
+    Random random = new Random();
+    byte[] buffer = new byte[8 * KILOBYTE];
+    long totalBytesRead = 0;
+    long bytesRead = 0;
+    try(FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) {
+      NanoTimer timer = new NanoTimer();
+
+      do {
+        bytesRead = inputStream.read(buffer);
+        totalBytesRead += bytesRead;
+        inputStream.seek(random.nextInt(
+            (int) (testFileLength - buffer.length)));
+      } while (bytesRead > 0 && totalBytesRead < minBytesToRead);
+
+      long elapsedTimeMs = timer.elapsedTimeMs();
+
+      inputStream.close();
+
+      LOG.info(String.format(
+          "v%1$d: totalBytesRead=%2$d, elapsedTimeMs=%3$d, Mbps=%4$.2f",
+          version,
+          totalBytesRead,
+          elapsedTimeMs,
+          toMbps(totalBytesRead, elapsedTimeMs)));
+
+      assertTrue(minBytesToRead <= totalBytesRead);
+
+      return elapsedTimeMs;
+    }
+  }
+
+  @Test
+  public void test_999_DeleteHugeFiles() throws IOException {
+    try {
+      NanoTimer timer = new NanoTimer();
+      NativeAzureFileSystem fs = getFileSystem();
+      fs.delete(TEST_FILE_PATH, false);
+      timer.end("time to delete %s", TEST_FILE_PATH);
+    } finally {
+      // clean up the test account
+      AzureTestUtils.cleanupTestAccount(accountUsingInputStreamV1);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestContainerChecks.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestContainerChecks.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestContainerChecks.java
new file mode 100644
index 0000000..cc3baf5
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestContainerChecks.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import static org.junit.Assume.assumeNotNull;
+
+import java.io.FileNotFoundException;
+import java.util.EnumSet;
+import java.util.concurrent.Callable;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azure.AzureBlobStorageTestAccount.CreateOptions;
+import org.apache.hadoop.fs.azure.integration.AzureTestUtils;
+import org.apache.hadoop.test.LambdaTestUtils;
+
+import org.junit.After;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.microsoft.azure.storage.blob.BlobOutputStream;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import com.microsoft.azure.storage.blob.CloudBlockBlob;
+
+/**
+ * Tests that WASB creates containers only if needed.
+ */
+public class ITestContainerChecks extends AbstractWasbTestWithTimeout {
+  private AzureBlobStorageTestAccount testAccount;
+  private boolean runningInSASMode = false;
+
+  @After
+  public void tearDown() throws Exception {
+    testAccount = AzureTestUtils.cleanup(testAccount);
+  }
+
+  @Before
+  public void setMode() {
+    runningInSASMode = AzureBlobStorageTestAccount.createTestConfiguration().
+        getBoolean(AzureNativeFileSystemStore.KEY_USE_SECURE_MODE, false);
+  }
+
+  @Test
+  public void testContainerExistAfterDoesNotExist() throws Exception {
+    testAccount = blobStorageTestAccount();
+    assumeNotNull(testAccount);
+    CloudBlobContainer container = testAccount.getRealContainer();
+    FileSystem fs = testAccount.getFileSystem();
+
+    // Starting off with the container not there
+    assertFalse(container.exists());
+
+    // A list shouldn't create the container and will set file system store
+    // state to DoesNotExist
+    try {
+      fs.listStatus(new Path("/"));
+      assertTrue("Should've thrown.", false);
+    } catch (FileNotFoundException ex) {
+      assertTrue("Unexpected exception: " + ex,
+          ex.getMessage().contains("does not exist."));
+    }
+    assertFalse(container.exists());
+
+    // Create a container outside of the WASB FileSystem
+    container.create();
+    // Add a file to the container outside of the WASB FileSystem
+    CloudBlockBlob blob = testAccount.getBlobReference("foo");
+    BlobOutputStream outputStream = blob.openOutputStream();
+    outputStream.write(new byte[10]);
+    outputStream.close();
+
+    // Make sure the file is visible
+    assertTrue(fs.exists(new Path("/foo")));
+    assertTrue(container.exists());
+  }
+
+  protected AzureBlobStorageTestAccount blobStorageTestAccount()
+      throws Exception {
+    return AzureBlobStorageTestAccount.create("",
+        EnumSet.noneOf(CreateOptions.class));
+  }
+
+  @Test
+  public void testContainerCreateAfterDoesNotExist() throws Exception {
+    testAccount = blobStorageTestAccount();
+    assumeNotNull(testAccount);
+    CloudBlobContainer container = testAccount.getRealContainer();
+    FileSystem fs = testAccount.getFileSystem();
+
+    // Starting off with the container not there
+    assertFalse(container.exists());
+
+    // A list shouldn't create the container and will set file system store
+    // state to DoesNotExist
+    try {
+      assertNull(fs.listStatus(new Path("/")));
+      assertTrue("Should've thrown.", false);
+    } catch (FileNotFoundException ex) {
+      assertTrue("Unexpected exception: " + ex,
+          ex.getMessage().contains("does not exist."));
+    }
+    assertFalse(container.exists());
+
+    // Create a container outside of the WASB FileSystem
+    container.create();
+
+    // Write should succeed
+    assertTrue(fs.createNewFile(new Path("/foo")));
+    assertTrue(container.exists());
+  }
+
+  @Test
+  public void testContainerCreateOnWrite() throws Exception {
+    testAccount = blobStorageTestAccount();
+    assumeNotNull(testAccount);
+    CloudBlobContainer container = testAccount.getRealContainer();
+    FileSystem fs = testAccount.getFileSystem();
+
+    // Starting off with the container not there
+    assertFalse(container.exists());
+
+    // A list shouldn't create the container.
+    try {
+      fs.listStatus(new Path("/"));
+      assertTrue("Should've thrown.", false);
+    } catch (FileNotFoundException ex) {
+      assertTrue("Unexpected exception: " + ex,
+          ex.getMessage().contains("does not exist."));
+    }
+    assertFalse(container.exists());
+
+    // Neither should a read.
+    Path foo = new Path("/testContainerCreateOnWrite-foo");
+    Path bar = new Path("/testContainerCreateOnWrite-bar");
+    LambdaTestUtils.intercept(FileNotFoundException.class,
+        new Callable<String>() {
+          @Override
+          public String call() throws Exception {
+            fs.open(foo).close();
+            return "Stream to " + foo;
+          }
+        }
+    );
+    assertFalse(container.exists());
+
+    // Neither should a rename
+    assertFalse(fs.rename(foo, bar));
+    assertFalse(container.exists());
+
+    // But a write should.
+    assertTrue(fs.createNewFile(foo));
+    assertTrue(container.exists());
+  }
+
+  @Test
+  public void testContainerChecksWithSas() throws Exception {
+
+    Assume.assumeFalse(runningInSASMode);
+    testAccount = AzureBlobStorageTestAccount.create("",
+        EnumSet.of(CreateOptions.UseSas));
+    assumeNotNull(testAccount);
+    CloudBlobContainer container = testAccount.getRealContainer();
+    FileSystem fs = testAccount.getFileSystem();
+
+    // The container shouldn't be there
+    assertFalse(container.exists());
+
+    // A write should just fail
+    try {
+      fs.createNewFile(new Path("/testContainerChecksWithSas-foo"));
+      assertFalse("Should've thrown.", true);
+    } catch (AzureException ex) {
+    }
+    assertFalse(container.exists());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestFileSystemOperationExceptionHandling.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestFileSystemOperationExceptionHandling.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestFileSystemOperationExceptionHandling.java
new file mode 100644
index 0000000..a45dae4
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestFileSystemOperationExceptionHandling.java
@@ -0,0 +1,283 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.io.FileNotFoundException;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.junit.After;
+import org.junit.Test;
+
+import static org.apache.hadoop.fs.azure.ExceptionHandlingTestHelper.*;
+
+/**
+ * Single threaded exception handling.
+ */
+public class ITestFileSystemOperationExceptionHandling
+    extends AbstractWasbTestBase {
+
+  private FSDataInputStream inputStream = null;
+
+  private Path testPath;
+  private Path testFolderPath;
+
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+    testPath = path("testfile.dat");
+    testFolderPath = path("testfolder");
+  }
+
+  /**
+   * Helper method that creates a InputStream to validate exceptions
+   * for various scenarios.
+   */
+  private void setupInputStreamToTest(AzureBlobStorageTestAccount testAccount)
+      throws Exception {
+
+    FileSystem fs = testAccount.getFileSystem();
+
+    // Step 1: Create a file and write dummy data.
+    Path base = methodPath();
+    Path testFilePath1 = new Path(base, "test1.dat");
+    Path testFilePath2 = new Path(base, "test2.dat");
+    FSDataOutputStream outputStream = fs.create(testFilePath1);
+    String testString = "This is a test string";
+    outputStream.write(testString.getBytes());
+    outputStream.close();
+
+    // Step 2: Open a read stream on the file.
+    inputStream = fs.open(testFilePath1);
+
+    // Step 3: Rename the file
+    fs.rename(testFilePath1, testFilePath2);
+  }
+
+  /**
+   * Tests a basic single threaded read scenario for Page blobs.
+   */
+  @Test(expected=FileNotFoundException.class)
+  public void testSingleThreadedPageBlobReadScenario() throws Throwable {
+    AzureBlobStorageTestAccount testAccount = getPageBlobTestStorageAccount();
+    setupInputStreamToTest(testAccount);
+    byte[] readBuffer = new byte[512];
+    inputStream.read(readBuffer);
+  }
+
+  /**
+   * Tests a basic single threaded seek scenario for Page blobs.
+   */
+  @Test(expected=FileNotFoundException.class)
+  public void testSingleThreadedPageBlobSeekScenario() throws Throwable {
+    AzureBlobStorageTestAccount testAccount = getPageBlobTestStorageAccount();
+    setupInputStreamToTest(testAccount);
+    inputStream.seek(5);
+  }
+
+  /**
+   * Test a basic single thread seek scenario for Block blobs.
+   */
+  @Test(expected=FileNotFoundException.class)
+  public void testSingleThreadBlockBlobSeekScenario() throws Throwable {
+
+    AzureBlobStorageTestAccount testAccount = createTestAccount();
+    setupInputStreamToTest(testAccount);
+    inputStream.seek(5);
+    inputStream.read();
+  }
+
+  /**
+   * Tests a basic single threaded read scenario for Block blobs.
+   */
+  @Test(expected=FileNotFoundException.class)
+  public void testSingledThreadBlockBlobReadScenario() throws Throwable{
+    AzureBlobStorageTestAccount testAccount = createTestAccount();
+    setupInputStreamToTest(testAccount);
+    byte[] readBuffer = new byte[512];
+    inputStream.read(readBuffer);
+  }
+
+  /**
+   * Tests basic single threaded setPermission scenario.
+   */
+  @Test(expected = FileNotFoundException.class)
+  public void testSingleThreadedBlockBlobSetPermissionScenario() throws 
Throwable {
+
+    createEmptyFile(createTestAccount(), testPath);
+    fs.delete(testPath, true);
+    fs.setPermission(testPath,
+        new FsPermission(FsAction.EXECUTE, FsAction.READ, FsAction.READ));
+  }
+
+  /**
+   * Tests basic single threaded setPermission scenario.
+   */
+  @Test(expected = FileNotFoundException.class)
+  public void testSingleThreadedPageBlobSetPermissionScenario()
+      throws Throwable {
+    createEmptyFile(getPageBlobTestStorageAccount(), testPath);
+    fs.delete(testPath, true);
+    fs.setOwner(testPath, "testowner", "testgroup");
+  }
+
+  /**
+   * Tests basic single threaded setPermission scenario.
+   */
+  @Test(expected = FileNotFoundException.class)
+  public void testSingleThreadedBlockBlobSetOwnerScenario() throws Throwable {
+
+    createEmptyFile(createTestAccount(), testPath);
+    fs.delete(testPath, true);
+    fs.setOwner(testPath, "testowner", "testgroup");
+  }
+
+  /**
+   * Tests basic single threaded setPermission scenario.
+   */
+  @Test(expected = FileNotFoundException.class)
+  public void testSingleThreadedPageBlobSetOwnerScenario() throws Throwable {
+    createEmptyFile(getPageBlobTestStorageAccount(),
+        testPath);
+    fs.delete(testPath, true);
+    fs.setPermission(testPath,
+        new FsPermission(FsAction.EXECUTE, FsAction.READ, FsAction.READ));
+  }
+
+  /**
+   * Test basic single threaded listStatus scenario.
+   */
+  @Test(expected = FileNotFoundException.class)
+  public void testSingleThreadedBlockBlobListStatusScenario() throws Throwable 
{
+    createTestFolder(createTestAccount(),
+        testFolderPath);
+    fs.delete(testFolderPath, true);
+    fs.listStatus(testFolderPath);
+  }
+
+  /**
+   * Test basic single threaded listStatus scenario.
+   */
+  @Test(expected = FileNotFoundException.class)
+  public void testSingleThreadedPageBlobListStatusScenario() throws Throwable {
+    createTestFolder(getPageBlobTestStorageAccount(),
+        testFolderPath);
+    fs.delete(testFolderPath, true);
+    fs.listStatus(testFolderPath);
+  }
+
+  /**
+   * Test basic single threaded listStatus scenario.
+   */
+  @Test
+  public void testSingleThreadedBlockBlobRenameScenario() throws Throwable {
+
+    createEmptyFile(createTestAccount(),
+        testPath);
+    Path dstPath = new Path("dstFile.dat");
+    fs.delete(testPath, true);
+    boolean renameResult = fs.rename(testPath, dstPath);
+    assertFalse(renameResult);
+  }
+
+  /**
+   * Test basic single threaded listStatus scenario.
+   */
+  @Test
+  public void testSingleThreadedPageBlobRenameScenario() throws Throwable {
+
+    createEmptyFile(getPageBlobTestStorageAccount(),
+        testPath);
+    Path dstPath = new Path("dstFile.dat");
+    fs.delete(testPath, true);
+    boolean renameResult = fs.rename(testPath, dstPath);
+    assertFalse(renameResult);
+  }
+
+  /**
+   * Test basic single threaded listStatus scenario.
+   */
+  @Test
+  public void testSingleThreadedBlockBlobDeleteScenario() throws Throwable {
+
+    createEmptyFile(createTestAccount(),
+        testPath);
+    fs.delete(testPath, true);
+    boolean deleteResult = fs.delete(testPath, true);
+    assertFalse(deleteResult);
+  }
+
+  /**
+   * Test basic single threaded listStatus scenario.
+   */
+  @Test
+  public void testSingleThreadedPageBlobDeleteScenario() throws Throwable {
+
+    createEmptyFile(getPageBlobTestStorageAccount(),
+        testPath);
+    fs.delete(testPath, true);
+    boolean deleteResult = fs.delete(testPath, true);
+    assertFalse(deleteResult);
+  }
+
+  /**
+   * Test basic single threaded listStatus scenario.
+   */
+  @Test(expected = FileNotFoundException.class)
+  public void testSingleThreadedBlockBlobOpenScenario() throws Throwable {
+
+    createEmptyFile(createTestAccount(),
+        testPath);
+    fs.delete(testPath, true);
+    inputStream = fs.open(testPath);
+  }
+
+  /**
+   * Test delete then open a file.
+   */
+  @Test(expected = FileNotFoundException.class)
+  public void testSingleThreadedPageBlobOpenScenario() throws Throwable {
+
+    createEmptyFile(getPageBlobTestStorageAccount(),
+        testPath);
+    fs.delete(testPath, true);
+    inputStream = fs.open(testPath);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (inputStream != null) {
+      inputStream.close();
+    }
+
+    ContractTestUtils.rm(fs, testPath, true, true);
+    super.tearDown();
+  }
+
+  @Override
+  protected AzureBlobStorageTestAccount createTestAccount()
+      throws Exception {
+    return AzureBlobStorageTestAccount.create();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestFileSystemOperationExceptionMessage.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestFileSystemOperationExceptionMessage.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestFileSystemOperationExceptionMessage.java
new file mode 100644
index 0000000..6d5e72e
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestFileSystemOperationExceptionMessage.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.net.URI;
+import java.util.UUID;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.azure.integration.AzureTestUtils;
+import org.apache.hadoop.test.GenericTestUtils;
+
+import com.microsoft.azure.storage.CloudStorageAccount;
+import org.junit.Test;
+
+import static 
org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.NO_ACCESS_TO_CONTAINER_MSG;
+
+/**
+ * Test for error messages coming from SDK.
+ */
+public class ITestFileSystemOperationExceptionMessage
+    extends AbstractWasbTestWithTimeout {
+
+
+
+  @Test
+  public void testAnonymouseCredentialExceptionMessage() throws Throwable {
+
+    Configuration conf = AzureBlobStorageTestAccount.createTestConfiguration();
+    CloudStorageAccount account =
+        AzureBlobStorageTestAccount.createTestAccount(conf);
+    AzureTestUtils.assume("No test account", account != null);
+
+    String testStorageAccount = conf.get("fs.azure.test.account.name");
+    conf = new Configuration();
+    conf.set("fs.AbstractFileSystem.wasb.impl",
+        "org.apache.hadoop.fs.azure.Wasb");
+    conf.set("fs.azure.skip.metrics", "true");
+
+    String testContainer = UUID.randomUUID().toString();
+    String wasbUri = String.format("wasb://%s@%s",
+        testContainer, testStorageAccount);
+
+    try(NativeAzureFileSystem filesystem = new NativeAzureFileSystem()) {
+      filesystem.initialize(new URI(wasbUri), conf);
+      fail("Expected an exception, got " + filesystem);
+    } catch (Exception ex) {
+
+      Throwable innerException = ex.getCause();
+      while (innerException != null
+          && !(innerException instanceof AzureException)) {
+        innerException = innerException.getCause();
+      }
+
+      if (innerException != null) {
+        GenericTestUtils.assertExceptionContains(String.format(
+            NO_ACCESS_TO_CONTAINER_MSG, testStorageAccount, testContainer),
+            ex);
+      } else {
+        fail("No inner azure exception");
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d2d97fa/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestFileSystemOperationsExceptionHandlingMultiThreaded.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestFileSystemOperationsExceptionHandlingMultiThreaded.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestFileSystemOperationsExceptionHandlingMultiThreaded.java
new file mode 100644
index 0000000..175a9ec
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestFileSystemOperationsExceptionHandlingMultiThreaded.java
@@ -0,0 +1,366 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.io.FileNotFoundException;
+
+import org.junit.Test;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.IOUtils;
+
+import static org.apache.hadoop.fs.azure.ExceptionHandlingTestHelper.*;
+
+/**
+ * Multithreaded operations on FS, verify failures are as expected.
+ */
+public class ITestFileSystemOperationsExceptionHandlingMultiThreaded
+    extends AbstractWasbTestBase {
+
+  FSDataInputStream inputStream = null;
+
+  private Path testPath;
+  private Path testFolderPath;
+
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+    testPath = path("testfile.dat");
+    testFolderPath = path("testfolder");
+  }
+
+  @Override
+  protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
+    return AzureBlobStorageTestAccount.create();
+  }
+
+  @Override
+  public void tearDown() throws Exception {
+
+    IOUtils.closeStream(inputStream);
+    ContractTestUtils.rm(fs, testPath, true, false);
+    ContractTestUtils.rm(fs, testFolderPath, true, false);
+    super.tearDown();
+  }
+
+  /**
+   * Helper method to creates an input stream to test various scenarios.
+   */
+  private void getInputStreamToTest(FileSystem fs, Path testPath)
+      throws Throwable {
+
+    FSDataOutputStream outputStream = fs.create(testPath);
+    String testString = "This is a test string";
+    outputStream.write(testString.getBytes());
+    outputStream.close();
+
+    inputStream = fs.open(testPath);
+  }
+
+  /**
+   * Test to validate correct exception is thrown for Multithreaded read
+   * scenario for block blobs.
+   */
+  @Test(expected = FileNotFoundException.class)
+  public void testMultiThreadedBlockBlobReadScenario() throws Throwable {
+
+    AzureBlobStorageTestAccount testAccount = createTestAccount();
+    NativeAzureFileSystem fs = testAccount.getFileSystem();
+    Path base = methodPath();
+    Path testFilePath1 = new Path(base, "test1.dat");
+    Path renamePath = new Path(base, "test2.dat");
+    getInputStreamToTest(fs, testFilePath1);
+    Thread renameThread = new Thread(
+        new RenameThread(fs, testFilePath1, renamePath));
+    renameThread.start();
+
+    renameThread.join();
+
+    byte[] readBuffer = new byte[512];
+    inputStream.read(readBuffer);
+  }
+
+  /**
+   * Test to validate correct exception is thrown for Multithreaded seek
+   * scenario for block blobs.
+   */
+  @Test(expected = FileNotFoundException.class)
+  public void testMultiThreadBlockBlobSeekScenario() throws Throwable {
+
+/*
+    AzureBlobStorageTestAccount testAccount = createTestAccount();
+    fs = testAccount.getFileSystem();
+*/
+    Path base = methodPath();
+    Path testFilePath1 = new Path(base, "test1.dat");
+    Path renamePath = new Path(base, "test2.dat");
+
+    getInputStreamToTest(fs, testFilePath1);
+    Thread renameThread = new Thread(
+        new RenameThread(fs, testFilePath1, renamePath));
+    renameThread.start();
+
+    renameThread.join();
+
+    inputStream.seek(5);
+    inputStream.read();
+  }
+
+  /**
+   * Tests basic multi threaded setPermission scenario.
+   */
+  @Test(expected = FileNotFoundException.class)
+  public void testMultiThreadedPageBlobSetPermissionScenario()
+      throws Throwable {
+    createEmptyFile(
+        getPageBlobTestStorageAccount(),
+        testPath);
+    Thread t = new Thread(new DeleteThread(fs, testPath));
+    t.start();
+    while (t.isAlive()) {
+      fs.setPermission(testPath,
+          new FsPermission(FsAction.EXECUTE, FsAction.READ, FsAction.READ));
+    }
+    fs.setPermission(testPath,
+        new FsPermission(FsAction.EXECUTE, FsAction.READ, FsAction.READ));
+  }
+
+  /**
+   * Tests basic multi threaded setPermission scenario.
+   */
+  @Test(expected = FileNotFoundException.class)
+  public void testMultiThreadedBlockBlobSetPermissionScenario()
+      throws Throwable {
+    createEmptyFile(createTestAccount(),
+        testPath);
+    Thread t = new Thread(new DeleteThread(fs, testPath));
+    t.start();
+    while (t.isAlive()) {
+      fs.setPermission(testPath,
+          new FsPermission(FsAction.EXECUTE, FsAction.READ, FsAction.READ));
+    }
+    fs.setPermission(testPath,
+        new FsPermission(FsAction.EXECUTE, FsAction.READ, FsAction.READ));
+  }
+
+  /**
+   * Tests basic multi threaded setPermission scenario.
+   */
+  @Test(expected = FileNotFoundException.class)
+  public void testMultiThreadedPageBlobOpenScenario() throws Throwable {
+
+    createEmptyFile(createTestAccount(),
+        testPath);
+    Thread t = new Thread(new DeleteThread(fs, testPath));
+    t.start();
+    while (t.isAlive()) {
+      inputStream = fs.open(testPath);
+      inputStream.close();
+    }
+
+    inputStream = fs.open(testPath);
+    inputStream.close();
+  }
+
+  /**
+   * Tests basic multi threaded setPermission scenario.
+   */
+  @Test(expected = FileNotFoundException.class)
+  public void testMultiThreadedBlockBlobOpenScenario() throws Throwable {
+
+    createEmptyFile(
+        getPageBlobTestStorageAccount(),
+        testPath);
+    Thread t = new Thread(new DeleteThread(fs, testPath));
+    t.start();
+
+    while (t.isAlive()) {
+      inputStream = fs.open(testPath);
+      inputStream.close();
+    }
+    inputStream = fs.open(testPath);
+    inputStream.close();
+  }
+
+  /**
+   * Tests basic multi threaded setOwner scenario.
+   */
+  @Test(expected = FileNotFoundException.class)
+  public void testMultiThreadedBlockBlobSetOwnerScenario() throws Throwable {
+
+    createEmptyFile(createTestAccount(), testPath);
+    Thread t = new Thread(new DeleteThread(fs, testPath));
+    t.start();
+    while (t.isAlive()) {
+      fs.setOwner(testPath, "testowner", "testgroup");
+    }
+    fs.setOwner(testPath, "testowner", "testgroup");
+  }
+
+  /**
+   * Tests basic multi threaded setOwner scenario.
+   */
+  @Test(expected = FileNotFoundException.class)
+  public void testMultiThreadedPageBlobSetOwnerScenario() throws Throwable {
+    createEmptyFile(
+        getPageBlobTestStorageAccount(),
+        testPath);
+    Thread t = new Thread(new DeleteThread(fs, testPath));
+    t.start();
+    while (t.isAlive()) {
+      fs.setOwner(testPath, "testowner", "testgroup");
+    }
+    fs.setOwner(testPath, "testowner", "testgroup");
+  }
+
+  /**
+   * Tests basic multi threaded listStatus scenario.
+   */
+  @Test(expected = FileNotFoundException.class)
+  public void testMultiThreadedBlockBlobListStatusScenario() throws Throwable {
+
+    createTestFolder(createTestAccount(),
+        testFolderPath);
+    Thread t = new Thread(new DeleteThread(fs, testFolderPath));
+    t.start();
+    while (t.isAlive()) {
+      fs.listStatus(testFolderPath);
+    }
+    fs.listStatus(testFolderPath);
+  }
+
+  /**
+   * Tests basic multi threaded listStatus scenario.
+   */
+  @Test(expected = FileNotFoundException.class)
+  public void testMultiThreadedPageBlobListStatusScenario() throws Throwable {
+
+    createTestFolder(
+        getPageBlobTestStorageAccount(),
+        testFolderPath);
+    Thread t = new Thread(new DeleteThread(fs, testFolderPath));
+    t.start();
+    while (t.isAlive()) {
+      fs.listStatus(testFolderPath);
+    }
+    fs.listStatus(testFolderPath);
+  }
+
+  /**
+   * Test to validate correct exception is thrown for Multithreaded read
+   * scenario for page blobs.
+   */
+  @Test(expected = FileNotFoundException.class)
+  public void testMultiThreadedPageBlobReadScenario() throws Throwable {
+
+    bindToTestAccount(getPageBlobTestStorageAccount());
+    Path base = methodPath();
+    Path testFilePath1 = new Path(base, "test1.dat");
+    Path renamePath = new Path(base, "test2.dat");
+
+    getInputStreamToTest(fs, testFilePath1);
+    Thread renameThread = new Thread(
+        new RenameThread(fs, testFilePath1, renamePath));
+    renameThread.start();
+
+    renameThread.join();
+    byte[] readBuffer = new byte[512];
+    inputStream.read(readBuffer);
+  }
+
+  /**
+   * Test to validate correct exception is thrown for Multithreaded seek
+   * scenario for page blobs.
+   */
+
+  @Test(expected = FileNotFoundException.class)
+  public void testMultiThreadedPageBlobSeekScenario() throws Throwable {
+
+    bindToTestAccount(getPageBlobTestStorageAccount());
+
+    Path base = methodPath();
+    Path testFilePath1 = new Path(base, "test1.dat");
+    Path renamePath = new Path(base, "test2.dat");
+
+    getInputStreamToTest(fs, testFilePath1);
+    Thread renameThread = new Thread(
+        new RenameThread(fs, testFilePath1, renamePath));
+    renameThread.start();
+
+    renameThread.join();
+    inputStream.seek(5);
+  }
+
+
+  /**
+   * Helper thread that just renames the test file.
+   */
+  private static class RenameThread implements Runnable {
+
+    private final FileSystem fs;
+    private final Path testPath;
+    private final Path renamePath;
+
+    RenameThread(FileSystem fs,
+        Path testPath,
+        Path renamePath) {
+      this.fs = fs;
+      this.testPath = testPath;
+      this.renamePath = renamePath;
+    }
+
+    @Override
+    public void run() {
+      try {
+        fs.rename(testPath, renamePath);
+      } catch (Exception e) {
+        // Swallowing the exception as the
+        // correctness of the test is controlled
+        // by the other thread
+      }
+    }
+  }
+
+  private static class DeleteThread implements Runnable {
+    private final FileSystem fs;
+    private final Path testPath;
+
+    DeleteThread(FileSystem fs, Path testPath) {
+      this.fs = fs;
+      this.testPath = testPath;
+    }
+
+    @Override
+    public void run() {
+      try {
+        fs.delete(testPath, true);
+      } catch (Exception e) {
+        // Swallowing the exception as the
+        // correctness of the test is controlled
+        // by the other thread
+      }
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to