This is an automated email from the ASF dual-hosted git repository.

anujmodi pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new ea5f65700a7 HADOOP-19474: [ABFS][FnsOverBlob] Listing Optimizations to 
avoid multiple iteration over list response. (#7421) (#7581)
ea5f65700a7 is described below

commit ea5f65700a77ce86654aa4176d9db93fbffc779f
Author: Anuj Modi <anujmodi2...@gmail.com>
AuthorDate: Mon Apr 7 09:48:13 2025 +0530

    HADOOP-19474: [ABFS][FnsOverBlob] Listing Optimizations to avoid multiple 
iteration over list response. (#7421) (#7581)
    
    Contributed by Anuj Modi
    
    Reviewed by: Anmol Asrani, Manish Bhatt, Manika Joshi
    Signed off by: Anuj Modi<anujm...@apache.org>
---
 .../src/config/checkstyle-suppressions.xml         |   2 +
 .../fs/azurebfs/AzureBlobFileSystemStore.java      | 175 ++------------
 .../fs/azurebfs/services/AbfsAHCHttpOperation.java |  28 +--
 .../fs/azurebfs/services/AbfsBlobClient.java       | 245 +++++++++++---------
 .../hadoop/fs/azurebfs/services/AbfsClient.java    | 135 +++++++++--
 .../hadoop/fs/azurebfs/services/AbfsDfsClient.java |  61 +++--
 .../hadoop/fs/azurebfs/services/AbfsErrors.java    |   2 +
 .../fs/azurebfs/services/AbfsHttpOperation.java    |  32 +--
 .../fs/azurebfs/services/ListActionTaker.java      |   2 +-
 .../fs/azurebfs/services/ListResponseData.java     | 102 ++++++++
 .../fs/azurebfs/services/VersionedFileStatus.java  | 139 +++++++++++
 .../apache/hadoop/fs/azurebfs/ITestAbfsClient.java |   8 +-
 .../fs/azurebfs/ITestAbfsCustomEncryption.java     |   7 +-
 .../azurebfs/ITestAzureBlobFileSystemCreate.java   |   5 +-
 .../azurebfs/ITestAzureBlobFileSystemDelete.java   |  14 +-
 .../ITestAzureBlobFileSystemFileStatus.java        |   3 +-
 .../ITestAzureBlobFileSystemListStatus.java        | 257 ++++++++++++++++++---
 .../azurebfs/ITestAzureBlobFileSystemRename.java   |  30 +--
 .../fs/azurebfs/services/AbfsClientTestUtil.java   |  27 ++-
 .../TestAbfsRestOperationMockFailures.java         |   9 +-
 .../fs/azurebfs/services/TestListActionTaker.java  |   7 +-
 .../fs/azurebfs/utils/DirectoryStateHelper.java    |   2 +-
 22 files changed, 875 insertions(+), 417 deletions(-)

diff --git a/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml 
b/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml
index 07aa26d2381..1cedf0c54b0 100644
--- a/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml
+++ b/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml
@@ -48,6 +48,8 @@
               
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]services[\\/]AbfsClient.java"/>
     <suppress checks="ParameterNumber"
               
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]services[\\/]AbfsBlobClient.java"/>
+    <suppress checks="ParameterNumber|MagicNumber"
+              
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]services[\\/]VersionedFileStatus.java"/>
     <suppress checks="ParameterNumber"
               
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]contracts[\\/]services[\\/]AppendRequestParameters.java"/>
     <suppress checks="ParameterNumber|MagicNumber"
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index 867edfd1438..eef6a8108d5 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.fs.azurebfs;
 
 import java.io.Closeable;
-import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.UnsupportedEncodingException;
@@ -59,7 +58,6 @@
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.EtagSource;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -78,8 +76,7 @@
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
 import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.TrileanConversionException;
 import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
-import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultEntrySchema;
-import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema;
+import org.apache.hadoop.fs.azurebfs.services.ListResponseData;
 import org.apache.hadoop.fs.azurebfs.enums.Trilean;
 import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider;
 import org.apache.hadoop.fs.azurebfs.extensions.ExtensionHelper;
@@ -115,6 +112,7 @@
 import org.apache.hadoop.fs.azurebfs.services.ListingSupport;
 import org.apache.hadoop.fs.azurebfs.services.SharedKeyCredentials;
 import org.apache.hadoop.fs.azurebfs.services.StaticRetryPolicy;
+import org.apache.hadoop.fs.azurebfs.services.VersionedFileStatus;
 import org.apache.hadoop.fs.azurebfs.utils.Base64;
 import org.apache.hadoop.fs.azurebfs.utils.CRC64;
 import org.apache.hadoop.fs.azurebfs.utils.DateTimeUtils;
@@ -1266,7 +1264,7 @@ public String listStatus(final Path path, final String 
startFrom,
       if (startFrom != null && !startFrom.isEmpty()) {
         /*
          * Blob Endpoint Does not support startFrom yet. Fallback to DFS 
Client.
-         * startFrom remains null for all HDFS APIs. This is only for internal 
use.
+         * startFrom remains null for all HDFS APIs. This is used only for 
tests.
          */
         listingClient = getClient(AbfsServiceType.DFS);
         continuation = getIsNamespaceEnabled(tracingContext)
@@ -1277,58 +1275,16 @@ public String listStatus(final Path path, final String 
startFrom,
 
     do {
       try (AbfsPerfInfo perfInfo = startTracking("listStatus", "listPath")) {
-        AbfsRestOperation op = listingClient.listPath(relativePath, false,
-            abfsConfiguration.getListMaxResults(), continuation,
-            tracingContext);
+        ListResponseData listResponseData = 
listingClient.listPath(relativePath,
+            false, abfsConfiguration.getListMaxResults(), continuation,
+            tracingContext, this.uri);
+        AbfsRestOperation op = listResponseData.getOp();
         perfInfo.registerResult(op.getResult());
-        continuation = 
listingClient.getContinuationFromResponse(op.getResult());
-        ListResultSchema retrievedSchema = 
op.getResult().getListResultSchema();
-        if (retrievedSchema == null) {
-          throw new AbfsRestOperationException(
-                  AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(),
-                  AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(),
-                  "listStatusAsync path not found",
-                  null, op.getResult());
-        }
-
-        long blockSize = abfsConfiguration.getAzureBlockSize();
-
-        for (ListResultEntrySchema entry : retrievedSchema.paths()) {
-          final String owner = 
identityTransformer.transformIdentityForGetRequest(entry.owner(), true, 
userName);
-          final String group = 
identityTransformer.transformIdentityForGetRequest(entry.group(), false, 
primaryUserGroup);
-          final String encryptionContext = entry.getXMsEncryptionContext();
-          final FsPermission fsPermission = entry.permissions() == null
-                  ? new AbfsPermission(FsAction.ALL, FsAction.ALL, 
FsAction.ALL)
-                  : AbfsPermission.valueOf(entry.permissions());
-          final boolean hasAcl = 
AbfsPermission.isExtendedAcl(entry.permissions());
-
-          long lastModifiedMillis = 0;
-          long contentLength = entry.contentLength() == null ? 0 : 
entry.contentLength();
-          boolean isDirectory = entry.isDirectory() == null ? false : 
entry.isDirectory();
-          if (entry.lastModified() != null && !entry.lastModified().isEmpty()) 
{
-            lastModifiedMillis = DateTimeUtils.parseLastModifiedTime(
-                entry.lastModified());
-          }
-
-          Path entryPath = new Path(File.separator + entry.name());
-          entryPath = entryPath.makeQualified(this.uri, entryPath);
-
-          fileStatuses.add(
-                  new VersionedFileStatus(
-                          owner,
-                          group,
-                          fsPermission,
-                          hasAcl,
-                          contentLength,
-                          isDirectory,
-                          1,
-                          blockSize,
-                          lastModifiedMillis,
-                          entryPath,
-                          entry.eTag(),
-                          encryptionContext));
+        continuation = listResponseData.getContinuationToken();
+        List<FileStatus> fileStatusListInCurrItr = 
listResponseData.getFileStatusList();
+        if (fileStatusListInCurrItr != null && 
!fileStatusListInCurrItr.isEmpty()) {
+          fileStatuses.addAll(fileStatusListInCurrItr);
         }
-
         perfInfo.registerSuccess(true);
         countAggregate++;
         shouldContinue =
@@ -1931,110 +1887,6 @@ private AbfsPerfInfo startTracking(String callerName, 
String calleeName) {
     return new AbfsPerfInfo(abfsPerfTracker, callerName, calleeName);
   }
 
-  /**
-   * A File status with version info extracted from the etag value returned
-   * in a LIST or HEAD request.
-   * The etag is included in the java serialization.
-   */
-  static final class VersionedFileStatus extends FileStatus
-      implements EtagSource {
-
-    /**
-     * The superclass is declared serializable; this subclass can also
-     * be serialized.
-     */
-    private static final long serialVersionUID = -2009013240419749458L;
-
-    /**
-     * The etag of an object.
-     * Not-final so that serialization via reflection will preserve the value.
-     */
-    private String version;
-
-    private String encryptionContext;
-
-    private VersionedFileStatus(
-            final String owner, final String group, final FsPermission 
fsPermission, final boolean hasAcl,
-            final long length, final boolean isdir, final int blockReplication,
-            final long blocksize, final long modificationTime, final Path path,
-            final String version, final String encryptionContext) {
-      super(length, isdir, blockReplication, blocksize, modificationTime, 0,
-              fsPermission,
-              owner,
-              group,
-              null,
-              path,
-              hasAcl, false, false);
-
-      this.version = version;
-      this.encryptionContext = encryptionContext;
-    }
-
-    /** Compare if this object is equal to another object.
-     * @param   obj the object to be compared.
-     * @return  true if two file status has the same path name; false if not.
-     */
-    @Override
-    public boolean equals(Object obj) {
-      if (!(obj instanceof FileStatus)) {
-        return false;
-      }
-
-      FileStatus other = (FileStatus) obj;
-
-      if (!this.getPath().equals(other.getPath())) {// compare the path
-        return false;
-      }
-
-      if (other instanceof VersionedFileStatus) {
-        return this.version.equals(((VersionedFileStatus) other).version);
-      }
-
-      return true;
-    }
-
-    /**
-     * Returns a hash code value for the object, which is defined as
-     * the hash code of the path name.
-     *
-     * @return  a hash code value for the path name and version
-     */
-    @Override
-    public int hashCode() {
-      int hash = getPath().hashCode();
-      hash = 89 * hash + (this.version != null ? this.version.hashCode() : 0);
-      return hash;
-    }
-
-    /**
-     * Returns the version of this FileStatus
-     *
-     * @return  a string value for the FileStatus version
-     */
-    public String getVersion() {
-      return this.version;
-    }
-
-    @Override
-    public String getEtag() {
-      return getVersion();
-    }
-
-    public String getEncryptionContext() {
-      return encryptionContext;
-    }
-
-    @Override
-    public String toString() {
-      final StringBuilder sb = new StringBuilder(
-          "VersionedFileStatus{");
-      sb.append(super.toString());
-      sb.append("; version='").append(version).append('\'');
-      sb.append('}');
-      return sb.toString();
-    }
-  }
-
   /**
    * Permissions class contain provided permission and umask in octalNotation.
    * If the object is created for namespace-disabled account, the permission 
and
@@ -2176,6 +2028,11 @@ void setNamespaceEnabled(Trilean isNamespaceEnabled){
     this.isNamespaceEnabled = isNamespaceEnabled;
   }
 
+  @VisibleForTesting
+  public URI getUri() {
+    return this.uri;
+  }
+
   private void updateInfiniteLeaseDirs() {
     this.azureInfiniteLeaseDirSet = new HashSet<>(Arrays.asList(
         
abfsConfiguration.getAzureInfiniteLeaseDirs().split(AbfsHttpConstants.COMMA)));
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java
index 3ed70965db9..dd585d32fb7 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java
@@ -38,7 +38,6 @@
 import org.apache.http.Header;
 import org.apache.http.HttpEntity;
 import org.apache.http.HttpResponse;
-import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.client.methods.HttpDelete;
 import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
 import org.apache.http.client.methods.HttpGet;
@@ -48,7 +47,6 @@
 import org.apache.http.client.methods.HttpPut;
 import org.apache.http.client.methods.HttpRequestBase;
 import org.apache.http.entity.ByteArrayEntity;
-import org.apache.http.util.EntityUtils;
 
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APACHE_IMPL;
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
@@ -194,26 +192,14 @@ String getConnResponseMessage() throws IOException {
   public void processResponse(final byte[] buffer,
       final int offset,
       final int length) throws IOException {
-    try {
-      if (!isPayloadRequest) {
-        prepareRequest();
-        LOG.debug("Sending request: {}", httpRequestBase);
-        httpResponse = executeRequest();
-        LOG.debug("Request sent: {}; response {}", httpRequestBase,
-            httpResponse);
-      }
-      parseResponseHeaderAndBody(buffer, offset, length);
-    } finally {
-      if (httpResponse != null) {
-        try {
-          EntityUtils.consume(httpResponse.getEntity());
-        } finally {
-          if (httpResponse instanceof CloseableHttpResponse) {
-            ((CloseableHttpResponse) httpResponse).close();
-          }
-        }
-      }
+    if (!isPayloadRequest) {
+      prepareRequest();
+      LOG.debug("Sending request: {}", httpRequestBase);
+      httpResponse = executeRequest();
+      LOG.debug("Request sent: {}; response {}", httpRequestBase,
+          httpResponse);
     }
+    parseResponseHeaderAndBody(buffer, offset, length);
   }
 
   /**
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
index 2bc6397c369..8166cfb60df 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
@@ -27,6 +27,7 @@
 import java.io.InputStream;
 import java.io.UnsupportedEncodingException;
 import java.net.HttpURLConnection;
+import java.net.URI;
 import java.net.URL;
 import java.net.URLDecoder;
 import java.net.URLEncoder;
@@ -36,6 +37,7 @@
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Hashtable;
 import java.util.List;
@@ -50,6 +52,7 @@
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.fs.Path;
@@ -60,6 +63,7 @@
 import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
 import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
 import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsDriverException;
 import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsInvalidChecksumException;
 import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
 import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
@@ -70,7 +74,6 @@
 import 
org.apache.hadoop.fs.azurebfs.contracts.services.BlobListResultEntrySchema;
 import org.apache.hadoop.fs.azurebfs.contracts.services.BlobListResultSchema;
 import org.apache.hadoop.fs.azurebfs.contracts.services.BlobListXmlParser;
-import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema;
 import 
org.apache.hadoop.fs.azurebfs.contracts.services.StorageErrorResponseSchema;
 import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider;
 import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider;
@@ -163,6 +166,7 @@
 import static 
org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_MAX_RESULTS;
 import static 
org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_PREFIX;
 import static 
org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_RESTYPE;
+import static 
org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_BLOB_LIST_PARSING;
 import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.PATH_EXISTS;
 import static 
org.apache.hadoop.fs.azurebfs.utils.UriUtils.isKeyForDirectorySet;
 import static 
org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ATOMIC_DIR_RENAME_RECOVERY_ON_GET_PATH_EXCEPTION;
@@ -340,18 +344,19 @@ public AbfsRestOperation deleteFilesystem(TracingContext 
tracingContext)
    * @param listMaxResults maximum number of blobs to return.
    * @param continuation marker to specify the continuation token.
    * @param tracingContext for tracing the service call.
-   * @return executed rest operation containing response from server.
+   * @param uri to be used for path conversion.
+   * @return {@link ListResponseData}. containing listing response.
    * @throws AzureBlobFileSystemException if rest operation or response 
parsing fails.
    */
-  public AbfsRestOperation listPath(final String relativePath, final boolean 
recursive,
-      final int listMaxResults, final String continuation, TracingContext 
tracingContext)
-      throws IOException {
-    return listPath(relativePath, recursive, listMaxResults, continuation, 
tracingContext, true);
+  @Override
+  public ListResponseData listPath(final String relativePath, final boolean 
recursive,
+      final int listMaxResults, final String continuation, TracingContext 
tracingContext, URI uri) throws IOException {
+    return listPath(relativePath, recursive, listMaxResults, continuation, 
tracingContext, uri, true);
   }
 
-  public AbfsRestOperation listPath(final String relativePath, final boolean 
recursive,
-      final int listMaxResults, final String continuation, TracingContext 
tracingContext,
-      boolean is404CheckRequired) throws AzureBlobFileSystemException {
+  public ListResponseData listPath(final String relativePath, final boolean 
recursive,
+      final int listMaxResults, final String continuation, TracingContext 
tracingContext, URI uri, boolean is404CheckRequired)
+      throws AzureBlobFileSystemException {
     final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
 
     AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
@@ -374,63 +379,63 @@ public AbfsRestOperation listPath(final String 
relativePath, final boolean recur
         requestHeaders);
 
     op.execute(tracingContext);
-    // Filter the paths for which no rename redo operation is performed.
-    fixAtomicEntriesInListResults(op, tracingContext);
-    if (isEmptyListResults(op.getResult()) && is404CheckRequired) {
+    ListResponseData listResponseData = parseListPathResults(op.getResult(), 
uri);
+    listResponseData.setOp(op);
+
+    // Perform Pending Rename Redo Operation on Atomic Rename Paths.
+    // Crashed HBase log rename recovery can be done by Filesystem.listStatus.
+    if (tracingContext.getOpType() == FSOperationType.LISTSTATUS
+        && op.getResult() != null
+        && op.getResult().getStatusCode() == HTTP_OK) {
+      retryRenameOnAtomicEntriesInListResults(tracingContext,
+          listResponseData.getRenamePendingJsonPaths());
+    }
+
+    if (isEmptyListResults(listResponseData) && is404CheckRequired) {
       // If the list operation returns no paths, we need to check if the path 
is a file.
       // If it is a file, we need to return the file in the list.
       // If it is a non-existing path, we need to throw a 
FileNotFoundException.
       if (relativePath.equals(ROOT_PATH)) {
-        // Root Always exists as directory. It can be a empty listing.
-        return op;
+        // Root Always exists as directory. It can be an empty listing.
+        return listResponseData;
       }
       AbfsRestOperation pathStatus = this.getPathStatus(relativePath, 
tracingContext, null, false);
       BlobListResultSchema listResultSchema = 
getListResultSchemaFromPathStatus(relativePath, pathStatus);
+      LOG.debug("ListBlob attempted on a file path. Returning file status.");
+      List<FileStatus> fileStatusList = new ArrayList<>();
+      for (BlobListResultEntrySchema entry : listResultSchema.paths()) {
+        fileStatusList.add(getVersionedFileStatusFromEntry(entry, uri));
+      }
       AbfsRestOperation listOp = getAbfsRestOperation(
           AbfsRestOperationType.ListBlobs,
           HTTP_METHOD_GET,
           url,
           requestHeaders);
       listOp.hardSetGetListStatusResult(HTTP_OK, listResultSchema);
-      return listOp;
+      listResponseData.setFileStatusList(fileStatusList);
+      listResponseData.setContinuationToken(null);
+      listResponseData.setRenamePendingJsonPaths(null);
+      listResponseData.setOp(listOp);
+      return listResponseData;
     }
-    return op;
+    return listResponseData;
   }
 
   /**
    * Filter the paths for which no rename redo operation is performed.
    * Update BlobListResultSchema path with filtered entries.
-   *
-   * @param op blob list operation
    * @param tracingContext tracing context
    * @throws AzureBlobFileSystemException if rest operation or response 
parsing fails.
    */
-  private void fixAtomicEntriesInListResults(final AbfsRestOperation op,
-                                             final TracingContext 
tracingContext) throws AzureBlobFileSystemException {
-    /*
-     * Crashed HBase log rename recovery is done by Filesystem.getFileStatus 
and
-     * Filesystem.listStatus.
-     */
-    if (tracingContext == null
-        || tracingContext.getOpType() != FSOperationType.LISTSTATUS
-        || op == null || op.getResult() == null
-        || op.getResult().getStatusCode() != HTTP_OK) {
+  private void retryRenameOnAtomicEntriesInListResults(TracingContext 
tracingContext,
+      Map<Path, Integer> renamePendingJsonPaths) throws 
AzureBlobFileSystemException {
+    if (renamePendingJsonPaths == null || renamePendingJsonPaths.isEmpty()) {
       return;
     }
-    BlobListResultSchema listResultSchema
-        = (BlobListResultSchema) op.getResult().getListResultSchema();
-    if (listResultSchema == null) {
-      return;
-    }
-    List<BlobListResultEntrySchema> filteredEntries = new ArrayList<>();
-    for (BlobListResultEntrySchema entry : listResultSchema.paths()) {
-      if (!takeListPathAtomicRenameKeyAction(entry.path(), entry.isDirectory(),
-          entry.contentLength().intValue(), tracingContext)) {
-        filteredEntries.add(entry);
-      }
-    }
 
-    listResultSchema.withPaths(filteredEntries);
+    for (Map.Entry<Path, Integer> entry : renamePendingJsonPaths.entrySet()) {
+      retryRenameOnAtomicKeyPath(entry.getKey(), entry.getValue(), 
tracingContext);
+    }
   }
 
   /**{@inheritDoc}*/
@@ -1162,10 +1167,7 @@ public AbfsRestOperation getPathStatus(final String path,
       throws AzureBlobFileSystemException {
     AbfsRestOperation op = this.getPathStatus(path, tracingContext,
         contextEncryptionAdapter, true);
-    /*
-     * Crashed HBase log-folder rename can be recovered by 
FileSystem#getFileStatus
-     * and FileSystem#listStatus calls.
-     */
+    // Crashed HBase log-folder rename can be recovered by 
FileSystem#getFileStatus
     if (tracingContext.getOpType() == FSOperationType.GET_FILESTATUS
         && op.getResult() != null && checkIsDir(op.getResult())) {
       takeGetPathStatusAtomicRenameKeyAction(new Path(path), tracingContext);
@@ -1212,6 +1214,8 @@ public AbfsRestOperation getPathStatus(final String path,
       if (op.getResult().getStatusCode() == HTTP_NOT_FOUND
           && isImplicitCheckRequired && isNonEmptyDirectory(path, 
tracingContext)) {
         // Implicit path found.
+        // Create a marker blob at this path.
+        this.createMarkerAtPath(path, null, contextEncryptionAdapter, 
tracingContext);
         AbfsRestOperation successOp = getSuccessOp(
             AbfsRestOperationType.GetPathStatus, HTTP_METHOD_HEAD,
             url, requestHeaders);
@@ -1562,18 +1566,6 @@ public boolean checkUserError(int responseStatusCode) {
         && responseStatusCode != HTTP_CONFLICT);
   }
 
-  /**
-   * Get the continuation token from the response from BLOB Endpoint Listing.
-   * Continuation Token will be present in XML List response body.
-   * @param result The response from the server.
-   * @return The continuation token.
-   */
-  @Override
-  public String getContinuationFromResponse(AbfsHttpOperation result) {
-    BlobListResultSchema listResultSchema = (BlobListResultSchema) 
result.getListResultSchema();
-    return listResultSchema.getNextMarker();
-  }
-
   /**
    * Get the User-defined metadata on a path from response headers of
    * GetBlobProperties API on Blob Endpoint.
@@ -1604,26 +1596,43 @@ public Hashtable<String, String> 
getXMSProperties(AbfsHttpOperation result)
 
   /**
    * Parse the XML response body returned by ListBlob API on Blob Endpoint.
-   * @param stream InputStream contains the response from server.
-   * @return BlobListResultSchema containing the list of entries.
-   * @throws IOException if parsing fails.
+   * @param result InputStream contains the response from server.
+   * @param uri to be used for path conversion.
+   * @return {@link ListResponseData}. containing listing response.
+   * @throws AzureBlobFileSystemException if parsing fails.
    */
   @Override
-  public ListResultSchema parseListPathResults(final InputStream stream) 
throws IOException {
-    if (stream == null) {
-      return null;
-    }
+  public ListResponseData parseListPathResults(AbfsHttpOperation result, URI 
uri)
+      throws AzureBlobFileSystemException {
     BlobListResultSchema listResultSchema;
-    try {
-      final SAXParser saxParser = saxParserThreadLocal.get();
-      saxParser.reset();
-      listResultSchema = new BlobListResultSchema();
-      saxParser.parse(stream, new BlobListXmlParser(listResultSchema, 
getBaseUrl().toString()));
-    } catch (SAXException | IOException e) {
-      throw new RuntimeException(e);
+    try (InputStream stream = result.getListResultStream()) {
+      if (stream == null) {
+        return null;
+      }
+      try {
+        final SAXParser saxParser = saxParserThreadLocal.get();
+        saxParser.reset();
+        listResultSchema = new BlobListResultSchema();
+        saxParser.parse(stream,
+            new BlobListXmlParser(listResultSchema, getBaseUrl().toString()));
+        result.setListResultSchema(listResultSchema);
+        LOG.debug("ListBlobs listed {} blobs with {} as continuation token",
+            listResultSchema.paths().size(),
+            listResultSchema.getNextMarker());
+      } catch (SAXException | IOException e) {
+        throw new AbfsDriverException(e);
+      }
+    } catch (IOException e) {
+      LOG.error("Unable to deserialize list results for uri {}", 
uri.toString(), e);
+      throw new AbfsDriverException(ERR_BLOB_LIST_PARSING, e);
     }
 
-    return removeDuplicateEntries(listResultSchema);
+    try {
+      return filterDuplicateEntriesAndRenamePendingFiles(listResultSchema, 
uri);
+    } catch (IOException e) {
+      LOG.error("Unable to filter list results for uri {}", uri.toString(), e);
+      throw new AbfsDriverException(ERR_BLOB_LIST_PARSING, e);
+    }
   }
 
   /**
@@ -1764,13 +1773,14 @@ public void 
takeGetPathStatusAtomicRenameKeyAction(final Path path,
     AbfsRestOperation pendingJsonFileStatus;
     Path pendingJsonPath = new Path(path.getParent(),
         path.toUri().getPath() + RenameAtomicity.SUFFIX);
+    int pendingJsonFileContentLength = 0;
     try {
-      pendingJsonFileStatus = getPathStatus(
-          pendingJsonPath.toUri().getPath(), tracingContext,
-          null, false);
+      pendingJsonFileStatus = getPathStatus(pendingJsonPath.toUri().getPath(),
+          tracingContext, null, false);
       if (checkIsDir(pendingJsonFileStatus.getResult())) {
         return;
       }
+      pendingJsonFileContentLength = 
Integer.parseInt(pendingJsonFileStatus.getResult().getResponseHeader(CONTENT_LENGTH));
     } catch (AbfsRestOperationException ex) {
       if (ex.getStatusCode() == HTTP_NOT_FOUND) {
         return;
@@ -1781,9 +1791,7 @@ public void takeGetPathStatusAtomicRenameKeyAction(final 
Path path,
     boolean renameSrcHasChanged;
     try {
       RenameAtomicity renameAtomicity = getRedoRenameAtomicity(
-          pendingJsonPath, Integer.parseInt(pendingJsonFileStatus.getResult()
-              .getResponseHeader(CONTENT_LENGTH)),
-          tracingContext);
+          pendingJsonPath, pendingJsonFileContentLength, tracingContext);
       renameAtomicity.redo();
       renameSrcHasChanged = false;
     } catch (AbfsRestOperationException ex) {
@@ -1818,23 +1826,16 @@ public void 
takeGetPathStatusAtomicRenameKeyAction(final Path path,
    * @param renamePendingJsonLen length of the pendingJson file.
    * @param tracingContext tracing context.
    *
-   * @return true if action is taken.
    * @throws AzureBlobFileSystemException server error
    */
-  private boolean takeListPathAtomicRenameKeyAction(final Path path,
-      final boolean isDirectory, final int renamePendingJsonLen,
+
+  private void retryRenameOnAtomicKeyPath(final Path path,
+      final int renamePendingJsonLen,
       final TracingContext tracingContext)
       throws AzureBlobFileSystemException {
-    if (path == null || path.isRoot() || !isAtomicRenameKey(
-        path.toUri().getPath()) || isDirectory || !path.toUri()
-        .getPath()
-        .endsWith(RenameAtomicity.SUFFIX)) {
-      return false;
-    }
     try {
-      RenameAtomicity renameAtomicity
-          = getRedoRenameAtomicity(path, renamePendingJsonLen,
-          tracingContext);
+      RenameAtomicity renameAtomicity = getRedoRenameAtomicity(path,
+          renamePendingJsonLen, tracingContext);
       renameAtomicity.redo();
     } catch (AbfsRestOperationException ex) {
       /*
@@ -1850,7 +1851,6 @@ private boolean takeListPathAtomicRenameKeyAction(final 
Path path,
         throw ex;
       }
     }
-    return true;
   }
 
   @VisibleForTesting
@@ -1924,39 +1924,65 @@ private List<AbfsHttpHeader> 
getMetadataHeadersList(final Hashtable<String, Stri
    * This is to handle duplicate listing entries returned by Blob Endpoint for
    * implicit paths that also has a marker file created for them.
    * This will retain entry corresponding to marker file and remove the 
BlobPrefix entry.
+   * This will also filter out all the rename pending json files in listing 
output.
    * @param listResultSchema List of entries returned by Blob Endpoint.
+   * @param uri URI to be used for path conversion.
    * @return List of entries after removing duplicates.
    */
-  private BlobListResultSchema removeDuplicateEntries(BlobListResultSchema 
listResultSchema) {
-    List<BlobListResultEntrySchema> uniqueEntries = new ArrayList<>();
+  private ListResponseData filterDuplicateEntriesAndRenamePendingFiles(
+      BlobListResultSchema listResultSchema, URI uri) throws IOException {
+    List<FileStatus> fileStatuses = new ArrayList<>();
+    Map<Path, Integer> renamePendingJsonPaths = new HashMap<>();
     TreeMap<String, BlobListResultEntrySchema> nameToEntryMap = new 
TreeMap<>();
 
     for (BlobListResultEntrySchema entry : listResultSchema.paths()) {
       if (StringUtils.isNotEmpty(entry.eTag())) {
         // This is a blob entry. It is either a file or a marker blob.
         // In both cases we will add this.
-        nameToEntryMap.put(entry.name(), entry);
+        if (isRenamePendingJsonPathEntry(entry)) {
+          renamePendingJsonPaths.put(entry.path(), 
entry.contentLength().intValue());
+        } else {
+          nameToEntryMap.put(entry.name(), entry);
+          fileStatuses.add(getVersionedFileStatusFromEntry(entry, uri));
+        }
       } else {
         // This is a BlobPrefix entry. It is a directory with file inside
         // This might have already been added as a marker blob.
         if (!nameToEntryMap.containsKey(entry.name())) {
           nameToEntryMap.put(entry.name(), entry);
+          fileStatuses.add(getVersionedFileStatusFromEntry(entry, uri));
         }
       }
     }
 
-    uniqueEntries.addAll(nameToEntryMap.values());
-    listResultSchema.withPaths(uniqueEntries);
-    return listResultSchema;
+    ListResponseData listResponseData = new ListResponseData();
+    listResponseData.setFileStatusList(fileStatuses);
+    listResponseData.setRenamePendingJsonPaths(renamePendingJsonPaths);
+    listResponseData.setContinuationToken(listResultSchema.getNextMarker());
+    return listResponseData;
+  }
+
+  /**
+   * Check if the entry is a rename pending json file path.
+   * @param entry to be checked.
+   * @return true if it is a rename pending json file path.
+   */
+  private boolean isRenamePendingJsonPathEntry(BlobListResultEntrySchema 
entry) {
+    String path = entry.path() != null ? entry.path().toUri().getPath() : null;
+    return path != null
+        && !entry.path().isRoot()
+        && isAtomicRenameKey(path)
+        && !entry.isDirectory()
+        && path.endsWith(RenameAtomicity.SUFFIX);
   }
 
   /**
    * When listing is done on a file, Blob Endpoint returns the empty listing
    * but DFS Endpoint returns the file status as one of the entries.
    * This is to convert file status into ListResultSchema.
-   * @param relativePath
-   * @param pathStatus
-   * @return
+   * @param relativePath relative path of the file.
+   * @param pathStatus AbfsRestOperation containing the file status.
+   * @return BlobListResultSchema containing the file status.
    */
   private BlobListResultSchema getListResultSchemaFromPathStatus(String 
relativePath, AbfsRestOperation pathStatus) {
     BlobListResultSchema listResultSchema = new BlobListResultSchema();
@@ -2001,27 +2027,26 @@ private static String decodeMetadataAttribute(String 
encoded)
   @VisibleForTesting
   public boolean isNonEmptyDirectory(String path,
       TracingContext tracingContext) throws AzureBlobFileSystemException {
-    AbfsRestOperation listOp = listPath(path, false, 1, null, tracingContext,
-        false);
-    return !isEmptyListResults(listOp.getResult());
+    // This method is only called internally to determine state of a path
+    // and hence don't need identity transformation to happen.
+    ListResponseData listResponseData = listPath(path, false, 1, null, 
tracingContext, null, false);
+    return !isEmptyListResults(listResponseData);
   }
 
   /**
    * Check if the list call returned empty results without any continuation 
token.
-   * @param result The response of listing API from the server.
+   * @param listResponseData The response of listing API from the server.
    * @return True if empty results without continuation token.
    */
-  private boolean isEmptyListResults(AbfsHttpOperation result) {
+  private boolean isEmptyListResults(ListResponseData listResponseData) {
+    AbfsHttpOperation result = listResponseData.getOp().getResult();
     boolean isEmptyList = result != null && result.getStatusCode() == HTTP_OK 
&& // List Call was successful
         result.getListResultSchema() != null && // Parsing of list response 
was successful
-        result.getListResultSchema().paths().isEmpty() && // No paths were 
returned
-        result.getListResultSchema() instanceof BlobListResultSchema && // It 
is safe to typecast to BlobListResultSchema
-        ((BlobListResultSchema) result.getListResultSchema()).getNextMarker() 
== null; // No continuation token was returned
+        listResponseData.getFileStatusList().isEmpty() && 
listResponseData.getRenamePendingJsonPaths().isEmpty() &&// No paths were 
returned
+        StringUtils.isEmpty(listResponseData.getContinuationToken()); // No 
continuation token was returned
     if (isEmptyList) {
       LOG.debug("List call returned empty results without any continuation 
token.");
       return true;
-    } else if (result != null && !(result.getListResultSchema() instanceof 
BlobListResultSchema)) {
-      throw new RuntimeException("List call returned unexpected results over 
Blob Endpoint.");
     }
     return false;
   }
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
index 5143a4a1954..8dcd78ea6b3 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
@@ -19,12 +19,15 @@
 package org.apache.hadoop.fs.azurebfs.services;
 
 import java.io.Closeable;
+import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.UnsupportedEncodingException;
+import java.lang.reflect.InvocationTargetException;
 import java.net.HttpURLConnection;
 import java.net.InetAddress;
 import java.net.MalformedURLException;
+import java.net.URI;
 import java.net.URL;
 import java.net.URLEncoder;
 import java.net.UnknownHostException;
@@ -47,6 +50,7 @@
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
@@ -66,18 +70,24 @@
 import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.SASTokenProviderException;
 import 
org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
 import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
-import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema;
+import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultEntrySchema;
 import 
org.apache.hadoop.fs.azurebfs.contracts.services.StorageErrorResponseSchema;
 import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider;
 import org.apache.hadoop.fs.azurebfs.extensions.ExtensionHelper;
 import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider;
 import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
+import org.apache.hadoop.fs.azurebfs.oauth2.IdentityTransformer;
+import org.apache.hadoop.fs.azurebfs.oauth2.IdentityTransformerInterface;
 import org.apache.hadoop.fs.azurebfs.security.ContextEncryptionAdapter;
+import org.apache.hadoop.fs.azurebfs.utils.DateTimeUtils;
 import org.apache.hadoop.fs.azurebfs.utils.EncryptionType;
 import org.apache.hadoop.fs.azurebfs.utils.MetricFormat;
 import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.store.LogExactlyOnce;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
 import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
 import 
org.apache.hadoop.thirdparty.com.google.common.util.concurrent.FutureCallback;
@@ -115,6 +125,7 @@
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SEMICOLON;
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SINGLE_WHITE_SPACE;
 import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.UTF_8;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_IDENTITY_TRANSFORM_CLASS;
 import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_DELETE_CONSIDERED_IDEMPOTENT;
 import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
 import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SERVER_SIDE_ENCRYPTION_ALGORITHM;
@@ -151,6 +162,7 @@ public abstract class AbfsClient implements Closeable {
   private final AbfsPerfTracker abfsPerfTracker;
   private String clientProvidedEncryptionKey = null;
   private String clientProvidedEncryptionKeySHA = null;
+  private final IdentityTransformerInterface identityTransformer;
 
   private final String accountName;
   private final AuthType authType;
@@ -285,6 +297,19 @@ private AbfsClient(final URL baseUrl,
           metricIdlePeriod);
     }
     this.abfsMetricUrl = abfsConfiguration.getMetricUri();
+
+    final Class<? extends IdentityTransformerInterface> 
identityTransformerClass =
+        
abfsConfiguration.getRawConfiguration().getClass(FS_AZURE_IDENTITY_TRANSFORM_CLASS,
 IdentityTransformer.class,
+            IdentityTransformerInterface.class);
+    try {
+      this.identityTransformer = identityTransformerClass.getConstructor(
+          
Configuration.class).newInstance(abfsConfiguration.getRawConfiguration());
+    } catch (IllegalAccessException | InstantiationException | 
IllegalArgumentException
+             | InvocationTargetException | NoSuchMethodException e) {
+      LOG.error("IdentityTransformer Init Falied", e);
+      throw new IOException(e);
+    }
+    LOG.trace("IdentityTransformer init complete");
   }
 
   public AbfsClient(final URL baseUrl, final SharedKeyCredentials 
sharedKeyCredentials,
@@ -501,12 +526,12 @@ public abstract AbfsRestOperation 
setFilesystemProperties(Hashtable<String, Stri
    * @param listMaxResults maximum number of blobs to return.
    * @param continuation marker to specify the continuation token.
    * @param tracingContext for tracing the server calls.
-   * @return executed rest operation containing response from server.
+   * @param uri to be used for the path conversion.
+   * @return {@link ListResponseData}. containing listing response.
    * @throws AzureBlobFileSystemException if rest operation or response 
parsing fails.
    */
-  public abstract AbfsRestOperation listPath(String relativePath, boolean 
recursive,
-      int listMaxResults, String continuation, TracingContext tracingContext)
-      throws IOException;
+  public abstract ListResponseData listPath(String relativePath, boolean 
recursive,
+      int listMaxResults, String continuation, TracingContext tracingContext, 
URI uri) throws IOException;
 
   /**
    * Retrieves user-defined metadata on filesystem.
@@ -1685,11 +1710,12 @@ protected boolean isRenameResilience() {
 
   /**
    * Parses response of Listing API from server based on Endpoint used.
-   * @param stream InputStream of the response
-   * @return ListResultSchema
+   * @param result AbfsHttpOperation of list Operation.
+   * @param uri to be used for the path conversion.
+   * @return {@link ListResponseData} containing the list of entries.
    * @throws IOException if parsing fails
    */
-  public abstract ListResultSchema parseListPathResults(InputStream stream) 
throws IOException;
+  public abstract ListResponseData parseListPathResults(AbfsHttpOperation 
result, URI uri) throws IOException;
 
   /**
    * Parses response of Get Block List from server based on Endpoint used.
@@ -1707,13 +1733,6 @@ protected boolean isRenameResilience() {
    */
   public abstract StorageErrorResponseSchema 
processStorageErrorResponse(InputStream stream) throws IOException;
 
-  /**
-   * Returns continuation token from server response based on Endpoint used.
-   * @param result response from server
-   * @return continuation token
-   */
-  public abstract String getContinuationFromResponse(AbfsHttpOperation result);
-
   /**
    * Returns user-defined metadata from server response based on Endpoint used.
    * @param result response from server
@@ -1757,4 +1776,90 @@ protected AbfsRestOperation getSuccessOp(final 
AbfsRestOperationType operationTy
     successOp.hardSetResult(HttpURLConnection.HTTP_OK);
     return successOp;
   }
+
+  /**
+   * Get the primary user group name.
+   * @return primary user group name
+   * @throws AzureBlobFileSystemException if unable to get the primary user 
group
+   */
+  private String getPrimaryUserGroup() throws AzureBlobFileSystemException {
+    if 
(!getAbfsConfiguration().getSkipUserGroupMetadataDuringInitialization()) {
+      try {
+        return UserGroupInformation.getCurrentUser().getPrimaryGroupName();
+      } catch (IOException ex) {
+        LOG.error("Failed to get primary group for {}, using user name as 
primary group name",
+            getPrimaryUser());
+      }
+    }
+    //Provide a default group name
+    return getPrimaryUser();
+  }
+
+  /**
+   * Get the primary username.
+   * @return primary username
+   * @throws AzureBlobFileSystemException if unable to get the primary user
+   */
+  private String getPrimaryUser() throws AzureBlobFileSystemException {
+    try {
+      return UserGroupInformation.getCurrentUser().getUserName();
+    } catch (IOException ex) {
+      throw new AbfsDriverException(ex);
+    }
+  }
+
+  /**
+   * Creates a VersionedFileStatus object from the ListResultEntrySchema.
+   * @param entry ListResultEntrySchema object.
+   * @param uri to be used for the path conversion.
+   * @return VersionedFileStatus object.
+   * @throws AzureBlobFileSystemException if transformation fails.
+   */
+  protected VersionedFileStatus getVersionedFileStatusFromEntry(
+      ListResultEntrySchema entry, URI uri) throws 
AzureBlobFileSystemException {
+    long blockSize = abfsConfiguration.getAzureBlockSize();
+    String owner = null, group = null;
+    try{
+      if (identityTransformer != null) {
+        owner = identityTransformer.transformIdentityForGetRequest(
+            entry.owner(), true, getPrimaryUser());
+        group = identityTransformer.transformIdentityForGetRequest(
+            entry.group(), false, getPrimaryUserGroup());
+      }
+    } catch (IOException ex) {
+      LOG.error("Failed to get owner/group for path {}", entry.name(), ex);
+      throw new AbfsDriverException(ex);
+    }
+    final String encryptionContext = entry.getXMsEncryptionContext();
+    final FsPermission fsPermission = entry.permissions() == null
+        ? new AbfsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)
+        : AbfsPermission.valueOf(entry.permissions());
+    final boolean hasAcl = AbfsPermission.isExtendedAcl(entry.permissions());
+
+    long lastModifiedMillis = 0;
+    long contentLength = entry.contentLength() == null ? 0 : 
entry.contentLength();
+    boolean isDirectory = entry.isDirectory() != null && entry.isDirectory();
+    if (entry.lastModified() != null && !entry.lastModified().isEmpty()) {
+      lastModifiedMillis = DateTimeUtils.parseLastModifiedTime(
+          entry.lastModified());
+    }
+
+    Path entryPath = new Path(File.separator + entry.name());
+    if (uri != null) {
+      entryPath = entryPath.makeQualified(uri, entryPath);
+    }
+    return new VersionedFileStatus(
+        owner,
+        group,
+        fsPermission,
+        hasAcl,
+        contentLength,
+        isDirectory,
+        1,
+        blockSize,
+        lastModifiedMillis,
+        entryPath,
+        entry.eTag(),
+        encryptionContext);
+  }
 }
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java
index 05acaa78f48..ef4194179dc 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java
@@ -23,6 +23,7 @@
 import java.io.InputStream;
 import java.io.UnsupportedEncodingException;
 import java.net.HttpURLConnection;
+import java.net.URI;
 import java.net.URL;
 import java.nio.ByteBuffer;
 import java.nio.CharBuffer;
@@ -30,6 +31,7 @@
 import java.nio.charset.Charset;
 import java.nio.charset.CharsetDecoder;
 import java.nio.charset.CharsetEncoder;
+import java.util.ArrayList;
 import java.util.Hashtable;
 import java.util.List;
 import java.util.Map;
@@ -41,6 +43,7 @@
 import com.fasterxml.jackson.databind.ObjectMapper;
 
 import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
@@ -58,8 +61,8 @@
 import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException;
 import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidFileSystemPropertyException;
 import 
org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
+import 
org.apache.hadoop.fs.azurebfs.contracts.services.DfsListResultEntrySchema;
 import org.apache.hadoop.fs.azurebfs.contracts.services.DfsListResultSchema;
-import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema;
 import 
org.apache.hadoop.fs.azurebfs.contracts.services.StorageErrorResponseSchema;
 import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider;
 import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider;
@@ -138,6 +141,7 @@
 import static 
org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND;
 import static 
org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.UNAUTHORIZED_BLOB_OVERWRITE;
 import static 
org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_CREATE_RECOVERY;
+import static 
org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_DFS_LIST_PARSING;
 import static 
org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_FILE_ALREADY_EXISTS;
 import static 
org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_RENAME_RECOVERY;
 
@@ -308,15 +312,16 @@ public AbfsRestOperation deleteFilesystem(TracingContext 
tracingContext)
    * @param listMaxResults maximum number of blobs to return.
    * @param continuation marker to specify the continuation token.
    * @param tracingContext for tracing the server calls.
-   * @return executed rest operation containing response from server.
+   * @param uri to be used for path conversion.
+   * @return {@link ListResponseData}. containing listing response.
    * @throws AzureBlobFileSystemException if rest operation or response 
parsing fails.
    */
   @Override
-  public AbfsRestOperation listPath(final String relativePath,
+  public ListResponseData listPath(final String relativePath,
       final boolean recursive,
       final int listMaxResults,
       final String continuation,
-      TracingContext tracingContext) throws IOException {
+      TracingContext tracingContext, URI uri) throws IOException {
     final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
 
     final AbfsUriQueryBuilder abfsUriQueryBuilder = 
createDefaultUriQueryBuilder();
@@ -337,7 +342,9 @@ public AbfsRestOperation listPath(final String relativePath,
         AbfsRestOperationType.ListPaths,
         HTTP_METHOD_GET, url, requestHeaders);
     op.execute(tracingContext);
-    return op;
+    ListResponseData listResponseData = parseListPathResults(op.getResult(), 
uri);
+    listResponseData.setOp(op);
+    return listResponseData;
   }
 
   /**
@@ -1444,8 +1451,7 @@ public boolean checkUserError(int responseStatusCode) {
    * @param result The response from the server.
    * @return The continuation token.
    */
-  @Override
-  public String getContinuationFromResponse(AbfsHttpOperation result) {
+  private String getContinuationFromResponse(AbfsHttpOperation result) {
     return 
result.getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION);
   }
 
@@ -1464,20 +1470,41 @@ public Hashtable<String, String> 
getXMSProperties(AbfsHttpOperation result)
 
   /**
    * Parse the list file response from DFS ListPath API in Json format
-   * @param stream InputStream contains the list results.
-   * @throws IOException if parsing fails.
+   * @param result InputStream contains the list results.
+   * @param uri to be used for path conversion.
+   * @return {@link ListResponseData}. containing listing response.
+   * @throws AzureBlobFileSystemException if parsing fails.
    */
   @Override
-  public ListResultSchema parseListPathResults(final InputStream stream) 
throws IOException {
-    DfsListResultSchema listResultSchema;
-    try {
-      final ObjectMapper objectMapper = new ObjectMapper();
-      listResultSchema = objectMapper.readValue(stream, 
DfsListResultSchema.class);
+  public ListResponseData parseListPathResults(AbfsHttpOperation result, URI 
uri) throws AzureBlobFileSystemException {
+    try (InputStream listResultInputStream = result.getListResultStream()) {
+      DfsListResultSchema listResultSchema;
+      try {
+        final ObjectMapper objectMapper = new ObjectMapper();
+        listResultSchema = objectMapper.readValue(listResultInputStream,
+            DfsListResultSchema.class);
+        result.setListResultSchema(listResultSchema);
+        LOG.debug("ListPath listed {} paths with {} as continuation token",
+            listResultSchema.paths().size(),
+            getContinuationFromResponse(result));
+      } catch (IOException ex) {
+        throw new AbfsDriverException(ex);
+      }
+
+      List<FileStatus> fileStatuses = new ArrayList<>();
+      for (DfsListResultEntrySchema entry : listResultSchema.paths()) {
+        fileStatuses.add(getVersionedFileStatusFromEntry(entry, uri));
+      }
+      ListResponseData listResponseData = new ListResponseData();
+      listResponseData.setFileStatusList(fileStatuses);
+      listResponseData.setRenamePendingJsonPaths(null);
+      listResponseData.setContinuationToken(
+          getContinuationFromResponse(result));
+      return listResponseData;
     } catch (IOException ex) {
-      LOG.error("Unable to deserialize list results", ex);
-      throw ex;
+      LOG.error("Unable to deserialize list results for Uri {}", 
uri.toString(), ex);
+      throw new AbfsDriverException(ERR_DFS_LIST_PARSING, ex);
     }
-    return listResultSchema;
   }
 
   @Override
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsErrors.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsErrors.java
index 511848c4d0a..da761f7b4e0 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsErrors.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsErrors.java
@@ -73,6 +73,8 @@ public final class AbfsErrors {
       "Error while recovering from create failure.";
   public static final String ERR_RENAME_RECOVERY =
       "Error while recovering from rename failure.";
+  public static final String ERR_BLOB_LIST_PARSING = "Parsing of XML List 
Response Failed in BlobClient.";
+  public static final String ERR_DFS_LIST_PARSING = "Parsing of Json List 
Response Failed in DfsClient.";
   public static final String INCORRECT_INGRESS_TYPE = "Ingress Type Cannot be 
DFS for Blob endpoint configured filesystem.";
   private AbfsErrors() {}
 }
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java
index 017a4f41807..fc35aecd587 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java
@@ -75,6 +75,7 @@ public abstract class AbfsHttpOperation implements 
AbfsPerfLoggable {
   private String requestId = "";
   private String expectedAppendPos = "";
   private ListResultSchema listResultSchema = null;
+  private InputStream listResultStream = null;
   private List<String> blockIdList = null;
 
   // metrics
@@ -220,6 +221,10 @@ public ListResultSchema getListResultSchema() {
     return listResultSchema;
   }
 
+  public final InputStream getListResultStream() {
+    return listResultStream;
+  }
+
   /**
    * Get response header value for the given headerKey.
    *
@@ -383,7 +388,8 @@ final void parseResponse(final byte[] buffer,
       // consume the input stream to release resources
       int totalBytesRead = 0;
 
-      try (InputStream stream = getContentInputStream()) {
+      try {
+        InputStream stream = getContentInputStream();
         if (isNullInputStream(stream)) {
           return;
         }
@@ -395,7 +401,7 @@ final void parseResponse(final byte[] buffer,
           if (url.toString().contains(QUERY_PARAM_COMP + EQUAL + BLOCKLIST)) {
             parseBlockListResponse(stream);
           } else {
-            parseListFilesResponse(stream);
+            listResultStream = stream;
           }
         } else {
           if (buffer != null) {
@@ -479,19 +485,6 @@ private void processStorageErrorResponse() {
    */
   protected abstract InputStream getErrorStream() throws IOException;
 
-  /**
-   * Parse the list file response
-   *
-   * @param stream InputStream contains the list results.
-   * @throws IOException if the response cannot be deserialized.
-   */
-  private void parseListFilesResponse(final InputStream stream) throws 
IOException {
-    if (stream == null || listResultSchema != null) {
-      return;
-    }
-    listResultSchema = client.parseListPathResults(stream);
-  }
-
   private void parseBlockListResponse(final InputStream stream) throws 
IOException {
     if (stream == null || blockIdList != null) {
       return;
@@ -579,7 +572,6 @@ public final long getSendLatency() {
   public final long getRecvLatency() {
     return recvResponseTimeMs;
   }
-
   /**
    * Set response status code for the server call.
    *
@@ -668,6 +660,14 @@ protected boolean isConnectionDisconnectedOnError() {
     return connectionDisconnectedOnError;
   }
 
+  /**
+   * Sets the list result schema after parsing done on Client.
+   * @param listResultSchema ListResultSchema
+   */
+  protected void setListResultSchema(final ListResultSchema listResultSchema) {
+    this.listResultSchema = listResultSchema;
+  }
+
   public static class AbfsHttpOperationWithFixedResult extends 
AbfsHttpOperation {
     /**
      * Creates an instance to represent fixed results.
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListActionTaker.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListActionTaker.java
index ed3d464e9b6..74f5aa4ffb5 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListActionTaker.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListActionTaker.java
@@ -239,7 +239,7 @@ protected String listAndEnqueue(final ListBlobQueue 
listBlobQueue,
       op = getAbfsClient().listPath(path.toUri().getPath(),
           true,
           queueAvailableSizeForProduction, continuationToken,
-          tracingContext);
+          tracingContext, null).getOp();
     } catch (AzureBlobFileSystemException ex) {
       throw ex;
     } catch (IOException ex) {
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListResponseData.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListResponseData.java
new file mode 100644
index 00000000000..28f10226d95
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListResponseData.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * This class is used to hold the response data for list operations.
+ * It contains a list of FileStatus objects, a map of rename pending JSON 
paths,
+ * continuation token and the executed REST operation.
+ */
+public class ListResponseData {
+
+  private List<FileStatus> fileStatusList;
+  private Map<Path, Integer> renamePendingJsonPaths;
+  private AbfsRestOperation executedRestOperation;
+  private String continuationToken;
+
+  /**
+   * Returns the list of FileStatus objects.
+   * @return the list of FileStatus objects
+   */
+  public List<FileStatus> getFileStatusList() {
+    return fileStatusList;
+  }
+
+  /**
+   * Sets the list of FileStatus objects.
+   * @param fileStatusList the list of FileStatus objects
+   */
+  public void setFileStatusList(final List<FileStatus> fileStatusList) {
+    this.fileStatusList = fileStatusList;
+  }
+
+  /**
+   * Returns the map of rename pending JSON paths.
+   * @return the map of rename pending JSON paths
+   */
+  public Map<Path, Integer> getRenamePendingJsonPaths() {
+    return renamePendingJsonPaths;
+  }
+
+  /**
+   * Sets the map of rename pending JSON paths.
+   * @param renamePendingJsonPaths the map of rename pending JSON paths
+   */
+  public void setRenamePendingJsonPaths(final Map<Path, Integer> 
renamePendingJsonPaths) {
+    this.renamePendingJsonPaths = renamePendingJsonPaths;
+  }
+
+  /**
+   * Returns the executed REST operation.
+   * @return the executed REST operation
+   */
+  public AbfsRestOperation getOp() {
+    return executedRestOperation;
+  }
+
+  /**
+   * Sets the executed REST operation.
+   * @param executedRestOperation the executed REST operation
+   */
+  public void setOp(final AbfsRestOperation executedRestOperation) {
+    this.executedRestOperation = executedRestOperation;
+  }
+
+  /**
+   * Returns the continuation token.
+   * @return the continuation token
+   */
+  public String getContinuationToken() {
+    return continuationToken;
+  }
+
+  /**
+   * Sets the continuation token.
+   * @param continuationToken the continuation token
+   */
+  public void setContinuationToken(final String continuationToken) {
+    this.continuationToken = continuationToken;
+  }
+}
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/VersionedFileStatus.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/VersionedFileStatus.java
new file mode 100644
index 00000000000..2db6e9dd83e
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/VersionedFileStatus.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import org.apache.hadoop.fs.EtagSource;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+
+/**
+ * A File status with version info extracted from the etag value returned
+ * in a LIST or HEAD request.
+ * The etag is included in the java serialization.
+ */
+public class VersionedFileStatus extends FileStatus implements EtagSource {
+
+  /**
+   * The superclass is declared serializable; this subclass can also
+   * be serialized.
+   */
+  private static final long serialVersionUID = -2009013240419749458L;
+
+  /**
+   * The etag of an object.
+   * Not-final so that serialization via reflection will preserve the value.
+   */
+  private String version;
+
+  private String encryptionContext;
+
+  public VersionedFileStatus(
+      final String owner, final String group, final FsPermission fsPermission, 
final boolean hasAcl,
+      final long length, final boolean isdir, final int blockReplication,
+      final long blocksize, final long modificationTime, final Path path,
+      final String version, final String encryptionContext) {
+    super(length, isdir, blockReplication, blocksize, modificationTime, 0,
+        fsPermission,
+        owner,
+        group,
+        null,
+        path,
+        hasAcl, false, false);
+
+    this.version = version;
+    this.encryptionContext = encryptionContext;
+  }
+
+  /** Compare if this object is equal to another object.
+   * @param   obj the object to be compared.
+   * @return  true if two file status has the same path name; false if not.
+   */
+  @Override
+  public boolean equals(Object obj) {
+    if (!(obj instanceof FileStatus)) {
+      return false;
+    }
+
+    FileStatus other = (FileStatus) obj;
+
+    if (!this.getPath().equals(other.getPath())) {// compare the path
+      return false;
+    }
+
+    if (other instanceof VersionedFileStatus) {
+      return this.version.equals(((VersionedFileStatus) other).version);
+    }
+
+    return true;
+  }
+
+  /**
+   * Returns a hash code value for the object, which is defined as
+   * the hash code of the path name.
+   *
+   * @return  a hash code value for the path name and version
+   */
+  @Override
+  public int hashCode() {
+    int hash = getPath().hashCode();
+    hash = 89 * hash + (this.version != null ? this.version.hashCode() : 0);
+    return hash;
+  }
+
+  /**
+   * Returns the version of this FileStatus
+   *
+   * @return  a string value for the FileStatus version
+   */
+  public String getVersion() {
+    return this.version;
+  }
+
+  /**
+   * Returns the etag of this FileStatus.
+   * @return a string value for the FileStatus etag.
+   */
+  @Override
+  public String getEtag() {
+    return getVersion();
+  }
+
+  /**
+   * Returns the encryption context of this FileStatus
+   * @return a string value for the FileStatus encryption context
+   */
+  public String getEncryptionContext() {
+    return encryptionContext;
+  }
+
+  /**
+   * Returns a string representation of the object.
+   * @return a string representation of the object
+   */
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder(
+        "VersionedFileStatus{");
+    sb.append(super.toString());
+    sb.append("; version='").append(version).append('\'');
+    sb.append('}');
+    return sb.toString();
+  }
+}
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsClient.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsClient.java
index baa57da2881..c180689b267 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsClient.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsClient.java
@@ -65,7 +65,7 @@ public void testContinuationTokenHavingEqualSign() throws 
Exception {
     try {
       AbfsRestOperation op = abfsClient
           .listPath("/", true, LIST_MAX_RESULTS, "===========",
-              getTestTracingContext(fs, true));
+              getTestTracingContext(fs, true), null).getOp();
       Assert.assertTrue(false);
     } catch (AbfsRestOperationException ex) {
       Assert.assertEquals("InvalidQueryParameterValue", 
ex.getErrorCode().getErrorCode());
@@ -106,7 +106,7 @@ public void testListPathWithValidListMaxResultsValues()
 
       AbfsRestOperation op = getFileSystem().getAbfsClient().listPath(
           directory.toString(), false, getListMaxResults(), null,
-          getTestTracingContext(getFileSystem(), true));
+          getTestTracingContext(getFileSystem(), true), null).getOp();
 
       List<? extends ListResultEntrySchema> list = 
op.getResult().getListResultSchema().paths();
       String continuationToken = 
op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION);
@@ -141,7 +141,7 @@ public void testListPathWithValueGreaterThanServerMaximum()
 
     AbfsRestOperation op = getFileSystem().getAbfsClient().listPath(
         directory.toString(), false, getListMaxResults(), null,
-        getTestTracingContext(getFileSystem(), true));
+        getTestTracingContext(getFileSystem(), true), null).getOp();
 
     List<? extends ListResultEntrySchema> list = 
op.getResult().getListResultSchema().paths();
     String continuationToken = 
op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION);
@@ -179,7 +179,7 @@ private List<? extends ListResultEntrySchema> 
listPath(String directory)
       throws IOException {
     return getFileSystem().getAbfsClient()
         .listPath(directory, false, getListMaxResults(), null,
-            getTestTracingContext(getFileSystem(), true)).getResult()
+            getTestTracingContext(getFileSystem(), true), 
null).getOp().getResult()
         .getListResultSchema().paths();
   }
 
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsCustomEncryption.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsCustomEncryption.java
index 2a06dfb4acc..781cd701400 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsCustomEncryption.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsCustomEncryption.java
@@ -54,6 +54,7 @@
 import org.apache.hadoop.fs.azurebfs.services.AbfsDfsClient;
 import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
 import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
+import org.apache.hadoop.fs.azurebfs.services.VersionedFileStatus;
 import org.apache.hadoop.fs.azurebfs.utils.EncryptionType;
 import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
 import org.apache.hadoop.fs.impl.OpenFileParameters;
@@ -301,10 +302,10 @@ private AbfsRestOperation 
callOperation(AzureBlobFileSystem fs,
            */
           FileStatus status = fs.listStatus(testPath)[0];
           Assertions.assertThat(status)
-              
.isInstanceOf(AzureBlobFileSystemStore.VersionedFileStatus.class);
+              .isInstanceOf(VersionedFileStatus.class);
 
           Assertions.assertThat(
-                  ((AzureBlobFileSystemStore.VersionedFileStatus) 
status).getEncryptionContext())
+                  ((VersionedFileStatus) status).getEncryptionContext())
               .isNotNull();
 
           try (FSDataInputStream in = fs.openFileWithOptions(testPath,
@@ -343,7 +344,7 @@ true, new BlobAppendRequestParameters(BLOCK_ID, null)),
           getTestTracingContext(fs, false));
       case LISTSTATUS:
         return client.listPath(path, false, 5, null,
-          getTestTracingContext(fs, true));
+          getTestTracingContext(fs, true), null).getOp();
       case RENAME:
         TracingContext tc = getTestTracingContext(fs, true);
         return client.renamePath(path, new Path(path + "_2").toString(),
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java
index f59c4a8b9f2..4b08e7d2246 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java
@@ -66,6 +66,7 @@
 import org.apache.hadoop.fs.azurebfs.services.ITestAbfsClient;
 import org.apache.hadoop.fs.azurebfs.services.RenameAtomicity;
 import org.apache.hadoop.fs.azurebfs.services.TestAbfsClient;
+import org.apache.hadoop.fs.azurebfs.services.VersionedFileStatus;
 import org.apache.hadoop.fs.azurebfs.utils.DirectoryStateHelper;
 import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
 import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator;
@@ -251,8 +252,8 @@ private AzureBlobFileSystem createJsonFile(Path path, Path 
renameJson) throws IO
     doReturn(client).when(store).getClient();
     fs.setWorkingDirectory(new Path(ROOT_PATH));
     fs.mkdirs(new Path(path, "test3"));
-    AzureBlobFileSystemStore.VersionedFileStatus fileStatus
-        = (AzureBlobFileSystemStore.VersionedFileStatus) 
fs.getFileStatus(path);
+    VersionedFileStatus fileStatus
+        = (VersionedFileStatus) fs.getFileStatus(path);
     new RenameAtomicity(path,
         new Path("/hbase/test4"), renameJson,
         getTestTracingContext(fs, true), fileStatus.getEtag(),
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java
index 3f16e6be80f..e20a2c43279 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java
@@ -20,6 +20,7 @@
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.net.URI;
 import java.nio.file.AccessDeniedException;
 import java.util.ArrayList;
 import java.util.List;
@@ -311,12 +312,11 @@ public void testDeleteIdempotencyTriggerHttp404() throws 
Exception {
       doCallRealMethod().when(mockClient)
               .listPath(Mockito.nullable(String.class), Mockito.anyBoolean(),
                       Mockito.anyInt(), Mockito.nullable(String.class),
-                      Mockito.nullable(TracingContext.class));
+                  Mockito.nullable(TracingContext.class), 
Mockito.nullable(URI.class));
       doCallRealMethod().when((AbfsBlobClient) mockClient)
               .listPath(Mockito.nullable(String.class), Mockito.anyBoolean(),
                       Mockito.anyInt(), Mockito.nullable(String.class),
-                      Mockito.nullable(TracingContext.class),
-                      Mockito.anyBoolean());
+                      Mockito.nullable(TracingContext.class), 
Mockito.nullable(URI.class), Mockito.anyBoolean());
       doCallRealMethod().when((AbfsBlobClient) mockClient)
               .getPathStatus(Mockito.nullable(String.class), 
Mockito.nullable(TracingContext.class),
                       Mockito.nullable(ContextEncryptionAdapter.class), 
Mockito.anyBoolean());
@@ -531,12 +531,12 @@ public void testDeleteImplicitDirWithSingleListResults() 
throws Exception {
               boolean recursive = answer.getArgument(1);
               String continuation = answer.getArgument(3);
               TracingContext context = answer.getArgument(4);
-              return client.listPath(path, recursive, 1, continuation, 
context);
+              return client.listPath(path, recursive, 1, continuation, 
context, null);
             })
             .when(spiedClient)
             .listPath(Mockito.anyString(), Mockito.anyBoolean(), 
Mockito.anyInt(),
                     Mockito.nullable(String.class),
-                    Mockito.any(TracingContext.class));
+                    Mockito.any(TracingContext.class), 
Mockito.nullable(URI.class));
     client.deleteBlobPath(new Path("/testDir/dir1"),
             null, getTestTracingContext(fs, true));
     fs.delete(new Path("/testDir/dir1"), true);
@@ -683,14 +683,14 @@ public void testProducerStopOnDeleteFailure() throws 
Exception {
             })
             .when(spiedClient)
             .listPath(Mockito.anyString(), Mockito.anyBoolean(), 
Mockito.anyInt(),
-                    Mockito.nullable(String.class), 
Mockito.any(TracingContext.class));
+                    Mockito.nullable(String.class), 
Mockito.any(TracingContext.class), Mockito.nullable(URI.class));
     intercept(AccessDeniedException.class,
             () -> {
               fs.delete(new Path("/src"), true);
             });
     Mockito.verify(spiedClient, Mockito.times(1))
             .listPath(Mockito.anyString(), Mockito.anyBoolean(), 
Mockito.anyInt(),
-                    Mockito.nullable(String.class), 
Mockito.any(TracingContext.class));
+                    Mockito.nullable(String.class), 
Mockito.any(TracingContext.class), Mockito.nullable(URI.class));
   }
 
   /**
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFileStatus.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFileStatus.java
index 87b1ea4321d..f226e5b61fb 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFileStatus.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFileStatus.java
@@ -98,6 +98,7 @@ private FileStatus validateStatus(final AzureBlobFileSystem 
fs, final Path name,
         assertTrue(errorInStatus + "not a file", fileStatus.isFile());
       }
     }
+    assertPathDns(fileStatus.getPath());
 
     return fileStatus;
   }
@@ -255,7 +256,7 @@ public void 
testListStatusIsCalledForImplicitPathOnBlobEndpoint() throws Excepti
     fs.getFileStatus(implicitPath);
 
     Mockito.verify(abfsClient, Mockito.times(1)).getPathStatus(any(), 
eq(false), any(), any());
-    Mockito.verify(abfsClient, Mockito.times(1)).listPath(any(), eq(false), 
eq(1), any(), any(), eq(false));
+    Mockito.verify(abfsClient, Mockito.times(1)).listPath(any(), eq(false), 
eq(1), any(), any(), any(), eq(false));
   }
 
   /**
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatus.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatus.java
index e563e082b80..1c4ef6d4212 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatus.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatus.java
@@ -28,6 +28,7 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
+import org.assertj.core.api.Assertions;
 import org.junit.Test;
 import org.mockito.Mockito;
 import org.mockito.stubbing.Stubber;
@@ -39,21 +40,20 @@
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
-import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
-import 
org.apache.hadoop.fs.azurebfs.contracts.services.DfsListResultEntrySchema;
-import org.apache.hadoop.fs.azurebfs.contracts.services.DfsListResultSchema;
-import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultEntrySchema;
-import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema;
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
+import org.apache.hadoop.fs.azurebfs.services.ListResponseData;
 import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
 import org.apache.hadoop.fs.azurebfs.services.AbfsClientTestUtil;
+import org.apache.hadoop.fs.azurebfs.utils.DirectoryStateHelper;
 import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
 import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderFormat;
 import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
 
 import static java.net.HttpURLConnection.HTTP_OK;
-import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH;
 import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_LIST_MAX_RESULTS;
+import static org.apache.hadoop.fs.azurebfs.services.RenameAtomicity.SUFFIX;
 import static 
org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_TIMEOUT_JDK_MESSAGE;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.assertMkdirs;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
@@ -72,7 +72,7 @@
 public class ITestAzureBlobFileSystemListStatus extends
     AbstractAbfsIntegrationTest {
   private static final int TEST_FILES_NUMBER = 6000;
-  private static final String TEST_CONTINUATION_TOKEN = "continuation";
+  public static final String TEST_CONTINUATION_TOKEN = "continuation";
 
   public ITestAzureBlobFileSystemListStatus() throws Exception {
     super();
@@ -122,43 +122,20 @@ public Void call() throws Exception {
    */
   @Test
   public void testListPathTracingContext() throws Exception {
-    assumeDfsServiceType();
-    final AzureBlobFileSystem fs = getFileSystem();
-    final AzureBlobFileSystem spiedFs = Mockito.spy(fs);
-    final AzureBlobFileSystemStore spiedStore = Mockito.spy(fs.getAbfsStore());
-    final AbfsClient spiedClient = Mockito.spy(fs.getAbfsClient());
+    final AzureBlobFileSystem spiedFs = Mockito.spy(getFileSystem());
+    final AzureBlobFileSystemStore spiedStore = 
Mockito.spy(spiedFs.getAbfsStore());
+    final AbfsClient spiedClient = Mockito.spy(spiedFs.getAbfsClient());
     final TracingContext spiedTracingContext = Mockito.spy(
         new TracingContext(
-            fs.getClientCorrelationId(), fs.getFileSystemId(),
+            spiedFs.getClientCorrelationId(), spiedFs.getFileSystemId(),
             FSOperationType.LISTSTATUS, true, 
TracingHeaderFormat.ALL_ID_FORMAT, null));
 
     Mockito.doReturn(spiedStore).when(spiedFs).getAbfsStore();
-    spiedStore.setClient(spiedClient);
+    Mockito.doReturn(spiedClient).when(spiedStore).getClient();
     spiedFs.setWorkingDirectory(new Path("/"));
 
-    
AbfsClientTestUtil.setMockAbfsRestOperationForListPathOperation(spiedClient,
+    AbfsClientTestUtil.setMockAbfsRestOperationForListOperation(spiedClient,
         (httpOperation) -> {
-
-          ListResultEntrySchema entry = new DfsListResultEntrySchema()
-              .withName("a")
-              .withIsDirectory(true);
-          List<ListResultEntrySchema> paths = new ArrayList<>();
-          paths.add(entry);
-          paths.clear();
-          entry = new DfsListResultEntrySchema()
-              .withName("abc.txt")
-              .withIsDirectory(false);
-          paths.add(entry);
-          ListResultSchema schema1 = new 
DfsListResultSchema().withPaths(paths);
-          ListResultSchema schema2 = new 
DfsListResultSchema().withPaths(paths);
-
-          when(httpOperation.getListResultSchema()).thenReturn(schema1)
-              .thenReturn(schema2);
-          when(httpOperation.getResponseHeader(
-              HttpHeaderConfigurations.X_MS_CONTINUATION))
-              .thenReturn(TEST_CONTINUATION_TOKEN)
-              .thenReturn(EMPTY_STRING);
-
           Stubber stubber = Mockito.doThrow(
               new SocketTimeoutException(CONNECTION_TIMEOUT_JDK_MESSAGE));
           stubber.doNothing().when(httpOperation).processResponse(
@@ -176,12 +153,12 @@ public void testListPathTracingContext() throws Exception 
{
     Mockito.verify(spiedClient, times(1)).listPath(
         "/", false,
         spiedFs.getAbfsStore().getAbfsConfiguration().getListMaxResults(),
-        null, spiedTracingContext);
+        null, spiedTracingContext, spiedFs.getAbfsStore().getUri());
     // 2. With continuation token
     Mockito.verify(spiedClient, times(1)).listPath(
         "/", false,
         spiedFs.getAbfsStore().getAbfsConfiguration().getListMaxResults(),
-        TEST_CONTINUATION_TOKEN, spiedTracingContext);
+        TEST_CONTINUATION_TOKEN, spiedTracingContext, 
spiedFs.getAbfsStore().getUri());
 
     // Assert that none of the API calls used the same tracing header.
     Mockito.verify(spiedTracingContext, times(0)).constructHeader(any(), 
any(), any());
@@ -343,4 +320,208 @@ public void testRenameTrailingPeriodFile() throws 
IOException {
     assertTrue("Attempt to create file that ended with a dot should"
         + " throw IllegalArgumentException", exceptionThrown);
   }
+
+
+
+  /**
+   * Test to verify that listStatus returns the correct file status all types
+   * of paths viz. implicit, explicit, file.
+   * @throws Exception if there is an error or test assertions fails.
+   */
+  @Test
+  public void testListStatusWithImplicitExplicitChildren() throws Exception {
+    final AzureBlobFileSystem fs = getFileSystem();
+    fs.setWorkingDirectory(new Path(ROOT_PATH));
+    Path root = new Path(ROOT_PATH);
+
+    // Create an implicit directory under root
+    Path dir = new Path("a");
+    Path fileInsideDir = new Path("a/file");
+    createAzCopyFolder(dir);
+
+    // Assert that implicit directory is returned
+    FileStatus[] fileStatuses = fs.listStatus(root);
+    Assertions.assertThat(fileStatuses.length)
+        .describedAs("List size is not expected").isEqualTo(1);
+    assertImplicitDirectoryFileStatus(fileStatuses[0], fs.makeQualified(dir));
+
+    // Create a marker blob for the directory.
+    fs.create(fileInsideDir);
+
+    // Assert that only one entry of explicit directory is returned
+    fileStatuses = fs.listStatus(root);
+    Assertions.assertThat(fileStatuses.length)
+        .describedAs("List size is not expected").isEqualTo(1);
+    assertExplicitDirectoryFileStatus(fileStatuses[0], fs.makeQualified(dir));
+
+    // Create a file under root
+    Path file1 = new Path("b");
+    fs.create(file1);
+
+    // Assert that two entries are returned in alphabetic order.
+    fileStatuses = fs.listStatus(root);
+    Assertions.assertThat(fileStatuses.length)
+        .describedAs("List size is not expected").isEqualTo(2);
+    assertExplicitDirectoryFileStatus(fileStatuses[0], fs.makeQualified(dir));
+    assertFilePathFileStatus(fileStatuses[1], fs.makeQualified(file1));
+
+    // Create another implicit directory under root.
+    Path dir2 = new Path("c");
+    createAzCopyFolder(dir2);
+
+    // Assert that three entries are returned in alphabetic order.
+    fileStatuses = fs.listStatus(root);
+    Assertions.assertThat(fileStatuses.length)
+        .describedAs("List size is not expected").isEqualTo(3);
+    assertExplicitDirectoryFileStatus(fileStatuses[0], fs.makeQualified(dir));
+    assertFilePathFileStatus(fileStatuses[1], fs.makeQualified(file1));
+    assertImplicitDirectoryFileStatus(fileStatuses[2], fs.makeQualified(dir2));
+  }
+
+  /**
+   * Test to verify that listStatus returns the correct file status when 
called on an implicit path
+   * @throws Exception if there is an error or test assertions fails.
+   */
+  @Test
+  public void testListStatusOnImplicitDirectoryPath() throws Exception {
+    final AzureBlobFileSystem fs = getFileSystem();
+    Path implicitPath = new Path("/implicitDir");
+    createAzCopyFolder(implicitPath);
+
+    FileStatus[] statuses = fs.listStatus(implicitPath);
+    Assertions.assertThat(statuses.length)
+        .describedAs("List size is not expected").isGreaterThanOrEqualTo(1);
+    assertImplicitDirectoryFileStatus(statuses[0], 
fs.makeQualified(statuses[0].getPath()));
+
+    FileStatus[] statuses1 = fs.listStatus(new 
Path(statuses[0].getPath().toString()));
+    Assertions.assertThat(statuses1.length)
+        .describedAs("List size is not expected").isGreaterThanOrEqualTo(1);
+    assertFilePathFileStatus(statuses1[0], 
fs.makeQualified(statuses1[0].getPath()));
+  }
+
+  @Test
+  public void testListStatusOnEmptyDirectory() throws Exception {
+    final AzureBlobFileSystem fs = getFileSystem();
+    Path emptyDir = new Path("/emptyDir");
+    fs.mkdirs(emptyDir);
+
+    FileStatus[] statuses = fs.listStatus(emptyDir);
+    Assertions.assertThat(statuses.length)
+        .describedAs("List size is not expected").isEqualTo(0);
+  }
+
+  @Test
+  public void testListStatusOnRenamePendingJsonFile() throws Exception {
+    final AzureBlobFileSystem fs = getFileSystem();
+    Path renamePendingJsonPath = new Path("/hbase/A/A-" + SUFFIX);
+    fs.create(renamePendingJsonPath);
+
+    FileStatus[] statuses = fs.listStatus(renamePendingJsonPath);
+    Assertions.assertThat(statuses.length)
+        .describedAs("List size is not expected").isEqualTo(1);
+    assertFilePathFileStatus(statuses[0], 
fs.makeQualified(statuses[0].getPath()));
+  }
+
+  @Test
+  public void testContinuationTokenAcrossListStatus() throws Exception {
+    final AzureBlobFileSystem fs = getFileSystem();
+    Path path = new Path("/testContinuationToken");
+    fs.mkdirs(path);
+    fs.create(new Path(path + "/file1"));
+    fs.create(new Path(path + "/file2"));
+
+    fs.listStatus(path);
+
+    ListResponseData listResponseData = fs.getAbfsStore().getClient().listPath(
+        "/testContinuationToken", false, 1, null, getTestTracingContext(fs, 
true),
+        fs.getAbfsStore().getUri());
+
+    Assertions.assertThat(listResponseData.getContinuationToken())
+        .describedAs("Continuation Token Should not be null").isNotNull();
+    Assertions.assertThat(listResponseData.getFileStatusList())
+        .describedAs("Listing Size Not as expected").hasSize(1);
+
+    ListResponseData listResponseData1 =  
fs.getAbfsStore().getClient().listPath(
+        "/testContinuationToken", false, 1, 
listResponseData.getContinuationToken(), getTestTracingContext(fs, true),
+        fs.getAbfsStore().getUri());
+
+    Assertions.assertThat(listResponseData1.getContinuationToken())
+        .describedAs("Continuation Token Should be null").isNull();
+    Assertions.assertThat(listResponseData1.getFileStatusList())
+        .describedAs("Listing Size Not as expected").hasSize(1);
+  }
+
+  @Test
+  public void testInvalidContinuationToken() throws Exception {
+    assumeHnsDisabled();
+    final AzureBlobFileSystem fs = getFileSystem();
+    Path path = new Path("/testInvalidContinuationToken");
+    fs.mkdirs(path);
+    fs.create(new Path(path + "/file1"));
+    fs.create(new Path(path + "/file2"));
+
+    intercept(AbfsRestOperationException.class,
+        () -> fs.getAbfsStore().getClient().listPath(
+            "/testInvalidContinuationToken", false, 1, "invalidToken",
+            getTestTracingContext(fs, true), fs.getAbfsStore().getUri()));
+  }
+
+  @Test
+  public void testEmptyContinuationToken() throws Exception {
+    final AzureBlobFileSystem fs = getFileSystem();
+    Path path = new Path("/testInvalidContinuationToken");
+    fs.mkdirs(path);
+    fs.create(new Path(path + "/file1"));
+    fs.create(new Path(path + "/file2"));
+
+    ListResponseData listResponseData = fs.getAbfsStore().getClient().listPath(
+        "/testInvalidContinuationToken", false, 1, "",
+        getTestTracingContext(fs, true), fs.getAbfsStore().getUri());
+
+    Assertions.assertThat(listResponseData.getContinuationToken())
+        .describedAs("Continuation Token Should Not be null").isNotNull();
+    Assertions.assertThat(listResponseData.getFileStatusList())
+        .describedAs("Listing Size Not as expected").hasSize(1);
+  }
+
+  private void assertFilePathFileStatus(final FileStatus fileStatus,
+      final Path qualifiedPath) {
+    Assertions.assertThat(fileStatus.getPath())
+        .describedAs("Path Not as expected").isEqualTo(qualifiedPath);
+    Assertions.assertThat(fileStatus.isFile())
+        .describedAs("Expecting a File Path").isEqualTo(true);
+    Assertions.assertThat(fileStatus.isDirectory())
+        .describedAs("Expecting a File Path").isEqualTo(false);
+    Assertions.assertThat(fileStatus.getModificationTime()).isNotEqualTo(0);
+  }
+
+  private void assertImplicitDirectoryFileStatus(final FileStatus fileStatus,
+      final Path qualifiedPath) throws Exception {
+    assertDirectoryFileStatus(fileStatus, qualifiedPath);
+    DirectoryStateHelper.isImplicitDirectory(qualifiedPath, getFileSystem(),
+        getTestTracingContext(getFileSystem(), true));
+    Assertions.assertThat(fileStatus.getModificationTime())
+        .describedAs("Last Modified Time Not as Expected").isEqualTo(0);
+  }
+
+  private void assertExplicitDirectoryFileStatus(final FileStatus fileStatus,
+      final Path qualifiedPath) throws Exception {
+    assertDirectoryFileStatus(fileStatus, qualifiedPath);
+    DirectoryStateHelper.isExplicitDirectory(qualifiedPath, getFileSystem(),
+        getTestTracingContext(getFileSystem(), true));
+    Assertions.assertThat(fileStatus.getModificationTime())
+        .describedAs("Last Modified Time Not as Expected").isNotEqualTo(0);
+  }
+
+  private void assertDirectoryFileStatus(final FileStatus fileStatus,
+      final Path qualifiedPath) {
+    Assertions.assertThat(fileStatus.getPath())
+        .describedAs("Path Not as Expected").isEqualTo(qualifiedPath);
+    Assertions.assertThat(fileStatus.isDirectory())
+        .describedAs("Expecting a Directory Path").isEqualTo(true);
+    Assertions.assertThat(fileStatus.isFile())
+        .describedAs("Expecting a Directory Path").isEqualTo(false);
+    Assertions.assertThat(fileStatus.getLen())
+        .describedAs("Content Length Not as Expected").isEqualTo(0);
+  }
 }
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java
index ad352eef69b..fa9b63c0ef9 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java
@@ -20,6 +20,7 @@
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.net.URI;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.AccessDeniedException;
 import java.util.ArrayList;
@@ -64,6 +65,7 @@
 import org.apache.hadoop.fs.azurebfs.services.RenameAtomicity;
 import org.apache.hadoop.fs.azurebfs.services.RenameAtomicityTestUtils;
 import org.apache.hadoop.fs.azurebfs.services.TestAbfsClient;
+import org.apache.hadoop.fs.azurebfs.services.VersionedFileStatus;
 import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
 import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderFormat;
 import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator;
@@ -869,8 +871,8 @@ public void testRenameCompleteBeforeRenameAtomicityRedo() 
throws Exception {
     /*
      * Create renameJson file.
      */
-    AzureBlobFileSystemStore.VersionedFileStatus fileStatus
-        = (AzureBlobFileSystemStore.VersionedFileStatus) 
fs.getFileStatus(path);
+    VersionedFileStatus fileStatus
+        = (VersionedFileStatus) fs.getFileStatus(path);
     int jsonLen = new RenameAtomicity(path,
         new Path("/hbase/test4"), renameJson,
         getTestTracingContext(fs, true), fileStatus.getEtag(),
@@ -1433,12 +1435,12 @@ public void 
testBlobRenameWithListGivingPaginatedResultWithOneObjectPerList()
           String continuation = answer.getArgument(3);
           TracingContext context = answer.getArgument(4);
           return getFileSystem().getAbfsClient()
-              .listPath(path, recursive, 1, continuation, context);
+              .listPath(path, recursive, 1, continuation, context, null);
         })
         .when(spiedClient)
         .listPath(Mockito.anyString(), Mockito.anyBoolean(), Mockito.anyInt(),
             Mockito.nullable(String.class),
-            Mockito.any(TracingContext.class));
+            Mockito.any(TracingContext.class), Mockito.nullable(URI.class));
     fs.rename(new Path("/testDir/dir1"), new Path("/testDir/dir2"));
     for (int i = 0; i < 10; i++) {
       Assertions.assertThat(fs.exists(new Path("/testDir/dir2/file" + i)))
@@ -1516,13 +1518,13 @@ public void testProducerStopOnRenameFailure() throws 
Exception {
             listCallInvocation[0]++;
             return 
getFileSystem().getAbfsClient().listPath(answer.getArgument(0),
                 answer.getArgument(1), 1,
-                answer.getArgument(3), answer.getArgument(4));
+                answer.getArgument(3), answer.getArgument(4), 
answer.getArgument(5));
           }
           return answer.callRealMethod();
         })
         .when(spiedClient)
         .listPath(Mockito.anyString(), Mockito.anyBoolean(), Mockito.anyInt(),
-            Mockito.nullable(String.class), Mockito.any(TracingContext.class));
+            Mockito.nullable(String.class), Mockito.any(TracingContext.class), 
Mockito.nullable(URI.class));
     intercept(AccessDeniedException.class,
         () -> {
           fs.rename(new Path("/src"), new Path("/dst"));
@@ -1780,15 +1782,15 @@ private Configuration createConfig(String 
producerQueueSize, String consumerMaxL
   private void validateRename(AzureBlobFileSystem fs, Path src, Path dst,
       boolean isSrcExist, boolean isDstExist, boolean isJsonExist)
       throws IOException {
-    Assertions.assertThat(fs.exists(dst))
-        .describedAs("Renamed Destination directory should exist.")
-        .isEqualTo(isDstExist);
     Assertions.assertThat(fs.exists(new Path(src.getParent(), src.getName() + 
SUFFIX)))
         .describedAs("Renamed Pending Json file should exist.")
         .isEqualTo(isJsonExist);
     Assertions.assertThat(fs.exists(src))
-        .describedAs("Renamed Destination directory should exist.")
+        .describedAs("Renamed Source directory should not exist.")
         .isEqualTo(isSrcExist);
+    Assertions.assertThat(fs.exists(dst))
+        .describedAs("Renamed Destination directory should exist.")
+        .isEqualTo(isDstExist);
   }
 
   /**
@@ -2087,8 +2089,8 @@ public AzureBlobFileSystem createJsonFile(Path path, Path 
renameJson)
     fs.setWorkingDirectory(new Path(ROOT_PATH));
     fs.create(new Path(path, "file.txt"));
 
-    AzureBlobFileSystemStore.VersionedFileStatus fileStatus
-        = (AzureBlobFileSystemStore.VersionedFileStatus) 
fs.getFileStatus(path);
+    VersionedFileStatus fileStatus
+        = (VersionedFileStatus) fs.getFileStatus(path);
 
     new RenameAtomicity(path, new Path("/hbase/test4"),
         renameJson, getTestTracingContext(fs, true),
@@ -2301,8 +2303,8 @@ public void testListCrashRecoveryWithMultipleJsonFile() 
throws Exception {
       fs.create(new Path(path2, "file3.txt"));
 
       Path renameJson2 = new Path(path2.getParent(), path2.getName() + SUFFIX);
-      AzureBlobFileSystemStore.VersionedFileStatus fileStatus
-          = (AzureBlobFileSystemStore.VersionedFileStatus) 
fs.getFileStatus(path2);
+      VersionedFileStatus fileStatus
+          = (VersionedFileStatus) fs.getFileStatus(path2);
 
       new RenameAtomicity(path2, new Path("/hbase/test4"),
           renameJson2, getTestTracingContext(fs, true),
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java
index a92fd6f6ca1..a6652fe85f1 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java
@@ -44,8 +44,10 @@
 
 import static java.net.HttpURLConnection.HTTP_OK;
 import static java.net.HttpURLConnection.HTTP_UNAVAILABLE;
+import static 
org.apache.hadoop.fs.azurebfs.ITestAzureBlobFileSystemListStatus.TEST_CONTINUATION_TOKEN;
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPLICATION_XML;
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.BLOCKLIST;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_GET;
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
 import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.CONTENT_LENGTH;
@@ -75,7 +77,7 @@ private AbfsClientTestUtil() {
 
   }
 
-  public static void setMockAbfsRestOperationForListPathOperation(
+  public static void setMockAbfsRestOperationForListOperation(
       final AbfsClient spiedClient,
       FunctionRaisingIOE<AbfsJdkHttpOperation, AbfsJdkHttpOperation> 
functionRaisingIOE)
       throws Exception {
@@ -91,13 +93,30 @@ public static void 
setMockAbfsRestOperationForListPathOperation(
         new ArrayList<>(),
         spiedClient.getAbfsConfiguration()
     ));
+    ListResponseData listResponseData1 = Mockito.spy(new ListResponseData());
+    listResponseData1.setRenamePendingJsonPaths(null);
+    listResponseData1.setOp(abfsRestOperation);
+    listResponseData1.setFileStatusList(new ArrayList<>());
+    listResponseData1.setContinuationToken(TEST_CONTINUATION_TOKEN);
+
+    ListResponseData listResponseData2 = Mockito.spy(new ListResponseData());
+    listResponseData2.setRenamePendingJsonPaths(null);
+    listResponseData2.setOp(abfsRestOperation);
+    listResponseData2.setFileStatusList(new ArrayList<>());
+    listResponseData2.setContinuationToken(EMPTY_STRING);
 
     Mockito.doReturn(abfsRestOperation).when(spiedClient).getAbfsRestOperation(
         eq(AbfsRestOperationType.ListPaths), any(), any(), any());
+    Mockito.doReturn(abfsRestOperation).when(spiedClient).getAbfsRestOperation(
+        eq(AbfsRestOperationType.ListBlobs), any(), any(), any());
 
-    addGeneralMockBehaviourToAbfsClient(spiedClient, exponentialRetryPolicy, 
staticRetryPolicy, intercept);
+    addGeneralMockBehaviourToAbfsClient(spiedClient, exponentialRetryPolicy, 
staticRetryPolicy, intercept, listResponseData1);
     addGeneralMockBehaviourToRestOpAndHttpOp(abfsRestOperation, httpOperation);
 
+    Mockito.doReturn(listResponseData1).doReturn(listResponseData2)
+        .when(spiedClient)
+        .parseListPathResults(any(), any());
+
     functionRaisingIOE.apply(httpOperation);
   }
 
@@ -204,7 +223,8 @@ public static void addMockBehaviourToRestOpAndHttpOp(final 
AbfsRestOperation abf
   public static void addGeneralMockBehaviourToAbfsClient(final AbfsClient 
abfsClient,
                                                          final 
ExponentialRetryPolicy exponentialRetryPolicy,
                                                          final 
StaticRetryPolicy staticRetryPolicy,
-                                                         final 
AbfsThrottlingIntercept intercept) throws IOException, URISyntaxException {
+                                                         final 
AbfsThrottlingIntercept intercept,
+      final ListResponseData listResponseData) throws IOException, 
URISyntaxException {
     Mockito.doReturn(OAuth).when(abfsClient).getAuthType();
     Mockito.doReturn("").when(abfsClient).getAccessToken();
     AbfsConfiguration abfsConfiguration = 
Mockito.mock(AbfsConfiguration.class);
@@ -217,6 +237,7 @@ public static void 
addGeneralMockBehaviourToAbfsClient(final AbfsClient abfsClie
         .when(intercept)
         .sendingRequest(any(), nullable(AbfsCounters.class));
     Mockito.doNothing().when(intercept).updateMetrics(any(), any());
+    
Mockito.doReturn(listResponseData).when(abfsClient).parseListPathResults(any(), 
any());
 
     // Returning correct retry policy based on failure reason
     
Mockito.doReturn(exponentialRetryPolicy).when(abfsClient).getExponentialRetryPolicy();
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRestOperationMockFailures.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRestOperationMockFailures.java
index 8ee3a71f358..4aaa53003d4 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRestOperationMockFailures.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRestOperationMockFailures.java
@@ -201,7 +201,8 @@ public void testRetryPolicyWithDifferentFailureReasons() 
throws Exception {
     StaticRetryPolicy staticRetryPolicy = 
Mockito.mock(StaticRetryPolicy.class);
     AbfsThrottlingIntercept intercept = Mockito.mock(
         AbfsThrottlingIntercept.class);
-    addGeneralMockBehaviourToAbfsClient(abfsClient, exponentialRetryPolicy, 
staticRetryPolicy, intercept);
+    addGeneralMockBehaviourToAbfsClient(abfsClient, exponentialRetryPolicy, 
staticRetryPolicy, intercept, Mockito.mock(
+        ListResponseData.class));
 
     AbfsRestOperation abfsRestOperation = Mockito.spy(new AbfsRestOperation(
         AbfsRestOperationType.ReadFile,
@@ -289,7 +290,8 @@ private void testClientRequestIdForStatusRetry(int status,
     StaticRetryPolicy staticRetryPolicy = 
Mockito.mock(StaticRetryPolicy.class);
     AbfsThrottlingIntercept intercept = Mockito.mock(
         AbfsThrottlingIntercept.class);
-    addGeneralMockBehaviourToAbfsClient(abfsClient, exponentialRetryPolicy, 
staticRetryPolicy, intercept);
+    addGeneralMockBehaviourToAbfsClient(abfsClient, exponentialRetryPolicy, 
staticRetryPolicy, intercept, Mockito.mock(
+        ListResponseData.class));
 
     // Create a readfile operation that will fail
     AbfsRestOperation abfsRestOperation = Mockito.spy(new AbfsRestOperation(
@@ -356,7 +358,8 @@ private void testClientRequestIdForTimeoutRetry(Exception[] 
exceptions,
     StaticRetryPolicy staticRetryPolicy = 
Mockito.mock(StaticRetryPolicy.class);
     AbfsThrottlingIntercept intercept = Mockito.mock(
         AbfsThrottlingIntercept.class);
-    addGeneralMockBehaviourToAbfsClient(abfsClient, exponentialRetryPolicy, 
staticRetryPolicy, intercept);
+    addGeneralMockBehaviourToAbfsClient(abfsClient, exponentialRetryPolicy, 
staticRetryPolicy, intercept, Mockito.mock(
+        ListResponseData.class));
 
     AbfsRestOperation abfsRestOperation = Mockito.spy(new AbfsRestOperation(
         AbfsRestOperationType.ReadFile,
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestListActionTaker.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestListActionTaker.java
index f9c774b7c32..f67ba45360c 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestListActionTaker.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestListActionTaker.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.fs.azurebfs.services;
 
 import java.io.IOException;
+import java.net.URI;
 import java.util.List;
 
 import org.assertj.core.api.Assertions;
@@ -63,9 +64,11 @@ public void 
testProducerResumeOnlyOnConsumerLagBecomesTolerable() throws
     Mockito.doReturn(DEFAULT_FS_AZURE_PRODUCER_QUEUE_MAX_SIZE)
         .when(abfsConfiguration)
         .getProducerQueueMaxSize();
+    ListResponseData listResponseData = Mockito.mock(ListResponseData.class);
     AbfsRestOperation op = Mockito.mock(AbfsRestOperation.class);
     AbfsHttpOperation httpOperation = Mockito.mock(AbfsHttpOperation.class);
     Mockito.doReturn(httpOperation).when(op).getResult();
+    Mockito.doReturn(op).when(listResponseData).getOp();
     BlobListResultSchema listResultSchema = Mockito.mock(
         BlobListResultSchema.class);
     Mockito.doReturn(listResultSchema)
@@ -132,10 +135,10 @@ protected void addPaths(final List<Path> paths,
           occurrences[0]++;
           Assertions.assertThat((int) answer.getArgument(2))
               .isEqualTo(DEFAULT_AZURE_LIST_MAX_RESULTS);
-          return op;
+          return listResponseData;
         }).when(client)
         .listPath(Mockito.anyString(), Mockito.anyBoolean(), Mockito.anyInt(),
-            Mockito.nullable(String.class), Mockito.any(TracingContext.class));
+            Mockito.nullable(String.class), Mockito.any(TracingContext.class), 
Mockito.nullable(URI.class));
 
     listActionTaker.listRecursiveAndTakeAction();
   }
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/DirectoryStateHelper.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/DirectoryStateHelper.java
index 3becb274a48..df65f7f19e2 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/DirectoryStateHelper.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/DirectoryStateHelper.java
@@ -73,7 +73,7 @@ public static boolean isImplicitDirectory(Path path, 
AzureBlobFileSystem fs,
 
     // 2nd condition: listPaths should return some entries.
     AbfsRestOperation op = client.listPath(
-        relativePath, false, 1, null, testTracingContext, false);
+        relativePath, false, 1, null, testTracingContext, null, false).getOp();
     if (op != null && op.getResult() != null) {
       int listSize = op.getResult().getListResultSchema().paths().size();
       if (listSize > 0) {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to