HADOOP-12666. Support Microsoft Azure Data Lake - as a file system in Hadoop. 
Contributed by Vishwajeet Dusane.

(cherry picked from commit 9581fb715cbc8a6ad28566e83c6d0242a7306688)

Conflicts:
        hadoop-tools/hadoop-tools-dist/pom.xml
        hadoop-tools/pom.xml


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a8f03ef7
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a8f03ef7
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a8f03ef7

Branch: refs/heads/branch-2
Commit: a8f03ef7ea8163c00ce5d72a4e1c77284befe5aa
Parents: 6350e4b
Author: Chris Nauroth <cnaur...@apache.org>
Authored: Thu Jun 9 14:49:05 2016 -0700
Committer: Chris Nauroth <cnaur...@apache.org>
Committed: Thu Jun 9 14:49:05 2016 -0700

----------------------------------------------------------------------
 .../src/main/resources/core-default.xml         |   60 +
 .../conf/TestCommonConfigurationFields.java     |    6 +
 hadoop-project/src/site/site.xml                |    2 +
 .../dev-support/findbugs-exclude.xml            |   24 +
 hadoop-tools/hadoop-azure-datalake/pom.xml      |  180 +++
 .../main/java/org/apache/hadoop/fs/adl/Adl.java |   52 +
 .../org/apache/hadoop/fs/adl/AdlFileSystem.java |   41 +
 ...hedRefreshTokenBasedAccessTokenProvider.java |  135 +++
 .../hadoop/fs/adl/oauth2/package-info.java      |   23 +
 .../org/apache/hadoop/fs/adl/package-info.java  |   23 +
 .../org/apache/hadoop/hdfs/web/ADLConfKeys.java |   61 +
 .../apache/hadoop/hdfs/web/BufferManager.java   |  180 +++
 .../web/PrivateAzureDataLakeFileSystem.java     | 1108 ++++++++++++++++++
 ...hedRefreshTokenBasedAccessTokenProvider.java |   37 +
 .../hadoop/hdfs/web/oauth2/package-info.java    |   24 +
 .../apache/hadoop/hdfs/web/package-info.java    |   25 +
 .../hadoop/hdfs/web/resources/ADLFlush.java     |   49 +
 .../hdfs/web/resources/ADLGetOpParam.java       |   96 ++
 .../hdfs/web/resources/ADLPostOpParam.java      |   97 ++
 .../hdfs/web/resources/ADLPutOpParam.java       |   94 ++
 .../hdfs/web/resources/ADLVersionInfo.java      |   51 +
 .../web/resources/AppendADLNoRedirectParam.java |   45 +
 .../web/resources/CreateADLNoRedirectParam.java |   44 +
 .../hadoop/hdfs/web/resources/LeaseParam.java   |   53 +
 .../web/resources/ReadADLNoRedirectParam.java   |   44 +
 .../hadoop/hdfs/web/resources/package-info.java |   27 +
 .../src/site/markdown/index.md                  |  219 ++++
 ...hedRefreshTokenBasedAccessTokenProvider.java |  147 +++
 hadoop-tools/hadoop-tools-dist/pom.xml          |    6 +
 hadoop-tools/pom.xml                            |    1 +
 30 files changed, 2954 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8f03ef7/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml 
b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
index 490f1de..41bf6d8 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
+++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
@@ -2213,4 +2213,64 @@
       needs to be specified in net.topology.script.file.name.
     </description>
   </property>
+
+
+  <!-- Azure Data Lake File System Configurations -->
+
+  <property>
+    <name>adl.feature.override.readahead</name>
+    <value>true</value>
+    <description>
+      Enables read aheads in the ADL client, the feature is used to
+      improve read throughput.
+      This works in conjunction with the value set in
+      adl.feature.override.readahead.max.buffersize.
+      When set to false the read ahead feature is turned off.
+      Default : True if not configured.
+    </description>
+  </property>
+
+  <property>
+    <name>adl.feature.override.readahead.max.buffersize</name>
+    <value>8388608</value>
+    <description>
+      Define maximum buffer size to cache read ahead data, this is
+      allocated per process to
+      cache read ahead data. Applicable only when
+      adl.feature.override.readahead is set to true.
+      Default : 8388608 Byte i.e. 8MB if not configured.
+    </description>
+  </property>
+
+  <property>
+    <name>adl.feature.override.readahead.max.concurrent.connection</name>
+    <value>2</value>
+    <description>
+      Define maximum concurrent connection can be established to
+      read ahead. If the data size is less than 4MB then only 1 read n/w
+      connection
+      is set. If the data size is less than 4MB but less than 8MB then 2 read
+      n/w connection
+      is set. Data greater than 8MB then value set under the property would
+      take
+      effect. Applicable only when adl.feature.override.readahead is set
+      to true and buffer size is greater than 8MB.
+      It is recommended to reset this property if the
+      adl.feature.override.readahead.max.buffersize
+      is less than 8MB to gain performance. Application has to consider
+      throttling limit for the account as well before configuring large
+      buffer size.
+    </description>
+  </property>
+
+  <property>
+    <name>fs.adl.impl</name>
+    <value>org.apache.hadoop.fs.adl.AdlFileSystem</value>
+  </property>
+
+  <property>
+    <name>fs.AbstractFileSystem.adl.impl</name>
+    <value>org.apache.hadoop.fs.adl.Adl</value>
+  </property>
+
 </configuration>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8f03ef7/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java
index 90f7514..020474f 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java
@@ -102,6 +102,12 @@ public class TestCommonConfigurationFields extends 
TestConfigurationFieldsBase {
     xmlPrefixToSkipCompare.add("s3.");
     xmlPrefixToSkipCompare.add("s3native.");
 
+    // ADL properties are in a different subtree
+    // - org.apache.hadoop.hdfs.web.ADLConfKeys
+    xmlPrefixToSkipCompare.add("adl.");
+    xmlPropsToSkipCompare.add("fs.adl.impl");
+    xmlPropsToSkipCompare.add("fs.AbstractFileSystem.adl.impl");
+
     // Deprecated properties.  These should eventually be removed from the
     // class.
     configurationPropsToSkipCompare

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8f03ef7/hadoop-project/src/site/site.xml
----------------------------------------------------------------------
diff --git a/hadoop-project/src/site/site.xml b/hadoop-project/src/site/site.xml
index 0167f0c..dd9e3e9 100644
--- a/hadoop-project/src/site/site.xml
+++ b/hadoop-project/src/site/site.xml
@@ -144,6 +144,8 @@
     <menu name="Hadoop Compatible File Systems" inherit="top">
       <item name="Amazon S3" href="hadoop-aws/tools/hadoop-aws/index.html"/>
       <item name="Azure Blob Storage" href="hadoop-azure/index.html"/>
+      <item name="Azure Data Lake Storage"
+            href="hadoop-azure-datalake/index.html"/>
       <item name="OpenStack Swift" href="hadoop-openstack/index.html"/>
     </menu>
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8f03ef7/hadoop-tools/hadoop-azure-datalake/dev-support/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure-datalake/dev-support/findbugs-exclude.xml 
b/hadoop-tools/hadoop-azure-datalake/dev-support/findbugs-exclude.xml
new file mode 100644
index 0000000..4fd36ef
--- /dev/null
+++ b/hadoop-tools/hadoop-azure-datalake/dev-support/findbugs-exclude.xml
@@ -0,0 +1,24 @@
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<FindBugsFilter>
+    <!-- Buffer object is accessed withing trusted code and intentionally 
assigned instead of array copy -->
+    <Match>
+        <Class 
name="org.apache.hadoop.hdfs.web.PrivateAzureDataLakeFileSystem$BatchAppendOutputStream$CommitTask"/>
+        <Bug pattern="EI_EXPOSE_REP2"/>
+        <Priority value="2"/>
+    </Match>
+</FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8f03ef7/hadoop-tools/hadoop-azure-datalake/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure-datalake/pom.xml 
b/hadoop-tools/hadoop-azure-datalake/pom.xml
new file mode 100644
index 0000000..66c874c
--- /dev/null
+++ b/hadoop-tools/hadoop-azure-datalake/pom.xml
@@ -0,0 +1,180 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.hadoop</groupId>
+    <artifactId>hadoop-project</artifactId>
+    <version>2.9.0-SNAPSHOT</version>
+    <relativePath>../../hadoop-project</relativePath>
+  </parent>
+  <groupId>org.apache.hadoop</groupId>
+  <artifactId>hadoop-azure-datalake</artifactId>
+  <name>Apache Hadoop Azure Data Lake support</name>
+  <description>
+    This module contains code to support integration with Azure Data Lake.
+  </description>
+  <packaging>jar</packaging>
+  <properties>
+    <okHttpVersion>2.4.0</okHttpVersion>
+    <minimalJsonVersion>0.9.1</minimalJsonVersion>
+    <file.encoding>UTF-8</file.encoding>
+    <downloadSources>true</downloadSources>
+  </properties>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>findbugs-maven-plugin</artifactId>
+        <configuration>
+          <findbugsXmlOutput>true</findbugsXmlOutput>
+          <xmlOutput>true</xmlOutput>
+          <excludeFilterFile>
+            ${basedir}/dev-support/findbugs-exclude.xml
+          </excludeFilterFile>
+          <effort>Max</effort>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-project-info-reports-plugin</artifactId>
+
+        <configuration>
+          <dependencyDetailsEnabled>false</dependencyDetailsEnabled>
+          <dependencyLocationsEnabled>false
+          </dependencyLocationsEnabled>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>deplist</id>
+            <phase>compile</phase>
+            <goals>
+              <goal>list</goal>
+            </goals>
+            <configuration>
+              <!-- build a shellprofile -->
+              
<outputFile>${project.basedir}/target/hadoop-tools-deps/${project.artifactId}.tools-optional.txt</outputFile>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+
+
+    <!--
+       The following is to suppress a m2e warning in eclipse
+       (m2e doesn't know how to handle maven-enforcer:enforce, so we have to 
tell m2e to ignore it)
+       see: 
http://stackoverflow.com/questions/13040788/how-to-elimate-the-maven-enforcer-plugin-goal-enforce-is-ignored-by-m2e-wa
+    -->
+    <pluginManagement>
+      <plugins>
+        <plugin>
+          <groupId>org.eclipse.m2e</groupId>
+          <artifactId>lifecycle-mapping</artifactId>
+          <version>1.0.0</version>
+          <configuration>
+            <lifecycleMappingMetadata>
+              <pluginExecutions>
+                <pluginExecution>
+                  <pluginExecutionFilter>
+                    <groupId>org.apache.maven.plugins
+                    </groupId>
+                    <artifactId>maven-enforcer-plugin
+                    </artifactId>
+                    <versionRange>[1.0.0,)</versionRange>
+                    <goals>
+                      <goal>enforce</goal>
+                    </goals>
+                  </pluginExecutionFilter>
+                  <action>
+                    <ignore/>
+                  </action>
+                </pluginExecution>
+              </pluginExecutions>
+            </lifecycleMappingMetadata>
+          </configuration>
+        </plugin>
+      </plugins>
+    </pluginManagement>
+
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client</artifactId>
+      <exclusions>
+        <exclusion>
+          <artifactId>servlet-api</artifactId>
+          <groupId>javax.servlet</groupId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs-client</artifactId>
+    </dependency>
+    <dependency>
+    <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+  </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.eclipsesource.minimal-json</groupId>
+      <artifactId>minimal-json</artifactId>
+      <version>0.9.1</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>com.squareup.okhttp</groupId>
+      <artifactId>mockwebserver</artifactId>
+      <version>2.4.0</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8f03ef7/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/Adl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/Adl.java
 
b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/Adl.java
new file mode 100644
index 0000000..4642d6b
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/Adl.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.fs.adl;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.DelegateToFileSystem;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+/**
+ * Expose adl:// scheme to access ADL file system.
+ */
+public class Adl extends DelegateToFileSystem {
+
+  Adl(URI theUri, Configuration conf) throws IOException, URISyntaxException {
+    super(theUri, createDataLakeFileSystem(conf), conf, AdlFileSystem.SCHEME,
+        false);
+  }
+
+  private static AdlFileSystem createDataLakeFileSystem(Configuration conf) {
+    AdlFileSystem fs = new AdlFileSystem();
+    fs.setConf(conf);
+    return fs;
+  }
+
+  /**
+   * @return Default port for ADL File system to communicate
+   */
+  @Override
+  public final int getUriDefaultPort() {
+    return AdlFileSystem.DEFAULT_PORT;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8f03ef7/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlFileSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlFileSystem.java
 
b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlFileSystem.java
new file mode 100644
index 0000000..11e1e0b
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlFileSystem.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.fs.adl;
+
+import org.apache.hadoop.hdfs.web.PrivateAzureDataLakeFileSystem;
+
+/**
+ * Expose adl:// scheme to access ADL file system.
+ */
+public class AdlFileSystem extends PrivateAzureDataLakeFileSystem {
+
+  public static final String SCHEME = "adl";
+  public static final int DEFAULT_PORT = 443;
+
+  @Override
+  public String getScheme() {
+    return SCHEME;
+  }
+
+  @Override
+  public int getDefaultPort() {
+    return DEFAULT_PORT;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8f03ef7/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/oauth2/CachedRefreshTokenBasedAccessTokenProvider.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/oauth2/CachedRefreshTokenBasedAccessTokenProvider.java
 
b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/oauth2/CachedRefreshTokenBasedAccessTokenProvider.java
new file mode 100644
index 0000000..b7f3b00
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/oauth2/CachedRefreshTokenBasedAccessTokenProvider.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.fs.adl.oauth2;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.LinkedHashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.web.oauth2.AccessTokenProvider;
+import 
org.apache.hadoop.hdfs.web.oauth2.ConfRefreshTokenBasedAccessTokenProvider;
+import 
org.apache.hadoop.hdfs.web.oauth2.PrivateCachedRefreshTokenBasedAccessTokenProvider;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.OAUTH_CLIENT_ID_KEY;
+import static 
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.OAUTH_REFRESH_URL_KEY;
+import static 
org.apache.hadoop.hdfs.web.oauth2.ConfRefreshTokenBasedAccessTokenProvider.OAUTH_REFRESH_TOKEN_KEY;
+
+/**
+ * Share refresh tokens across all ADLS instances with a common client ID. The
+ * {@link AccessTokenProvider} can be shared across multiple instances,
+ * amortizing the cost of refreshing tokens.
+ */
+public class CachedRefreshTokenBasedAccessTokenProvider
+    extends PrivateCachedRefreshTokenBasedAccessTokenProvider {
+
+  public static final String FORCE_REFRESH = "adl.force.token.refresh";
+
+  private static final Logger LOG =
+      
LoggerFactory.getLogger(CachedRefreshTokenBasedAccessTokenProvider.class);
+
+  /** Limit size of provider cache. */
+  static final int MAX_PROVIDERS = 10;
+  @SuppressWarnings("serial")
+  private static final Map<String, AccessTokenProvider> CACHE =
+      new LinkedHashMap<String, AccessTokenProvider>() {
+        @Override
+        public boolean removeEldestEntry(
+            Map.Entry<String, AccessTokenProvider> e) {
+          return size() > MAX_PROVIDERS;
+        }
+      };
+
+  private AccessTokenProvider instance = null;
+
+  /**
+   * Create handle for cached instance.
+   */
+  public CachedRefreshTokenBasedAccessTokenProvider() {
+  }
+
+  /**
+   * Gets the access token from internally cached
+   * ConfRefreshTokenBasedAccessTokenProvider instance.
+   *
+   * @return Valid OAuth2 access token for the user.
+   * @throws IOException when system error, internal server error or user error
+   */
+  @Override
+  public synchronized String getAccessToken() throws IOException {
+    return instance.getAccessToken();
+  }
+
+  /**
+   * @return A cached Configuration consistent with the parameters of this
+   * instance.
+   */
+  @Override
+  public synchronized Configuration getConf() {
+    return instance.getConf();
+  }
+
+  /**
+   * Configure cached instance. Note that the Configuration instance returned
+   * from subsequent calls to {@link #getConf() getConf} may be from a
+   * previous, cached entry.
+   * @param conf Configuration instance
+   */
+  @Override
+  public synchronized void setConf(Configuration conf) {
+    String id = conf.get(OAUTH_CLIENT_ID_KEY);
+    if (null == id) {
+      throw new IllegalArgumentException("Missing client ID");
+    }
+    synchronized (CACHE) {
+      instance = CACHE.get(id);
+      if (null == instance
+          || conf.getBoolean(FORCE_REFRESH, false)
+          || replace(instance, conf)) {
+        instance = newInstance();
+        // clone configuration
+        instance.setConf(new Configuration(conf));
+        CACHE.put(id, instance);
+        LOG.debug("Created new client {}", id);
+      }
+    }
+  }
+
+  AccessTokenProvider newInstance() {
+    return new ConfRefreshTokenBasedAccessTokenProvider();
+  }
+
+  private static boolean replace(AccessTokenProvider cached, Configuration c2) 
{
+    // ConfRefreshTokenBasedAccessTokenProvider::setConf asserts !null
+    final Configuration c1 = cached.getConf();
+    for (String key : new String[] {
+        OAUTH_REFRESH_TOKEN_KEY, OAUTH_REFRESH_URL_KEY }) {
+      if (!c1.get(key).equals(c2.get(key))) {
+        // replace cached instance for this clientID
+        return true;
+      }
+    }
+    return false;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8f03ef7/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/oauth2/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/oauth2/package-info.java
 
b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/oauth2/package-info.java
new file mode 100644
index 0000000..b444984
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/oauth2/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/**
+ * public interface to expose OAuth2 authentication related features.
+ */
+package org.apache.hadoop.fs.adl.oauth2;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8f03ef7/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/package-info.java
 
b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/package-info.java
new file mode 100644
index 0000000..98e6a77
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/**
+ * Supporting classes for metrics instrumentation.
+ */
+package org.apache.hadoop.fs.adl;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8f03ef7/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/ADLConfKeys.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/ADLConfKeys.java
 
b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/ADLConfKeys.java
new file mode 100644
index 0000000..a7f932f
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/ADLConfKeys.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdfs.web;
+
+/**
+ * Constants.
+ */
+public final class ADLConfKeys {
+  public static final String
+      ADL_FEATURE_CONCURRENT_READ_AHEAD_MAX_CONCURRENT_CONN =
+      "adl.feature.override.readahead.max.concurrent.connection";
+  public static final int
+      ADL_FEATURE_CONCURRENT_READ_AHEAD_MAX_CONCURRENT_CONN_DEFAULT = 2;
+  public static final String ADL_WEBSDK_VERSION_KEY = "ADLFeatureSet";
+  static final String ADL_DEBUG_OVERRIDE_LOCAL_USER_AS_OWNER =
+      "adl.debug.override.localuserasfileowner";
+  static final boolean ADL_DEBUG_SET_LOCAL_USER_AS_OWNER_DEFAULT = false;
+  static final String ADL_FEATURE_REDIRECT_OFF =
+      "adl.feature.override.redirection.off";
+  static final boolean ADL_FEATURE_REDIRECT_OFF_DEFAULT = true;
+  static final String ADL_FEATURE_GET_BLOCK_LOCATION_LOCALLY_BUNDLED =
+      "adl.feature.override.getblocklocation.locally.bundled";
+  static final boolean ADL_FEATURE_GET_BLOCK_LOCATION_LOCALLY_BUNDLED_DEFAULT
+      = true;
+  static final String ADL_FEATURE_CONCURRENT_READ_WITH_READ_AHEAD =
+      "adl.feature.override.readahead";
+  static final boolean ADL_FEATURE_CONCURRENT_READ_WITH_READ_AHEAD_DEFAULT =
+      true;
+  static final String ADL_FEATURE_CONCURRENT_READ_WITH_READ_AHEAD_BUFFER_SIZE =
+      "adl.feature.override.readahead.max.buffersize";
+
+  static final int KB = 1024;
+  static final int MB = KB * KB;
+  static final int DEFAULT_BLOCK_SIZE = 4 * MB;
+  static final int DEFAULT_EXTENT_SIZE = 256 * MB;
+  static final int DEFAULT_TIMEOUT_IN_SECONDS = 120;
+  static final int
+      ADL_FEATURE_CONCURRENT_READ_WITH_READ_AHEAD_BUFFER_SIZE_DEFAULT =
+      8 * MB;
+
+  private ADLConfKeys() {
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8f03ef7/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/BufferManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/BufferManager.java
 
b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/BufferManager.java
new file mode 100644
index 0000000..350c6e7
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/BufferManager.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdfs.web;
+
+/**
+ * Responsible for holding buffered data in the process. Hold only 1 and only
+ * 1 buffer block in the memory. Buffer block
+ * information is for the given file and the offset from the which the block
+ * is fetched. Across the webhdfs instances if
+ * same buffer block has been used then backend trip is avoided. Buffer block
+ * is certainly important since ADL fetches
+ * large amount of data (Default is 4MB however can be configured through
+ * core-site.xml) from the backend.
+ * Observation is in case of ORC/Avro kind of compressed file, buffer block
+ * does not avoid few backend calls across
+ * webhdfs
+ * instances.
+ */
+final class BufferManager {
+  private static final BufferManager BUFFER_MANAGER_INSTANCE = new
+      BufferManager();
+  private static Object lock = new Object();
+  private Buffer buffer = null;
+  private String fileName;
+
+  /**
+   * Constructor.
+   */
+  private BufferManager() {
+  }
+
+  public static Object getLock() {
+    return lock;
+  }
+
+  public static BufferManager getInstance() {
+    return BUFFER_MANAGER_INSTANCE;
+  }
+
+  /**
+   * Validate if the current buffer block is of given stream.
+   *
+   * @param path   ADL stream path
+   * @param offset Stream offset that caller is interested in
+   * @return True if the buffer block is available otherwise false
+   */
+  boolean hasValidDataForOffset(String path, long offset) {
+    if (this.fileName == null) {
+      return false;
+    }
+
+    if (!this.fileName.equals(path)) {
+      return false;
+    }
+
+    if (buffer == null) {
+      return false;
+    }
+
+    if ((offset < buffer.offset) || (offset >= (buffer.offset
+        + buffer.data.length))) {
+      return false;
+    }
+
+    return true;
+  }
+
+  /**
+   * Clean buffer block.
+   */
+  void clear() {
+    buffer = null;
+  }
+
+  /**
+   * Validate if the current buffer block is of given stream. For now partial
+   * data available is not supported.
+   * Data must be available exactly or within the range of offset and size
+   * passed as parameter.
+   *
+   * @param path   Stream path
+   * @param offset Offset of the stream
+   * @param size   Size of the data from the offset of the stream caller
+   *               interested in
+   * @return True if the data is available from the given offset and of the
+   * size caller is interested in.
+   */
+  boolean hasData(String path, long offset, int size) {
+
+    if (!hasValidDataForOffset(path, offset)) {
+      return false;
+    }
+
+    if ((size + offset) > (buffer.data.length + buffer.offset)) {
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Return the buffer block from the requested offset. It is caller
+   * responsibility to check if the buffer block is
+   * of there interest and offset is valid.
+   *
+   * @param data   Byte array to be filed from the buffer block
+   * @param offset Data to be fetched from the offset.
+   */
+  void get(byte[] data, long offset) {
+    System.arraycopy(buffer.data, (int) (offset - buffer.offset), data, 0,
+        data.length);
+  }
+
+  /**
+   * Create new empty buffer block of the given size.
+   *
+   * @param len Size of the buffer block.
+   * @return Empty byte array.
+   */
+  byte[] getEmpty(int len) {
+    return new byte[len];
+  }
+
+  /**
+   * This function allows caller to specify new buffer block for the stream
+   * which is pulled from the backend.
+   *
+   * @param data   Buffer
+   * @param path   Stream path to which buffer belongs to
+   * @param offset Stream offset where buffer start with
+   */
+  void add(byte[] data, String path, long offset) {
+    if (data == null) {
+      return;
+    }
+
+    buffer = new Buffer();
+    buffer.data = data;
+    buffer.offset = offset;
+    this.fileName = path;
+  }
+
+  /**
+   * @return Size of the buffer.
+   */
+  int getBufferSize() {
+    return buffer.data.length;
+  }
+
+  /**
+   * @return Stream offset where buffer start with
+   */
+  long getBufferOffset() {
+    return buffer.offset;
+  }
+
+  /**
+   * Buffer container.
+   */
+  static class Buffer {
+    private byte[] data;
+    private long offset;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8f03ef7/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/PrivateAzureDataLakeFileSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/PrivateAzureDataLakeFileSystem.java
 
b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/PrivateAzureDataLakeFileSystem.java
new file mode 100644
index 0000000..89011d2
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/PrivateAzureDataLakeFileSystem.java
@@ -0,0 +1,1108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdfs.web;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.web.resources.ADLFlush;
+import org.apache.hadoop.hdfs.web.resources.ADLGetOpParam;
+import org.apache.hadoop.hdfs.web.resources.ADLPostOpParam;
+import org.apache.hadoop.hdfs.web.resources.ADLPutOpParam;
+import org.apache.hadoop.hdfs.web.resources.ADLVersionInfo;
+import org.apache.hadoop.hdfs.web.resources.AppendADLNoRedirectParam;
+import org.apache.hadoop.hdfs.web.resources.BlockSizeParam;
+import org.apache.hadoop.hdfs.web.resources.BufferSizeParam;
+import org.apache.hadoop.hdfs.web.resources.CreateADLNoRedirectParam;
+import org.apache.hadoop.hdfs.web.resources.CreateFlagParam;
+import org.apache.hadoop.hdfs.web.resources.CreateParentParam;
+import org.apache.hadoop.hdfs.web.resources.GetOpParam;
+import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
+import org.apache.hadoop.hdfs.web.resources.LeaseParam;
+import org.apache.hadoop.hdfs.web.resources.LengthParam;
+import org.apache.hadoop.hdfs.web.resources.OffsetParam;
+import org.apache.hadoop.hdfs.web.resources.OverwriteParam;
+import org.apache.hadoop.hdfs.web.resources.Param;
+import org.apache.hadoop.hdfs.web.resources.PermissionParam;
+import org.apache.hadoop.hdfs.web.resources.PutOpParam;
+import org.apache.hadoop.hdfs.web.resources.ReadADLNoRedirectParam;
+import org.apache.hadoop.hdfs.web.resources.ReplicationParam;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.VersionInfo;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.SocketException;
+import java.net.URI;
+import java.net.URL;
+import java.util.EnumSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Extended @see SWebHdfsFileSystem API. This class contains Azure data lake
+ * specific stability, Reliability and performance improvement.
+ * <p>
+ * Motivation behind PrivateAzureDataLakeFileSystem to encapsulate dependent
+ * implementation on org.apache.hadoop.hdfs.web package to configure query
+ * parameters, configuration over HTTP request send to backend .. etc. This
+ * class should be refactored and moved under package org.apache.hadoop.fs
+ * .adl once the required dependent changes are made into ASF code.
+ */
+public class PrivateAzureDataLakeFileSystem extends SWebHdfsFileSystem {
+
+  public static final String SCHEME = "adl";
+
+  // Feature configuration
+  private boolean featureGetBlockLocationLocallyBundled = true;
+  private boolean featureConcurrentReadWithReadAhead = true;
+  private boolean featureRedirectOff = true;
+  private boolean featureFlushWhenEOF = true;
+  private boolean overrideOwner = false;
+  private int maxConcurrentConnection;
+  private int maxBufferSize;
+  private String userName;
+
+  /**
+   * Constructor.
+   */
+  public PrivateAzureDataLakeFileSystem() {
+    try {
+      userName = UserGroupInformation.getCurrentUser().getShortUserName();
+    } catch (IOException e) {
+      userName = "hadoop";
+    }
+  }
+
+  @Override
+  public synchronized void initialize(URI uri, Configuration conf)
+      throws IOException {
+    super.initialize(uri, conf);
+    overrideOwner = getConf()
+        .getBoolean(ADLConfKeys.ADL_DEBUG_OVERRIDE_LOCAL_USER_AS_OWNER,
+            ADLConfKeys.ADL_DEBUG_SET_LOCAL_USER_AS_OWNER_DEFAULT);
+
+    featureRedirectOff = getConf()
+        .getBoolean(ADLConfKeys.ADL_FEATURE_REDIRECT_OFF,
+            ADLConfKeys.ADL_FEATURE_REDIRECT_OFF_DEFAULT);
+
+    featureGetBlockLocationLocallyBundled = getConf()
+        .getBoolean(ADLConfKeys.ADL_FEATURE_GET_BLOCK_LOCATION_LOCALLY_BUNDLED,
+            
ADLConfKeys.ADL_FEATURE_GET_BLOCK_LOCATION_LOCALLY_BUNDLED_DEFAULT);
+
+    featureConcurrentReadWithReadAhead = getConf().
+        getBoolean(ADLConfKeys.ADL_FEATURE_CONCURRENT_READ_WITH_READ_AHEAD,
+            ADLConfKeys.ADL_FEATURE_CONCURRENT_READ_WITH_READ_AHEAD_DEFAULT);
+
+    maxBufferSize = getConf().getInt(
+        ADLConfKeys.ADL_FEATURE_CONCURRENT_READ_WITH_READ_AHEAD_BUFFER_SIZE,
+        ADLConfKeys
+            .ADL_FEATURE_CONCURRENT_READ_WITH_READ_AHEAD_BUFFER_SIZE_DEFAULT);
+
+    maxConcurrentConnection = getConf().getInt(
+        ADLConfKeys.ADL_FEATURE_CONCURRENT_READ_AHEAD_MAX_CONCURRENT_CONN,
+        ADLConfKeys
+            .ADL_FEATURE_CONCURRENT_READ_AHEAD_MAX_CONCURRENT_CONN_DEFAULT);
+  }
+
+  @VisibleForTesting
+  protected boolean isFeatureGetBlockLocationLocallyBundled() {
+    return featureGetBlockLocationLocallyBundled;
+  }
+
+  @VisibleForTesting
+  protected boolean isFeatureConcurrentReadWithReadAhead() {
+    return featureConcurrentReadWithReadAhead;
+  }
+
+  @VisibleForTesting
+  protected boolean isFeatureRedirectOff() {
+    return featureRedirectOff;
+  }
+
+  @VisibleForTesting
+  protected boolean isOverrideOwnerFeatureOn() {
+    return overrideOwner;
+  }
+
+  @VisibleForTesting
+  protected int getMaxBufferSize() {
+    return maxBufferSize;
+  }
+
+  @VisibleForTesting
+  protected int getMaxConcurrentConnection() {
+    return maxConcurrentConnection;
+  }
+
+  @Override
+  public String getScheme() {
+    return SCHEME;
+  }
+
+  /**
+   * Constructing home directory locally is fine as long as Hadoop
+   * local user name and ADL user name relationship story is not fully baked
+   * yet.
+   *
+   * @return Hadoop local user home directory.
+   */
+  @Override
+  public final Path getHomeDirectory() {
+    try {
+      return makeQualified(new Path(
+          "/user/" + 
UserGroupInformation.getCurrentUser().getShortUserName()));
+    } catch (IOException e) {
+    }
+
+    return new Path("/user/" + userName);
+  }
+
+  /**
+   * Azure data lake does not support user configuration for data replication
+   * hence not leaving system to query on
+   * azure data lake.
+   *
+   * Stub implementation
+   *
+   * @param p           Not honoured
+   * @param replication Not honoured
+   * @return True hard coded since ADL file system does not support
+   * replication configuration
+   * @throws IOException No exception would not thrown in this case however
+   *                     aligning with parent api definition.
+   */
+  @Override
+  public final boolean setReplication(final Path p, final short replication)
+      throws IOException {
+    return true;
+  }
+
+  /**
+   * @param f File/Folder path
+   * @return FileStatus instance containing metadata information of f
+   * @throws IOException For any system error
+   */
+  @Override
+  public FileStatus getFileStatus(Path f) throws IOException {
+    statistics.incrementReadOps(1);
+    FileStatus status = super.getFileStatus(f);
+
+    if (overrideOwner) {
+      FileStatus proxiedStatus = new FileStatus(status.getLen(),
+          status.isDirectory(), status.getReplication(), status.getBlockSize(),
+          status.getModificationTime(), status.getAccessTime(),
+          status.getPermission(), userName, "hdfs", status.getPath());
+      return proxiedStatus;
+    } else {
+      return status;
+    }
+  }
+
+  /**
+   * Create call semantic is handled differently in case of ADL. Create
+   * semantics is translated to Create/Append
+   * semantics.
+   * 1. No dedicated connection to server.
+   * 2. Buffering is locally done, Once buffer is full or flush is invoked on
+   * the by the caller. All the pending
+   * data is pushed to ADL as APPEND operation code.
+   * 3. On close - Additional call is send to server to close the stream, and
+   * release lock from the stream.
+   *
+   * Necessity of Create/Append semantics is
+   * 1. ADL backend server does not allow idle connection for longer duration
+   * . In case of slow writer scenario,
+   * observed connection timeout/Connection reset causing occasional job
+   * failures.
+   * 2. Performance boost to jobs which are slow writer, avoided network 
latency
+   * 3. ADL equally better performing with multiple of 4MB chunk as append
+   * calls.
+   *
+   * @param f           File path
+   * @param permission  Access permission for the newly created file
+   * @param overwrite   Remove existing file and recreate new one if true
+   *                    otherwise throw error if file exist
+   * @param bufferSize  Buffer size, ADL backend does not honour
+   * @param replication Replication count, ADL backend does not honour
+   * @param blockSize   Block size, ADL backend does not honour
+   * @param progress    Progress indicator
+   * @return FSDataOutputStream OutputStream on which application can push
+   * stream of bytes
+   * @throws IOException when system error, internal server error or user error
+   */
+  @Override
+  public FSDataOutputStream create(final Path f, final FsPermission permission,
+      final boolean overwrite, final int bufferSize, final short replication,
+      final long blockSize, final Progressable progress) throws IOException {
+    statistics.incrementWriteOps(1);
+
+    return new FSDataOutputStream(new BatchAppendOutputStream(f, bufferSize,
+        new PermissionParam(applyUMask(permission)),
+        new OverwriteParam(overwrite), new BufferSizeParam(bufferSize),
+        new ReplicationParam(replication), new BlockSizeParam(blockSize),
+        new ADLVersionInfo(VersionInfo.getVersion())), statistics) {
+    };
+  }
+
+  @Override
+  public FSDataOutputStream createNonRecursive(final Path f,
+      final FsPermission permission, final EnumSet<CreateFlag> flag,
+      final int bufferSize, final short replication, final long blockSize,
+      final Progressable progress) throws IOException {
+    statistics.incrementWriteOps(1);
+
+    String leaseId = java.util.UUID.randomUUID().toString();
+    return new FSDataOutputStream(new BatchAppendOutputStream(f, bufferSize,
+        new PermissionParam(applyUMask(permission)), new CreateFlagParam(flag),
+        new CreateParentParam(false), new BufferSizeParam(bufferSize),
+        new ReplicationParam(replication), new LeaseParam(leaseId),
+        new BlockSizeParam(blockSize),
+        new ADLVersionInfo(VersionInfo.getVersion())), statistics) {
+    };
+  }
+
+  /**
+   * Since defined as private in parent class, redefined to pass through
+   * Create api implementation.
+   *
+   * @param permission
+   * @return FsPermission list
+   */
+  private FsPermission applyUMask(FsPermission permission) {
+    FsPermission fsPermission = permission;
+    if (fsPermission == null) {
+      fsPermission = FsPermission.getDefault();
+    }
+    return fsPermission.applyUMask(FsPermission.getUMask(getConf()));
+  }
+
+  /**
+   * Open call semantic is handled differently in case of ADL. Instead of
+   * network stream is returned to the user,
+   * Overridden FsInputStream is returned.
+   *
+   * 1. No dedicated connection to server.
+   * 2. Process level concurrent read ahead Buffering is done, This allows
+   * data to be available for caller quickly.
+   * 3. Number of byte to read ahead is configurable.
+   *
+   * Advantage of Process level concurrent read ahead Buffering semantics is
+   * 1. ADL backend server does not allow idle connection for longer duration
+   * . In case of slow reader scenario,
+   * observed connection timeout/Connection reset causing occasional job
+   * failures.
+   * 2. Performance boost to jobs which are slow reader, avoided network 
latency
+   * 3. Compressed format support like ORC, and large data files gains the
+   * most out of this implementation.
+   *
+   * Read ahead feature is configurable.
+   *
+   * @param f          File path
+   * @param buffersize Buffer size
+   * @return FSDataInputStream InputStream on which application can read
+   * stream of bytes
+   * @throws IOException when system error, internal server error or user error
+   */
+  @Override
+  public FSDataInputStream open(final Path f, final int buffersize)
+      throws IOException {
+    statistics.incrementReadOps(1);
+
+    final HttpOpParam.Op op = GetOpParam.Op.OPEN;
+    // use a runner so the open can recover from an invalid token
+    FsPathConnectionRunner runner = null;
+
+    if (featureConcurrentReadWithReadAhead) {
+      URL url = this.toUrl(op, f, new BufferSizeParam(buffersize),
+          new ReadADLNoRedirectParam(true),
+          new ADLVersionInfo(VersionInfo.getVersion()));
+
+      BatchByteArrayInputStream bb = new BatchByteArrayInputStream(url, f,
+          maxBufferSize, maxConcurrentConnection);
+
+      FSDataInputStream fin = new FSDataInputStream(bb);
+      return fin;
+    } else {
+      if (featureRedirectOff) {
+        runner = new FsPathConnectionRunner(ADLGetOpParam.Op.OPEN, f,
+            new BufferSizeParam(buffersize), new ReadADLNoRedirectParam(true),
+            new ADLVersionInfo(VersionInfo.getVersion()));
+      } else {
+        runner = new FsPathConnectionRunner(op, f,
+            new BufferSizeParam(buffersize));
+      }
+
+      return new FSDataInputStream(
+          new OffsetUrlInputStream(new UnresolvedUrlOpener(runner),
+              new OffsetUrlOpener(null)));
+    }
+  }
+
+  /**
+   * @param f File/Folder path
+   * @return FileStatus array list
+   * @throws IOException For system error
+   */
+  @Override
+  public FileStatus[] listStatus(final Path f) throws IOException {
+    FileStatus[] fileStatuses = super.listStatus(f);
+    for (int i = 0; i < fileStatuses.length; i++) {
+      if (overrideOwner) {
+        fileStatuses[i] = new FileStatus(fileStatuses[i].getLen(),
+            fileStatuses[i].isDirectory(), fileStatuses[i].getReplication(),
+            fileStatuses[i].getBlockSize(),
+            fileStatuses[i].getModificationTime(),
+            fileStatuses[i].getAccessTime(), fileStatuses[i].getPermission(),
+            userName, "hdfs", fileStatuses[i].getPath());
+      }
+    }
+    return fileStatuses;
+  }
+
+  @Override
+  public BlockLocation[] getFileBlockLocations(final FileStatus status,
+      final long offset, final long length) throws IOException {
+    if (status == null) {
+      return null;
+    }
+
+    if (featureGetBlockLocationLocallyBundled) {
+      if ((offset < 0) || (length < 0)) {
+        throw new IllegalArgumentException("Invalid start or len parameter");
+      }
+
+      if (status.getLen() < offset) {
+        return new BlockLocation[0];
+      }
+
+      final String[] name = {"localhost"};
+      final String[] host = {"localhost"};
+      long blockSize = ADLConfKeys.DEFAULT_EXTENT_SIZE; // Block size must be
+      // non zero
+      int numberOfLocations =
+          (int) (length / blockSize) + ((length % blockSize == 0) ? 0 : 1);
+      BlockLocation[] locations = new BlockLocation[numberOfLocations];
+      for (int i = 0; i < locations.length; i++) {
+        long currentOffset = offset + (i * blockSize);
+        long currentLength = Math
+            .min(blockSize, offset + length - currentOffset);
+        locations[i] = new BlockLocation(name, host, currentOffset,
+            currentLength);
+      }
+
+      return locations;
+    } else {
+      return getFileBlockLocations(status.getPath(), offset, length);
+    }
+  }
+
+  @Override
+  public BlockLocation[] getFileBlockLocations(final Path p, final long offset,
+      final long length) throws IOException {
+    statistics.incrementReadOps(1);
+
+    if (featureGetBlockLocationLocallyBundled) {
+      FileStatus fileStatus = getFileStatus(p);
+      return getFileBlockLocations(fileStatus, offset, length);
+    } else {
+      return super.getFileBlockLocations(p, offset, length);
+    }
+  }
+
+  enum StreamState {
+    Initial,
+    DataCachedInLocalBuffer,
+    StreamEnd
+  }
+
+  class BatchAppendOutputStream extends OutputStream {
+    private Path fsPath;
+    private Param<?, ?>[] parameters;
+    private byte[] data = null;
+    private int offset = 0;
+    private long length = 0;
+    private boolean eof = false;
+    private boolean hadError = false;
+    private byte[] dataBuffers = null;
+    private int bufSize = 0;
+    private boolean streamClosed = false;
+
+    public BatchAppendOutputStream(Path path, int bufferSize,
+        Param<?, ?>... param) throws IOException {
+      if (bufferSize < (ADLConfKeys.DEFAULT_BLOCK_SIZE)) {
+        bufSize = ADLConfKeys.DEFAULT_BLOCK_SIZE;
+      } else {
+        bufSize = bufferSize;
+      }
+
+      this.fsPath = path;
+      this.parameters = param;
+      this.data = getBuffer();
+      FSDataOutputStream createStream = null;
+      try {
+        if (featureRedirectOff) {
+          CreateADLNoRedirectParam skipRedirect = new CreateADLNoRedirectParam(
+              true);
+          Param<?, ?>[] tmpParam = featureFlushWhenEOF ?
+              new Param<?, ?>[param.length + 2] :
+              new Param<?, ?>[param.length + 1];
+          System.arraycopy(param, 0, tmpParam, 0, param.length);
+          tmpParam[param.length] = skipRedirect;
+          if (featureFlushWhenEOF) {
+            tmpParam[param.length + 1] = new ADLFlush(false);
+          }
+          createStream = new FsPathOutputStreamRunner(ADLPutOpParam.Op.CREATE,
+              fsPath, 1, tmpParam).run();
+        } else {
+          createStream = new FsPathOutputStreamRunner(PutOpParam.Op.CREATE,
+              fsPath, 1, param).run();
+        }
+      } finally {
+        if (createStream != null) {
+          createStream.close();
+        }
+      }
+    }
+
+    @Override
+    public final synchronized void write(int b) throws IOException {
+      if (streamClosed) {
+        throw new IOException(fsPath + " stream object is closed.");
+      }
+
+      if (offset == (data.length)) {
+        flush();
+      }
+
+      data[offset] = (byte) b;
+      offset++;
+
+      // Statistics will get incremented again as part of the batch updates,
+      // decrement here to avoid double value
+      if (statistics != null) {
+        statistics.incrementBytesWritten(-1);
+      }
+    }
+
+    @Override
+    public final synchronized void write(byte[] buf, int off, int len)
+        throws IOException {
+      if (streamClosed) {
+        throw new IOException(fsPath + " stream object is closed.");
+      }
+
+      int bytesToWrite = len;
+      int localOff = off;
+      int localLen = len;
+      if (localLen >= data.length) {
+        // Flush data that is already in our internal buffer
+        flush();
+
+        // Keep committing data until we have less than our internal buffers
+        // length left
+        do {
+          try {
+            commit(buf, localOff, data.length, eof);
+          } catch (IOException e) {
+            hadError = true;
+            throw e;
+          }
+          localOff += data.length;
+          localLen -= data.length;
+        } while (localLen >= data.length);
+      }
+
+      // At this point, we have less than data.length left to copy from users
+      // buffer
+      if (offset + localLen >= data.length) {
+        // Users buffer has enough data left to fill our internal buffer
+        int bytesToCopy = data.length - offset;
+        System.arraycopy(buf, localOff, data, offset, bytesToCopy);
+        offset += bytesToCopy;
+
+        // Flush our internal buffer
+        flush();
+        localOff += bytesToCopy;
+        localLen -= bytesToCopy;
+      }
+
+      if (localLen > 0) {
+        // Simply copy the remainder from the users buffer into our internal
+        // buffer
+        System.arraycopy(buf, localOff, data, offset, localLen);
+        offset += localLen;
+      }
+
+      // Statistics will get incremented again as part of the batch updates,
+      // decrement here to avoid double value
+      if (statistics != null) {
+        statistics.incrementBytesWritten(-bytesToWrite);
+      }
+    }
+
+    @Override
+    public final synchronized void flush() throws IOException {
+      if (streamClosed) {
+        throw new IOException(fsPath + " stream object is closed.");
+      }
+
+      if (offset > 0) {
+        try {
+          commit(data, 0, offset, eof);
+        } catch (IOException e) {
+          hadError = true;
+          throw e;
+        }
+      }
+
+      offset = 0;
+    }
+
+    @Override
+    public final synchronized void close() throws IOException {
+      // Stream is closed earlier, return quietly.
+      if(streamClosed) {
+        return;
+      }
+
+      if (featureRedirectOff) {
+        eof = true;
+      }
+
+      boolean flushedSomething = false;
+      if (hadError) {
+        // No point proceeding further since the error has occurred and
+        // stream would be required to upload again.
+        streamClosed = true;
+        return;
+      } else {
+        flushedSomething = offset > 0;
+        try {
+          flush();
+        } finally {
+          streamClosed = true;
+        }
+      }
+
+      if (featureRedirectOff) {
+        // If we didn't flush anything from our internal buffer, we have to
+        // call the service again
+        // with an empty payload and flush=true in the url
+        if (!flushedSomething) {
+          try {
+            commit(null, 0, ADLConfKeys.KB, true);
+          } finally {
+            streamClosed = true;
+          }
+        }
+      }
+    }
+
+    private void commit(byte[] buffer, int off, int len, boolean endOfFile)
+        throws IOException {
+      OutputStream out = null;
+      try {
+        if (featureRedirectOff) {
+          AppendADLNoRedirectParam skipRedirect = new AppendADLNoRedirectParam(
+              true);
+          Param<?, ?>[] tmpParam = featureFlushWhenEOF ?
+              new Param<?, ?>[parameters.length + 3] :
+              new Param<?, ?>[parameters.length + 1];
+          System.arraycopy(parameters, 0, tmpParam, 0, parameters.length);
+          tmpParam[parameters.length] = skipRedirect;
+          if (featureFlushWhenEOF) {
+            tmpParam[parameters.length + 1] = new ADLFlush(endOfFile);
+            tmpParam[parameters.length + 2] = new OffsetParam(length);
+          }
+
+          out = new FsPathOutputStreamRunner(ADLPostOpParam.Op.APPEND, fsPath,
+              len, tmpParam).run();
+        } else {
+          out = new FsPathOutputStreamRunner(ADLPostOpParam.Op.APPEND, fsPath,
+              len, parameters).run();
+        }
+
+        if (buffer != null) {
+          out.write(buffer, off, len);
+          length += len;
+        }
+      } finally {
+        if (out != null) {
+          out.close();
+        }
+      }
+    }
+
+    private byte[] getBuffer() {
+      // Switch between the first and second buffer
+      dataBuffers = new byte[bufSize];
+      return dataBuffers;
+    }
+  }
+
+  /**
+   * Read data from backend in chunks instead of persistent connection. This
+   * is to avoid slow reader causing socket
+   * timeout.
+   */
+  protected class BatchByteArrayInputStream extends FSInputStream {
+
+    private static final int SIZE4MB = 4 * 1024 * 1024;
+    private final URL runner;
+    private byte[] data = null;
+    private long validDataHoldingSize = 0;
+    private int bufferOffset = 0;
+    private long currentFileOffset = 0;
+    private long nextFileOffset = 0;
+    private long fileSize = 0;
+    private StreamState state = StreamState.Initial;
+    private int maxBufferSize;
+    private int maxConcurrentConnection;
+    private Path fsPath;
+    private boolean streamIsClosed;
+    private Future[] subtasks = null;
+
+    BatchByteArrayInputStream(URL url, Path p, int bufferSize,
+        int concurrentConnection) throws IOException {
+      this.runner = url;
+      fsPath = p;
+      FileStatus fStatus = getFileStatus(fsPath);
+      if (!fStatus.isFile()) {
+        throw new IOException("Cannot open the directory " + p + " for " +
+            "reading");
+      }
+      fileSize = fStatus.getLen();
+      this.maxBufferSize = bufferSize;
+      this.maxConcurrentConnection = concurrentConnection;
+      this.streamIsClosed = false;
+    }
+
+    @Override
+    public synchronized final int read(long position, byte[] buffer, int 
offset,
+        int length) throws IOException {
+      if (streamIsClosed) {
+        throw new IOException("Stream already closed");
+      }
+      long oldPos = this.getPos();
+
+      int nread1;
+      try {
+        this.seek(position);
+        nread1 = this.read(buffer, offset, length);
+      } finally {
+        this.seek(oldPos);
+      }
+
+      return nread1;
+    }
+
+    @Override
+    public synchronized final int read() throws IOException {
+      if (streamIsClosed) {
+        throw new IOException("Stream already closed");
+      }
+      int status = doBufferAvailabilityCheck();
+      if (status == -1) {
+        return status;
+      }
+      int ch = data[bufferOffset++] & (0xff);
+      if (statistics != null) {
+        statistics.incrementBytesRead(1);
+      }
+      return ch;
+    }
+
+    @Override
+    public synchronized final void readFully(long position, byte[] buffer,
+        int offset, int length) throws IOException {
+      if (streamIsClosed) {
+        throw new IOException("Stream already closed");
+      }
+
+      super.readFully(position, buffer, offset, length);
+      if (statistics != null) {
+        statistics.incrementBytesRead(length);
+      }
+    }
+
+    @Override
+    public synchronized final int read(byte[] b, int off, int len)
+        throws IOException {
+      if (b == null) {
+        throw new IllegalArgumentException();
+      } else if (off < 0 || len < 0 || len > b.length - off) {
+        throw new IndexOutOfBoundsException();
+      } else if (len == 0) {
+        return 0;
+      }
+
+      if (streamIsClosed) {
+        throw new IOException("Stream already closed");
+      }
+      int status = doBufferAvailabilityCheck();
+      if (status == -1) {
+        return status;
+      }
+
+      int byteRead = 0;
+      long availableBytes = validDataHoldingSize - off;
+      long requestedBytes = bufferOffset + len - off;
+      if (requestedBytes <= availableBytes) {
+        System.arraycopy(data, bufferOffset, b, off, len);
+        bufferOffset += len;
+        byteRead = len;
+      } else {
+        byteRead = super.read(b, off, len);
+      }
+
+      if (statistics != null) {
+        statistics.incrementBytesRead(byteRead);
+      }
+
+      return byteRead;
+    }
+
+    private int doBufferAvailabilityCheck() throws IOException {
+      if (state == StreamState.Initial) {
+        validDataHoldingSize = fill(nextFileOffset);
+      }
+
+      long dataReloadSize = 0;
+      switch ((int) validDataHoldingSize) {
+      case -1:
+        state = StreamState.StreamEnd;
+        return -1;
+      case 0:
+        dataReloadSize = fill(nextFileOffset);
+        if (dataReloadSize <= 0) {
+          state = StreamState.StreamEnd;
+          return (int) dataReloadSize;
+        } else {
+          validDataHoldingSize = dataReloadSize;
+        }
+        break;
+      default:
+        break;
+      }
+
+      if (bufferOffset >= validDataHoldingSize) {
+        dataReloadSize = fill(nextFileOffset);
+      }
+
+      if (bufferOffset >= ((dataReloadSize == 0) ?
+          validDataHoldingSize :
+          dataReloadSize)) {
+        state = StreamState.StreamEnd;
+        return -1;
+      }
+
+      validDataHoldingSize = ((dataReloadSize == 0) ?
+          validDataHoldingSize :
+          dataReloadSize);
+      state = StreamState.DataCachedInLocalBuffer;
+      return 0;
+    }
+
+    private long fill(final long off) throws IOException {
+      if (state == StreamState.StreamEnd) {
+        return -1;
+      }
+
+      if (fileSize <= off) {
+        state = StreamState.StreamEnd;
+        return -1;
+      }
+      int len = maxBufferSize;
+      long fileOffset = 0;
+      boolean isEntireFileCached = true;
+      if ((fileSize <= maxBufferSize)) {
+        len = (int) fileSize;
+        currentFileOffset = 0;
+        nextFileOffset = 0;
+      } else {
+        if (len > (fileSize - off)) {
+          len = (int) (fileSize - off);
+        }
+
+        synchronized (BufferManager.getLock()) {
+          if (BufferManager.getInstance()
+              .hasValidDataForOffset(fsPath.toString(), off)) {
+            len = (int) (
+                BufferManager.getInstance().getBufferOffset() + BufferManager
+                    .getInstance().getBufferSize() - (int) off);
+          }
+        }
+
+        if (len <= 0) {
+          len = maxBufferSize;
+        }
+        fileOffset = off;
+        isEntireFileCached = false;
+      }
+
+      data = null;
+      BufferManager bm = BufferManager.getInstance();
+      data = bm.getEmpty(len);
+      boolean fetchDataOverNetwork = false;
+      synchronized (BufferManager.getLock()) {
+        if (bm.hasData(fsPath.toString(), fileOffset, len)) {
+          try {
+            bm.get(data, fileOffset);
+            validDataHoldingSize = data.length;
+            currentFileOffset = fileOffset;
+          } catch (ArrayIndexOutOfBoundsException e) {
+            fetchDataOverNetwork = true;
+          }
+        } else {
+          fetchDataOverNetwork = true;
+        }
+      }
+
+      if (fetchDataOverNetwork) {
+        int splitSize = getSplitSize(len);
+        try {
+          validDataHoldingSize = fillDataConcurrently(data, len, fileOffset,
+              splitSize);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new IOException("Interrupted filling buffer", e);
+        }
+
+        synchronized (BufferManager.getLock()) {
+          bm.add(data, fsPath.toString(), fileOffset);
+        }
+        currentFileOffset = nextFileOffset;
+      }
+
+      nextFileOffset += validDataHoldingSize;
+      state = StreamState.DataCachedInLocalBuffer;
+      bufferOffset = isEntireFileCached ? (int) off : 0;
+      return validDataHoldingSize;
+    }
+
+    int getSplitSize(int size) {
+      if (size <= SIZE4MB) {
+        return 1;
+      }
+
+      // Not practical
+      if (size > maxBufferSize) {
+        size = maxBufferSize;
+      }
+
+      int equalBufferSplit = Math.max(Math.round(size / SIZE4MB), 1);
+      int splitSize = Math.min(equalBufferSplit, maxConcurrentConnection);
+      return splitSize;
+    }
+
+    @Override
+    public synchronized final void seek(long pos) throws IOException {
+      if (pos == -1) {
+        throw new IOException("Bad offset, cannot seek to " + pos);
+      }
+
+      BufferManager bm = BufferManager.getInstance();
+      synchronized (BufferManager.getLock()) {
+        if (bm.hasValidDataForOffset(fsPath.toString(), pos)) {
+          state = StreamState.DataCachedInLocalBuffer;
+        } else if (pos >= 0) {
+          state = StreamState.Initial;
+        }
+      }
+
+      long availableBytes = (currentFileOffset + validDataHoldingSize);
+
+      // Check if this position falls under buffered data
+      if (pos < currentFileOffset || availableBytes <= 0) {
+        validDataHoldingSize = 0;
+        currentFileOffset = pos;
+        nextFileOffset = pos;
+        bufferOffset = 0;
+        return;
+      }
+
+      if (pos < availableBytes && pos >= currentFileOffset) {
+        state = StreamState.DataCachedInLocalBuffer;
+        bufferOffset = (int) (pos - currentFileOffset);
+      } else {
+        validDataHoldingSize = 0;
+        currentFileOffset = pos;
+        nextFileOffset = pos;
+        bufferOffset = 0;
+      }
+    }
+
+    @Override
+    public synchronized final long getPos() throws IOException {
+      if (streamIsClosed) {
+        throw new IOException("Stream already closed");
+      }
+      return bufferOffset + currentFileOffset;
+    }
+
+    @Override
+    public synchronized final int available() throws IOException {
+      if (streamIsClosed) {
+        throw new IOException("Stream already closed");
+      }
+      return Integer.MAX_VALUE;
+    }
+
+    @Override
+    public final boolean seekToNewSource(long targetPos) throws IOException {
+      return false;
+    }
+
+    @SuppressWarnings("unchecked")
+    private int fillDataConcurrently(byte[] byteArray, int length,
+        long globalOffset, int splitSize)
+        throws IOException, InterruptedException {
+      ExecutorService executor = Executors.newFixedThreadPool(splitSize);
+      subtasks = new Future[splitSize];
+      for (int i = 0; i < splitSize; i++) {
+        int offset = i * (length / splitSize);
+        int splitLength = (splitSize == (i + 1)) ?
+            (length / splitSize) + (length % splitSize) :
+            (length / splitSize);
+        subtasks[i] = executor.submit(
+            new BackgroundReadThread(byteArray, offset, splitLength,
+                globalOffset + offset));
+      }
+
+      executor.shutdown();
+      // wait until all tasks are finished
+      executor.awaitTermination(ADLConfKeys.DEFAULT_TIMEOUT_IN_SECONDS,
+          TimeUnit.SECONDS);
+
+      int totalBytePainted = 0;
+      for (int i = 0; i < splitSize; ++i) {
+        try {
+          totalBytePainted += (Integer) subtasks[i].get();
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new IOException(e.getCause());
+        } catch (ExecutionException e) {
+          Thread.currentThread().interrupt();
+          throw new IOException(e.getCause());
+        }
+      }
+
+      if (totalBytePainted != length) {
+        throw new IOException("Expected " + length + " bytes, Got " +
+            totalBytePainted + " bytes");
+      }
+
+      return totalBytePainted;
+    }
+
+    @Override
+    public synchronized final void close() throws IOException {
+      synchronized (BufferManager.getLock()) {
+        BufferManager.getInstance().clear();
+      }
+      //need to cleanup the above code the stream and connection close doesn't
+      // happen here
+      //flag set to mark close happened, cannot use the stream once closed
+      streamIsClosed = true;
+    }
+
+    /**
+     * Reads data from the ADL backend from the specified global offset and
+     * given
+     * length. Read data from ADL backend is copied to buffer array from the
+     * offset value specified.
+     *
+     * @param buffer       Store read data from ADL backend in the buffer.
+     * @param offset       Store read data from ADL backend in the buffer
+     *                     from the
+     *                     offset.
+     * @param length       Size of the data read from the ADL backend.
+     * @param globalOffset Read data from file offset.
+     * @return Number of bytes read from the ADL backend
+     * @throws IOException For any intermittent server issues or internal
+     *                     failures.
+     */
+    private int fillUpData(byte[] buffer, int offset, int length,
+        long globalOffset) throws IOException {
+      int totalBytesRead = 0;
+      final URL offsetUrl = new URL(
+          runner + "&" + new OffsetParam(String.valueOf(globalOffset)) + "&"
+              + new LengthParam(String.valueOf(length)));
+      HttpURLConnection conn = new URLRunner(GetOpParam.Op.OPEN, offsetUrl,
+          true).run();
+      InputStream in = conn.getInputStream();
+      try {
+        int bytesRead = 0;
+        while ((bytesRead = in.read(buffer, (int) offset + totalBytesRead,
+            (int) (length - totalBytesRead))) > 0) {
+          totalBytesRead += bytesRead;
+        }
+
+        // InputStream must be fully consumed to enable http keep-alive
+        if (bytesRead == 0) {
+          // Looking for EOF marker byte needs to be read.
+          if (in.read() != -1) {
+            throw new SocketException(
+                "Server returned more than requested data.");
+          }
+        }
+      } finally {
+        in.close();
+        conn.disconnect();
+      }
+
+      return totalBytesRead;
+    }
+
+    private class BackgroundReadThread implements Callable {
+
+      private final byte[] data;
+      private int offset;
+      private int length;
+      private long globalOffset;
+
+      BackgroundReadThread(byte[] buffer, int off, int size, long position) {
+        this.data = buffer;
+        this.offset = off;
+        this.length = size;
+        this.globalOffset = position;
+      }
+
+      public Object call() throws IOException {
+        return fillUpData(data, offset, length, globalOffset);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8f03ef7/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/oauth2/PrivateCachedRefreshTokenBasedAccessTokenProvider.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/oauth2/PrivateCachedRefreshTokenBasedAccessTokenProvider.java
 
b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/oauth2/PrivateCachedRefreshTokenBasedAccessTokenProvider.java
new file mode 100644
index 0000000..d7dce25
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/oauth2/PrivateCachedRefreshTokenBasedAccessTokenProvider.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdfs.web.oauth2;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * Exposing AccessTokenProvider publicly to extend in com.microsoft.azure
+ * .datalake package. Extended version to cache
+ * token for the process to gain performance gain.
+ */
+@Private
+@Unstable
+public abstract class PrivateCachedRefreshTokenBasedAccessTokenProvider
+    extends AccessTokenProvider {
+
+  // visibility workaround
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8f03ef7/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/oauth2/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/oauth2/package-info.java
 
b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/oauth2/package-info.java
new file mode 100644
index 0000000..7a9dffa
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/oauth2/package-info.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/**
+ * A distributed implementation of {@link
+ * org.apache.hadoop.hdfs.web.oauth2} for oauth2 token management support.
+ */
+package org.apache.hadoop.hdfs.web.oauth2;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8f03ef7/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/package-info.java
 
b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/package-info.java
new file mode 100644
index 0000000..1cc8273
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/package-info.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/**
+ * A distributed implementation of {@link org.apache.hadoop.hdfs.web} for
+ * reading and writing files on Azure data lake file system. This
+ * implementation is derivation from the webhdfs specification.
+ */
+package org.apache.hadoop.hdfs.web;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8f03ef7/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/ADLFlush.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/ADLFlush.java
 
b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/ADLFlush.java
new file mode 100644
index 0000000..b76aaaa
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/ADLFlush.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdfs.web.resources;
+
+/**
+ * Query parameter to notify backend server that the all the data has been
+ * pushed to over the stream.
+ *
+ * Used in operation code Create and Append.
+ */
+public class ADLFlush extends BooleanParam {
+  /**
+   * Parameter name.
+   */
+  public static final String NAME = "flush";
+
+  private static final Domain DOMAIN = new Domain(NAME);
+
+  /**
+   * Constructor.
+   *
+   * @param value the parameter value.
+   */
+  public ADLFlush(final Boolean value) {
+    super(DOMAIN, value);
+  }
+
+  @Override
+  public final String getName() {
+    return NAME;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8f03ef7/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/ADLGetOpParam.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/ADLGetOpParam.java
 
b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/ADLGetOpParam.java
new file mode 100644
index 0000000..6b3708f
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/ADLGetOpParam.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdfs.web.resources;
+
+import java.net.HttpURLConnection;
+
+/**
+ * Extended Webhdfs GetOpParam to avoid redirect operation for azure data
+ * lake storage.
+ */
+public class ADLGetOpParam extends HttpOpParam<ADLGetOpParam.Op> {
+  private static final Domain<Op> DOMAIN = new Domain<Op>(NAME, Op.class);
+
+  /**
+   * Constructor.
+   *
+   * @param str a string representation of the parameter value.
+   */
+  public ADLGetOpParam(final String str) {
+    super(DOMAIN, DOMAIN.parse(str));
+  }
+
+  @Override
+  public final String getName() {
+    return NAME;
+  }
+
+  /**
+   * Get operations.
+   */
+  public static enum Op implements HttpOpParam.Op {
+    OPEN(false, HttpURLConnection.HTTP_OK);
+
+    private final boolean redirect;
+    private final int expectedHttpResponseCode;
+    private final boolean requireAuth;
+
+    Op(final boolean doRedirect, final int expectHttpResponseCode) {
+      this(doRedirect, expectHttpResponseCode, false);
+    }
+
+    Op(final boolean doRedirect, final int expectHttpResponseCode,
+        final boolean doRequireAuth) {
+      this.redirect = doRedirect;
+      this.expectedHttpResponseCode = expectHttpResponseCode;
+      this.requireAuth = doRequireAuth;
+    }
+
+    @Override
+    public HttpOpParam.Type getType() {
+      return HttpOpParam.Type.GET;
+    }
+
+    @Override
+    public boolean getRequireAuth() {
+      return requireAuth;
+    }
+
+    @Override
+    public boolean getDoOutput() {
+      return false;
+    }
+
+    @Override
+    public boolean getRedirect() {
+      return redirect;
+    }
+
+    @Override
+    public int getExpectedHttpResponseCode() {
+      return expectedHttpResponseCode;
+    }
+
+    @Override
+    public String toQueryString() {
+      return NAME + "=" + this;
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to