HADOOP-13930. Azure: Add Authorization support to WASB. Contributed by Sivaguru 
Sankaridurg and Dushyanth


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/68682352
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/68682352
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/68682352

Branch: refs/heads/YARN-5734
Commit: 686823529be09bea2a6cecb3503ef722017475bc
Parents: 52d7d5a
Author: Mingliang Liu <lium...@apache.org>
Authored: Mon Mar 6 17:16:36 2017 -0800
Committer: Mingliang Liu <lium...@apache.org>
Committed: Mon Mar 6 17:16:36 2017 -0800

----------------------------------------------------------------------
 .../src/main/resources/core-default.xml         |  10 +
 .../conf/TestCommonConfigurationFields.java     |   1 +
 .../fs/azure/AzureNativeFileSystemStore.java    |   5 +-
 .../hadoop/fs/azure/NativeAzureFileSystem.java  | 116 ++++++-
 .../fs/azure/RemoteWasbAuthorizerImpl.java      | 190 ++++++++++
 .../fs/azure/WasbAuthorizationException.java    |  40 +++
 .../fs/azure/WasbAuthorizationOperations.java   |  44 +++
 .../fs/azure/WasbAuthorizerInterface.java       |  53 +++
 .../hadoop/fs/azure/WasbRemoteCallHelper.java   |  71 +++-
 .../hadoop-azure/src/site/markdown/index.md     |  34 ++
 .../fs/azure/AzureBlobStorageTestAccount.java   |  61 ++--
 .../hadoop/fs/azure/MockWasbAuthorizerImpl.java | 102 ++++++
 .../TestNativeAzureFileSystemAuthorization.java | 344 +++++++++++++++++++
 .../fs/azure/TestWasbRemoteCallHelper.java      | 344 +++++++++++++++++++
 .../src/test/resources/azure-test.xml           |  28 +-
 15 files changed, 1373 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/68682352/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml 
b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
index 35be56b..52b58ed 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
+++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
@@ -1292,6 +1292,16 @@
     to specify the time (such as 2s, 2m, 1h, etc.).
   </description>
 </property>
+<property>
+  <name>fs.azure.authorization</name>
+  <value>false</value>
+  <description>
+    Config flag to enable authorization support in WASB. Setting it to "true" 
enables
+    authorization support to WASB. Currently WASB authorization requires a 
remote service
+    to provide authorization that needs to be specified via 
fs.azure.authorization.remote.service.url
+    configuration
+  </description>
+</property>
 
 
 <property>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/68682352/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java
index 966a8ac..cbfb6d1 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java
@@ -114,6 +114,7 @@ public class TestCommonConfigurationFields extends 
TestConfigurationFieldsBase {
     xmlPropsToSkipCompare.add("fs.azure.sas.expiry.period");
     xmlPropsToSkipCompare.add("fs.azure.local.sas.key.mode");
     xmlPropsToSkipCompare.add("fs.azure.secure.mode");
+    xmlPropsToSkipCompare.add("fs.azure.authorization");
 
     // Deprecated properties.  These should eventually be removed from the
     // class.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/68682352/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
index 07c389c..a8708ec 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
@@ -249,7 +249,7 @@ public class AzureNativeFileSystemStore implements 
NativeFileSystemStore {
    * Default values to control SAS Key mode.
    * By default we set the values to false.
    */
-  private static final boolean DEFAULT_USE_SECURE_MODE = false;
+  public static final boolean DEFAULT_USE_SECURE_MODE = false;
   private static final boolean DEFAULT_USE_LOCAL_SAS_KEY_MODE = false;
 
   /**
@@ -849,6 +849,9 @@ public class AzureNativeFileSystemStore implements 
NativeFileSystemStore {
     rootDirectory = container.getDirectoryReference("");
 
     canCreateOrModifyContainer = true;
+
+    configureAzureStorageSession();
+    tolerateOobAppends = false;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/68682352/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
index 4184a53..6de0a28 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
@@ -1106,7 +1106,31 @@ public class NativeAzureFileSystem extends FileSystem {
   // A counter to create unique (within-process) names for my metrics sources.
   private static AtomicInteger metricsSourceNameCounter = new AtomicInteger();
   private boolean appendSupportEnabled = false;
-  
+
+  /**
+   * Configuration key to enable authorization support in WASB.
+   */
+  public static final String KEY_AZURE_AUTHORIZATION =
+      "fs.azure.authorization";
+
+  /**
+   * Default value for the authorization support in WASB.
+   */
+  private static final boolean DEFAULT_AZURE_AUTHORIZATION = false;
+
+  /**
+   * Flag controlling authorization support in WASB.
+   */
+  private boolean azureAuthorization = false;
+
+  /**
+   * Authorizer to use when authorization support is enabled in
+   * WASB.
+   */
+  private WasbAuthorizerInterface authorizer = null;
+
+  private String delegationToken = null;
+
   public NativeAzureFileSystem() {
     // set store in initialize()
   }
@@ -1146,11 +1170,11 @@ public class NativeAzureFileSystem extends FileSystem {
       return baseName + number;
     }
   }
-  
+
   /**
    * Checks if the given URI scheme is a scheme that's affiliated with the 
Azure
    * File System.
-   * 
+   *
    * @param scheme
    *          The URI scheme.
    * @return true iff it's an Azure File System URI scheme.
@@ -1167,7 +1191,7 @@ public class NativeAzureFileSystem extends FileSystem {
   /**
    * Puts in the authority of the default file system if it is a WASB file
    * system and the given URI's authority is null.
-   * 
+   *
    * @return The URI with reconstructed authority if necessary and possible.
    */
   private static URI reconstructAuthorityIfNeeded(URI uri, Configuration conf) 
{
@@ -1237,6 +1261,24 @@ public class NativeAzureFileSystem extends FileSystem {
     // Initialize thread counts from user configuration
     deleteThreadCount = conf.getInt(AZURE_DELETE_THREADS, 
DEFAULT_AZURE_DELETE_THREADS);
     renameThreadCount = conf.getInt(AZURE_RENAME_THREADS, 
DEFAULT_AZURE_RENAME_THREADS);
+
+    boolean useSecureMode = 
conf.getBoolean(AzureNativeFileSystemStore.KEY_USE_SECURE_MODE,
+        AzureNativeFileSystemStore.DEFAULT_USE_SECURE_MODE);
+
+    this.azureAuthorization = useSecureMode &&
+        conf.getBoolean(KEY_AZURE_AUTHORIZATION, DEFAULT_AZURE_AUTHORIZATION);
+
+    if (this.azureAuthorization) {
+
+      this.authorizer =
+          new RemoteWasbAuthorizerImpl();
+      authorizer.init(conf);
+    }
+  }
+
+  @VisibleForTesting
+  public void updateWasbAuthorizer(WasbAuthorizerInterface authorizer) {
+    this.authorizer = authorizer;
   }
 
   private NativeFileSystemStore createDefaultStore(Configuration conf) {
@@ -1338,18 +1380,28 @@ public class NativeAzureFileSystem extends FileSystem {
   /**
    * For unit test purposes, retrieves the AzureNativeFileSystemStore store
    * backing this file system.
-   * 
+   *
    * @return The store object.
    */
   @VisibleForTesting
   public AzureNativeFileSystemStore getStore() {
     return actualStore;
   }
-  
+
   NativeFileSystemStore getStoreInterface() {
     return store;
   }
 
+  private void performAuthCheck(String path, String accessType,
+      String operation) throws WasbAuthorizationException, IOException {
+
+    if (azureAuthorization && this.authorizer != null &&
+        !this.authorizer.authorize(path, accessType, delegationToken)) {
+      throw new WasbAuthorizationException(operation
+          + " operation for Path : " + path + " not allowed");
+    }
+  }
+
   /**
    * Gets the metrics source for this file system.
    * This is mainly here for unit testing purposes.
@@ -1372,6 +1424,10 @@ public class NativeAzureFileSystem extends FileSystem {
     LOG.debug("Opening file: {} for append", f);
 
     Path absolutePath = makeAbsolute(f);
+
+    performAuthCheck(absolutePath.toString(),
+        WasbAuthorizationOperations.WRITE.toString(), "append");
+
     String key = pathToKey(absolutePath);
     FileMetadata meta = null;
     try {
@@ -1434,6 +1490,7 @@ public class NativeAzureFileSystem extends FileSystem {
    * Get a self-renewing lease on the specified file.
    * @param path path whose lease to be renewed.
    * @return Lease
+   * @throws AzureException when not being able to acquire a lease on the path
    */
   public SelfRenewingLease acquireLease(Path path) throws AzureException {
     String fullKey = pathToKey(makeAbsolute(path));
@@ -1572,6 +1629,10 @@ public class NativeAzureFileSystem extends FileSystem {
     }
 
     Path absolutePath = makeAbsolute(f);
+
+    performAuthCheck(absolutePath.toString(),
+        WasbAuthorizationOperations.WRITE.toString(), "create");
+
     String key = pathToKey(absolutePath);
 
     FileMetadata existingMetadata = store.retrieveMetadata(key);
@@ -1652,10 +1713,10 @@ public class NativeAzureFileSystem extends FileSystem {
     // Construct the data output stream from the buffered output stream.
     FSDataOutputStream fsOut = new FSDataOutputStream(bufOutStream, 
statistics);
 
-    
+
     // Increment the counter
     instrumentation.fileCreated();
-    
+
     // Return data output stream to caller.
     return fsOut;
   }
@@ -1694,6 +1755,10 @@ public class NativeAzureFileSystem extends FileSystem {
     LOG.debug("Deleting file: {}", f.toString());
 
     Path absolutePath = makeAbsolute(f);
+
+    performAuthCheck(absolutePath.toString(),
+        WasbAuthorizationOperations.EXECUTE.toString(), "delete");
+
     String key = pathToKey(absolutePath);
 
     // Capture the metadata for the path.
@@ -1964,6 +2029,10 @@ public class NativeAzureFileSystem extends FileSystem {
 
     // Capture the absolute path and the path to key.
     Path absolutePath = makeAbsolute(f);
+
+    performAuthCheck(absolutePath.toString(),
+        WasbAuthorizationOperations.EXECUTE.toString(), "getFileStatus");
+
     String key = pathToKey(absolutePath);
     if (key.length() == 0) { // root always exists
       return newDirectory(null, absolutePath);
@@ -2062,6 +2131,10 @@ public class NativeAzureFileSystem extends FileSystem {
     LOG.debug("Listing status for {}", f.toString());
 
     Path absolutePath = makeAbsolute(f);
+
+    performAuthCheck(absolutePath.toString(),
+        WasbAuthorizationOperations.EXECUTE.toString(), "list");
+
     String key = pathToKey(absolutePath);
     Set<FileStatus> status = new TreeSet<FileStatus>();
     FileMetadata meta = null;
@@ -2228,7 +2301,7 @@ public class NativeAzureFileSystem extends FileSystem {
 
   /**
    * Applies the applicable UMASK's on the given permission.
-   * 
+   *
    * @param permission
    *          The permission to mask.
    * @param applyMode
@@ -2250,7 +2323,7 @@ public class NativeAzureFileSystem extends FileSystem {
   /**
    * Creates the PermissionStatus object to use for the given permission, based
    * on the current user in context.
-   * 
+   *
    * @param permission
    *          The permission for the file.
    * @return The permission status object to use.
@@ -2284,6 +2357,10 @@ public class NativeAzureFileSystem extends FileSystem {
     }
 
     Path absolutePath = makeAbsolute(f);
+
+    performAuthCheck(absolutePath.toString(),
+        WasbAuthorizationOperations.EXECUTE.toString(), "mkdirs");
+
     PermissionStatus permissionStatus = null;
     if(noUmask) {
       // ensure owner still has wx permissions at the minimum
@@ -2337,6 +2414,10 @@ public class NativeAzureFileSystem extends FileSystem {
     LOG.debug("Opening file: {}", f.toString());
 
     Path absolutePath = makeAbsolute(f);
+
+    performAuthCheck(absolutePath.toString(),
+        WasbAuthorizationOperations.READ.toString(), "read");
+
     String key = pathToKey(absolutePath);
     FileMetadata meta = null;
     try {
@@ -2393,7 +2474,12 @@ public class NativeAzureFileSystem extends FileSystem {
           + " through WASB that has colons in the name");
     }
 
-    String srcKey = pathToKey(makeAbsolute(src));
+    Path absolutePath = makeAbsolute(src);
+
+    performAuthCheck(absolutePath.toString(),
+        WasbAuthorizationOperations.EXECUTE.toString(), "rename");
+
+    String srcKey = pathToKey(absolutePath);
 
     if (srcKey.length() == 0) {
       // Cannot rename root of file system
@@ -2695,6 +2781,10 @@ public class NativeAzureFileSystem extends FileSystem {
   @Override
   public void setPermission(Path p, FsPermission permission) throws 
FileNotFoundException, IOException {
     Path absolutePath = makeAbsolute(p);
+
+    performAuthCheck(absolutePath.toString(),
+        WasbAuthorizationOperations.EXECUTE.toString(), "setPermission");
+
     String key = pathToKey(absolutePath);
     FileMetadata metadata = null;
     try {
@@ -2733,6 +2823,10 @@ public class NativeAzureFileSystem extends FileSystem {
   public void setOwner(Path p, String username, String groupname)
       throws IOException {
     Path absolutePath = makeAbsolute(p);
+
+    performAuthCheck(absolutePath.toString(),
+        WasbAuthorizationOperations.EXECUTE.toString(), "setOwner");
+
     String key = pathToKey(absolutePath);
     FileMetadata metadata = null;
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/68682352/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/RemoteWasbAuthorizerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/RemoteWasbAuthorizerImpl.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/RemoteWasbAuthorizerImpl.java
new file mode 100644
index 0000000..5f2265b
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/RemoteWasbAuthorizerImpl.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.utils.URIBuilder;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+
+import static 
org.apache.hadoop.fs.azure.WasbRemoteCallHelper.REMOTE_CALL_SUCCESS_CODE;
+
+/**
+ * Class implementing WasbAuthorizerInterface using a remote
+ * service that implements the authorization operation. This
+ * class expects the url of the remote service to be passed
+ * via config.
+ */
+public class RemoteWasbAuthorizerImpl implements WasbAuthorizerInterface {
+
+  private String remoteAuthorizerServiceUrl = "";
+
+  /**
+   * Configuration parameter name expected in the Configuration object to
+   * provide the url of the remote service. {@value}
+   */
+  public static final String KEY_REMOTE_AUTH_SERVICE_URL =
+      "fs.azure.authorization.remote.service.url";
+
+  /**
+   * Authorization operation OP name in the remote service {@value}
+   */
+  private static final String CHECK_AUTHORIZATION_OP =
+      "CHECK_AUTHORIZATION";
+
+  /**
+   * Query parameter specifying the access operation type. {@value}
+   */
+  private static final String ACCESS_OPERATION_QUERY_PARAM_NAME =
+      "operation_type";
+
+  /**
+   * Query parameter specifying the wasb absolute path. {@value}
+   */
+  private static final String WASB_ABSOLUTE_PATH_QUERY_PARAM_NAME =
+      "wasb_absolute_path";
+
+  /**
+   * Query parameter name for user info {@value}
+   */
+  private static final String DELEGATION_TOKEN_QUERY_PARAM_NAME =
+      "delegation_token";
+
+  private WasbRemoteCallHelper remoteCallHelper = null;
+
+  @VisibleForTesting
+  public void updateWasbRemoteCallHelper(WasbRemoteCallHelper helper) {
+    this.remoteCallHelper = helper;
+  }
+
+  @Override
+  public void init(Configuration conf)
+      throws WasbAuthorizationException, IOException {
+
+    remoteAuthorizerServiceUrl = conf.get(KEY_REMOTE_AUTH_SERVICE_URL);
+
+    if (remoteAuthorizerServiceUrl == null
+          || remoteAuthorizerServiceUrl.isEmpty()) {
+      throw new WasbAuthorizationException(
+          "fs.azure.authorization.remote.service.url config not set"
+          + " in configuration.");
+    }
+
+    this.remoteCallHelper = new WasbRemoteCallHelper();
+  }
+
+  @Override
+  public boolean authorize(String wasbAbsolutePath, String accessType,
+      String delegationToken) throws WasbAuthorizationException, IOException {
+
+    try {
+      URIBuilder uriBuilder = new URIBuilder(remoteAuthorizerServiceUrl);
+      uriBuilder.setPath("/" + CHECK_AUTHORIZATION_OP);
+      uriBuilder.addParameter(WASB_ABSOLUTE_PATH_QUERY_PARAM_NAME,
+          wasbAbsolutePath);
+      uriBuilder.addParameter(ACCESS_OPERATION_QUERY_PARAM_NAME,
+          accessType);
+      uriBuilder.addParameter(DELEGATION_TOKEN_QUERY_PARAM_NAME,
+          delegationToken);
+
+      String responseBody = remoteCallHelper.makeRemoteGetRequest(
+          new HttpGet(uriBuilder.build()));
+
+      ObjectMapper objectMapper = new ObjectMapper();
+      RemoteAuthorizerResponse authorizerResponse =
+          objectMapper.readValue(responseBody, RemoteAuthorizerResponse.class);
+
+      if (authorizerResponse == null) {
+        throw new WasbAuthorizationException(
+            "RemoteAuthorizerResponse object null from remote call");
+      } else if (authorizerResponse.getResponseCode()
+          == REMOTE_CALL_SUCCESS_CODE) {
+        return authorizerResponse.getAuthorizationResult();
+      } else {
+        throw new WasbAuthorizationException("Remote authorization"
+            + " service encountered an error "
+            + authorizerResponse.getResponseMessage());
+      }
+    } catch (URISyntaxException | WasbRemoteCallException
+        | JsonParseException | JsonMappingException ex) {
+      throw new WasbAuthorizationException(ex);
+    }
+  }
+}
+
+/**
+ * POJO representing the response expected from a remote
+ * authorization service.
+ * The remote service is expected to return the authorization
+ * response in the following JSON format
+ * {
+ *    "responseCode" : 0 or non-zero <int>,
+ *    "responseMessage" : relavant message of failure <String>
+ *    "authorizationResult" : authorization result <boolean>
+ *                            true - if auhorization allowed
+ *                            false - otherwise.
+ *
+ * }
+ */
+class RemoteAuthorizerResponse {
+
+  private int responseCode;
+  private boolean authorizationResult;
+  private String responseMessage;
+
+  public RemoteAuthorizerResponse(int responseCode,
+      boolean authorizationResult, String message) {
+    this.responseCode = responseCode;
+    this.authorizationResult = authorizationResult;
+    this.responseMessage = message;
+  }
+
+  public RemoteAuthorizerResponse() {
+  }
+
+  public int getResponseCode() {
+    return responseCode;
+  }
+
+  public void setResponseCode(int responseCode) {
+    this.responseCode = responseCode;
+  }
+
+  public boolean getAuthorizationResult() {
+    return authorizationResult;
+  }
+
+  public void setAuthorizationResult(boolean authorizationResult) {
+    this.authorizationResult = authorizationResult;
+  }
+
+  public String getResponseMessage() {
+    return responseMessage;
+  }
+
+  public void setResponseMessage(String message) {
+    this.responseMessage = message;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/68682352/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/WasbAuthorizationException.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/WasbAuthorizationException.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/WasbAuthorizationException.java
new file mode 100644
index 0000000..eff9248
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/WasbAuthorizationException.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+/**
+ *  Exception that gets thrown during the authorization failures
+ *  in WASB.
+ */
+public class WasbAuthorizationException extends AzureException {
+
+  private static final long serialVersionUID = 1L;
+
+  public WasbAuthorizationException(String message) {
+    super(message);
+  }
+
+  public WasbAuthorizationException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public WasbAuthorizationException(Throwable t) {
+    super(t);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/68682352/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/WasbAuthorizationOperations.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/WasbAuthorizationOperations.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/WasbAuthorizationOperations.java
new file mode 100644
index 0000000..41ca2b3
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/WasbAuthorizationOperations.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+/**
+ * Different authorization operations supported
+ * in WASB.
+ */
+
+public enum WasbAuthorizationOperations {
+
+  READ, WRITE, EXECUTE;
+
+  @Override
+  public String toString() {
+    switch(this) {
+      case READ:
+        return "read";
+      case WRITE:
+        return "write";
+      case EXECUTE:
+        return "execute";
+      default:
+        throw new IllegalArgumentException(
+            "Invalid Authorization Operation");
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/68682352/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/WasbAuthorizerInterface.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/WasbAuthorizerInterface.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/WasbAuthorizerInterface.java
new file mode 100644
index 0000000..f391851
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/WasbAuthorizerInterface.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ *  Interface to implement authorization support in WASB.
+ *  API's of this interface will be implemented in the
+ *  StorageInterface Layer before making calls to Azure
+ *  Storage.
+ */
+public interface WasbAuthorizerInterface {
+  /**
+   * Initializer method
+   * @param conf - Configuration object
+   * @throws WasbAuthorizationException - On authorization exceptions
+   * @throws IOException - When not able to reach the authorizer
+   */
+  public void init(Configuration conf)
+      throws WasbAuthorizationException, IOException;
+
+  /**
+   * Authorizer API to authorize access in WASB.
+
+   * @param wasbAbolutePath : Absolute WASB Path used for access.
+   * @param accessType : Type of access
+   * @param delegationToken : The user information.
+   * @return : true - If access allowed false - If access is not allowed.
+   * @throws WasbAuthorizationException - On authorization exceptions
+   * @throws IOException - When not able to reach the authorizer
+   */
+  public boolean authorize(String wasbAbolutePath, String accessType,
+      String delegationToken) throws WasbAuthorizationException, IOException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/68682352/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/WasbRemoteCallHelper.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/WasbRemoteCallHelper.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/WasbRemoteCallHelper.java
index 543c899..09ea084 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/WasbRemoteCallHelper.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/WasbRemoteCallHelper.java
@@ -18,18 +18,21 @@
 
 package org.apache.hadoop.fs.azure;
 
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.nio.charset.StandardCharsets;
-
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.http.Header;
 import org.apache.http.HttpResponse;
 import org.apache.http.HttpStatus;
+import org.apache.http.StatusLine;
 import org.apache.http.client.ClientProtocolException;
 import org.apache.http.client.HttpClient;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.impl.client.HttpClientBuilder;
 
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+
 /**
  * Helper class the has constants and helper methods
  * used in WASB when integrating with a remote http cred
@@ -48,6 +51,11 @@ class WasbRemoteCallHelper {
    */
   private HttpClient client = null;
 
+  @VisibleForTesting
+  public void updateHttpClient(HttpClient client) {
+    this.client = client;
+  }
+
   public WasbRemoteCallHelper() {
     this.client = HttpClientBuilder.create().build();
   }
@@ -58,17 +66,54 @@ class WasbRemoteCallHelper {
    * @return Http Response body returned as a string. The caller
    *  is expected to semantically understand the response.
    * @throws WasbRemoteCallException
+   * @throws IOException
    */
   public String makeRemoteGetRequest(HttpGet getRequest)
-      throws WasbRemoteCallException {
+      throws WasbRemoteCallException, IOException {
 
     try {
 
+      final String APPLICATION_JSON = "application/json";
+      final int MAX_CONTENT_LENGTH = 1024;
+
+      getRequest.setHeader("Accept", APPLICATION_JSON);
+
       HttpResponse response = client.execute(getRequest);
 
-      if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
-        throw new WasbRemoteCallException(
-            response.getStatusLine().toString());
+      StatusLine statusLine = response.getStatusLine();
+      if (statusLine == null || statusLine.getStatusCode() != 
HttpStatus.SC_OK) {
+        throw new WasbRemoteCallException(getRequest.getURI().toString() + ":" 
+
+            ((statusLine!=null) ? statusLine.toString() : "NULL")
+        );
+      }
+
+      Header contentTypeHeader = response.getFirstHeader("Content-Type");
+      if (contentTypeHeader == null || contentTypeHeader.getValue() != 
APPLICATION_JSON) {
+        throw new WasbRemoteCallException(getRequest.getURI().toString() + ":" 
+
+            "Content-Type mismatch: expected: " + APPLICATION_JSON +
+            ", got " + ((contentTypeHeader!=null) ? 
contentTypeHeader.getValue() : "NULL")
+        );
+      }
+
+      Header contentLengthHeader = response.getFirstHeader("Content-Length");
+      if (contentLengthHeader == null) {
+        throw new WasbRemoteCallException(getRequest.getURI().toString() + ":" 
+
+            "Content-Length header missing"
+        );
+      }
+
+      try {
+        if (Integer.parseInt(contentLengthHeader.getValue()) > 
MAX_CONTENT_LENGTH) {
+          throw new WasbRemoteCallException(getRequest.getURI().toString() + 
":" +
+              "Content-Length:" + contentLengthHeader.getValue() +
+              "exceeded max:" + MAX_CONTENT_LENGTH
+          );
+        }
+      }
+      catch (NumberFormatException nfe) {
+        throw new WasbRemoteCallException(getRequest.getURI().toString() + ":" 
+
+            "Invalid Content-Length value :" + contentLengthHeader.getValue()
+        );
       }
 
       BufferedReader rd = new BufferedReader(
@@ -83,11 +128,11 @@ class WasbRemoteCallHelper {
       return responseBody.toString();
 
     } catch (ClientProtocolException clientProtocolEx) {
-      throw new WasbRemoteCallException("Encountered ClientProtocolException"
-          + " while making remote call", clientProtocolEx);
+      throw new WasbRemoteCallException(getRequest.getURI().toString() + ":" +
+          "Encountered ClientProtocolException while making remote call", 
clientProtocolEx);
     } catch (IOException ioEx) {
-      throw new WasbRemoteCallException("Encountered IOException while making"
-          + " remote call", ioEx);
+      throw new WasbRemoteCallException(getRequest.getURI().toString() + ":" +
+          "Encountered IOException while making remote call", ioEx);
     }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/68682352/hadoop-tools/hadoop-azure/src/site/markdown/index.md
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/index.md 
b/hadoop-tools/hadoop-azure/src/site/markdown/index.md
index 2865223..1d1274b 100644
--- a/hadoop-tools/hadoop-azure/src/site/markdown/index.md
+++ b/hadoop-tools/hadoop-azure/src/site/markdown/index.md
@@ -333,6 +333,40 @@ The service is expected to return a response in JSON 
format:
   "sasKey" : Requested SAS Key <String>
 }
 ```
+
+## <a name="WASB Authorization" />Authorization Support in WASB.
+
+Authorization support can be enabled in WASB using the following configuration:
+
+```
+    <property>
+      <name>fs.azure.authorization</name>
+      <value>true</value>
+    </property>
+```
+  The current implementation of authorization relies on the presence of an 
external service that can enforce
+  the authorization. The service is expected to be running on a URL provided 
by the following config.
+
+```
+    <property>
+      <name>fs.azure.authorization.remote.service.url</name>
+      <value>{URL}</value>
+    </property>
+```
+
+  The remote service is expected to provide support for the following REST 
call: ```{URL}/CHECK_AUTHORIZATION```
+  An example request:
+  
```{URL}/CHECK_AUTHORIZATION?wasb_absolute_path=<absolute_path>&operation_type=<operation
 type>&delegation_token=<delegation token>```
+
+  The service is expected to return a response in JSON format:
+  ```
+  {
+    "responseCode" : 0 or non-zero <int>,
+    "responseMessage" : relevant message on failure <String>,
+    "authorizationResult" : true/false <boolean>
+  }
+  ```
+
 ## <a name="Testing_the_hadoop-azure_Module" />Testing the hadoop-azure Module
 
 The hadoop-azure module includes a full suite of unit tests.  Most of the tests

http://git-wip-us.apache.org/repos/asf/hadoop/blob/68682352/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/AzureBlobStorageTestAccount.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/AzureBlobStorageTestAccount.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/AzureBlobStorageTestAccount.java
index 5353663..5f66fd2f 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/AzureBlobStorageTestAccount.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/AzureBlobStorageTestAccount.java
@@ -18,17 +18,9 @@
 
 package org.apache.hadoop.fs.azure;
 
-import static 
org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.DEFAULT_STORAGE_EMULATOR_ACCOUNT_NAME;
-
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.EnumSet;
-import java.util.GregorianCalendar;
-import java.util.TimeZone;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
+import com.microsoft.azure.storage.*;
+import com.microsoft.azure.storage.blob.*;
+import com.microsoft.azure.storage.core.Base64;
 import org.apache.commons.configuration2.SubsetConfiguration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -39,22 +31,14 @@ import org.apache.hadoop.metrics2.MetricsRecord;
 import org.apache.hadoop.metrics2.MetricsSink;
 import org.apache.hadoop.metrics2.MetricsTag;
 import org.apache.hadoop.metrics2.impl.TestMetricsConfig;
-import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
-
-import com.microsoft.azure.storage.AccessCondition;
-import com.microsoft.azure.storage.CloudStorageAccount;
-import com.microsoft.azure.storage.StorageCredentials;
-import com.microsoft.azure.storage.StorageCredentialsAccountAndKey;
-import com.microsoft.azure.storage.StorageCredentialsAnonymous;
-import com.microsoft.azure.storage.blob.BlobContainerPermissions;
-import com.microsoft.azure.storage.blob.BlobContainerPublicAccessType;
-import com.microsoft.azure.storage.blob.BlobOutputStream;
-import com.microsoft.azure.storage.blob.CloudBlobClient;
-import com.microsoft.azure.storage.blob.CloudBlobContainer;
-import com.microsoft.azure.storage.blob.CloudBlockBlob;
-import com.microsoft.azure.storage.blob.SharedAccessBlobPermissions;
-import com.microsoft.azure.storage.blob.SharedAccessBlobPolicy;
-import com.microsoft.azure.storage.core.Base64;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.*;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static 
org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.DEFAULT_STORAGE_EMULATOR_ACCOUNT_NAME;
+import static 
org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.KEY_USE_LOCAL_SAS_KEY_MODE;
 
 /**
  * Helper class to create WASB file systems backed by either a mock in-memory
@@ -92,7 +76,7 @@ public final class AzureBlobStorageTestAccount {
   private static final ConcurrentLinkedQueue<MetricsRecord> allMetrics =
       new ConcurrentLinkedQueue<MetricsRecord>();
   private static boolean metricsConfigSaved = false;
-  
+
   private AzureBlobStorageTestAccount(NativeAzureFileSystem fs,
       CloudStorageAccount account,
       CloudBlobContainer container) {
@@ -272,6 +256,7 @@ public final class AzureBlobStorageTestAccount {
     store.setAzureStorageInteractionLayer(mockStorage);
     NativeAzureFileSystem fs = new NativeAzureFileSystem(store);
     setMockAccountKey(conf);
+    configureSecureModeTestSettings(conf);
     // register the fs provider.
 
     fs.initialize(new URI(MOCK_WASB_URI), conf);
@@ -332,6 +317,8 @@ public final class AzureBlobStorageTestAccount {
     // Set account URI and initialize Azure file system.
     URI accountUri = createAccountUri(DEFAULT_STORAGE_EMULATOR_ACCOUNT_NAME,
         containerName);
+    configureSecureModeTestSettings(conf);
+
     fs.initialize(accountUri, conf);
 
     // Create test account initializing the appropriate member variables.
@@ -368,6 +355,7 @@ public final class AzureBlobStorageTestAccount {
     // out-of-band appends.
     conf.setBoolean(KEY_DISABLE_THROTTLING, true);
     conf.setBoolean(KEY_READ_TOLERATE_CONCURRENT_APPEND, true);
+    configureSecureModeTestSettings(conf);
 
     // Set account URI and initialize Azure file system.
     URI accountUri = createAccountUri(accountName, containerName);
@@ -409,6 +397,17 @@ public final class AzureBlobStorageTestAccount {
   }
 
   /**
+   * Configure default values for Secure Mode testing.
+   * These values are relevant only when testing in Secure Mode.
+   *
+   * @param conf
+   *          The configuration.
+   */
+  public static void configureSecureModeTestSettings(Configuration conf) {
+    conf.set(KEY_USE_LOCAL_SAS_KEY_MODE, "true"); // always use local sas-key 
mode for testing
+  }
+
+  /**
    * Sets the mock account key in the given configuration.
    * 
    * @param conf
@@ -556,6 +555,8 @@ public final class AzureBlobStorageTestAccount {
       conf.setBoolean(KEY_DISABLE_THROTTLING, true);
     }
 
+    configureSecureModeTestSettings(conf);
+
     // Set account URI and initialize Azure file system.
     URI accountUri = createAccountUri(accountName, containerName);
     fs.initialize(accountUri, conf);
@@ -693,6 +694,8 @@ public final class AzureBlobStorageTestAccount {
     // Capture the account URL and the account name.
     String accountName = conf.get(TEST_ACCOUNT_NAME_PROPERTY_NAME);
 
+    configureSecureModeTestSettings(conf);
+
     // Generate a container name and create a shared access signature string 
for
     // it.
     //
@@ -764,6 +767,8 @@ public final class AzureBlobStorageTestAccount {
     // Capture the account URL and the account name.
     String accountName = conf.get(TEST_ACCOUNT_NAME_PROPERTY_NAME);
 
+    configureSecureModeTestSettings(conf);
+
     // Set up public container with the specified blob name.
     CloudBlockBlob blobRoot = primeRootContainer(blobClient, accountName,
         blobName, fileSize);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/68682352/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockWasbAuthorizerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockWasbAuthorizerImpl.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockWasbAuthorizerImpl.java
new file mode 100644
index 0000000..8f7cb2a
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockWasbAuthorizerImpl.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * A mock wasb authorizer implementation.
+ */
+
+public class MockWasbAuthorizerImpl implements WasbAuthorizerInterface {
+
+  private Map<AuthorizationComponent, Boolean> authRules;
+
+  @Override
+  public void init(Configuration conf) {
+    authRules = new HashMap<AuthorizationComponent, Boolean>();
+  }
+
+  public void addAuthRule(String wasbAbsolutePath,
+      String accessType, boolean access) {
+    AuthorizationComponent component =
+        new AuthorizationComponent(wasbAbsolutePath, accessType);
+    this.authRules.put(component, access);
+  }
+
+  @Override
+  public boolean authorize(String wasbAbsolutePath, String accessType,
+      String delegationToken) throws WasbAuthorizationException {
+
+    AuthorizationComponent component =
+        new AuthorizationComponent(wasbAbsolutePath, accessType);
+
+    if (authRules.containsKey(component)) {
+      return authRules.get(component);
+    } else {
+      return false;
+    }
+  }
+}
+
+class AuthorizationComponent {
+
+  private String wasbAbsolutePath;
+  private String accessType;
+
+  public AuthorizationComponent(String wasbAbsolutePath,
+      String accessType) {
+    this.wasbAbsolutePath = wasbAbsolutePath;
+    this.accessType = accessType;
+  }
+
+  @Override
+  public int hashCode() {
+    return this.wasbAbsolutePath.hashCode() ^ this.accessType.hashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+
+    if (obj == this) {
+      return true;
+    }
+
+    if (obj == null
+        || !(obj instanceof AuthorizationComponent)) {
+      return false;
+    }
+
+    return ((AuthorizationComponent)obj).
+              getWasbAbsolutePath().equals(this.wasbAbsolutePath)
+            && ((AuthorizationComponent)obj).
+              getAccessType().equals(this.accessType);
+  }
+
+  public String getWasbAbsolutePath() {
+    return this.wasbAbsolutePath;
+  }
+
+  public String getAccessType() {
+    return accessType;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/68682352/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemAuthorization.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemAuthorization.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemAuthorization.java
new file mode 100644
index 0000000..a2bbeb1
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemAuthorization.java
@@ -0,0 +1,344 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.sun.tools.javac.util.Assert;
+import org.junit.rules.ExpectedException;
+
+import java.io.Console;
+
+import static 
org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.KEY_USE_SECURE_MODE;
+
+/**
+ * Test class to hold all WASB authorization tests.
+ */
+public class TestNativeAzureFileSystemAuthorization
+  extends AbstractWasbTestBase {
+
+  @Override
+  protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(NativeAzureFileSystem.KEY_AZURE_AUTHORIZATION, "true");
+    conf.set(RemoteWasbAuthorizerImpl.KEY_REMOTE_AUTH_SERVICE_URL, 
"http://localhost/";);
+    return AzureBlobStorageTestAccount.create(conf);
+  }
+
+
+  @Before
+  public void beforeMethod() {
+    boolean useSecureMode = fs.getConf().getBoolean(KEY_USE_SECURE_MODE, 
false);
+    boolean useAuthorization = 
fs.getConf().getBoolean(NativeAzureFileSystem.KEY_AZURE_AUTHORIZATION, false);
+    Assume.assumeTrue("Test valid when both SecureMode and Authorization are 
enabled .. skipping",
+        useSecureMode && useAuthorization);
+
+    Assume.assumeTrue(
+        useSecureMode && useAuthorization
+    );
+  }
+
+
+  @Rule
+  public ExpectedException expectedEx = ExpectedException.none();
+
+  /**
+   * Positive test to verify Create and delete access check
+   * @throws Throwable
+   */
+  @Test
+  public void testCreateAccessCheckPositive() throws Throwable {
+
+    AzureBlobStorageTestAccount testAccount = createTestAccount();
+    NativeAzureFileSystem fs = testAccount.getFileSystem();
+
+    Path parentDir = new Path("/testCreateAccessCheckPositive");
+    Path testPath = new Path(parentDir, "test.dat");
+
+    MockWasbAuthorizerImpl authorizer = new MockWasbAuthorizerImpl();
+    authorizer.init(null);
+    authorizer.addAuthRule(testPath.toString(), 
WasbAuthorizationOperations.WRITE.toString(), true);
+    authorizer.addAuthRule(testPath.toString(), 
WasbAuthorizationOperations.EXECUTE.toString(), true);
+    authorizer.addAuthRule(parentDir.toString(), 
WasbAuthorizationOperations.EXECUTE.toString(), true);
+    fs.updateWasbAuthorizer(authorizer);
+
+    fs.create(testPath);
+    ContractTestUtils.assertPathExists(fs, "testPath was not created", 
testPath);
+    fs.delete(parentDir, true);
+  }
+
+  /**
+   * Negative test to verify Create access check
+   * @throws Throwable
+   */
+
+  @Test // (expected=WasbAuthorizationException.class)
+  public void testCreateAccessCheckNegative() throws Throwable {
+
+    expectedEx.expect(WasbAuthorizationException.class);
+    expectedEx.expectMessage("create operation for Path : 
/testCreateAccessCheckNegative/test.dat not allowed");
+
+    AzureBlobStorageTestAccount testAccount = createTestAccount();
+    NativeAzureFileSystem fs = testAccount.getFileSystem();
+
+    Path parentDir = new Path("/testCreateAccessCheckNegative");
+    Path testPath = new Path(parentDir, "test.dat");
+
+    MockWasbAuthorizerImpl authorizer = new MockWasbAuthorizerImpl();
+    authorizer.init(null);
+    
authorizer.addAuthRule(testPath.toString(),WasbAuthorizationOperations.WRITE.toString(),
 false);
+    
authorizer.addAuthRule(parentDir.toString(),WasbAuthorizationOperations.EXECUTE.toString(),
 true);
+    fs.updateWasbAuthorizer(authorizer);
+
+    try {
+      fs.create(testPath);
+    }
+    finally {
+      fs.delete(parentDir, true);
+    }
+  }
+
+  /**
+   * Positive test to verify Create and delete access check
+   * @throws Throwable
+   */
+  @Test
+  public void testListAccessCheckPositive() throws Throwable {
+
+    AzureBlobStorageTestAccount testAccount = createTestAccount();
+    NativeAzureFileSystem fs = testAccount.getFileSystem();
+
+    Path parentDir = new Path("/testListAccessCheckPositive");
+    Path testPath = new Path(parentDir, "test.dat");
+
+    MockWasbAuthorizerImpl authorizer = new MockWasbAuthorizerImpl();
+    authorizer.init(null);
+    authorizer.addAuthRule(testPath.toString(), 
WasbAuthorizationOperations.WRITE.toString(), true);
+    authorizer.addAuthRule(testPath.toString(), 
WasbAuthorizationOperations.EXECUTE.toString(), true);
+    authorizer.addAuthRule(parentDir.toString(), 
WasbAuthorizationOperations.EXECUTE.toString(), true);
+    fs.updateWasbAuthorizer(authorizer);
+
+    fs.create(testPath);
+    ContractTestUtils.assertPathExists(fs, "testPath does not exist", 
testPath);
+
+    try {
+      fs.listStatus(testPath);
+    }
+    finally {
+      fs.delete(parentDir, true);
+    }
+  }
+
+  /**
+   * Negative test to verify Create access check
+   * @throws Throwable
+   */
+
+  @Test //(expected=WasbAuthorizationException.class)
+  public void testListAccessCheckNegative() throws Throwable {
+
+    expectedEx.expect(WasbAuthorizationException.class);
+    expectedEx.expectMessage("getFileStatus operation for Path : 
/testListAccessCheckNegative/test.dat not allowed");
+
+    AzureBlobStorageTestAccount testAccount = createTestAccount();
+    NativeAzureFileSystem fs = testAccount.getFileSystem();
+
+    Path parentDir = new Path("/testListAccessCheckNegative");
+    Path testPath = new Path(parentDir, "test.dat");
+
+    MockWasbAuthorizerImpl authorizer = new MockWasbAuthorizerImpl();
+    authorizer.init(null);
+    authorizer.addAuthRule(testPath.toString(), 
WasbAuthorizationOperations.WRITE.toString(), true);
+    authorizer.addAuthRule(testPath.toString(), 
WasbAuthorizationOperations.EXECUTE.toString(), false);
+    authorizer.addAuthRule(parentDir.toString(), 
WasbAuthorizationOperations.EXECUTE.toString(), true);
+    fs.updateWasbAuthorizer(authorizer);
+
+    fs.create(testPath);
+    ContractTestUtils.assertPathExists(fs, "testPath does not exist", 
testPath);
+
+    try {
+      fs.listStatus(testPath);
+    }
+    finally {
+      fs.delete(parentDir, true);
+    }
+  }
+
+  /**
+   * Positive test to verify rename access check.
+   * @throws Throwable
+   */
+  @Test
+  public void testRenameAccessCheckPositive() throws Throwable {
+
+    AzureBlobStorageTestAccount testAccount = createTestAccount();
+    NativeAzureFileSystem fs = testAccount.getFileSystem();
+
+    Path parentDir = new Path("/testRenameAccessCheckPositive");
+    Path testPath = new Path(parentDir, "test.dat");
+    Path renamePath = new Path(parentDir, "test2.dat");
+
+    MockWasbAuthorizerImpl authorizer = new MockWasbAuthorizerImpl();
+    authorizer.init(null);
+    authorizer.addAuthRule(testPath.toString(), 
WasbAuthorizationOperations.WRITE.toString(), true);
+    authorizer.addAuthRule(testPath.toString(), 
WasbAuthorizationOperations.EXECUTE.toString(), true);
+    authorizer.addAuthRule(renamePath.toString(), 
WasbAuthorizationOperations.EXECUTE.toString(), true);
+    authorizer.addAuthRule(parentDir.toString(), 
WasbAuthorizationOperations.EXECUTE.toString(), true);
+    fs.updateWasbAuthorizer(authorizer);
+
+    fs.create(testPath);
+    ContractTestUtils.assertPathExists(fs, "sourcePath does not exist", 
testPath);
+
+    try {
+      fs.rename(testPath, renamePath);
+      ContractTestUtils.assertPathExists(fs, "destPath does not exist", 
renamePath);
+    }
+    finally {
+      fs.delete(parentDir, true);
+    }
+  }
+
+  /**
+   * Negative test to verify rename access check.
+   * @throws Throwable
+   */
+  @Test //(expected=WasbAuthorizationException.class)
+  public void testRenameAccessCheckNegative() throws Throwable {
+
+    expectedEx.expect(WasbAuthorizationException.class);
+    expectedEx.expectMessage("rename operation for Path : 
/testRenameAccessCheckNegative/test.dat not allowed");
+
+    AzureBlobStorageTestAccount testAccount = createTestAccount();
+    NativeAzureFileSystem fs = testAccount.getFileSystem();
+    Path parentDir = new Path("/testRenameAccessCheckNegative");
+    Path testPath = new Path(parentDir, "test.dat");
+    Path renamePath = new Path(parentDir, "test2.dat");
+
+    MockWasbAuthorizerImpl authorizer = new MockWasbAuthorizerImpl();
+    authorizer.init(null);
+    authorizer.addAuthRule(testPath.toString(), 
WasbAuthorizationOperations.WRITE.toString(), true);
+    // set EXECUTE to true for initial assert right after creation.
+    authorizer.addAuthRule(testPath.toString(), 
WasbAuthorizationOperations.EXECUTE.toString(), true);
+    authorizer.addAuthRule(parentDir.toString(), 
WasbAuthorizationOperations.EXECUTE.toString(), true);
+    fs.updateWasbAuthorizer(authorizer);
+
+    fs.create(testPath);
+    ContractTestUtils.assertPathExists(fs, "sourcePath does not exist", 
testPath);
+
+    // Set EXECUTE to false for actual rename-failure test
+    authorizer.addAuthRule(testPath.toString(), 
WasbAuthorizationOperations.EXECUTE.toString(), false);
+    fs.updateWasbAuthorizer(authorizer);
+
+    try {
+      fs.rename(testPath, renamePath);
+      ContractTestUtils.assertPathExists(fs, "destPath does not exist", 
renamePath);
+    } finally {
+      fs.delete(parentDir, true);
+    }
+  }
+
+  /**
+   * Positive test for read access check.
+   * @throws Throwable
+   */
+  @Test
+  public void testReadAccessCheckPositive() throws Throwable {
+
+    AzureBlobStorageTestAccount testAccount = createTestAccount();
+    NativeAzureFileSystem fs = testAccount.getFileSystem();
+    Path parentDir = new Path("/testReadAccessCheckPositive");
+    Path testPath = new Path(parentDir, "test.dat");
+
+    MockWasbAuthorizerImpl authorizer = new MockWasbAuthorizerImpl();
+    authorizer.init(null);
+    authorizer.addAuthRule(testPath.toString(), 
WasbAuthorizationOperations.WRITE.toString(), true);
+    authorizer.addAuthRule(testPath.toString(), 
WasbAuthorizationOperations.EXECUTE.toString(), true);
+    authorizer.addAuthRule(testPath.toString(), 
WasbAuthorizationOperations.READ.toString(), true);
+    authorizer.addAuthRule(parentDir.toString(), 
WasbAuthorizationOperations.EXECUTE.toString(), true);
+    fs.updateWasbAuthorizer(authorizer);
+
+    FSDataOutputStream fso = fs.create(testPath);
+    String data = "Hello World";
+    fso.writeBytes(data);
+    fso.close();
+    ContractTestUtils.assertPathExists(fs, "testPath does not exist", 
testPath);
+
+    FSDataInputStream inputStream = null;
+    try {
+      inputStream = fs.open(testPath);
+      ContractTestUtils.verifyRead(inputStream, data.getBytes(), 0, 
data.length());
+    }
+    finally {
+      if(inputStream != null) {
+        inputStream.close();
+      }
+      fs.delete(parentDir, true);
+    }
+  }
+
+  /**
+   * Negative test to verify read access check.
+   * @throws Throwable
+   */
+
+  @Test //(expected=WasbAuthorizationException.class)
+  public void testReadAccessCheckNegative() throws Throwable {
+
+    expectedEx.expect(WasbAuthorizationException.class);
+    expectedEx.expectMessage("read operation for Path : 
/testReadAccessCheckNegative/test.dat not allowed");
+
+    AzureBlobStorageTestAccount testAccount = createTestAccount();
+    NativeAzureFileSystem fs = testAccount.getFileSystem();
+    Path parentDir = new Path("/testReadAccessCheckNegative");
+    Path testPath = new Path(parentDir, "test.dat");
+
+    MockWasbAuthorizerImpl authorizer = new MockWasbAuthorizerImpl();
+    authorizer.init(null);
+    authorizer.addAuthRule(testPath.toString(), 
WasbAuthorizationOperations.WRITE.toString(), true);
+    authorizer.addAuthRule(testPath.toString(), 
WasbAuthorizationOperations.EXECUTE.toString(), true);
+    authorizer.addAuthRule(testPath.toString(), 
WasbAuthorizationOperations.READ.toString(), false);
+    authorizer.addAuthRule(parentDir.toString(), 
WasbAuthorizationOperations.EXECUTE.toString(), true);
+    fs.updateWasbAuthorizer(authorizer);
+
+    FSDataOutputStream fso = fs.create(testPath);
+    String data = "Hello World";
+    fso.writeBytes(data);
+    fso.close();
+    ContractTestUtils.assertPathExists(fs, "testPath does not exist", 
testPath);
+
+    FSDataInputStream inputStream = null;
+    try {
+      inputStream = fs.open(testPath);
+      ContractTestUtils.verifyRead(inputStream, data.getBytes(), 0, 
data.length());
+    } finally {
+      if (inputStream != null) {
+        inputStream.close();
+      }
+      fs.delete(parentDir, true);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/68682352/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestWasbRemoteCallHelper.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestWasbRemoteCallHelper.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestWasbRemoteCallHelper.java
new file mode 100644
index 0000000..b13e5e9
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestWasbRemoteCallHelper.java
@@ -0,0 +1,344 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.http.*;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpGet;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.mockito.Mockito;
+
+import java.io.ByteArrayInputStream;
+import java.nio.charset.StandardCharsets;
+
+import static 
org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.KEY_USE_SECURE_MODE;
+
+/**
+ * Test class to hold all WasbRemoteCallHelper tests
+ */
+public class TestWasbRemoteCallHelper
+    extends AbstractWasbTestBase {
+
+  @Override
+  protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(NativeAzureFileSystem.KEY_AZURE_AUTHORIZATION, "true");
+    conf.set(RemoteWasbAuthorizerImpl.KEY_REMOTE_AUTH_SERVICE_URL, 
"http://localhost/";);
+    return AzureBlobStorageTestAccount.create(conf);
+  }
+
+  @Before
+  public void beforeMethod() {
+    boolean useSecureMode = fs.getConf().getBoolean(KEY_USE_SECURE_MODE, 
false);
+    boolean useAuthorization = 
fs.getConf().getBoolean(NativeAzureFileSystem.KEY_AZURE_AUTHORIZATION, false);
+    Assume.assumeTrue("Test valid when both SecureMode and Authorization are 
enabled .. skipping",
+        useSecureMode && useAuthorization);
+
+    Assume.assumeTrue(
+        useSecureMode && useAuthorization
+    );
+  }
+
+  @Rule
+  public ExpectedException expectedEx = ExpectedException.none();
+
+  /**
+   * Test invalid status-code
+   * @throws Throwable
+   */
+  @Test // (expected = WasbAuthorizationException.class)
+  public void testInvalidStatusCode() throws Throwable {
+
+    setupExpectations();
+
+    // set up mocks
+    HttpClient mockHttpClient = Mockito.mock(HttpClient.class);
+    HttpResponse mockHttpResponse = Mockito.mock(HttpResponse.class);
+    
Mockito.when(mockHttpClient.execute(Mockito.<HttpGet>any())).thenReturn(mockHttpResponse);
+    
Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(999));
+    // finished setting up mocks
+
+    performop(mockHttpClient);
+  }
+
+  /**
+   * Test invalid Content-Type
+   * @throws Throwable
+   */
+  @Test // (expected = WasbAuthorizationException.class)
+  public void testInvalidContentType() throws Throwable {
+
+    setupExpectations();
+
+    // set up mocks
+    HttpClient mockHttpClient = Mockito.mock(HttpClient.class);
+    HttpResponse mockHttpResponse = Mockito.mock(HttpResponse.class);
+    
Mockito.when(mockHttpClient.execute(Mockito.<HttpGet>any())).thenReturn(mockHttpResponse);
+    
Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(200));
+    Mockito.when(mockHttpResponse.getFirstHeader("Content-Type"))
+        .thenReturn(newHeader("Content-Type", "text/plain"));
+    // finished setting up mocks
+
+    performop(mockHttpClient);
+  }
+
+  /**
+   * Test missing Content-Length
+   * @throws Throwable
+   */
+  @Test // (expected = WasbAuthorizationException.class)
+  public void testMissingContentLength() throws Throwable {
+
+    setupExpectations();
+
+    // set up mocks
+    HttpClient mockHttpClient = Mockito.mock(HttpClient.class);
+    HttpResponse mockHttpResponse = Mockito.mock(HttpResponse.class);
+    
Mockito.when(mockHttpClient.execute(Mockito.<HttpGet>any())).thenReturn(mockHttpResponse);
+    
Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(200));
+    Mockito.when(mockHttpResponse.getFirstHeader("Content-Type"))
+        .thenReturn(newHeader("Content-Type", "application/json"));
+    // finished setting up mocks
+
+    performop(mockHttpClient);
+  }
+
+  /**
+   * Test Content-Length exceeds max
+   * @throws Throwable
+   */
+  @Test // (expected = WasbAuthorizationException.class)
+  public void testContentLengthExceedsMax() throws Throwable {
+
+    setupExpectations();
+
+    // set up mocks
+    HttpClient mockHttpClient = Mockito.mock(HttpClient.class);
+    HttpResponse mockHttpResponse = Mockito.mock(HttpResponse.class);
+    
Mockito.when(mockHttpClient.execute(Mockito.<HttpGet>any())).thenReturn(mockHttpResponse);
+    
Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(200));
+    Mockito.when(mockHttpResponse.getFirstHeader("Content-Type"))
+        .thenReturn(newHeader("Content-Type", "application/json"));
+    Mockito.when(mockHttpResponse.getFirstHeader("Content-Length"))
+        .thenReturn(newHeader("Content-Length", "2048"));
+    // finished setting up mocks
+
+    performop(mockHttpClient);
+  }
+
+  /**
+   * Test invalid Content-Length value
+   * @throws Throwable
+   */
+  @Test // (expected = WasbAuthorizationException.class)
+  public void testInvalidContentLengthValue() throws Throwable {
+
+    setupExpectations();
+
+    // set up mocks
+    HttpClient mockHttpClient = Mockito.mock(HttpClient.class);
+    HttpResponse mockHttpResponse = Mockito.mock(HttpResponse.class);
+    
Mockito.when(mockHttpClient.execute(Mockito.<HttpGet>any())).thenReturn(mockHttpResponse);
+    
Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(200));
+    Mockito.when(mockHttpResponse.getFirstHeader("Content-Type"))
+        .thenReturn(newHeader("Content-Type", "application/json"));
+    Mockito.when(mockHttpResponse.getFirstHeader("Content-Length"))
+        .thenReturn(newHeader("Content-Length", "20abc48"));
+    // finished setting up mocks
+
+    performop(mockHttpClient);
+  }
+
+  /**
+   * Test valid JSON response
+   * @throws Throwable
+   */
+  @Test
+  public void testValidJSONResponse() throws Throwable {
+
+    // set up mocks
+    HttpClient mockHttpClient = Mockito.mock(HttpClient.class);
+
+    HttpResponse mockHttpResponse = Mockito.mock(HttpResponse.class);
+    HttpEntity mockHttpEntity = Mockito.mock(HttpEntity.class);
+
+    
Mockito.when(mockHttpClient.execute(Mockito.<HttpGet>any())).thenReturn(mockHttpResponse);
+    
Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(200));
+    Mockito.when(mockHttpResponse.getFirstHeader("Content-Type"))
+        .thenReturn(newHeader("Content-Type", "application/json"));
+    Mockito.when(mockHttpResponse.getFirstHeader("Content-Length"))
+        .thenReturn(newHeader("Content-Length", "1024"));
+    Mockito.when(mockHttpResponse.getEntity()).thenReturn(mockHttpEntity);
+    Mockito.when(mockHttpEntity.getContent())
+        .thenReturn(new 
ByteArrayInputStream(validJsonResponse().getBytes(StandardCharsets.UTF_8)))
+        .thenReturn(new 
ByteArrayInputStream(validJsonResponse().getBytes(StandardCharsets.UTF_8)))
+        .thenReturn(new 
ByteArrayInputStream(validJsonResponse().getBytes(StandardCharsets.UTF_8)));
+    // finished setting up mocks
+
+    performop(mockHttpClient);
+  }
+
+  /**
+   * Test malformed JSON response
+   * @throws Throwable
+   */
+  @Test // (expected = WasbAuthorizationException.class)
+  public void testMalFormedJSONResponse() throws Throwable {
+
+    expectedEx.expect(WasbAuthorizationException.class);
+    expectedEx.expectMessage("com.fasterxml.jackson.core.JsonParseException: 
Unexpected end-of-input in FIELD_NAME");
+
+    // set up mocks
+    HttpClient mockHttpClient = Mockito.mock(HttpClient.class);
+
+    HttpResponse mockHttpResponse = Mockito.mock(HttpResponse.class);
+    HttpEntity mockHttpEntity = Mockito.mock(HttpEntity.class);
+
+    
Mockito.when(mockHttpClient.execute(Mockito.<HttpGet>any())).thenReturn(mockHttpResponse);
+    
Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(200));
+    Mockito.when(mockHttpResponse.getFirstHeader("Content-Type"))
+        .thenReturn(newHeader("Content-Type", "application/json"));
+    Mockito.when(mockHttpResponse.getFirstHeader("Content-Length"))
+        .thenReturn(newHeader("Content-Length", "1024"));
+    Mockito.when(mockHttpResponse.getEntity()).thenReturn(mockHttpEntity);
+    Mockito.when(mockHttpEntity.getContent())
+        .thenReturn(new 
ByteArrayInputStream(malformedJsonResponse().getBytes(StandardCharsets.UTF_8)));
+    // finished setting up mocks
+
+    performop(mockHttpClient);
+  }
+
+  /**
+   * Test valid JSON response failure response code
+   * @throws Throwable
+   */
+  @Test // (expected = WasbAuthorizationException.class)
+  public void testFailureCodeJSONResponse() throws Throwable {
+
+    expectedEx.expect(WasbAuthorizationException.class);
+    expectedEx.expectMessage("Remote authorization service encountered an 
error Unauthorized");
+
+    // set up mocks
+    HttpClient mockHttpClient = Mockito.mock(HttpClient.class);
+
+    HttpResponse mockHttpResponse = Mockito.mock(HttpResponse.class);
+    HttpEntity mockHttpEntity = Mockito.mock(HttpEntity.class);
+
+    
Mockito.when(mockHttpClient.execute(Mockito.<HttpGet>any())).thenReturn(mockHttpResponse);
+    
Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(200));
+    Mockito.when(mockHttpResponse.getFirstHeader("Content-Type"))
+        .thenReturn(newHeader("Content-Type", "application/json"));
+    Mockito.when(mockHttpResponse.getFirstHeader("Content-Length"))
+        .thenReturn(newHeader("Content-Length", "1024"));
+    Mockito.when(mockHttpResponse.getEntity()).thenReturn(mockHttpEntity);
+    Mockito.when(mockHttpEntity.getContent())
+        .thenReturn(new 
ByteArrayInputStream(failureCodeJsonResponse().getBytes(StandardCharsets.UTF_8)));
+    // finished setting up mocks
+
+    performop(mockHttpClient);
+  }
+
+  private void setupExpectations() {
+    expectedEx.expect(WasbAuthorizationException.class);
+    
expectedEx.expectMessage("org.apache.hadoop.fs.azure.WasbRemoteCallException: " 
+
+        "http://localhost/CHECK_AUTHORIZATION?wasb_absolute_path=%2Ftest.dat&"; 
+
+        "operation_type=write&delegation_token:Encountered IOException while 
making remote call");
+  }
+
+  private void performop(HttpClient mockHttpClient) throws Throwable {
+    AzureBlobStorageTestAccount testAccount = createTestAccount();
+    NativeAzureFileSystem fs = testAccount.getFileSystem();
+
+    Path testPath = new Path("/", "test.dat");
+
+    RemoteWasbAuthorizerImpl authorizer = new RemoteWasbAuthorizerImpl();
+    authorizer.init(fs.getConf());
+    WasbRemoteCallHelper mockWasbRemoteCallHelper = new WasbRemoteCallHelper();
+    mockWasbRemoteCallHelper.updateHttpClient(mockHttpClient);
+    authorizer.updateWasbRemoteCallHelper(mockWasbRemoteCallHelper);
+    fs.updateWasbAuthorizer(authorizer);
+
+    fs.create(testPath);
+    ContractTestUtils.assertPathExists(fs, "testPath was not created", 
testPath);
+    fs.delete(testPath, false);
+  }
+
+  private String validJsonResponse() {
+    return new String(
+        "{\"responseCode\": 0, \"authorizationResult\": true, 
\"responseMessage\": \"Authorized\"}"
+    );
+  }
+
+  private String malformedJsonResponse() {
+    return new String(
+        "{\"responseCode\": 0, \"authorizationResult\": true, 
\"responseMessage\":"
+    );
+  }
+
+  private String failureCodeJsonResponse() {
+    return new String(
+        "{\"responseCode\": 1, \"authorizationResult\": false, 
\"responseMessage\": \"Unauthorized\"}"
+    );
+  }
+
+  private StatusLine newStatusLine(int statusCode) {
+    return new StatusLine() {
+      @Override
+      public ProtocolVersion getProtocolVersion() {
+        return new ProtocolVersion("HTTP", 1, 1);
+      }
+
+      @Override
+      public int getStatusCode() {
+        return statusCode;
+      }
+
+      @Override
+      public String getReasonPhrase() {
+        return "Reason Phrase";
+      }
+    };
+  }
+
+  private Header newHeader(String name, String value) {
+    return new Header() {
+      @Override
+      public String getName() {
+        return name;
+      }
+
+      @Override
+      public String getValue() {
+        return value;
+      }
+
+      @Override
+      public HeaderElement[] getElements() throws ParseException {
+        return new HeaderElement[0];
+      }
+    };
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/68682352/hadoop-tools/hadoop-azure/src/test/resources/azure-test.xml
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/resources/azure-test.xml 
b/hadoop-tools/hadoop-azure/src/test/resources/azure-test.xml
index e898aa6..c73d6d8 100644
--- a/hadoop-tools/hadoop-azure/src/test/resources/azure-test.xml
+++ b/hadoop-tools/hadoop-azure/src/test/resources/azure-test.xml
@@ -19,27 +19,21 @@
   <!-- For tests against live azure, provide the following account information 
-->
 
   <!--
-  <property>
-    <name>fs.azure.test.account.name</name>
-    <value>{ACCOUNTNAME}.blob.core.windows.net</value>
-  </property>
-  <property>
-    <name>fs.azure.account.key.{ACCOUNTNAME}.blob.core.windows.net</name>
-    <value>{ACCOUNTKEY}</value>
-  </property>
+    <property>
+      <name>fs.azure.test.account.name</name>
+      <value>{ACCOUNTNAME}.blob.core.windows.net</value>
+    </property>
+    <property>
+      <name>fs.azure.account.key.{ACCOUNTNAME}.blob.core.windows.net</name>
+      <value>{ACCOUNTKEY}</value>
+    </property>
+  -->
+
   <property>
     <name>fs.azure.secure.mode</name>
     <value>false</value>
   </property>
-  <property>
-    <name>fs.azure.local.sas.key.mode</name>
-    <value>false</value>
-  </property>
-  <property>
-    <name>fs.azure.cred.service.url</name>
-    <value>{CRED_SERIVCE_URL}</value>
-  </property>
-  -->
+
 
   <!-- Save the above configuration properties in a separate file named -->
   <!-- azure-auth-keys.xml in the same directory as this file. -->


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to