Repository: hadoop
Updated Branches:
refs/heads/branch-2 d65ca63c6 -> c260d7ec7
HADOOP-12533. Introduce FileNotFoundException in WASB for read and seek API.
Contributed by Dushyanth.
(cherry picked from commit 28790692624177d89fb1e4f59e2f83a659fc3089)
Conflicts:
hadoop-common-project/hadoop-common/CHANGES.txt
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c260d7ec
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c260d7ec
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c260d7ec
Branch: refs/heads/branch-2
Commit: c260d7ec782d92318b69a51f22671c3de58a6fed
Parents: d65ca63
Author: cnauroth <[email protected]>
Authored: Mon Nov 2 09:38:37 2015 -0800
Committer: cnauroth <[email protected]>
Committed: Mon Nov 2 09:39:27 2015 -0800
----------------------------------------------------------------------
hadoop-common-project/hadoop-common/CHANGES.txt | 3 +
.../hadoop/fs/azure/NativeAzureFileSystem.java | 125 ++++++++++---
...estFileSystemOperationExceptionHandling.java | 131 +++++++++++++
...perationsExceptionHandlingMultiThreaded.java | 185 +++++++++++++++++++
4 files changed, 422 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c260d7ec/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt
b/hadoop-common-project/hadoop-common/CHANGES.txt
index 43ba78a..48aa782 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -699,6 +699,9 @@ Release 2.8.0 - UNRELEASED
HADOOP-9242. Duplicate surefire plugin config in hadoop-common.
(Andrey Klochkov via suresh)
+ HADOOP-12533. Introduce FileNotFoundException in WASB for read and seek
API.
+ (Dushyanth via cnauroth)
+
OPTIMIZATIONS
HADOOP-12051. ProtobufRpcEngine.invoke() should use Exception.toString()
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c260d7ec/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
----------------------------------------------------------------------
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
index 7c5a504..73bc6b3 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.fs.azure;
import java.io.DataInputStream;
+import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
@@ -49,6 +50,7 @@ import org.apache.hadoop.fs.BufferedFSInputStream;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -62,7 +64,6 @@ import
org.apache.hadoop.fs.azure.StorageInterface.CloudBlobWrapper;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Progressable;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.codehaus.jackson.JsonNode;
@@ -74,9 +75,11 @@ import org.codehaus.jackson.map.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.microsoft.azure.storage.AccessCondition;
import com.microsoft.azure.storage.OperationContext;
+import com.microsoft.azure.storage.StorageErrorCode;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.CloudBlob;
-import com.microsoft.azure.storage.core.*;
+import com.microsoft.azure.storage.StorageErrorCodeStrings;
+import org.apache.hadoop.io.IOUtils;
/**
* A {@link FileSystem} for reading and writing files stored on <a
@@ -88,7 +91,6 @@ import com.microsoft.azure.storage.core.*;
@InterfaceStability.Stable
public class NativeAzureFileSystem extends FileSystem {
private static final int USER_WX_PERMISION = 0300;
-
/**
* A description of a folder rename operation, including the source and
* destination keys, and descriptions of the files in the source folder.
@@ -712,7 +714,7 @@ public class NativeAzureFileSystem extends FileSystem {
* @returns int An integer corresponding to the byte read.
*/
@Override
- public synchronized int read() throws IOException {
+ public synchronized int read() throws FileNotFoundException, IOException {
try {
int result = 0;
result = in.read();
@@ -726,13 +728,21 @@ public class NativeAzureFileSystem extends FileSystem {
//
return result;
} catch(IOException e) {
- if (e.getCause() instanceof StorageException) {
- StorageException storageExcp = (StorageException) e.getCause();
+
+ Throwable innerException = checkForAzureStorageException(e);
+
+ if (innerException instanceof StorageException) {
+
LOG.error("Encountered Storage Exception for read on Blob : {}"
+ " Exception details: {} Error Code : {}",
- key, e.getMessage(), storageExcp.getErrorCode());
+ key, e, ((StorageException) innerException).getErrorCode());
+
+ if (isFileNotFoundException((StorageException) innerException)) {
+ throw new FileNotFoundException(String.format("%s is not found",
key));
+ }
}
- throw e;
+
+ throw e;
}
}
@@ -757,7 +767,7 @@ public class NativeAzureFileSystem extends FileSystem {
* there is no more data because the end of stream is reached.
*/
@Override
- public synchronized int read(byte[] b, int off, int len) throws
IOException {
+ public synchronized int read(byte[] b, int off, int len) throws
FileNotFoundException, IOException {
try {
int result = 0;
result = in.read(b, off, len);
@@ -772,29 +782,56 @@ public class NativeAzureFileSystem extends FileSystem {
// Return to the caller with the result.
return result;
} catch(IOException e) {
- if (e.getCause() instanceof StorageException) {
- StorageException storageExcp = (StorageException) e.getCause();
+
+ Throwable innerException = checkForAzureStorageException(e);
+
+ if (innerException instanceof StorageException) {
+
LOG.error("Encountered Storage Exception for read on Blob : {}"
+ " Exception details: {} Error Code : {}",
- key, e.getMessage(), storageExcp.getErrorCode());
+ key, e, ((StorageException) innerException).getErrorCode());
+
+ if (isFileNotFoundException((StorageException) innerException)) {
+ throw new FileNotFoundException(String.format("%s is not found",
key));
+ }
}
- throw e;
+
+ throw e;
}
}
@Override
- public void close() throws IOException {
- in.close();
- closed = true;
+ public synchronized void close() throws IOException {
+ if (!closed) {
+ closed = true;
+ IOUtils.closeStream(in);
+ in = null;
+ }
}
@Override
- public synchronized void seek(long pos) throws IOException {
- in.close();
- in = store.retrieve(key);
- this.pos = in.skip(pos);
- LOG.debug("Seek to position {}. Bytes skipped {}", pos,
- this.pos);
+ public synchronized void seek(long pos) throws FileNotFoundException,
EOFException, IOException {
+ try {
+ checkNotClosed();
+ if (pos < 0) {
+ throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
+ }
+ IOUtils.closeStream(in);
+ in = store.retrieve(key);
+ this.pos = in.skip(pos);
+ LOG.debug("Seek to position {}. Bytes skipped {}", pos,
+ this.pos);
+ } catch(IOException e) {
+
+ Throwable innerException = checkForAzureStorageException(e);
+
+ if (innerException instanceof StorageException
+ && isFileNotFoundException((StorageException) innerException)) {
+ throw new FileNotFoundException(String.format("%s is not found",
key));
+ }
+
+ throw e;
+ }
}
@Override
@@ -806,6 +843,50 @@ public class NativeAzureFileSystem extends FileSystem {
public boolean seekToNewSource(long targetPos) throws IOException {
return false;
}
+
+ /*
+ * Helper method to recursively check if the cause of the exception is
+ * a Azure storage exception.
+ */
+ private Throwable checkForAzureStorageException(IOException e) {
+
+ Throwable innerException = e.getCause();
+
+ while (innerException != null
+ && !(innerException instanceof StorageException)) {
+ innerException = innerException.getCause();
+ }
+
+ return innerException;
+ }
+
+ /*
+ * Helper method to check if the AzureStorageException is
+ * because backing blob was not found.
+ */
+ private boolean isFileNotFoundException(StorageException e) {
+
+ String errorCode = ((StorageException) e).getErrorCode();
+ if (errorCode != null
+ && (errorCode.equals(StorageErrorCodeStrings.BLOB_NOT_FOUND)
+ || errorCode.equals(StorageErrorCodeStrings.RESOURCE_NOT_FOUND)
+ || errorCode.equals(StorageErrorCode.BLOB_NOT_FOUND.toString())
+ ||
errorCode.equals(StorageErrorCode.RESOURCE_NOT_FOUND.toString()))) {
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /*
+ * Helper method to check if a stream is closed.
+ */
+ private void checkNotClosed() throws IOException {
+ if (closed) {
+ throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+ }
+ }
}
private class NativeAzureFsOutputStream extends OutputStream {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c260d7ec/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestFileSystemOperationExceptionHandling.java
----------------------------------------------------------------------
diff --git
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestFileSystemOperationExceptionHandling.java
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestFileSystemOperationExceptionHandling.java
new file mode 100644
index 0000000..35a1f50
--- /dev/null
+++
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestFileSystemOperationExceptionHandling.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.io.FileNotFoundException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.junit.After;
+import org.junit.Test;
+
+
+public class TestFileSystemOperationExceptionHandling extends
+ NativeAzureFileSystemBaseTest {
+
+ FSDataInputStream inputStream = null;
+ /*
+ * Helper method to create a PageBlob test storage account.
+ */
+ private AzureBlobStorageTestAccount getPageBlobTestStorageAccount()
+ throws Exception {
+
+ Configuration conf = new Configuration();
+
+ // Configure the page blob directories key so every file created is a page
blob.
+ conf.set(AzureNativeFileSystemStore.KEY_PAGE_BLOB_DIRECTORIES, "/");
+
+ // Configure the atomic rename directories key so every folder will have
+ // atomic rename applied.
+ conf.set(AzureNativeFileSystemStore.KEY_ATOMIC_RENAME_DIRECTORIES, "/");
+ return AzureBlobStorageTestAccount.create(conf);
+ }
+
+
+ /*
+ * Helper method that creates a InputStream to validate exceptions
+ * for various scenarios
+ */
+ private void setupInputStreamToTest(AzureBlobStorageTestAccount testAccount)
+ throws Exception {
+
+ fs = testAccount.getFileSystem();
+
+ // Step 1: Create a file and write dummy data.
+ Path testFilePath1 = new Path("test1.dat");
+ Path testFilePath2 = new Path("test2.dat");
+ FSDataOutputStream outputStream = fs.create(testFilePath1);
+ String testString = "This is a test string";
+ outputStream.write(testString.getBytes());
+ outputStream.close();
+
+ // Step 2: Open a read stream on the file.
+ inputStream = fs.open(testFilePath1);
+
+ // Step 3: Rename the file
+ fs.rename(testFilePath1, testFilePath2);
+ }
+
+ /*
+ * Tests a basic single threaded read scenario for Page blobs.
+ */
+ @Test(expected=FileNotFoundException.class)
+ public void testSingleThreadedPageBlobReadScenario() throws Throwable {
+ AzureBlobStorageTestAccount testAccount = getPageBlobTestStorageAccount();
+ setupInputStreamToTest(testAccount);
+ byte[] readBuffer = new byte[512];
+ inputStream.read(readBuffer);
+ }
+
+ /*
+ * Tests a basic single threaded seek scenario for Page blobs.
+ */
+ @Test(expected=FileNotFoundException.class)
+ public void testSingleThreadedPageBlobSeekScenario() throws Throwable {
+ AzureBlobStorageTestAccount testAccount = getPageBlobTestStorageAccount();
+ setupInputStreamToTest(testAccount);
+ inputStream.seek(5);
+ }
+
+ /*
+ * Test a basic single thread seek scenario for Block blobs.
+ */
+ @Test(expected=FileNotFoundException.class)
+ public void testSingleThreadBlockBlobSeekScenario() throws Throwable {
+
+ AzureBlobStorageTestAccount testAccount = createTestAccount();
+ setupInputStreamToTest(testAccount);
+ inputStream.seek(5);
+ }
+
+ /*
+ * Tests a basic single threaded read scenario for Block blobs.
+ */
+ @Test(expected=FileNotFoundException.class)
+ public void testSingledThreadBlockBlobReadScenario() throws Throwable{
+ AzureBlobStorageTestAccount testAccount = createTestAccount();
+ setupInputStreamToTest(testAccount);
+ byte[] readBuffer = new byte[512];
+ inputStream.read(readBuffer);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (inputStream != null) {
+ inputStream.close();
+ }
+ }
+
+ @Override
+ protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
+ return AzureBlobStorageTestAccount.create();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c260d7ec/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestFileSystemOperationsExceptionHandlingMultiThreaded.java
----------------------------------------------------------------------
diff --git
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestFileSystemOperationsExceptionHandlingMultiThreaded.java
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestFileSystemOperationsExceptionHandlingMultiThreaded.java
new file mode 100644
index 0000000..0f91500
--- /dev/null
+++
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestFileSystemOperationsExceptionHandlingMultiThreaded.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.io.FileNotFoundException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.junit.After;
+import org.junit.Test;
+
+public class TestFileSystemOperationsExceptionHandlingMultiThreaded extends
+ NativeAzureFileSystemBaseTest {
+
+ FSDataInputStream inputStream = null;
+ /*
+ * Helper method to creates an input stream to test various scenarios.
+ */
+ private void getInputStreamToTest(FileSystem fs, Path testPath) throws
Throwable {
+
+ FSDataOutputStream outputStream = fs.create(testPath);
+ String testString = "This is a test string";
+ outputStream.write(testString.getBytes());
+ outputStream.close();
+
+ inputStream = fs.open(testPath);
+ }
+
+ /*
+ * Test to validate correct exception is thrown for Multithreaded read
+ * scenario for block blobs
+ */
+ @Test(expected=FileNotFoundException.class)
+ public void testMultiThreadedBlockBlobReadScenario() throws Throwable {
+
+ AzureBlobStorageTestAccount testAccount = createTestAccount();
+ fs = testAccount.getFileSystem();
+ Path testFilePath1 = new Path("test1.dat");
+
+ getInputStreamToTest(fs, testFilePath1);
+ Thread renameThread = new Thread(new RenameThread(fs, testFilePath1));
+ renameThread.start();
+
+ renameThread.join();
+
+ byte[] readBuffer = new byte[512];
+ inputStream.read(readBuffer);
+ }
+
+ /*
+ * Test to validate correct exception is thrown for Multithreaded seek
+ * scenario for block blobs
+ */
+
+ @Test(expected=FileNotFoundException.class)
+ public void testMultiThreadBlockBlobSeekScenario() throws Throwable {
+
+ AzureBlobStorageTestAccount testAccount = createTestAccount();
+ fs = testAccount.getFileSystem();
+ Path testFilePath1 = new Path("test1.dat");
+
+ getInputStreamToTest(fs, testFilePath1);
+ Thread renameThread = new Thread(new RenameThread(fs, testFilePath1));
+ renameThread.start();
+
+ renameThread.join();
+
+ inputStream.seek(5);
+ }
+
+ /*
+ * Test to validate correct exception is thrown for Multithreaded read
+ * scenario for page blobs
+ */
+
+ @Test(expected=FileNotFoundException.class)
+ public void testMultiThreadedPageBlobReadScenario() throws Throwable {
+
+ AzureBlobStorageTestAccount testAccount = getPageBlobTestStorageAccount();
+ fs = testAccount.getFileSystem();
+ Path testFilePath1 = new Path("test1.dat");
+
+ getInputStreamToTest(fs, testFilePath1);
+ Thread renameThread = new Thread(new RenameThread(fs, testFilePath1));
+ renameThread.start();
+
+ renameThread.join();
+ byte[] readBuffer = new byte[512];
+ inputStream.read(readBuffer);
+ }
+
+ /*
+ * Test to validate correct exception is thrown for Multithreaded seek
+ * scenario for page blobs
+ */
+
+ @Test(expected=FileNotFoundException.class)
+ public void testMultiThreadedPageBlobSeekScenario() throws Throwable {
+
+ AzureBlobStorageTestAccount testAccount = getPageBlobTestStorageAccount();
+ fs = testAccount.getFileSystem();
+ Path testFilePath1 = new Path("test1.dat");
+
+ getInputStreamToTest(fs, testFilePath1);
+ Thread renameThread = new Thread(new RenameThread(fs, testFilePath1));
+ renameThread.start();
+
+ renameThread.join();
+ inputStream.seek(5);
+ }
+
+ @Override
+ protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
+ return AzureBlobStorageTestAccount.create();
+ }
+
+ /*
+ * Helper method to create a PageBlob test storage account.
+ */
+ private AzureBlobStorageTestAccount getPageBlobTestStorageAccount()
+ throws Exception {
+
+ Configuration conf = new Configuration();
+
+ // Configure the page blob directories key so every file created is a page
blob.
+ conf.set(AzureNativeFileSystemStore.KEY_PAGE_BLOB_DIRECTORIES, "/");
+
+ // Configure the atomic rename directories key so every folder will have
+ // atomic rename applied.
+ conf.set(AzureNativeFileSystemStore.KEY_ATOMIC_RENAME_DIRECTORIES, "/");
+ return AzureBlobStorageTestAccount.create(conf);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (inputStream != null) {
+ inputStream.close();
+ }
+ }
+}
+
+/*
+ * Helper thread that just renames the test file.
+ */
+class RenameThread implements Runnable {
+
+ private FileSystem fs;
+ private Path testPath;
+ private Path renamePath = new Path("test2.dat");
+
+ public RenameThread(FileSystem fs, Path testPath) {
+ this.fs = fs;
+ this.testPath = testPath;
+ }
+
+ @Override
+ public void run(){
+ try {
+ fs.rename(testPath, renamePath);
+ }catch (Exception e) {
+ // Swallowing the exception as the
+ // correctness of the test is controlled
+ // by the other thread
+ }
+ }
+}