This is an automated email from the ASF dual-hosted git repository.

dazhou pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b033c68  HADOOP-16612. Track Azure Blob File System client-perceived 
latency
b033c68 is described below

commit b033c681e4fc3ee1a38caa807e130aee481d99d5
Author: Jeetesh Mangwani <jeman...@microsoft.com>
AuthorDate: Tue Nov 19 08:51:49 2019 -0800

    HADOOP-16612. Track Azure Blob File System client-perceived latency
    
    Contributed by Jeetesh Mangwani.
    
    This add the ability to track the end-to-end performance of ADLS Gen 2 REST 
APIs by measuring latency in the Hadoop ABFS driver.
    The latency information is sent back to the ADLS Gen 2 REST API endpoints 
in the subsequent requests.
---
 hadoop-tools/hadoop-azure/pom.xml                  |   5 +
 .../hadoop/fs/azurebfs/AbfsConfiguration.java      |  13 +
 .../fs/azurebfs/AzureBlobFileSystemStore.java      | 844 +++++++++++++--------
 .../fs/azurebfs/constants/ConfigurationKeys.java   |   2 +
 .../constants/FileSystemConfigurations.java        |   1 +
 .../constants/HttpHeaderConfigurations.java        |   1 +
 .../contracts/services/AbfsPerfLoggable.java       |  34 +
 .../hadoop/fs/azurebfs/services/AbfsClient.java    |   9 +-
 .../fs/azurebfs/services/AbfsHttpOperation.java    |  46 +-
 .../fs/azurebfs/services/AbfsInputStream.java      |   4 +-
 .../fs/azurebfs/services/AbfsOutputStream.java     |  21 +-
 .../hadoop/fs/azurebfs/services/AbfsPerfInfo.java  | 133 ++++
 .../fs/azurebfs/services/AbfsPerfTracker.java      | 319 ++++++++
 .../fs/azurebfs/services/AbfsRestOperation.java    |   8 +
 .../hadoop-azure/src/site/markdown/abfs.md         |  46 ++
 .../fs/azurebfs/services/TestAbfsClient.java       |   2 +-
 .../fs/azurebfs/services/TestAbfsPerfTracker.java  | 408 ++++++++++
 .../hadoop-azure/src/test/resources/azure-test.xml |   5 +
 18 files changed, 1561 insertions(+), 340 deletions(-)

diff --git a/hadoop-tools/hadoop-azure/pom.xml 
b/hadoop-tools/hadoop-azure/pom.xml
index 1a4250f..f2af5a9 100644
--- a/hadoop-tools/hadoop-azure/pom.xml
+++ b/hadoop-tools/hadoop-azure/pom.xml
@@ -278,6 +278,11 @@
       <artifactId>bcpkix-jdk15on</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.assertj</groupId>
+      <artifactId>assertj-core</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <profiles>
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
index ef2d83f..9b8c156 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
@@ -178,6 +178,10 @@ public class AbfsConfiguration{
       DefaultValue = DEFAULT_USE_UPN)
   private boolean useUpn;
 
+  @BooleanConfigurationValidatorAnnotation(ConfigurationKey = 
FS_AZURE_ABFS_LATENCY_TRACK,
+          DefaultValue = DEFAULT_ABFS_LATENCY_TRACK)
+  private boolean trackLatency;
+
   private Map<String, String> storageAccountKeys;
 
   public AbfsConfiguration(final Configuration rawConfig, String accountName)
@@ -471,6 +475,15 @@ public class AbfsConfiguration{
     return this.useUpn;
   }
 
+  /**
+   * Whether {@code AbfsClient} should track and send latency info back to 
storage servers.
+   *
+   * @return a boolean indicating whether latency should be tracked.
+   */
+  public boolean shouldTrackLatency() {
+    return this.trackLatency;
+  }
+
   public AccessTokenProvider getTokenProvider() throws 
TokenAccessProviderException {
     AuthType authType = getEnum(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, 
AuthType.SharedKey);
     if (authType == AuthType.OAuth) {
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index 7f1bf10..51953d4 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -35,6 +35,7 @@ import java.nio.charset.CharsetEncoder;
 import java.nio.charset.StandardCharsets;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Date;
@@ -82,6 +83,8 @@ import 
org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
 import org.apache.hadoop.fs.azurebfs.services.AuthType;
 import org.apache.hadoop.fs.azurebfs.services.ExponentialRetryPolicy;
 import org.apache.hadoop.fs.azurebfs.services.SharedKeyCredentials;
+import org.apache.hadoop.fs.azurebfs.services.AbfsPerfTracker;
+import org.apache.hadoop.fs.azurebfs.services.AbfsPerfInfo;
 import org.apache.hadoop.fs.azurebfs.utils.Base64;
 import org.apache.hadoop.fs.azurebfs.utils.CRC64;
 import org.apache.hadoop.fs.azurebfs.utils.UriUtils;
@@ -122,6 +125,7 @@ public class AzureBlobFileSystemStore implements Closeable {
   private static final String TOKEN_DATE_PATTERN = 
"yyyy-MM-dd'T'HH:mm:ss.SSSSSSS'Z'";
   private static final String XMS_PROPERTIES_ENCODING = "ISO-8859-1";
   private static final int LIST_MAX_RESULTS = 500;
+  private static final int GET_SET_AGGREGATE_COUNT = 2;
 
   private final AbfsConfiguration abfsConfiguration;
   private final Set<String> azureAtomicRenameDirSet;
@@ -130,6 +134,7 @@ public class AzureBlobFileSystemStore implements Closeable {
   private final AuthType authType;
   private final UserGroupInformation userGroupInformation;
   private final IdentityTransformer identityTransformer;
+  private final AbfsPerfTracker abfsPerfTracker;
 
   public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme, 
Configuration configuration)
           throws IOException {
@@ -162,6 +167,7 @@ public class AzureBlobFileSystemStore implements Closeable {
     this.authType = abfsConfiguration.getAuthType(accountName);
     boolean usingOauth = (authType == AuthType.OAuth);
     boolean useHttps = (usingOauth || abfsConfiguration.isHttpsAlwaysUsed()) ? 
true : isSecureScheme;
+    this.abfsPerfTracker = new AbfsPerfTracker(fileSystemName, accountName, 
this.abfsConfiguration);
     initializeClient(uri, fileSystemName, accountName, useHttps);
     this.identityTransformer = new 
IdentityTransformer(abfsConfiguration.getRawConfiguration());
   }
@@ -211,10 +217,13 @@ public class AzureBlobFileSystemStore implements 
Closeable {
 
   public boolean getIsNamespaceEnabled() throws AzureBlobFileSystemException {
     if (!isNamespaceEnabledSet) {
+
       LOG.debug("Get root ACL status");
-      try {
-        client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + 
AbfsHttpConstants.ROOT_PATH);
+      try (AbfsPerfInfo perfInfo = startTracking("getIsNamespaceEnabled", 
"getAclStatus")) {
+        AbfsRestOperation op = 
client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + 
AbfsHttpConstants.ROOT_PATH);
+        perfInfo.registerResult(op.getResult());
         isNamespaceEnabled = true;
+        perfInfo.registerSuccess(true);
       } catch (AbfsRestOperationException ex) {
         // Get ACL status is a HEAD request, its response doesn't contain 
errorCode
         // So can only rely on its status code to determine its account type.
@@ -265,17 +274,23 @@ public class AzureBlobFileSystemStore implements 
Closeable {
   }
 
   public Hashtable<String, String> getFilesystemProperties() throws 
AzureBlobFileSystemException {
-    LOG.debug("getFilesystemProperties for filesystem: {}",
-            client.getFileSystem());
+    try (AbfsPerfInfo perfInfo = startTracking("getFilesystemProperties",
+            "getFilesystemProperties")) {
+      LOG.debug("getFilesystemProperties for filesystem: {}",
+              client.getFileSystem());
+
+      final Hashtable<String, String> parsedXmsProperties;
 
-    final Hashtable<String, String> parsedXmsProperties;
+      final AbfsRestOperation op = client.getFilesystemProperties();
+      perfInfo.registerResult(op.getResult());
 
-    final AbfsRestOperation op = client.getFilesystemProperties();
-    final String xMsProperties = 
op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_PROPERTIES);
+      final String xMsProperties = 
op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_PROPERTIES);
 
-    parsedXmsProperties = parseCommaSeparatedXmsProperties(xMsProperties);
+      parsedXmsProperties = parseCommaSeparatedXmsProperties(xMsProperties);
+      perfInfo.registerSuccess(true);
 
-    return parsedXmsProperties;
+      return parsedXmsProperties;
+    }
   }
 
   public void setFilesystemProperties(final Hashtable<String, String> 
properties)
@@ -288,159 +303,196 @@ public class AzureBlobFileSystemStore implements 
Closeable {
             client.getFileSystem(),
             properties);
 
-    final String commaSeparatedProperties;
-    try {
-      commaSeparatedProperties = 
convertXmsPropertiesToCommaSeparatedString(properties);
-    } catch (CharacterCodingException ex) {
-      throw new InvalidAbfsRestOperationException(ex);
-    }
+    try (AbfsPerfInfo perfInfo = startTracking("setFilesystemProperties",
+            "setFilesystemProperties")) {
+      final String commaSeparatedProperties;
+      try {
+        commaSeparatedProperties = 
convertXmsPropertiesToCommaSeparatedString(properties);
+      } catch (CharacterCodingException ex) {
+        throw new InvalidAbfsRestOperationException(ex);
+      }
 
-    client.setFilesystemProperties(commaSeparatedProperties);
+      final AbfsRestOperation op = 
client.setFilesystemProperties(commaSeparatedProperties);
+      perfInfo.registerResult(op.getResult()).registerSuccess(true);
+    }
   }
 
   public Hashtable<String, String> getPathStatus(final Path path) throws 
AzureBlobFileSystemException {
-    LOG.debug("getPathStatus for filesystem: {} path: {}",
-            client.getFileSystem(),
-           path);
+    try (AbfsPerfInfo perfInfo = startTracking("getPathStatus", 
"getPathStatus")){
+      LOG.debug("getPathStatus for filesystem: {} path: {}",
+              client.getFileSystem(),
+              path);
+
+      final Hashtable<String, String> parsedXmsProperties;
+      final AbfsRestOperation op = 
client.getPathStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path));
+      perfInfo.registerResult(op.getResult());
 
-    final Hashtable<String, String> parsedXmsProperties;
-    final AbfsRestOperation op = 
client.getPathStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path));
+      final String xMsProperties = 
op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_PROPERTIES);
 
-    final String xMsProperties = 
op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_PROPERTIES);
+      parsedXmsProperties = parseCommaSeparatedXmsProperties(xMsProperties);
 
-    parsedXmsProperties = parseCommaSeparatedXmsProperties(xMsProperties);
+      perfInfo.registerSuccess(true);
 
-    return parsedXmsProperties;
+      return parsedXmsProperties;
+    }
   }
 
   public void setPathProperties(final Path path, final Hashtable<String, 
String> properties) throws AzureBlobFileSystemException {
-    LOG.debug("setFilesystemProperties for filesystem: {} path: {} with 
properties: {}",
-            client.getFileSystem(),
-            path,
-            properties);
+    try (AbfsPerfInfo perfInfo = startTracking("setPathProperties", 
"setPathProperties")){
+      LOG.debug("setFilesystemProperties for filesystem: {} path: {} with 
properties: {}",
+              client.getFileSystem(),
+              path,
+              properties);
 
-    final String commaSeparatedProperties;
-    try {
-      commaSeparatedProperties = 
convertXmsPropertiesToCommaSeparatedString(properties);
-    } catch (CharacterCodingException ex) {
-      throw new InvalidAbfsRestOperationException(ex);
+      final String commaSeparatedProperties;
+      try {
+        commaSeparatedProperties = 
convertXmsPropertiesToCommaSeparatedString(properties);
+      } catch (CharacterCodingException ex) {
+        throw new InvalidAbfsRestOperationException(ex);
+      }
+      final AbfsRestOperation op = 
client.setPathProperties(AbfsHttpConstants.FORWARD_SLASH + 
getRelativePath(path), commaSeparatedProperties);
+      perfInfo.registerResult(op.getResult()).registerSuccess(true);
     }
-    client.setPathProperties(AbfsHttpConstants.FORWARD_SLASH + 
getRelativePath(path), commaSeparatedProperties);
   }
 
   public void createFilesystem() throws AzureBlobFileSystemException {
-    LOG.debug("createFilesystem for filesystem: {}",
-            client.getFileSystem());
+    try (AbfsPerfInfo perfInfo = startTracking("createFilesystem", 
"createFilesystem")){
+      LOG.debug("createFilesystem for filesystem: {}",
+              client.getFileSystem());
 
-    client.createFilesystem();
+      final AbfsRestOperation op = client.createFilesystem();
+      perfInfo.registerResult(op.getResult()).registerSuccess(true);
+    }
   }
 
   public void deleteFilesystem() throws AzureBlobFileSystemException {
-    LOG.debug("deleteFilesystem for filesystem: {}",
-            client.getFileSystem());
+    try (AbfsPerfInfo perfInfo = startTracking("deleteFilesystem", 
"deleteFilesystem")) {
+      LOG.debug("deleteFilesystem for filesystem: {}",
+              client.getFileSystem());
 
-    client.deleteFilesystem();
+      final AbfsRestOperation op = client.deleteFilesystem();
+      perfInfo.registerResult(op.getResult()).registerSuccess(true);
+    }
   }
 
   public OutputStream createFile(final Path path, final boolean overwrite, 
final FsPermission permission,
                                  final FsPermission umask) throws 
AzureBlobFileSystemException {
-    boolean isNamespaceEnabled = getIsNamespaceEnabled();
-    LOG.debug("createFile filesystem: {} path: {} overwrite: {} permission: {} 
umask: {} isNamespaceEnabled: {}",
-            client.getFileSystem(),
-            path,
-            overwrite,
-            permission.toString(),
-            umask.toString(),
-            isNamespaceEnabled);
-
-    client.createPath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), 
true, overwrite,
-        isNamespaceEnabled ? getOctalNotation(permission) : null,
-        isNamespaceEnabled ? getOctalNotation(umask) : null);
-
-    return new AbfsOutputStream(
-        client,
-        AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
-        0,
-        abfsConfiguration.getWriteBufferSize(),
-        abfsConfiguration.isFlushEnabled(),
-        abfsConfiguration.isOutputStreamFlushDisabled());
+    try (AbfsPerfInfo perfInfo = startTracking("createFile", "createPath")) {
+      boolean isNamespaceEnabled = getIsNamespaceEnabled();
+      LOG.debug("createFile filesystem: {} path: {} overwrite: {} permission: 
{} umask: {} isNamespaceEnabled: {}",
+              client.getFileSystem(),
+              path,
+              overwrite,
+              permission.toString(),
+              umask.toString(),
+              isNamespaceEnabled);
+
+      final AbfsRestOperation op = 
client.createPath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), 
true, overwrite,
+              isNamespaceEnabled ? getOctalNotation(permission) : null,
+              isNamespaceEnabled ? getOctalNotation(umask) : null);
+      perfInfo.registerResult(op.getResult()).registerSuccess(true);
+
+      return new AbfsOutputStream(
+              client,
+              AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
+              0,
+              abfsConfiguration.getWriteBufferSize(),
+              abfsConfiguration.isFlushEnabled(),
+              abfsConfiguration.isOutputStreamFlushDisabled());
+    }
   }
 
   public void createDirectory(final Path path, final FsPermission permission, 
final FsPermission umask)
       throws AzureBlobFileSystemException {
-    boolean isNamespaceEnabled = getIsNamespaceEnabled();
-    LOG.debug("createDirectory filesystem: {} path: {} permission: {} umask: 
{} isNamespaceEnabled: {}",
-            client.getFileSystem(),
-            path,
-            permission,
-            umask,
-            isNamespaceEnabled);
-
-    client.createPath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), 
false, true,
-        isNamespaceEnabled ? getOctalNotation(permission) : null,
-        isNamespaceEnabled ? getOctalNotation(umask) : null);
+    try (AbfsPerfInfo perfInfo = startTracking("createDirectory", 
"createPath")) {
+      boolean isNamespaceEnabled = getIsNamespaceEnabled();
+      LOG.debug("createDirectory filesystem: {} path: {} permission: {} umask: 
{} isNamespaceEnabled: {}",
+              client.getFileSystem(),
+              path,
+              permission,
+              umask,
+              isNamespaceEnabled);
+
+      final AbfsRestOperation op = 
client.createPath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), 
false, true,
+              isNamespaceEnabled ? getOctalNotation(permission) : null,
+              isNamespaceEnabled ? getOctalNotation(umask) : null);
+      perfInfo.registerResult(op.getResult()).registerSuccess(true);
+    }
   }
 
   public AbfsInputStream openFileForRead(final Path path, final 
FileSystem.Statistics statistics)
       throws AzureBlobFileSystemException {
-    LOG.debug("openFileForRead filesystem: {} path: {}",
-            client.getFileSystem(),
-            path);
+    try (AbfsPerfInfo perfInfo = startTracking("openFileForRead", 
"getPathStatus")) {
+      LOG.debug("openFileForRead filesystem: {} path: {}",
+              client.getFileSystem(),
+              path);
 
-    final AbfsRestOperation op = 
client.getPathStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path));
+      final AbfsRestOperation op = 
client.getPathStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path));
+      perfInfo.registerResult(op.getResult());
 
-    final String resourceType = 
op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
-    final long contentLength = 
Long.parseLong(op.getResult().getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH));
-    final String eTag = 
op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
+      final String resourceType = 
op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
+      final long contentLength = 
Long.parseLong(op.getResult().getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH));
+      final String eTag = 
op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
 
-    if (parseIsDirectory(resourceType)) {
-      throw new AbfsRestOperationException(
-              AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(),
-              AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(),
-              "openFileForRead must be used with files and not directories",
-              null);
-    }
+      if (parseIsDirectory(resourceType)) {
+        throw new AbfsRestOperationException(
+                AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(),
+                AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(),
+                "openFileForRead must be used with files and not directories",
+                null);
+      }
+
+      perfInfo.registerSuccess(true);
 
-    // Add statistics for InputStream
-    return new AbfsInputStream(client, statistics,
-            AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), 
contentLength,
-                abfsConfiguration.getReadBufferSize(), 
abfsConfiguration.getReadAheadQueueDepth(),
-                abfsConfiguration.getTolerateOobAppends(), eTag);
+      // Add statistics for InputStream
+      return new AbfsInputStream(client, statistics,
+              AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), 
contentLength,
+              abfsConfiguration.getReadBufferSize(), 
abfsConfiguration.getReadAheadQueueDepth(),
+              abfsConfiguration.getTolerateOobAppends(), eTag);
+    }
   }
 
   public OutputStream openFileForWrite(final Path path, final boolean 
overwrite) throws
           AzureBlobFileSystemException {
-    LOG.debug("openFileForWrite filesystem: {} path: {} overwrite: {}",
-            client.getFileSystem(),
-            path,
-            overwrite);
+    try (AbfsPerfInfo perfInfo = startTracking("openFileForWrite", 
"getPathStatus")) {
+      LOG.debug("openFileForWrite filesystem: {} path: {} overwrite: {}",
+              client.getFileSystem(),
+              path,
+              overwrite);
 
-    final AbfsRestOperation op = 
client.getPathStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path));
+      final AbfsRestOperation op = 
client.getPathStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path));
+      perfInfo.registerResult(op.getResult());
 
-    final String resourceType = 
op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
-    final Long contentLength = 
Long.valueOf(op.getResult().getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH));
+      final String resourceType = 
op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
+      final Long contentLength = 
Long.valueOf(op.getResult().getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH));
 
-    if (parseIsDirectory(resourceType)) {
-      throw new AbfsRestOperationException(
-              AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(),
-              AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(),
-              "openFileForRead must be used with files and not directories",
-              null);
-    }
+      if (parseIsDirectory(resourceType)) {
+        throw new AbfsRestOperationException(
+                AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(),
+                AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(),
+                "openFileForRead must be used with files and not directories",
+                null);
+      }
 
-    final long offset = overwrite ? 0 : contentLength;
+      final long offset = overwrite ? 0 : contentLength;
 
-    return new AbfsOutputStream(
-        client,
-        AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
-        offset,
-        abfsConfiguration.getWriteBufferSize(),
-        abfsConfiguration.isFlushEnabled(),
-        abfsConfiguration.isOutputStreamFlushDisabled());
+      perfInfo.registerSuccess(true);
+
+      return new AbfsOutputStream(
+              client,
+              AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
+              offset,
+              abfsConfiguration.getWriteBufferSize(),
+              abfsConfiguration.isFlushEnabled(),
+              abfsConfiguration.isOutputStreamFlushDisabled());
+    }
   }
 
   public void rename(final Path source, final Path destination) throws
           AzureBlobFileSystemException {
+    final Instant startAggregate = abfsPerfTracker.getLatencyInstant();
+    long countAggregate = 0;
+    boolean shouldContinue;
 
     if (isAtomicRenameKey(source.getName())) {
       LOG.warn("The atomic rename feature is not supported by the ABFS scheme; 
however rename,"
@@ -455,15 +507,28 @@ public class AzureBlobFileSystemStore implements 
Closeable {
     String continuation = null;
 
     do {
-      AbfsRestOperation op = client.renamePath(AbfsHttpConstants.FORWARD_SLASH 
+ getRelativePath(source),
-              AbfsHttpConstants.FORWARD_SLASH + getRelativePath(destination), 
continuation);
-      continuation = 
op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION);
-
-    } while (continuation != null && !continuation.isEmpty());
+      try (AbfsPerfInfo perfInfo = startTracking("rename", "renamePath")) {
+        AbfsRestOperation op = 
client.renamePath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(source),
+                AbfsHttpConstants.FORWARD_SLASH + 
getRelativePath(destination), continuation);
+        perfInfo.registerResult(op.getResult());
+        continuation = 
op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION);
+        perfInfo.registerSuccess(true);
+        countAggregate++;
+        shouldContinue = continuation != null && !continuation.isEmpty();
+
+        if (!shouldContinue) {
+          perfInfo.registerAggregates(startAggregate, countAggregate);
+        }
+      }
+    } while (shouldContinue);
   }
 
   public void delete(final Path path, final boolean recursive)
       throws AzureBlobFileSystemException {
+    final Instant startAggregate = abfsPerfTracker.getLatencyInstant();
+    long countAggregate = 0;
+    boolean shouldContinue = true;
+
     LOG.debug("delete filesystem: {} path: {} recursive: {}",
             client.getFileSystem(),
             path,
@@ -472,70 +537,89 @@ public class AzureBlobFileSystemStore implements 
Closeable {
     String continuation = null;
 
     do {
-      AbfsRestOperation op = client.deletePath(
-          AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), recursive, 
continuation);
-      continuation = 
op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION);
-
-    } while (continuation != null && !continuation.isEmpty());
+      try (AbfsPerfInfo perfInfo = startTracking("delete", "deletePath")) {
+        AbfsRestOperation op = client.deletePath(
+                AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), 
recursive, continuation);
+        perfInfo.registerResult(op.getResult());
+        continuation = 
op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION);
+        perfInfo.registerSuccess(true);
+        countAggregate++;
+        shouldContinue = continuation != null && !continuation.isEmpty();
+
+        if (!shouldContinue) {
+          perfInfo.registerAggregates(startAggregate, countAggregate);
+        }
+      }
+    } while (shouldContinue);
   }
 
   public FileStatus getFileStatus(final Path path) throws IOException {
-    boolean isNamespaceEnabled = getIsNamespaceEnabled();
-    LOG.debug("getFileStatus filesystem: {} path: {} isNamespaceEnabled: {}",
-            client.getFileSystem(),
-            path,
-            isNamespaceEnabled);
-
-    final AbfsRestOperation op;
-    if (path.isRoot()) {
-      op = isNamespaceEnabled
-              ? client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + 
AbfsHttpConstants.ROOT_PATH)
-              : client.getFilesystemProperties();
-    } else {
-      op = client.getPathStatus(AbfsHttpConstants.FORWARD_SLASH + 
getRelativePath(path));
-    }
-
-    final long blockSize = abfsConfiguration.getAzureBlockSize();
-    final AbfsHttpOperation result = op.getResult();
-
-    final String eTag = 
result.getResponseHeader(HttpHeaderConfigurations.ETAG);
-    final String lastModified = 
result.getResponseHeader(HttpHeaderConfigurations.LAST_MODIFIED);
-    final String permissions = 
result.getResponseHeader((HttpHeaderConfigurations.X_MS_PERMISSIONS));
-    final boolean hasAcl = AbfsPermission.isExtendedAcl(permissions);
-    final long contentLength;
-    final boolean resourceIsDir;
+    try (AbfsPerfInfo perfInfo = startTracking("getFileStatus", 
"undetermined")) {
+      boolean isNamespaceEnabled = getIsNamespaceEnabled();
+      LOG.debug("getFileStatus filesystem: {} path: {} isNamespaceEnabled: {}",
+              client.getFileSystem(),
+              path,
+              isNamespaceEnabled);
+
+      final AbfsRestOperation op;
+      if (path.isRoot()) {
+        if (isNamespaceEnabled) {
+          perfInfo.registerCallee("getAclStatus");
+          op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + 
AbfsHttpConstants.ROOT_PATH);
+        } else {
+          perfInfo.registerCallee("getFilesystemProperties");
+          op = client.getFilesystemProperties();
+        }
+      } else {
+        perfInfo.registerCallee("getPathStatus");
+        op = client.getPathStatus(AbfsHttpConstants.FORWARD_SLASH + 
getRelativePath(path));
+      }
 
-    if (path.isRoot()) {
-      contentLength = 0;
-      resourceIsDir = true;
-    } else {
-      contentLength = 
parseContentLength(result.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH));
-      resourceIsDir = 
parseIsDirectory(result.getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE));
-    }
+      perfInfo.registerResult(op.getResult());
+      final long blockSize = abfsConfiguration.getAzureBlockSize();
+      final AbfsHttpOperation result = op.getResult();
+
+      final String eTag = 
result.getResponseHeader(HttpHeaderConfigurations.ETAG);
+      final String lastModified = 
result.getResponseHeader(HttpHeaderConfigurations.LAST_MODIFIED);
+      final String permissions = 
result.getResponseHeader((HttpHeaderConfigurations.X_MS_PERMISSIONS));
+      final boolean hasAcl = AbfsPermission.isExtendedAcl(permissions);
+      final long contentLength;
+      final boolean resourceIsDir;
+
+      if (path.isRoot()) {
+        contentLength = 0;
+        resourceIsDir = true;
+      } else {
+        contentLength = 
parseContentLength(result.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH));
+        resourceIsDir = 
parseIsDirectory(result.getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE));
+      }
 
-    final String transformedOwner = 
identityTransformer.transformIdentityForGetRequest(
+      final String transformedOwner = 
identityTransformer.transformIdentityForGetRequest(
               result.getResponseHeader(HttpHeaderConfigurations.X_MS_OWNER),
               true,
               userName);
 
-    final String transformedGroup = 
identityTransformer.transformIdentityForGetRequest(
+      final String transformedGroup = 
identityTransformer.transformIdentityForGetRequest(
               result.getResponseHeader(HttpHeaderConfigurations.X_MS_GROUP),
               false,
               primaryUserGroup);
 
-    return new VersionedFileStatus(
-            transformedOwner,
-            transformedGroup,
-            permissions == null ? new AbfsPermission(FsAction.ALL, 
FsAction.ALL, FsAction.ALL)
-                    : AbfsPermission.valueOf(permissions),
-            hasAcl,
-            contentLength,
-            resourceIsDir,
-            1,
-            blockSize,
-            parseLastModifiedTime(lastModified),
-            path,
-            eTag);
+      perfInfo.registerSuccess(true);
+
+      return new VersionedFileStatus(
+              transformedOwner,
+              transformedGroup,
+              permissions == null ? new AbfsPermission(FsAction.ALL, 
FsAction.ALL, FsAction.ALL)
+                      : AbfsPermission.valueOf(permissions),
+              hasAcl,
+              contentLength,
+              resourceIsDir,
+              1,
+              blockSize,
+              parseLastModifiedTime(lastModified),
+              path,
+              eTag);
+    }
   }
 
   /**
@@ -559,6 +643,10 @@ public class AzureBlobFileSystemStore implements Closeable 
{
    * */
   @InterfaceStability.Unstable
   public FileStatus[] listStatus(final Path path, final String startFrom) 
throws IOException {
+    final Instant startAggregate = abfsPerfTracker.getLatencyInstant();
+    long countAggregate = 0;
+    boolean shouldContinue = true;
+
     LOG.debug("listStatus filesystem: {} path: {}, startFrom: {}",
             client.getFileSystem(),
             path,
@@ -576,53 +664,63 @@ public class AzureBlobFileSystemStore implements 
Closeable {
 
     ArrayList<FileStatus> fileStatuses = new ArrayList<>();
     do {
-      AbfsRestOperation op = client.listPath(relativePath, false, 
LIST_MAX_RESULTS, continuation);
-      continuation = 
op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION);
-      ListResultSchema retrievedSchema = op.getResult().getListResultSchema();
-      if (retrievedSchema == null) {
-        throw new AbfsRestOperationException(
-                AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(),
-                AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(),
-                "listStatusAsync path not found",
-                null, op.getResult());
-      }
+      try (AbfsPerfInfo perfInfo = startTracking("listStatus", "listPath")) {
+        AbfsRestOperation op = client.listPath(relativePath, false, 
LIST_MAX_RESULTS, continuation);
+        perfInfo.registerResult(op.getResult());
+        continuation = 
op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION);
+        ListResultSchema retrievedSchema = 
op.getResult().getListResultSchema();
+        if (retrievedSchema == null) {
+          throw new AbfsRestOperationException(
+                  AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(),
+                  AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(),
+                  "listStatusAsync path not found",
+                  null, op.getResult());
+        }
 
-      long blockSize = abfsConfiguration.getAzureBlockSize();
-
-      for (ListResultEntrySchema entry : retrievedSchema.paths()) {
-        final String owner = 
identityTransformer.transformIdentityForGetRequest(entry.owner(), true, 
userName);
-        final String group = 
identityTransformer.transformIdentityForGetRequest(entry.group(), false, 
primaryUserGroup);
-        final FsPermission fsPermission = entry.permissions() == null
-                ? new AbfsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)
-                : AbfsPermission.valueOf(entry.permissions());
-        final boolean hasAcl = 
AbfsPermission.isExtendedAcl(entry.permissions());
-
-        long lastModifiedMillis = 0;
-        long contentLength = entry.contentLength() == null ? 0 : 
entry.contentLength();
-        boolean isDirectory = entry.isDirectory() == null ? false : 
entry.isDirectory();
-        if (entry.lastModified() != null && !entry.lastModified().isEmpty()) {
-          lastModifiedMillis = parseLastModifiedTime(entry.lastModified());
+        long blockSize = abfsConfiguration.getAzureBlockSize();
+
+        for (ListResultEntrySchema entry : retrievedSchema.paths()) {
+          final String owner = 
identityTransformer.transformIdentityForGetRequest(entry.owner(), true, 
userName);
+          final String group = 
identityTransformer.transformIdentityForGetRequest(entry.group(), false, 
primaryUserGroup);
+          final FsPermission fsPermission = entry.permissions() == null
+                  ? new AbfsPermission(FsAction.ALL, FsAction.ALL, 
FsAction.ALL)
+                  : AbfsPermission.valueOf(entry.permissions());
+          final boolean hasAcl = 
AbfsPermission.isExtendedAcl(entry.permissions());
+
+          long lastModifiedMillis = 0;
+          long contentLength = entry.contentLength() == null ? 0 : 
entry.contentLength();
+          boolean isDirectory = entry.isDirectory() == null ? false : 
entry.isDirectory();
+          if (entry.lastModified() != null && !entry.lastModified().isEmpty()) 
{
+            lastModifiedMillis = parseLastModifiedTime(entry.lastModified());
+          }
+
+          Path entryPath = new Path(File.separator + entry.name());
+          entryPath = entryPath.makeQualified(this.uri, entryPath);
+
+          fileStatuses.add(
+                  new VersionedFileStatus(
+                          owner,
+                          group,
+                          fsPermission,
+                          hasAcl,
+                          contentLength,
+                          isDirectory,
+                          1,
+                          blockSize,
+                          lastModifiedMillis,
+                          entryPath,
+                          entry.eTag()));
         }
 
-        Path entryPath = new Path(File.separator + entry.name());
-        entryPath = entryPath.makeQualified(this.uri, entryPath);
-
-        fileStatuses.add(
-                new VersionedFileStatus(
-                        owner,
-                        group,
-                        fsPermission,
-                        hasAcl,
-                        contentLength,
-                        isDirectory,
-                        1,
-                        blockSize,
-                        lastModifiedMillis,
-                        entryPath,
-                        entry.eTag()));
-      }
+        perfInfo.registerSuccess(true);
+        countAggregate++;
+        shouldContinue = continuation != null && !continuation.isEmpty();
 
-    } while (continuation != null && !continuation.isEmpty());
+        if (!shouldContinue) {
+          perfInfo.registerAggregates(startAggregate, countAggregate);
+        }
+      }
+    } while (shouldContinue);
 
     return fileStatuses.toArray(new FileStatus[fileStatuses.size()]);
   }
@@ -689,17 +787,25 @@ public class AzureBlobFileSystemStore implements 
Closeable {
           "This operation is only valid for storage accounts with the 
hierarchical namespace enabled.");
     }
 
-    LOG.debug(
-            "setOwner filesystem: {} path: {} owner: {} group: {}",
-            client.getFileSystem(),
-            path.toString(),
-            owner,
-            group);
+    try (AbfsPerfInfo perfInfo = startTracking("setOwner", "setOwner")) {
+
+      LOG.debug(
+              "setOwner filesystem: {} path: {} owner: {} group: {}",
+              client.getFileSystem(),
+              path.toString(),
+              owner,
+              group);
+
+      final String transformedOwner = 
identityTransformer.transformUserOrGroupForSetRequest(owner);
+      final String transformedGroup = 
identityTransformer.transformUserOrGroupForSetRequest(group);
 
-    final String transformedOwner = 
identityTransformer.transformUserOrGroupForSetRequest(owner);
-    final String transformedGroup = 
identityTransformer.transformUserOrGroupForSetRequest(group);
+      final AbfsRestOperation op = client.setOwner(
+              AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true),
+              transformedOwner,
+              transformedGroup);
 
-    client.setOwner(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, 
true), transformedOwner, transformedGroup);
+      perfInfo.registerResult(op.getResult()).registerSuccess(true);
+    }
   }
 
   public void setPermission(final Path path, final FsPermission permission) 
throws
@@ -709,13 +815,20 @@ public class AzureBlobFileSystemStore implements 
Closeable {
           "This operation is only valid for storage accounts with the 
hierarchical namespace enabled.");
     }
 
-    LOG.debug(
-            "setPermission filesystem: {} path: {} permission: {}",
-            client.getFileSystem(),
-            path.toString(),
-            permission.toString());
-    client.setPermission(AbfsHttpConstants.FORWARD_SLASH + 
getRelativePath(path, true),
-            String.format(AbfsHttpConstants.PERMISSION_FORMAT, 
permission.toOctal()));
+    try (AbfsPerfInfo perfInfo = startTracking("setPermission", 
"setPermission")) {
+
+      LOG.debug(
+              "setPermission filesystem: {} path: {} permission: {}",
+              client.getFileSystem(),
+              path.toString(),
+              permission.toString());
+
+      final AbfsRestOperation op = client.setPermission(
+              AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true),
+              String.format(AbfsHttpConstants.PERMISSION_FORMAT, 
permission.toOctal()));
+
+      perfInfo.registerResult(op.getResult()).registerSuccess(true);
+    }
   }
 
   public void modifyAclEntries(final Path path, final List<AclEntry> aclSpec) 
throws
@@ -725,25 +838,37 @@ public class AzureBlobFileSystemStore implements 
Closeable {
           "This operation is only valid for storage accounts with the 
hierarchical namespace enabled.");
     }
 
-    LOG.debug(
-            "modifyAclEntries filesystem: {} path: {} aclSpec: {}",
-            client.getFileSystem(),
-            path.toString(),
-            AclEntry.aclSpecToString(aclSpec));
+    try (AbfsPerfInfo perfInfoGet = startTracking("modifyAclEntries", 
"getAclStatus")) {
+
+      LOG.debug(
+              "modifyAclEntries filesystem: {} path: {} aclSpec: {}",
+              client.getFileSystem(),
+              path.toString(),
+              AclEntry.aclSpecToString(aclSpec));
+
+      identityTransformer.transformAclEntriesForSetRequest(aclSpec);
+      final Map<String, String> modifyAclEntries = 
AbfsAclHelper.deserializeAclSpec(AclEntry.aclSpecToString(aclSpec));
+      boolean useUpn = AbfsAclHelper.isUpnFormatAclEntries(modifyAclEntries);
 
-    identityTransformer.transformAclEntriesForSetRequest(aclSpec);
-    final Map<String, String> modifyAclEntries = 
AbfsAclHelper.deserializeAclSpec(AclEntry.aclSpecToString(aclSpec));
-    boolean useUpn = AbfsAclHelper.isUpnFormatAclEntries(modifyAclEntries);
+      final AbfsRestOperation op = 
client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, 
true), useUpn);
+      perfInfoGet.registerResult(op.getResult());
+      final String eTag = 
op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
 
-    final AbfsRestOperation op = 
client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, 
true), useUpn);
-    final String eTag = 
op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
+      final Map<String, String> aclEntries = 
AbfsAclHelper.deserializeAclSpec(op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL));
 
-    final Map<String, String> aclEntries = 
AbfsAclHelper.deserializeAclSpec(op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL));
+      AbfsAclHelper.modifyAclEntriesInternal(aclEntries, modifyAclEntries);
 
-    AbfsAclHelper.modifyAclEntriesInternal(aclEntries, modifyAclEntries);
+      perfInfoGet.registerSuccess(true).finishTracking();
 
-    client.setAcl(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, 
true),
-        AbfsAclHelper.serializeAclSpec(aclEntries), eTag);
+      try (AbfsPerfInfo perfInfoSet = startTracking("modifyAclEntries", 
"setAcl")) {
+        final AbfsRestOperation setAclOp
+                = client.setAcl(AbfsHttpConstants.FORWARD_SLASH + 
getRelativePath(path, true),
+                AbfsAclHelper.serializeAclSpec(aclEntries), eTag);
+        perfInfoSet.registerResult(setAclOp.getResult())
+                .registerSuccess(true)
+                .registerAggregates(perfInfoGet.getTrackingStart(), 
GET_SET_AGGREGATE_COUNT);
+      }
+    }
   }
 
   public void removeAclEntries(final Path path, final List<AclEntry> aclSpec) 
throws AzureBlobFileSystemException {
@@ -752,25 +877,37 @@ public class AzureBlobFileSystemStore implements 
Closeable {
           "This operation is only valid for storage accounts with the 
hierarchical namespace enabled.");
     }
 
-    LOG.debug(
-            "removeAclEntries filesystem: {} path: {} aclSpec: {}",
-            client.getFileSystem(),
-            path.toString(),
-            AclEntry.aclSpecToString(aclSpec));
+    try (AbfsPerfInfo perfInfoGet = startTracking("removeAclEntries", 
"getAclStatus")) {
+
+      LOG.debug(
+              "removeAclEntries filesystem: {} path: {} aclSpec: {}",
+              client.getFileSystem(),
+              path.toString(),
+              AclEntry.aclSpecToString(aclSpec));
 
-    identityTransformer.transformAclEntriesForSetRequest(aclSpec);
-    final Map<String, String> removeAclEntries = 
AbfsAclHelper.deserializeAclSpec(AclEntry.aclSpecToString(aclSpec));
-    boolean isUpnFormat = 
AbfsAclHelper.isUpnFormatAclEntries(removeAclEntries);
+      identityTransformer.transformAclEntriesForSetRequest(aclSpec);
+      final Map<String, String> removeAclEntries = 
AbfsAclHelper.deserializeAclSpec(AclEntry.aclSpecToString(aclSpec));
+      boolean isUpnFormat = 
AbfsAclHelper.isUpnFormatAclEntries(removeAclEntries);
 
-    final AbfsRestOperation op = 
client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, 
true), isUpnFormat);
-    final String eTag = 
op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
+      final AbfsRestOperation op = 
client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, 
true), isUpnFormat);
+      perfInfoGet.registerResult(op.getResult());
+      final String eTag = 
op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
 
-    final Map<String, String> aclEntries = 
AbfsAclHelper.deserializeAclSpec(op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL));
+      final Map<String, String> aclEntries = 
AbfsAclHelper.deserializeAclSpec(op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL));
 
-    AbfsAclHelper.removeAclEntriesInternal(aclEntries, removeAclEntries);
+      AbfsAclHelper.removeAclEntriesInternal(aclEntries, removeAclEntries);
 
-    client.setAcl(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, 
true),
-            AbfsAclHelper.serializeAclSpec(aclEntries), eTag);
+      perfInfoGet.registerSuccess(true).finishTracking();
+
+      try (AbfsPerfInfo perfInfoSet = startTracking("removeAclEntries", 
"setAcl")) {
+        final AbfsRestOperation setAclOp =
+                client.setAcl(AbfsHttpConstants.FORWARD_SLASH + 
getRelativePath(path, true),
+                AbfsAclHelper.serializeAclSpec(aclEntries), eTag);
+        perfInfoSet.registerResult(setAclOp.getResult())
+                .registerSuccess(true)
+                .registerAggregates(perfInfoGet.getTrackingStart(), 
GET_SET_AGGREGATE_COUNT);
+      }
+    }
   }
 
   public void removeDefaultAcl(final Path path) throws 
AzureBlobFileSystemException {
@@ -779,26 +916,38 @@ public class AzureBlobFileSystemStore implements 
Closeable {
           "This operation is only valid for storage accounts with the 
hierarchical namespace enabled.");
     }
 
-    LOG.debug(
-            "removeDefaultAcl filesystem: {} path: {}",
-            client.getFileSystem(),
-            path.toString());
+    try (AbfsPerfInfo perfInfoGet = startTracking("removeDefaultAcl", 
"getAclStatus")) {
 
-    final AbfsRestOperation op = 
client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, 
true));
-    final String eTag = 
op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
-    final Map<String, String> aclEntries = 
AbfsAclHelper.deserializeAclSpec(op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL));
-    final Map<String, String> defaultAclEntries = new HashMap<>();
+      LOG.debug(
+              "removeDefaultAcl filesystem: {} path: {}",
+              client.getFileSystem(),
+              path.toString());
 
-    for (Map.Entry<String, String> aclEntry : aclEntries.entrySet()) {
-      if (aclEntry.getKey().startsWith("default:")) {
-        defaultAclEntries.put(aclEntry.getKey(), aclEntry.getValue());
+      final AbfsRestOperation op = 
client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, 
true));
+      perfInfoGet.registerResult(op.getResult());
+      final String eTag = 
op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
+      final Map<String, String> aclEntries = 
AbfsAclHelper.deserializeAclSpec(op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL));
+      final Map<String, String> defaultAclEntries = new HashMap<>();
+
+      for (Map.Entry<String, String> aclEntry : aclEntries.entrySet()) {
+        if (aclEntry.getKey().startsWith("default:")) {
+          defaultAclEntries.put(aclEntry.getKey(), aclEntry.getValue());
+        }
       }
-    }
 
-    aclEntries.keySet().removeAll(defaultAclEntries.keySet());
+      aclEntries.keySet().removeAll(defaultAclEntries.keySet());
 
-    client.setAcl(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, 
true),
-        AbfsAclHelper.serializeAclSpec(aclEntries), eTag);
+      perfInfoGet.registerSuccess(true).finishTracking();
+
+      try (AbfsPerfInfo perfInfoSet = startTracking("removeDefaultAcl", 
"setAcl")) {
+        final AbfsRestOperation setAclOp =
+                client.setAcl(AbfsHttpConstants.FORWARD_SLASH + 
getRelativePath(path, true),
+                AbfsAclHelper.serializeAclSpec(aclEntries), eTag);
+        perfInfoSet.registerResult(setAclOp.getResult())
+                .registerSuccess(true)
+                .registerAggregates(perfInfoGet.getTrackingStart(), 
GET_SET_AGGREGATE_COUNT);
+      }
+    }
   }
 
   public void removeAcl(final Path path) throws AzureBlobFileSystemException {
@@ -807,22 +956,35 @@ public class AzureBlobFileSystemStore implements 
Closeable {
           "This operation is only valid for storage accounts with the 
hierarchical namespace enabled.");
     }
 
-    LOG.debug(
-            "removeAcl filesystem: {} path: {}",
-            client.getFileSystem(),
-            path.toString());
-    final AbfsRestOperation op = 
client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, 
true));
-    final String eTag = 
op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
+    try (AbfsPerfInfo perfInfoGet = startTracking("removeAcl", 
"getAclStatus")){
+
+      LOG.debug(
+              "removeAcl filesystem: {} path: {}",
+              client.getFileSystem(),
+              path.toString());
+
+      final AbfsRestOperation op = 
client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, 
true));
+      perfInfoGet.registerResult(op.getResult());
+      final String eTag = 
op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
 
-    final Map<String, String> aclEntries = 
AbfsAclHelper.deserializeAclSpec(op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL));
-    final Map<String, String> newAclEntries = new HashMap<>();
+      final Map<String, String> aclEntries = 
AbfsAclHelper.deserializeAclSpec(op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL));
+      final Map<String, String> newAclEntries = new HashMap<>();
 
-    newAclEntries.put(AbfsHttpConstants.ACCESS_USER, 
aclEntries.get(AbfsHttpConstants.ACCESS_USER));
-    newAclEntries.put(AbfsHttpConstants.ACCESS_GROUP, 
aclEntries.get(AbfsHttpConstants.ACCESS_GROUP));
-    newAclEntries.put(AbfsHttpConstants.ACCESS_OTHER, 
aclEntries.get(AbfsHttpConstants.ACCESS_OTHER));
+      newAclEntries.put(AbfsHttpConstants.ACCESS_USER, 
aclEntries.get(AbfsHttpConstants.ACCESS_USER));
+      newAclEntries.put(AbfsHttpConstants.ACCESS_GROUP, 
aclEntries.get(AbfsHttpConstants.ACCESS_GROUP));
+      newAclEntries.put(AbfsHttpConstants.ACCESS_OTHER, 
aclEntries.get(AbfsHttpConstants.ACCESS_OTHER));
 
-    client.setAcl(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, 
true),
-        AbfsAclHelper.serializeAclSpec(newAclEntries), eTag);
+      perfInfoGet.registerSuccess(true).finishTracking();
+
+      try (AbfsPerfInfo perfInfoSet = startTracking("removeAcl", "setAcl")) {
+        final AbfsRestOperation setAclOp =
+                client.setAcl(AbfsHttpConstants.FORWARD_SLASH + 
getRelativePath(path, true),
+                AbfsAclHelper.serializeAclSpec(newAclEntries), eTag);
+        perfInfoSet.registerResult(setAclOp.getResult())
+                .registerSuccess(true)
+                .registerAggregates(perfInfoGet.getTrackingStart(), 
GET_SET_AGGREGATE_COUNT);
+      }
+    }
   }
 
   public void setAcl(final Path path, final List<AclEntry> aclSpec) throws 
AzureBlobFileSystemException {
@@ -831,25 +993,37 @@ public class AzureBlobFileSystemStore implements 
Closeable {
           "This operation is only valid for storage accounts with the 
hierarchical namespace enabled.");
     }
 
-    LOG.debug(
-            "setAcl filesystem: {} path: {} aclspec: {}",
-            client.getFileSystem(),
-            path.toString(),
-            AclEntry.aclSpecToString(aclSpec));
+    try (AbfsPerfInfo perfInfoGet = startTracking("setAcl", "getAclStatus")) {
 
-    identityTransformer.transformAclEntriesForSetRequest(aclSpec);
-    final Map<String, String> aclEntries = 
AbfsAclHelper.deserializeAclSpec(AclEntry.aclSpecToString(aclSpec));
-    final boolean isUpnFormat = 
AbfsAclHelper.isUpnFormatAclEntries(aclEntries);
+      LOG.debug(
+              "setAcl filesystem: {} path: {} aclspec: {}",
+              client.getFileSystem(),
+              path.toString(),
+              AclEntry.aclSpecToString(aclSpec));
 
-    final AbfsRestOperation op = 
client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, 
true), isUpnFormat);
-    final String eTag = 
op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
+      identityTransformer.transformAclEntriesForSetRequest(aclSpec);
+      final Map<String, String> aclEntries = 
AbfsAclHelper.deserializeAclSpec(AclEntry.aclSpecToString(aclSpec));
+      final boolean isUpnFormat = 
AbfsAclHelper.isUpnFormatAclEntries(aclEntries);
 
-    final Map<String, String> getAclEntries = 
AbfsAclHelper.deserializeAclSpec(op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL));
+      final AbfsRestOperation op = 
client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, 
true), isUpnFormat);
+      perfInfoGet.registerResult(op.getResult());
+      final String eTag = 
op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
 
-    AbfsAclHelper.setAclEntriesInternal(aclEntries, getAclEntries);
+      final Map<String, String> getAclEntries = 
AbfsAclHelper.deserializeAclSpec(op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL));
 
-    client.setAcl(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, 
true),
-        AbfsAclHelper.serializeAclSpec(aclEntries), eTag);
+      AbfsAclHelper.setAclEntriesInternal(aclEntries, getAclEntries);
+
+      perfInfoGet.registerSuccess(true).finishTracking();
+
+      try (AbfsPerfInfo perfInfoSet = startTracking("setAcl", "setAcl")) {
+        final AbfsRestOperation setAclOp =
+                client.setAcl(AbfsHttpConstants.FORWARD_SLASH + 
getRelativePath(path, true),
+                AbfsAclHelper.serializeAclSpec(aclEntries), eTag);
+        perfInfoSet.registerResult(setAclOp.getResult())
+                .registerSuccess(true)
+                .registerAggregates(perfInfoGet.getTrackingStart(), 
GET_SET_AGGREGATE_COUNT);
+      }
+    }
   }
 
   public AclStatus getAclStatus(final Path path) throws IOException {
@@ -858,38 +1032,44 @@ public class AzureBlobFileSystemStore implements 
Closeable {
           "This operation is only valid for storage accounts with the 
hierarchical namespace enabled.");
     }
 
-    LOG.debug(
-            "getAclStatus filesystem: {} path: {}",
-            client.getFileSystem(),
-            path.toString());
-    AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH 
+ getRelativePath(path, true));
-    AbfsHttpOperation result = op.getResult();
-
-    final String transformedOwner = 
identityTransformer.transformIdentityForGetRequest(
-            result.getResponseHeader(HttpHeaderConfigurations.X_MS_OWNER),
-            true,
-            userName);
-    final String transformedGroup = 
identityTransformer.transformIdentityForGetRequest(
-            result.getResponseHeader(HttpHeaderConfigurations.X_MS_GROUP),
-            false,
-            primaryUserGroup);
-
-    final String permissions = 
result.getResponseHeader(HttpHeaderConfigurations.X_MS_PERMISSIONS);
-    final String aclSpecString = 
op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL);
-
-    final List<AclEntry> aclEntries = 
AclEntry.parseAclSpec(AbfsAclHelper.processAclString(aclSpecString), true);
-    identityTransformer.transformAclEntriesForGetRequest(aclEntries, userName, 
primaryUserGroup);
-    final FsPermission fsPermission = permissions == null ? new 
AbfsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)
-            : AbfsPermission.valueOf(permissions);
-
-    final AclStatus.Builder aclStatusBuilder = new AclStatus.Builder();
-    aclStatusBuilder.owner(transformedOwner);
-    aclStatusBuilder.group(transformedGroup);
-
-    aclStatusBuilder.setPermission(fsPermission);
-    aclStatusBuilder.stickyBit(fsPermission.getStickyBit());
-    aclStatusBuilder.addEntries(aclEntries);
-    return aclStatusBuilder.build();
+    try (AbfsPerfInfo perfInfo = startTracking("getAclStatus", 
"getAclStatus")) {
+
+      LOG.debug(
+              "getAclStatus filesystem: {} path: {}",
+              client.getFileSystem(),
+              path.toString());
+
+      AbfsRestOperation op = 
client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, 
true));
+      AbfsHttpOperation result = op.getResult();
+      perfInfo.registerResult(result);
+
+      final String transformedOwner = 
identityTransformer.transformIdentityForGetRequest(
+              result.getResponseHeader(HttpHeaderConfigurations.X_MS_OWNER),
+              true,
+              userName);
+      final String transformedGroup = 
identityTransformer.transformIdentityForGetRequest(
+              result.getResponseHeader(HttpHeaderConfigurations.X_MS_GROUP),
+              false,
+              primaryUserGroup);
+
+      final String permissions = 
result.getResponseHeader(HttpHeaderConfigurations.X_MS_PERMISSIONS);
+      final String aclSpecString = 
op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL);
+
+      final List<AclEntry> aclEntries = 
AclEntry.parseAclSpec(AbfsAclHelper.processAclString(aclSpecString), true);
+      identityTransformer.transformAclEntriesForGetRequest(aclEntries, 
userName, primaryUserGroup);
+      final FsPermission fsPermission = permissions == null ? new 
AbfsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)
+              : AbfsPermission.valueOf(permissions);
+
+      final AclStatus.Builder aclStatusBuilder = new AclStatus.Builder();
+      aclStatusBuilder.owner(transformedOwner);
+      aclStatusBuilder.group(transformedGroup);
+
+      aclStatusBuilder.setPermission(fsPermission);
+      aclStatusBuilder.stickyBit(fsPermission.getStickyBit());
+      aclStatusBuilder.addEntries(aclEntries);
+      perfInfo.registerSuccess(true);
+      return aclStatusBuilder.build();
+    }
   }
 
   public boolean isAtomicRenameKey(String key) {
@@ -930,7 +1110,7 @@ public class AzureBlobFileSystemStore implements Closeable 
{
             abfsConfiguration.getRawConfiguration());
     }
 
-    this.client =  new AbfsClient(baseUrl, creds, abfsConfiguration, new 
ExponentialRetryPolicy(), tokenProvider);
+    this.client =  new AbfsClient(baseUrl, creds, abfsConfiguration, new 
ExponentialRetryPolicy(), tokenProvider, abfsPerfTracker);
   }
 
   private String getOctalNotation(FsPermission fsPermission) {
@@ -1071,6 +1251,10 @@ public class AzureBlobFileSystemStore implements 
Closeable {
     return false;
   }
 
+  private AbfsPerfInfo startTracking(String callerName, String calleeName) {
+    return new AbfsPerfInfo(abfsPerfTracker, callerName, calleeName);
+  }
+
   private static class VersionedFileStatus extends FileStatus {
     private final String version;
 
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
index eb4605b..409ffc3 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
@@ -114,6 +114,8 @@ public final class ConfigurationKeys {
   public static final String FS_AZURE_ACCOUNT_OAUTH_REFRESH_TOKEN = 
"fs.azure.account.oauth2.refresh.token";
   /** Key for oauth AAD refresh token endpoint: {@value}. */
   public static final String FS_AZURE_ACCOUNT_OAUTH_REFRESH_TOKEN_ENDPOINT = 
"fs.azure.account.oauth2.refresh.token.endpoint";
+  /** Key for enabling the tracking of ABFS API latency and sending the 
latency numbers to the ABFS API service */
+  public static final String FS_AZURE_ABFS_LATENCY_TRACK = 
"fs.azure.abfs.latency.track";
 
   public static String accountProperty(String property, String account) {
     return property + "." + account;
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
index e0c355a..3a4385e 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
@@ -67,6 +67,7 @@ public final class FileSystemConfigurations {
   public static final boolean DEFAULT_ENABLE_HTTPS = true;
 
   public static final boolean DEFAULT_USE_UPN = false;
+  public static final boolean DEFAULT_ABFS_LATENCY_TRACK = false;
 
   private FileSystemConfigurations() {}
 }
\ No newline at end of file
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java
index c8d4390..79bba09 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java
@@ -58,6 +58,7 @@ public final class HttpHeaderConfigurations {
   public static final String X_MS_PERMISSIONS = "x-ms-permissions";
   public static final String X_MS_UMASK = "x-ms-umask";
   public static final String X_MS_NAMESPACE_ENABLED = "x-ms-namespace-enabled";
+  public static final String X_MS_ABFS_CLIENT_LATENCY = 
"x-ms-abfs-client-latency";
 
   private HttpHeaderConfigurations() {}
 }
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AbfsPerfLoggable.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AbfsPerfLoggable.java
new file mode 100644
index 0000000..772f006
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AbfsPerfLoggable.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.contracts.services;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * The AbfsPerfLoggable contract.
+ */
+@InterfaceStability.Evolving
+public interface AbfsPerfLoggable {
+  /**
+   * Gets the string to log to the Abfs Logging API.
+   *
+   * @return the string that will be logged.
+   */
+  String getLogString();
+}
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
index edcd197..dea5e94 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
@@ -60,6 +60,7 @@ public class AbfsClient implements Closeable {
   private final String filesystem;
   private final AbfsConfiguration abfsConfiguration;
   private final String userAgent;
+  private final AbfsPerfTracker abfsPerfTracker;
 
   private final AccessTokenProvider tokenProvider;
 
@@ -67,7 +68,8 @@ public class AbfsClient implements Closeable {
   public AbfsClient(final URL baseUrl, final SharedKeyCredentials 
sharedKeyCredentials,
                     final AbfsConfiguration abfsConfiguration,
                     final ExponentialRetryPolicy exponentialRetryPolicy,
-                    final AccessTokenProvider tokenProvider) {
+                    final AccessTokenProvider tokenProvider,
+                    final AbfsPerfTracker abfsPerfTracker) {
     this.baseUrl = baseUrl;
     this.sharedKeyCredentials = sharedKeyCredentials;
     String baseUrlString = baseUrl.toString();
@@ -88,6 +90,7 @@ public class AbfsClient implements Closeable {
 
     this.userAgent = initializeUserAgent(abfsConfiguration, sslProviderName);
     this.tokenProvider = tokenProvider;
+    this.abfsPerfTracker = abfsPerfTracker;
   }
 
   @Override
@@ -101,6 +104,10 @@ public class AbfsClient implements Closeable {
     return filesystem;
   }
 
+  protected AbfsPerfTracker getAbfsPerfTracker() {
+    return abfsPerfTracker;
+  }
+
   ExponentialRetryPolicy getRetryPolicy() {
     return retryPolicy;
   }
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java
index 5579877..881d41f 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java
@@ -21,8 +21,10 @@ package org.apache.hadoop.fs.azurebfs.services;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.io.UnsupportedEncodingException;
 import java.net.HttpURLConnection;
 import java.net.URL;
+import java.net.URLEncoder;
 import java.util.List;
 import java.util.UUID;
 
@@ -40,12 +42,13 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
 import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsPerfLoggable;
 import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema;
 
 /**
  * Represents an HTTP operation.
  */
-public class AbfsHttpOperation {
+public class AbfsHttpOperation implements AbfsPerfLoggable {
   private static final Logger LOG = 
LoggerFactory.getLogger(AbfsHttpOperation.class);
 
   private static final int CONNECT_TIMEOUT = 30 * 1000;
@@ -161,6 +164,47 @@ public class AbfsHttpOperation {
     return sb.toString();
   }
 
+  // Returns a trace message for the ABFS API logging service to consume
+  public String getLogString() {
+    String urlStr = null;
+
+    try {
+      urlStr = URLEncoder.encode(url.toString(), "UTF-8");
+    } catch(UnsupportedEncodingException e) {
+      urlStr = "https%3A%2F%2Ffailed%2Fto%2Fencode%2Furl";
+    }
+
+    final StringBuilder sb = new StringBuilder();
+    sb.append("s=")
+      .append(statusCode)
+      .append(" e=")
+      .append(storageErrorCode)
+      .append(" ci=")
+      .append(clientRequestId)
+      .append(" ri=")
+      .append(requestId);
+
+    if (isTraceEnabled) {
+      sb.append(" ct=")
+        .append(connectionTimeMs)
+        .append(" st=")
+        .append(sendRequestTimeMs)
+        .append(" rt=")
+        .append(recvResponseTimeMs);
+    }
+
+    sb.append(" bs=")
+      .append(bytesSent)
+      .append(" br=")
+      .append(bytesReceived)
+      .append(" m=")
+      .append(method)
+      .append(" u=")
+      .append(urlStr);
+
+    return sb.toString();
+  }
+
   /**
    * Initializes a new HTTP request and opens the connection.
    *
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
index fe48cb9..1f34342 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
@@ -226,8 +226,10 @@ public class AbfsInputStream extends FSInputStream {
       throw new IllegalArgumentException("requested read length is more than 
will fit after requested offset in buffer");
     }
     final AbfsRestOperation op;
-    try {
+    AbfsPerfTracker tracker = client.getAbfsPerfTracker();
+    try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "readRemote", 
"read")) {
       op = client.read(path, position, b, offset, length, tolerateOobAppends ? 
"*" : eTag);
+      perfInfo.registerResult(op.getResult()).registerSuccess(true);
     } catch (AzureBlobFileSystemException ex) {
       if (ex instanceof AbfsRestOperationException) {
         AbfsRestOperationException ere = (AbfsRestOperationException) ex;
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
index fd56eb0..2d40941 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
@@ -289,10 +289,16 @@ public class AbfsOutputStream extends OutputStream 
implements Syncable, StreamCa
     final Future<Void> job = completionService.submit(new Callable<Void>() {
       @Override
       public Void call() throws Exception {
-        client.append(path, offset, bytes, 0,
-            bytesLength);
-        byteBufferPool.putBuffer(ByteBuffer.wrap(bytes));
-        return null;
+        AbfsPerfTracker tracker = client.getAbfsPerfTracker();
+        try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
+                "writeCurrentBufferToService", "append")) {
+          AbfsRestOperation op = client.append(path, offset, bytes, 0,
+                  bytesLength);
+          perfInfo.registerResult(op.getResult());
+          byteBufferPool.putBuffer(ByteBuffer.wrap(bytes));
+          perfInfo.registerSuccess(true);
+          return null;
+        }
       }
     });
 
@@ -334,8 +340,11 @@ public class AbfsOutputStream extends OutputStream 
implements Syncable, StreamCa
 
   private synchronized void flushWrittenBytesToServiceInternal(final long 
offset,
       final boolean retainUncommitedData, final boolean isClose) throws 
IOException {
-    try {
-      client.flush(path, offset, retainUncommitedData, isClose);
+    AbfsPerfTracker tracker = client.getAbfsPerfTracker();
+    try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
+            "flushWrittenBytesToServiceInternal", "flush")) {
+      AbfsRestOperation op = client.flush(path, offset, retainUncommitedData, 
isClose);
+      perfInfo.registerResult(op.getResult()).registerSuccess(true);
     } catch (AzureBlobFileSystemException ex) {
       if (ex instanceof AbfsRestOperationException) {
         if (((AbfsRestOperationException) ex).getStatusCode() == 
HttpURLConnection.HTTP_NOT_FOUND) {
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsPerfInfo.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsPerfInfo.java
new file mode 100644
index 0000000..0e7a111
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsPerfInfo.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.time.Instant;
+
+import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsPerfLoggable;
+
+/**
+ * {@code AbfsPerfInfo} holds information on ADLS Gen 2 API performance 
observed by {@code AbfsClient}. Every
+ * Abfs request keeps adding its information (success/failure, latency etc) to 
its {@code AbfsPerfInfo}'s object
+ * as and when it becomes available. When the request is over, the performance 
information is recorded while
+ * the {@code AutoCloseable} {@code AbfsPerfInfo} object is "closed".
+ */
+public final class AbfsPerfInfo implements AutoCloseable {
+
+  // the tracker which will be extracting perf info out of this object.
+  private AbfsPerfTracker abfsPerfTracker;
+
+  // the caller name.
+  private String callerName;
+
+  // the callee name.
+  private String calleeName;
+
+  // time when this tracking started.
+  private Instant trackingStart;
+
+  // time when this tracking ended.
+  private Instant trackingEnd;
+
+  // whether the tracked request was successful.
+  private boolean success;
+
+  // time when the aggregate operation (to which this request belongs) started.
+  private Instant aggregateStart;
+
+  // number of requests in the aggregate operation (to which this request 
belongs).
+  private long aggregateCount;
+
+  // result of the request.
+  private AbfsPerfLoggable res;
+
+  public AbfsPerfInfo(AbfsPerfTracker abfsPerfTracker, String callerName, 
String calleeName) {
+    this.callerName = callerName;
+    this.calleeName = calleeName;
+    this.abfsPerfTracker = abfsPerfTracker;
+    this.success = false;
+    this.trackingStart = abfsPerfTracker.getLatencyInstant();
+  }
+
+  public AbfsPerfInfo registerSuccess(boolean success) {
+    this.success = success;
+    return this;
+  }
+
+  public AbfsPerfInfo registerResult(AbfsPerfLoggable res) {
+    this.res = res;
+    return this;
+  }
+
+  public AbfsPerfInfo registerAggregates(Instant aggregateStart, long 
aggregateCount) {
+    this.aggregateStart = aggregateStart;
+    this.aggregateCount = aggregateCount;
+    return this;
+  }
+
+  public AbfsPerfInfo finishTracking() {
+    if (this.trackingEnd == null) {
+      this.trackingEnd = abfsPerfTracker.getLatencyInstant();
+    }
+
+    return this;
+  }
+
+  public AbfsPerfInfo registerCallee(String calleeName) {
+    this.calleeName = calleeName;
+    return this;
+  }
+
+  @Override
+  public void close() {
+    abfsPerfTracker.trackInfo(this.finishTracking());
+  }
+
+  public String getCallerName() {
+    return callerName;
+  };
+
+  public String getCalleeName() {
+    return calleeName;
+  }
+
+  public Instant getTrackingStart() {
+    return trackingStart;
+  }
+
+  public Instant getTrackingEnd() {
+    return trackingEnd;
+  }
+
+  public boolean getSuccess() {
+    return success;
+  }
+
+  public Instant getAggregateStart() {
+    return aggregateStart;
+  }
+
+  public long getAggregateCount() {
+    return aggregateCount;
+  }
+
+  public AbfsPerfLoggable getResult() {
+    return res;
+  }
+}
\ No newline at end of file
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsPerfTracker.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsPerfTracker.java
new file mode 100644
index 0000000..e24c47b
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsPerfTracker.java
@@ -0,0 +1,319 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsPerfLoggable;
+
+/**
+ * {@code AbfsPerfTracker} keeps track of service latencies observed by {@code 
AbfsClient}. Every request hands over
+ * its perf-related information as a {@code AbfsPerfInfo} object (contains 
success/failure, latency etc) to the
+ * {@code AbfsPerfTracker}'s queue. When a request is made, we check {@code 
AbfsPerfTracker} to see if there are
+ * any latency numbers to be reported. If there are any, the stats are added 
to an HTTP header
+ * ({@code x-ms-abfs-client-latency}) on the next request.
+ *
+ * A typical perf log line appears like:
+ *
+ * h=KARMA t=2019-10-25T20:21:14.518Z a=abfstest01.dfs.core.windows.net
+ * c=abfs-testcontainer-84828169-6488-4a62-a875-1e674275a29f cr=delete 
ce=deletePath r=Succeeded l=32 ls=32 lc=1 s=200
+ * e= ci=95121dae-70a8-4187-b067-614091034558 
ri=97effdcf-201f-0097-2d71-8bae00000000 ct=0 st=0 rt=0 bs=0 br=0 m=DELETE
+ * 
u=https%3A%2F%2Fabfstest01.dfs.core.windows.net%2Fabfs-testcontainer%2Ftest%3Ftimeout%3D90%26recursive%3Dtrue
+ *
+ * The fields have the following definitions:
+ *
+ * h: host name
+ * t: time when this request was logged
+ * a: Azure storage account name
+ * c: container name
+ * cr: name of the caller method
+ * ce: name of the callee method
+ * r: result (Succeeded/Failed)
+ * l: latency (time spent in callee)
+ * ls: latency sum (aggregate time spent in caller; logged when there are 
multiple callees;
+ *     logged with the last callee)
+ * lc: latency count (number of callees; logged when there are multiple 
callees;
+ *     logged with the last callee)
+ * s: HTTP Status code
+ * e: Error code
+ * ci: client request ID
+ * ri: server request ID
+ * ct: connection time in milliseconds
+ * st: sending time in milliseconds
+ * rt: receiving time in milliseconds
+ * bs: bytes sent
+ * br: bytes received
+ * m: HTTP method (GET, PUT etc)
+ * u: Encoded HTTP URL
+ *
+ */
+public final class AbfsPerfTracker {
+
+  // the logger.
+  private static final Logger LOG = 
LoggerFactory.getLogger(AbfsPerfTracker.class);
+
+  // the field names of perf log lines.
+  private static final String HOST_NAME_KEY = "h";
+  private static final String TIMESTAMP_KEY = "t";
+  private static final String STORAGE_ACCOUNT_NAME_KEY = "a";
+  private static final String CONTAINER_NAME_KEY = "c";
+  private static final String CALLER_METHOD_NAME_KEY = "cr";
+  private static final String CALLEE_METHOD_NAME_KEY = "ce";
+  private static final String RESULT_KEY = "r";
+  private static final String LATENCY_KEY = "l";
+  private static final String LATENCY_SUM_KEY = "ls";
+  private static final String LATENCY_COUNT_KEY = "lc";
+  private static final String HTTP_STATUS_CODE_KEY = "s";
+  private static final String ERROR_CODE_KEY = "e";
+  private static final String CLIENT_REQUEST_ID_KEY = "ci";
+  private static final String SERVER_REQUEST_ID_KEY = "ri";
+  private static final String CONNECTION_TIME_KEY = "ct";
+  private static final String SENDING_TIME_KEY = "st";
+  private static final String RECEIVING_TIME_KEY = "rt";
+  private static final String BYTES_SENT_KEY = "bs";
+  private static final String BYTES_RECEIVED_KEY = "br";
+  private static final String HTTP_METHOD_KEY = "m";
+  private static final String HTTP_URL_KEY = "u";
+  private static final String STRING_PLACEHOLDER = "%s";
+
+  // the queue to hold latency information.
+  private final ConcurrentLinkedQueue<String> queue = new 
ConcurrentLinkedQueue<String>();
+
+  // whether the latency tracker has been enabled.
+  private boolean enabled = false;
+
+  // the host name.
+  private String hostName;
+
+  // singleton latency reporting format.
+  private String singletonLatencyReportingFormat;
+
+  // aggregate latency reporting format.
+  private String aggregateLatencyReportingFormat;
+
+  public AbfsPerfTracker(String filesystemName, String accountName, 
AbfsConfiguration configuration) {
+    this(filesystemName, accountName, configuration.shouldTrackLatency());
+  }
+
+  protected AbfsPerfTracker(String filesystemName, String accountName, boolean 
enabled) {
+    this.enabled = enabled;
+
+    LOG.debug("AbfsPerfTracker configuration: {}", enabled);
+
+    if (enabled) {
+      try {
+        hostName = InetAddress.getLocalHost().getHostName();
+      } catch (UnknownHostException e) {
+        hostName = "UnknownHost";
+      }
+
+      String commonReportingFormat = new StringBuilder()
+              .append(HOST_NAME_KEY)
+              .append(AbfsHttpConstants.EQUAL)
+              .append(hostName)
+              .append(AbfsHttpConstants.SINGLE_WHITE_SPACE)
+              .append(TIMESTAMP_KEY)
+              .append(AbfsHttpConstants.EQUAL)
+              .append(STRING_PLACEHOLDER)
+              .append(AbfsHttpConstants.SINGLE_WHITE_SPACE)
+              .append(STORAGE_ACCOUNT_NAME_KEY)
+              .append(AbfsHttpConstants.EQUAL)
+              .append(accountName)
+              .append(AbfsHttpConstants.SINGLE_WHITE_SPACE)
+              .append(CONTAINER_NAME_KEY)
+              .append(AbfsHttpConstants.EQUAL)
+              .append(filesystemName)
+              .append(AbfsHttpConstants.SINGLE_WHITE_SPACE)
+              .append(CALLER_METHOD_NAME_KEY)
+              .append(AbfsHttpConstants.EQUAL)
+              .append(STRING_PLACEHOLDER)
+              .append(AbfsHttpConstants.SINGLE_WHITE_SPACE)
+              .append(CALLEE_METHOD_NAME_KEY)
+              .append(AbfsHttpConstants.EQUAL)
+              .append(STRING_PLACEHOLDER)
+              .append(AbfsHttpConstants.SINGLE_WHITE_SPACE)
+              .append(RESULT_KEY)
+              .append(AbfsHttpConstants.EQUAL)
+              .append(STRING_PLACEHOLDER)
+              .append(AbfsHttpConstants.SINGLE_WHITE_SPACE)
+              .append(LATENCY_KEY)
+              .append(AbfsHttpConstants.EQUAL)
+              .append(STRING_PLACEHOLDER)
+              .toString();
+
+      /**
+        * Example singleton log (no ls or lc field)
+        * h=KARMA t=2019-10-25T20:21:14.518Z a=abfstest01.dfs.core.windows.net
+        * c=abfs-testcontainer-84828169-6488-4a62-a875-1e674275a29f cr=delete 
ce=deletePath r=Succeeded l=32 s=200
+        * e= ci=95121dae-70a8-4187-b067-614091034558 
ri=97effdcf-201f-0097-2d71-8bae00000000 ct=0 st=0 rt=0 bs=0 br=0 m=DELETE
+        * 
u=https%3A%2F%2Fabfstest01.dfs.core.windows.net%2Fabfs-testcontainer%2Ftest%3Ftimeout%3D90%26recursive%3Dtrue
+      */
+      singletonLatencyReportingFormat = new StringBuilder()
+              .append(commonReportingFormat)
+              .append(STRING_PLACEHOLDER)
+              .toString();
+
+      /**
+       * Example aggregate log
+       * h=KARMA t=2019-10-25T20:21:14.518Z a=abfstest01.dfs.core.windows.net
+       * c=abfs-testcontainer-84828169-6488-4a62-a875-1e674275a29f cr=delete 
ce=deletePath r=Succeeded l=32 ls=32 lc=1 s=200
+       * e= ci=95121dae-70a8-4187-b067-614091034558 
ri=97effdcf-201f-0097-2d71-8bae00000000 ct=0 st=0 rt=0 bs=0 br=0 m=DELETE
+       * 
u=https%3A%2F%2Fabfstest01.dfs.core.windows.net%2Fabfs-testcontainer%2Ftest%3Ftimeout%3D90%26recursive%3Dtrue
+       */
+      aggregateLatencyReportingFormat = new StringBuilder()
+              .append(commonReportingFormat)
+              .append(AbfsHttpConstants.SINGLE_WHITE_SPACE)
+              .append(LATENCY_SUM_KEY)
+              .append(AbfsHttpConstants.EQUAL)
+              .append(STRING_PLACEHOLDER)
+              .append(AbfsHttpConstants.SINGLE_WHITE_SPACE)
+              .append(LATENCY_COUNT_KEY)
+              .append(AbfsHttpConstants.EQUAL)
+              .append(STRING_PLACEHOLDER)
+              .append(STRING_PLACEHOLDER)
+              .toString();
+    }
+  }
+
+  public void trackInfo(AbfsPerfInfo perfInfo) {
+    if (!enabled) {
+      return;
+    }
+
+    if (isValidInstant(perfInfo.getAggregateStart()) && 
perfInfo.getAggregateCount() > 0) {
+      recordClientLatency(
+              perfInfo.getTrackingStart(),
+              perfInfo.getTrackingEnd(),
+              perfInfo.getCallerName(),
+              perfInfo.getCalleeName(),
+              perfInfo.getSuccess(),
+              perfInfo.getAggregateStart(),
+              perfInfo.getAggregateCount(),
+              perfInfo.getResult());
+    } else {
+      recordClientLatency(
+              perfInfo.getTrackingStart(),
+              perfInfo.getTrackingEnd(),
+              perfInfo.getCallerName(),
+              perfInfo.getCalleeName(),
+              perfInfo.getSuccess(),
+              perfInfo.getResult());
+    }
+  }
+
+  public Instant getLatencyInstant() {
+    if (!enabled) {
+      return null;
+    }
+
+    return Instant.now();
+  }
+
+  private void recordClientLatency(
+          Instant operationStart,
+          Instant operationStop,
+          String callerName,
+          String calleeName,
+          boolean success,
+          AbfsPerfLoggable res) {
+
+    Instant trackerStart = Instant.now();
+    long latency = isValidInstant(operationStart) && 
isValidInstant(operationStop)
+            ? Duration.between(operationStart, operationStop).toMillis() : -1;
+
+    String latencyDetails = String.format(singletonLatencyReportingFormat,
+            Instant.now(),
+            callerName,
+            calleeName,
+            success ? "Succeeded" : "Failed",
+            latency,
+            res == null ? "" : (" " + res.getLogString()));
+
+    this.offerToQueue(trackerStart, latencyDetails);
+  }
+
+  private void recordClientLatency(
+          Instant operationStart,
+          Instant operationStop,
+          String callerName,
+          String calleeName,
+          boolean success,
+          Instant aggregateStart,
+          long aggregateCount,
+          AbfsPerfLoggable res){
+
+    Instant trackerStart = Instant.now();
+    long latency = isValidInstant(operationStart) && 
isValidInstant(operationStop)
+            ? Duration.between(operationStart, operationStop).toMillis() : -1;
+    long aggregateLatency = isValidInstant(aggregateStart) && 
isValidInstant(operationStop)
+            ? Duration.between(aggregateStart, operationStop).toMillis() : -1;
+
+    String latencyDetails = String.format(aggregateLatencyReportingFormat,
+            Instant.now(),
+            callerName,
+            calleeName,
+            success ? "Succeeded" : "Failed",
+            latency,
+            aggregateLatency,
+            aggregateCount,
+            res == null ? "" : (" " + res.getLogString()));
+
+    offerToQueue(trackerStart, latencyDetails);
+  }
+
+  public String getClientLatency() {
+    if (!enabled) {
+      return null;
+    }
+
+    Instant trackerStart = Instant.now();
+    String latencyDetails = queue.poll(); // non-blocking pop
+
+    if (LOG.isDebugEnabled()) {
+      Instant stop = Instant.now();
+      long elapsed = Duration.between(trackerStart, stop).toMillis();
+      LOG.debug("Dequeued latency info [{} ms]: {}", elapsed, latencyDetails);
+    }
+
+    return latencyDetails;
+  }
+
+  private void offerToQueue(Instant trackerStart, String latencyDetails) {
+    queue.offer(latencyDetails); // non-blocking append
+
+    if (LOG.isDebugEnabled()) {
+      Instant trackerStop = Instant.now();
+      long elapsed = Duration.between(trackerStart, trackerStop).toMillis();
+      LOG.debug("Queued latency info [{} ms]: {}", elapsed, latencyDetails);
+    }
+  }
+
+  private boolean isValidInstant(Instant testInstant) {
+    return testInstant != null && testInstant != Instant.MIN && testInstant != 
Instant.MAX;
+  }
+}
\ No newline at end of file
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
index 4196c10..a171068 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
@@ -121,6 +121,14 @@ public class AbfsRestOperation {
    * HTTP operations.
    */
   void execute() throws AzureBlobFileSystemException {
+    // see if we have latency reports from the previous requests
+    String latencyHeader = this.client.getAbfsPerfTracker().getClientLatency();
+    if (latencyHeader != null && !latencyHeader.isEmpty()) {
+      AbfsHttpHeader httpHeader =
+              new 
AbfsHttpHeader(HttpHeaderConfigurations.X_MS_ABFS_CLIENT_LATENCY, 
latencyHeader);
+      requestHeaders.add(httpHeader);
+    }
+
     int retryCount = 0;
     while (!executeHttpOperation(retryCount++)) {
       try {
diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md 
b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md
index c5bad77..47739a7 100644
--- a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md
+++ b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md
@@ -661,6 +661,52 @@ Hflush() being the only documented API that can provide 
persistent data
 transfer, Flush() also attempting to persist buffered data will lead to
 performance issues.
 
+### <a name="perfoptions"></a> Perf Options
+
+#### <a name="abfstracklatencyoptions"></a> 1. HTTP Request Tracking Options
+If you set `fs.azure.abfs.latency.track` to `true`, the module starts tracking 
the
+performance metrics of ABFS HTTP traffic. To obtain these numbers on your 
machine
+or cluster, you will also need to enable debug logging for the 
`AbfsPerfTracker`
+class in your `log4j` config. A typical perf log line appears like:
+
+```
+h=KARMA t=2019-10-25T20:21:14.518Z a=abfstest01.dfs.core.windows.net
+c=abfs-testcontainer-84828169-6488-4a62-a875-1e674275a29f cr=delete 
ce=deletePath
+r=Succeeded l=32 ls=32 lc=1 s=200 e= ci=95121dae-70a8-4187-b067-614091034558
+ri=97effdcf-201f-0097-2d71-8bae00000000 ct=0 st=0 rt=0 bs=0 br=0 m=DELETE
+u=https%3A%2F%2Fabfstest01.dfs.core.windows.net%2Ftestcontainer%2Ftest%3Ftimeout%3D90%26recursive%3Dtrue
+```
+
+The fields have the following definitions:
+
+`h`: host name
+`t`: time when this request was logged
+`a`: Azure storage account name
+`c`: container name
+`cr`: name of the caller method
+`ce`: name of the callee method
+`r`: result (Succeeded/Failed)
+`l`: latency (time spent in callee)
+`ls`: latency sum (aggregate time spent in caller; logged when there are 
multiple
+callees; logged with the last callee)
+`lc`: latency count (number of callees; logged when there are multiple callees;
+logged with the last callee)
+`s`: HTTP Status code
+`e`: Error code
+`ci`: client request ID
+`ri`: server request ID
+`ct`: connection time in milliseconds
+`st`: sending time in milliseconds
+`rt`: receiving time in milliseconds
+`bs`: bytes sent
+`br`: bytes received
+`m`: HTTP method (GET, PUT etc)
+`u`: Encoded HTTP URL
+
+Note that these performance numbers are also sent back to the ADLS Gen 2 API 
endpoints
+in the `x-ms-abfs-client-latency` HTTP headers in subsequent requests. Azure 
uses these
+settings to track their end-to-end latency.
+
 ## <a name="troubleshooting"></a> Troubleshooting
 
 The problems associated with the connector usually come down to, in order
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java
index 228e385..7df9fb1 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java
@@ -43,7 +43,7 @@ public final class TestAbfsClient {
                                  AbfsConfiguration config,
                                  boolean includeSSLProvider) {
     AbfsClient client = new AbfsClient(baseUrl, null,
-        config, null, null);
+        config, null, null, null);
     String sslProviderName = null;
     if (includeSSLProvider) {
       sslProviderName = 
DelegatingSSLSocketFactory.getDefaultFactory().getProviderName();
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsPerfTracker.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsPerfTracker.java
new file mode 100644
index 0000000..4f42102
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsPerfTracker.java
@@ -0,0 +1,408 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.net.URL;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Test the latency tracker for ABFS.
+ *
+ */
+public final class TestAbfsPerfTracker {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestAbfsPerfTracker.class);
+  private static ExecutorService executorService = null;
+  private static final int TEST_AGGREGATE_COUNT = 42;
+  private final String filesystemName = "bogusFilesystemName";
+  private final String accountName = "bogusAccountName";
+  private final URL url;
+
+  public TestAbfsPerfTracker() throws Exception {
+    this.url = new URL("http", "www.microsoft.com", "/bogusFile");
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    executorService = Executors.newCachedThreadPool();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    executorService.shutdown();
+  }
+
+  @Test
+  public void verifyDisablingOfTracker() throws Exception {
+    // verify that disabling of the tracker works
+    AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, 
filesystemName, false);
+
+    String latencyDetails = abfsPerfTracker.getClientLatency();
+    assertThat(latencyDetails).describedAs("AbfsPerfTracker should be 
empty").isNull();
+
+    try (AbfsPerfInfo tracker = new AbfsPerfInfo(abfsPerfTracker, 
"disablingCaller",
+            "disablingCallee")) {
+      AbfsHttpOperation op = new AbfsHttpOperation(url, "GET", new 
ArrayList<>());
+      tracker.registerResult(op).registerSuccess(true);
+    }
+
+    latencyDetails = abfsPerfTracker.getClientLatency();
+    assertThat(latencyDetails).describedAs("AbfsPerfTracker should return no 
record").isNull();
+  }
+
+  @Test
+  public void verifyTrackingForSingletonLatencyRecords() throws Exception {
+    // verify that tracking for singleton latency records works as expected
+    final int numTasks = 100;
+    AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, 
filesystemName, true);
+
+    String latencyDetails = abfsPerfTracker.getClientLatency();
+    assertThat(latencyDetails).describedAs("AbfsPerfTracker should be 
empty").isNull();
+
+    List<Callable<Integer>> tasks = new ArrayList<>();
+    AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new 
ArrayList<>());
+
+    for (int i = 0; i < numTasks; i++) {
+      tasks.add(() -> {
+        try (AbfsPerfInfo tracker = new AbfsPerfInfo(abfsPerfTracker, 
"oneOperationCaller",
+                "oneOperationCallee")) {
+          tracker.registerResult(httpOperation).registerSuccess(true);
+          return 0;
+        }
+      });
+    }
+
+    for (Future<Integer> fr: executorService.invokeAll(tasks)) {
+      fr.get();
+    }
+
+    for (int i = 0; i < numTasks; i++) {
+      latencyDetails = abfsPerfTracker.getClientLatency();
+      assertThat(latencyDetails).describedAs("AbfsPerfTracker should return 
non-null record").isNotNull();
+      assertThat(latencyDetails).describedAs("Latency record should be in the 
correct format")
+        .containsPattern("h=[^ ]* t=[^ ]* a=bogusFilesystemName 
c=bogusAccountName cr=oneOperationCaller"
+          + " ce=oneOperationCallee r=Succeeded l=[0-9]+ s=0 e= ci=[^ ]* ri=[^ 
]* bs=0 br=0 m=GET"
+          + " u=http%3A%2F%2Fwww.microsoft.com%2FbogusFile");
+    }
+
+    latencyDetails = abfsPerfTracker.getClientLatency();
+    assertThat(latencyDetails).describedAs("AbfsPerfTracker should return no 
record").isNull();
+  }
+
+  @Test
+  public void verifyTrackingForAggregateLatencyRecords() throws Exception {
+    // verify that tracking of aggregate latency records works as expected
+    final int numTasks = 100;
+    AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, 
filesystemName, true);
+
+    String latencyDetails = abfsPerfTracker.getClientLatency();
+    assertThat(latencyDetails).describedAs("AbfsPerfTracker should be 
empty").isNull();
+
+    List<Callable<Integer>> tasks = new ArrayList<>();
+    AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new 
ArrayList<>());
+
+    for (int i = 0; i < numTasks; i++) {
+      tasks.add(() -> {
+        try (AbfsPerfInfo tracker = new AbfsPerfInfo(abfsPerfTracker, 
"oneOperationCaller",
+                "oneOperationCallee")) {
+          tracker.registerResult(httpOperation).registerSuccess(true)
+                  .registerAggregates(Instant.now(), TEST_AGGREGATE_COUNT);
+          return 0;
+        }
+      });
+    }
+
+    for (Future<Integer> fr: executorService.invokeAll(tasks)) {
+      fr.get();
+    }
+
+    for (int i = 0; i < numTasks; i++) {
+      latencyDetails = abfsPerfTracker.getClientLatency();
+      assertThat(latencyDetails).describedAs("AbfsPerfTracker should return 
non-null record").isNotNull();
+      assertThat(latencyDetails).describedAs("Latency record should be in the 
correct format")
+        .containsPattern("h=[^ ]* t=[^ ]* a=bogusFilesystemName 
c=bogusAccountName cr=oneOperationCaller"
+                + " ce=oneOperationCallee r=Succeeded l=[0-9]+ ls=[0-9]+ lc=" 
+ TEST_AGGREGATE_COUNT
+                + " s=0 e= ci=[^ ]* ri=[^ ]* bs=0 br=0 m=GET 
u=http%3A%2F%2Fwww.microsoft.com%2FbogusFile");
+    }
+
+    latencyDetails = abfsPerfTracker.getClientLatency();
+    assertThat(latencyDetails).describedAs("AbfsPerfTracker should return no 
record").isNull();
+  }
+
+  @Test
+  public void verifyRecordingSingletonLatencyIsCheapWhenDisabled() throws 
Exception {
+    // when latency tracker is disabled, we expect it to take time equivalent 
to checking a boolean value
+    final double maxLatencyWhenDisabledMs = 1000;
+    final double minLatencyWhenDisabledMs = 0;
+    final long numTasks = 1000;
+    long aggregateLatency = 0;
+    AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, 
filesystemName, false);
+    List<Callable<Long>> tasks = new ArrayList<>();
+    final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", 
new ArrayList<>());
+
+    for (int i = 0; i < numTasks; i++) {
+      tasks.add(() -> {
+        Instant startRecord = Instant.now();
+
+        try (AbfsPerfInfo tracker = new AbfsPerfInfo(abfsPerfTracker, 
"oneOperationCaller",
+                "oneOperationCallee")) {
+          tracker.registerResult(httpOperation).registerSuccess(true);
+        }
+
+        long latencyRecord = Duration.between(startRecord, 
Instant.now()).toMillis();
+        LOG.debug("Spent {} ms in recording latency.", latencyRecord);
+        return latencyRecord;
+      });
+    }
+
+    for (Future<Long> fr: executorService.invokeAll(tasks)) {
+      aggregateLatency += fr.get();
+    }
+
+    double averageRecordLatency = aggregateLatency / numTasks;
+    assertThat(averageRecordLatency).describedAs("Average time for recording 
singleton latencies should be bounded")
+      .isBetween(minLatencyWhenDisabledMs, maxLatencyWhenDisabledMs);
+  }
+
+  @Test
+  public void verifyRecordingAggregateLatencyIsCheapWhenDisabled() throws 
Exception {
+    // when latency tracker is disabled, we expect it to take time equivalent 
to checking a boolean value
+    final double maxLatencyWhenDisabledMs = 1000;
+    final double minLatencyWhenDisabledMs = 0;
+    final long numTasks = 1000;
+    long aggregateLatency = 0;
+    AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, 
filesystemName, false);
+    List<Callable<Long>> tasks = new ArrayList<>();
+    final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", 
new ArrayList<>());
+
+    for (int i = 0; i < numTasks; i++) {
+      tasks.add(() -> {
+        Instant startRecord = Instant.now();
+
+        try (AbfsPerfInfo tracker = new AbfsPerfInfo(abfsPerfTracker, 
"oneOperationCaller",
+                "oneOperationCallee")) {
+          tracker.registerResult(httpOperation).registerSuccess(true)
+                  .registerAggregates(startRecord, TEST_AGGREGATE_COUNT);
+        }
+
+        long latencyRecord = Duration.between(startRecord, 
Instant.now()).toMillis();
+        LOG.debug("Spent {} ms in recording latency.", latencyRecord);
+        return latencyRecord;
+      });
+    }
+
+    for (Future<Long> fr: executorService.invokeAll(tasks)) {
+      aggregateLatency += fr.get();
+    }
+
+    double averageRecordLatency = aggregateLatency / numTasks;
+    assertThat(averageRecordLatency).describedAs("Average time for recording 
aggregate latencies should be bounded")
+      .isBetween(minLatencyWhenDisabledMs, maxLatencyWhenDisabledMs);
+  }
+
+  @Test
+  public void verifyGettingLatencyRecordsIsCheapWhenDisabled() throws 
Exception {
+    // when latency tracker is disabled, we expect it to take time equivalent 
to checking a boolean value
+    final double maxLatencyWhenDisabledMs = 1000;
+    final double minLatencyWhenDisabledMs = 0;
+    final long numTasks = 1000;
+    long aggregateLatency = 0;
+    AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, 
filesystemName, false);
+    List<Callable<Long>> tasks = new ArrayList<>();
+
+    for (int i = 0; i < numTasks; i++) {
+      tasks.add(() -> {
+        Instant startGet = Instant.now();
+        abfsPerfTracker.getClientLatency();
+        long latencyGet = Duration.between(startGet, Instant.now()).toMillis();
+        LOG.debug("Spent {} ms in retrieving latency record.", latencyGet);
+        return latencyGet;
+      });
+    }
+
+    for (Future<Long> fr: executorService.invokeAll(tasks)) {
+      aggregateLatency += fr.get();
+    }
+
+    double averageRecordLatency = aggregateLatency / numTasks;
+    assertThat(averageRecordLatency).describedAs("Average time for getting 
latency records should be bounded")
+      .isBetween(minLatencyWhenDisabledMs, maxLatencyWhenDisabledMs);
+  }
+
+  @Test
+  public void verifyRecordingSingletonLatencyIsCheapWhenEnabled() throws 
Exception {
+    final double maxLatencyWhenDisabledMs = 5000;
+    final double minLatencyWhenDisabledMs = 0;
+    final long numTasks = 1000;
+    long aggregateLatency = 0;
+    AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, 
filesystemName, true);
+    List<Callable<Long>> tasks = new ArrayList<>();
+    final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", 
new ArrayList<>());
+
+    for (int i = 0; i < numTasks; i++) {
+      tasks.add(() -> {
+        Instant startRecord = Instant.now();
+
+        try (AbfsPerfInfo tracker = new AbfsPerfInfo(abfsPerfTracker, 
"oneOperationCaller",
+                "oneOperationCallee")) {
+          tracker.registerResult(httpOperation).registerSuccess(true);
+        }
+
+        long latencyRecord = Duration.between(startRecord, 
Instant.now()).toMillis();
+        LOG.debug("Spent {} ms in recording latency.", latencyRecord);
+        return latencyRecord;
+      });
+    }
+
+    for (Future<Long> fr: executorService.invokeAll(tasks)) {
+      aggregateLatency += fr.get();
+    }
+
+    double averageRecordLatency = aggregateLatency / numTasks;
+    assertThat(averageRecordLatency).describedAs("Average time for recording 
singleton latencies should be bounded")
+      .isBetween(minLatencyWhenDisabledMs, maxLatencyWhenDisabledMs);
+  }
+
+  @Test
+  public void verifyRecordingAggregateLatencyIsCheapWhenEnabled() throws 
Exception {
+    final double maxLatencyWhenDisabledMs = 5000;
+    final double minLatencyWhenDisabledMs = 0;
+    final long numTasks = 1000;
+    long aggregateLatency = 0;
+    AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, 
filesystemName, true);
+    List<Callable<Long>> tasks = new ArrayList<>();
+    final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", 
new ArrayList<>());
+
+    for (int i = 0; i < numTasks; i++) {
+      tasks.add(() -> {
+        Instant startRecord = Instant.now();
+
+        try (AbfsPerfInfo tracker = new AbfsPerfInfo(abfsPerfTracker, 
"oneOperationCaller",
+                "oneOperationCallee")) {
+          tracker.registerResult(httpOperation).registerSuccess(true).
+                  registerAggregates(startRecord, TEST_AGGREGATE_COUNT);
+        }
+
+        long latencyRecord = Duration.between(startRecord, 
Instant.now()).toMillis();
+        LOG.debug("Spent {} ms in recording latency.", latencyRecord);
+        return latencyRecord;
+      });
+    }
+
+    for (Future<Long> fr: executorService.invokeAll(tasks)) {
+      aggregateLatency += fr.get();
+    }
+
+    double averageRecordLatency = aggregateLatency / numTasks;
+    assertThat(averageRecordLatency).describedAs("Average time for recording 
aggregate latencies is bounded")
+      .isBetween(minLatencyWhenDisabledMs, maxLatencyWhenDisabledMs);
+  }
+
+  @Test
+  public void verifyGettingLatencyRecordsIsCheapWhenEnabled() throws Exception 
{
+    final double maxLatencyWhenDisabledMs = 5000;
+    final double minLatencyWhenDisabledMs = 0;
+    final long numTasks = 1000;
+    long aggregateLatency = 0;
+    AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, 
filesystemName, true);
+    List<Callable<Long>> tasks = new ArrayList<>();
+
+    for (int i = 0; i < numTasks; i++) {
+      tasks.add(() -> {
+        Instant startRecord = Instant.now();
+        abfsPerfTracker.getClientLatency();
+        long latencyRecord = Duration.between(startRecord, 
Instant.now()).toMillis();
+        LOG.debug("Spent {} ms in recording latency.", latencyRecord);
+        return latencyRecord;
+      });
+    }
+
+    for (Future<Long> fr: executorService.invokeAll(tasks)) {
+      aggregateLatency += fr.get();
+    }
+
+    double averageRecordLatency = aggregateLatency / numTasks;
+    assertThat(averageRecordLatency).describedAs("Average time for getting 
latency records should be bounded")
+      .isBetween(minLatencyWhenDisabledMs, maxLatencyWhenDisabledMs);
+  }
+
+  @Test
+  public void verifyNoExceptionOnInvalidInput() throws Exception {
+    Instant testInstant = Instant.now();
+    AbfsPerfTracker abfsPerfTrackerDisabled = new AbfsPerfTracker(accountName, 
filesystemName, false);
+    AbfsPerfTracker abfsPerfTrackerEnabled = new AbfsPerfTracker(accountName, 
filesystemName, true);
+    final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", 
new ArrayList<AbfsHttpHeader>());
+
+    verifyNoException(abfsPerfTrackerDisabled);
+    verifyNoException(abfsPerfTrackerEnabled);
+  }
+
+  private void verifyNoException(AbfsPerfTracker abfsPerfTracker) throws 
Exception {
+    Instant testInstant = Instant.now();
+    final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", 
new ArrayList<AbfsHttpHeader>());
+
+    try (
+            AbfsPerfInfo tracker01 = new AbfsPerfInfo(abfsPerfTracker, null, 
null);
+            AbfsPerfInfo tracker02 = new AbfsPerfInfo(abfsPerfTracker, "test", 
null);
+            AbfsPerfInfo tracker03 = new AbfsPerfInfo(abfsPerfTracker, "test", 
"test");
+            AbfsPerfInfo tracker04 = new AbfsPerfInfo(abfsPerfTracker, "test", 
"test");
+
+            AbfsPerfInfo tracker05 = new AbfsPerfInfo(abfsPerfTracker, null, 
null);
+            AbfsPerfInfo tracker06 = new AbfsPerfInfo(abfsPerfTracker, "test", 
null);
+            AbfsPerfInfo tracker07 = new AbfsPerfInfo(abfsPerfTracker, "test", 
"test");
+            AbfsPerfInfo tracker08 = new AbfsPerfInfo(abfsPerfTracker, "test", 
"test");
+            AbfsPerfInfo tracker09 = new AbfsPerfInfo(abfsPerfTracker, "test", 
"test");
+            AbfsPerfInfo tracker10 = new AbfsPerfInfo(abfsPerfTracker, "test", 
"test");
+
+            AbfsPerfInfo tracker11 = new AbfsPerfInfo(abfsPerfTracker, "test", 
"test");
+            AbfsPerfInfo tracker12 = new AbfsPerfInfo(abfsPerfTracker, "test", 
"test");
+            AbfsPerfInfo tracker13 = new AbfsPerfInfo(abfsPerfTracker, "test", 
"test");
+    ) {
+      tracker01.registerResult(null).registerSuccess(false);
+      tracker02.registerResult(null).registerSuccess(false);
+      tracker03.registerResult(null).registerSuccess(false);
+      tracker04.registerResult(httpOperation).registerSuccess(false);
+
+      
tracker05.registerResult(null).registerSuccess(false).registerAggregates(null, 
0);
+      
tracker06.registerResult(null).registerSuccess(false).registerAggregates(null, 
0);
+      
tracker07.registerResult(null).registerSuccess(false).registerAggregates(null, 
0);
+      
tracker08.registerResult(httpOperation).registerSuccess(false).registerAggregates(null,
 0);
+      
tracker09.registerResult(httpOperation).registerSuccess(false).registerAggregates(Instant.now(),
 0);
+      
tracker10.registerResult(httpOperation).registerSuccess(false).registerAggregates(Instant.now(),
 TEST_AGGREGATE_COUNT);
+
+      
tracker11.registerResult(httpOperation).registerSuccess(false).registerAggregates(testInstant,
 TEST_AGGREGATE_COUNT);
+      
tracker12.registerResult(httpOperation).registerSuccess(false).registerAggregates(Instant.MAX,
 TEST_AGGREGATE_COUNT);
+      
tracker13.registerResult(httpOperation).registerSuccess(false).registerAggregates(Instant.MIN,
 TEST_AGGREGATE_COUNT);
+    }
+  }
+}
\ No newline at end of file
diff --git a/hadoop-tools/hadoop-azure/src/test/resources/azure-test.xml 
b/hadoop-tools/hadoop-azure/src/test/resources/azure-test.xml
index 24d444a..d833cfb 100644
--- a/hadoop-tools/hadoop-azure/src/test/resources/azure-test.xml
+++ b/hadoop-tools/hadoop-azure/src/test/resources/azure-test.xml
@@ -33,6 +33,11 @@
     <value>false</value>
   </property>
 
+  <property>
+    <name>fs.azure.abfs.latency.track</name>
+    <value>false</value>
+  </property>
+
   <!--====================  ABFS CONFIGURATION ====================-->
   <!-- SEE relevant section in "site/markdown/testing_azure.md"-->
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to