This is an automated email from the ASF dual-hosted git repository.

anujmodi pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 07fdc1a48fa HADOOP-18325: ABFS: Add correlated metric support for ABFS 
operations (#6314) (#7303)
07fdc1a48fa is described below

commit 07fdc1a48fa94c8f17da49809e9d8ada12b25f0c
Author: Anmol Asrani <anmol.asrani...@gmail.com>
AuthorDate: Tue Jan 21 06:14:58 2025 +0000

    HADOOP-18325: ABFS: Add correlated metric support for ABFS operations 
(#6314) (#7303)
    
    Contributed by Anmol Asrani
---
 .../hadoop/fs/azurebfs/AbfsBackoffMetrics.java     | 312 ++++++++++++
 .../hadoop/fs/azurebfs/AbfsConfiguration.java      |  45 ++
 .../hadoop/fs/azurebfs/AbfsCountersImpl.java       | 102 +++-
 .../hadoop/fs/azurebfs/AzureBlobFileSystem.java    |  15 +-
 .../fs/azurebfs/constants/ConfigurationKeys.java   |   7 +
 .../constants/FileSystemConfigurations.java        |   2 +
 .../constants/HttpHeaderConfigurations.java        |   1 +
 .../contracts/services/AzureServiceErrorCode.java  |  18 +-
 .../hadoop/fs/azurebfs/services/AbfsClient.java    | 194 +++++++-
 .../fs/azurebfs/services/AbfsClientContext.java    |   2 +-
 .../hadoop/fs/azurebfs/services/AbfsCounters.java  |  11 +
 .../fs/azurebfs/services/AbfsInputStream.java      |  18 +-
 .../azurebfs/services/AbfsReadFooterMetrics.java   | 549 +++++++++++++++++++++
 .../fs/azurebfs/services/AbfsRestOperation.java    | 178 ++++++-
 .../fs/azurebfs/services/TimerFunctionality.java   |   4 +-
 .../MetricFormat.java}                             |  20 +-
 .../hadoop/fs/azurebfs/utils/TracingContext.java   |  22 +-
 .../hadoop-azure/src/site/markdown/abfs.md         |  43 ++
 .../azurebfs/ITestAbfsInputStreamStatistics.java   |   1 -
 .../fs/azurebfs/ITestAbfsReadFooterMetrics.java    | 385 +++++++++++++++
 .../ITestAzureBlobFileSystemListStatus.java        |   9 +-
 .../fs/azurebfs/services/AbfsClientTestUtil.java   |   9 +-
 .../fs/azurebfs/services/ITestAbfsClient.java      |   1 -
 .../fs/azurebfs/services/TestAbfsInputStream.java  |   7 +-
 .../azurebfs/services/TestAbfsRestOperation.java   |  82 +++
 .../TestAbfsRestOperationMockFailures.java         |   3 +-
 26 files changed, 1985 insertions(+), 55 deletions(-)

diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsBackoffMetrics.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsBackoffMetrics.java
new file mode 100644
index 00000000000..37dbdfffeed
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsBackoffMetrics.java
@@ -0,0 +1,312 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HUNDRED;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.THOUSAND;
+
+public class AbfsBackoffMetrics {
+
+  private AtomicLong numberOfRequestsSucceeded;
+
+  private AtomicLong minBackoff;
+
+  private AtomicLong maxBackoff;
+
+  private AtomicLong totalRequests;
+
+  private AtomicLong totalBackoff;
+
+  private String retryCount;
+
+  private AtomicLong numberOfIOPSThrottledRequests;
+
+  private AtomicLong numberOfBandwidthThrottledRequests;
+
+  private AtomicLong numberOfOtherThrottledRequests;
+
+  private AtomicLong numberOfNetworkFailedRequests;
+
+  private AtomicLong maxRetryCount;
+
+  private AtomicLong totalNumberOfRequests;
+
+  private AtomicLong numberOfRequestsSucceededWithoutRetrying;
+
+  private AtomicLong numberOfRequestsFailed;
+
+  private final Map<String, AbfsBackoffMetrics> metricsMap
+      = new ConcurrentHashMap<>();
+
+  public AbfsBackoffMetrics() {
+    initializeMap();
+    this.numberOfIOPSThrottledRequests = new AtomicLong();
+    this.numberOfBandwidthThrottledRequests = new AtomicLong();
+    this.numberOfOtherThrottledRequests = new AtomicLong();
+    this.totalNumberOfRequests = new AtomicLong();
+    this.maxRetryCount = new AtomicLong();
+    this.numberOfRequestsSucceededWithoutRetrying = new AtomicLong();
+    this.numberOfRequestsFailed = new AtomicLong();
+    this.numberOfNetworkFailedRequests = new AtomicLong();
+  }
+
+  public AbfsBackoffMetrics(String retryCount) {
+    this.retryCount = retryCount;
+    this.numberOfRequestsSucceeded = new AtomicLong();
+    this.minBackoff = new AtomicLong(Long.MAX_VALUE);
+    this.maxBackoff = new AtomicLong();
+    this.totalRequests = new AtomicLong();
+    this.totalBackoff = new AtomicLong();
+  }
+
+  private void initializeMap() {
+    ArrayList<String> retryCountList = new ArrayList<String>(
+        Arrays.asList("1", "2", "3", "4", "5_15", "15_25", "25AndAbove"));
+    for (String s : retryCountList) {
+      metricsMap.put(s, new AbfsBackoffMetrics(s));
+    }
+  }
+
+  public long getNumberOfRequestsSucceeded() {
+    return this.numberOfRequestsSucceeded.get();
+  }
+
+  public void setNumberOfRequestsSucceeded(long numberOfRequestsSucceeded) {
+    this.numberOfRequestsSucceeded.set(numberOfRequestsSucceeded);
+  }
+
+  public void incrementNumberOfRequestsSucceeded() {
+    this.numberOfRequestsSucceeded.getAndIncrement();
+  }
+
+  public long getMinBackoff() {
+    return this.minBackoff.get();
+  }
+
+  public void setMinBackoff(long minBackoff) {
+    this.minBackoff.set(minBackoff);
+  }
+
+  public long getMaxBackoff() {
+    return this.maxBackoff.get();
+  }
+
+  public void setMaxBackoff(long maxBackoff) {
+    this.maxBackoff.set(maxBackoff);
+  }
+
+  public long getTotalRequests() {
+    return this.totalRequests.get();
+  }
+
+  public void incrementTotalRequests() {
+    this.totalRequests.incrementAndGet();
+  }
+
+  public void setTotalRequests(long totalRequests) {
+    this.totalRequests.set(totalRequests);
+  }
+
+  public long getTotalBackoff() {
+    return this.totalBackoff.get();
+  }
+
+  public void setTotalBackoff(long totalBackoff) {
+    this.totalBackoff.set(totalBackoff);
+  }
+
+  public String getRetryCount() {
+    return this.retryCount;
+  }
+
+  public long getNumberOfIOPSThrottledRequests() {
+    return this.numberOfIOPSThrottledRequests.get();
+  }
+
+  public void setNumberOfIOPSThrottledRequests(long 
numberOfIOPSThrottledRequests) {
+    this.numberOfIOPSThrottledRequests.set(numberOfIOPSThrottledRequests);
+  }
+
+  public void incrementNumberOfIOPSThrottledRequests() {
+    this.numberOfIOPSThrottledRequests.getAndIncrement();
+  }
+
+  public long getNumberOfBandwidthThrottledRequests() {
+    return this.numberOfBandwidthThrottledRequests.get();
+  }
+
+  public void setNumberOfBandwidthThrottledRequests(long 
numberOfBandwidthThrottledRequests) {
+    
this.numberOfBandwidthThrottledRequests.set(numberOfBandwidthThrottledRequests);
+  }
+
+  public void incrementNumberOfBandwidthThrottledRequests() {
+    this.numberOfBandwidthThrottledRequests.getAndIncrement();
+  }
+
+  public long getNumberOfOtherThrottledRequests() {
+    return this.numberOfOtherThrottledRequests.get();
+  }
+
+  public void setNumberOfOtherThrottledRequests(long 
numberOfOtherThrottledRequests) {
+    this.numberOfOtherThrottledRequests.set(numberOfOtherThrottledRequests);
+  }
+
+  public void incrementNumberOfOtherThrottledRequests() {
+    this.numberOfOtherThrottledRequests.getAndIncrement();
+  }
+
+  public long getMaxRetryCount() {
+    return this.maxRetryCount.get();
+  }
+
+  public void setMaxRetryCount(long maxRetryCount) {
+    this.maxRetryCount.set(maxRetryCount);
+  }
+
+  public void incrementMaxRetryCount() {
+    this.maxRetryCount.getAndIncrement();
+  }
+
+  public long getTotalNumberOfRequests() {
+    return this.totalNumberOfRequests.get();
+  }
+
+  public void setTotalNumberOfRequests(long totalNumberOfRequests) {
+    this.totalNumberOfRequests.set(totalNumberOfRequests);
+  }
+
+  public void incrementTotalNumberOfRequests() {
+    this.totalNumberOfRequests.getAndIncrement();
+  }
+
+  public Map<String, AbfsBackoffMetrics> getMetricsMap() {
+    return metricsMap;
+  }
+
+  public long getNumberOfRequestsSucceededWithoutRetrying() {
+    return this.numberOfRequestsSucceededWithoutRetrying.get();
+  }
+
+  public void setNumberOfRequestsSucceededWithoutRetrying(long 
numberOfRequestsSucceededWithoutRetrying) {
+    
this.numberOfRequestsSucceededWithoutRetrying.set(numberOfRequestsSucceededWithoutRetrying);
+  }
+
+  public void incrementNumberOfRequestsSucceededWithoutRetrying() {
+    this.numberOfRequestsSucceededWithoutRetrying.getAndIncrement();
+  }
+
+  public long getNumberOfRequestsFailed() {
+    return this.numberOfRequestsFailed.get();
+  }
+
+  public void setNumberOfRequestsFailed(long numberOfRequestsFailed) {
+    this.numberOfRequestsFailed.set(numberOfRequestsFailed);
+  }
+
+  public void incrementNumberOfRequestsFailed() {
+    this.numberOfRequestsFailed.getAndIncrement();
+  }
+
+  public long getNumberOfNetworkFailedRequests() {
+    return this.numberOfNetworkFailedRequests.get();
+  }
+
+  public void setNumberOfNetworkFailedRequests(long 
numberOfNetworkFailedRequests) {
+    this.numberOfNetworkFailedRequests.set(numberOfNetworkFailedRequests);
+  }
+
+  public void incrementNumberOfNetworkFailedRequests() {
+    this.numberOfNetworkFailedRequests.getAndIncrement();
+  }
+
+  /*
+          Acronyms :-
+          1.RCTSI :- Request count that succeeded in x retries
+          2.MMA :- Min Max Average (This refers to the backoff or sleep time 
between 2 requests)
+          3.s :- seconds
+          4.BWT :- Number of Bandwidth throttled requests
+          5.IT :- Number of IOPS throttled requests
+          6.OT :- Number of Other throttled requests
+          7.NFR :- Number of requests which failed due to network errors
+          8.%RT :- Percentage of requests that are throttled
+          9.TRNR :- Total number of requests which succeeded without retrying
+          10.TRF :- Total number of requests which failed
+          11.TR :- Total number of requests which were made
+          12.MRC :- Max retry count across all requests
+           */
+  @Override
+  public String toString() {
+    StringBuilder metricString = new StringBuilder();
+    long totalRequestsThrottled = getNumberOfBandwidthThrottledRequests()
+        + getNumberOfIOPSThrottledRequests()
+        + getNumberOfOtherThrottledRequests();
+    double percentageOfRequestsThrottled =
+        ((double) totalRequestsThrottled / getTotalNumberOfRequests()) * 
HUNDRED;
+    for (Map.Entry<String, AbfsBackoffMetrics> entry : metricsMap.entrySet()) {
+      metricString.append("$RCTSI$_").append(entry.getKey())
+          .append("R_").append("=")
+          .append(entry.getValue().getNumberOfRequestsSucceeded());
+      long totalRequests = entry.getValue().getTotalRequests();
+      if (totalRequests > 0) {
+        metricString.append("$MMA$_").append(entry.getKey())
+            .append("R_").append("=")
+            .append(String.format("%.3f",
+                (double) entry.getValue().getMinBackoff() / THOUSAND))
+            .append("s")
+            .append(String.format("%.3f",
+                (double) entry.getValue().getMaxBackoff() / THOUSAND))
+            .append("s")
+            .append(String.format("%.3f",
+                ((double) entry.getValue().getTotalBackoff() / totalRequests)
+                    / THOUSAND))
+            .append("s");
+      } else {
+        metricString.append("$MMA$_").append(entry.getKey())
+            .append("R_").append("=0s");
+      }
+    }
+    metricString.append("$BWT=")
+        .append(getNumberOfBandwidthThrottledRequests())
+        .append("$IT=")
+        .append(getNumberOfIOPSThrottledRequests())
+        .append("$OT=")
+        .append(getNumberOfOtherThrottledRequests())
+        .append("$RT=")
+        .append(String.format("%.3f", percentageOfRequestsThrottled))
+        .append("$NFR=")
+        .append(getNumberOfNetworkFailedRequests())
+        .append("$TRNR=")
+        .append(getNumberOfRequestsSucceededWithoutRetrying())
+        .append("$TRF=")
+        .append(getNumberOfRequestsFailed())
+        .append("$TR=")
+        .append(getTotalNumberOfRequests())
+        .append("$MRC=")
+        .append(getMaxRetryCount());
+
+    return metricString + "";
+  }
+}
+
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
index b51391ef86a..08771ed79fd 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
@@ -25,6 +25,7 @@
 import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType;
 import org.apache.hadoop.fs.azurebfs.services.FixedSASTokenProvider;
 import org.apache.hadoop.fs.azurebfs.constants.HttpOperationType;
+import org.apache.hadoop.fs.azurebfs.utils.MetricFormat;
 import org.apache.hadoop.util.Preconditions;
 
 import org.apache.commons.lang3.StringUtils;
@@ -302,6 +303,26 @@ public class AbfsConfiguration{
       DefaultValue = DEFAULT_ENABLE_AUTOTHROTTLING)
   private boolean enableAutoThrottling;
 
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = 
FS_AZURE_METRIC_IDLE_TIMEOUT,
+      DefaultValue = DEFAULT_METRIC_IDLE_TIMEOUT_MS)
+  private int metricIdleTimeout;
+
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = 
FS_AZURE_METRIC_ANALYSIS_TIMEOUT,
+      DefaultValue = DEFAULT_METRIC_ANALYSIS_TIMEOUT_MS)
+  private int metricAnalysisTimeout;
+
+  @StringConfigurationValidatorAnnotation(ConfigurationKey = 
FS_AZURE_METRIC_URI,
+          DefaultValue = EMPTY_STRING)
+  private String metricUri;
+
+  @StringConfigurationValidatorAnnotation(ConfigurationKey = 
FS_AZURE_METRIC_ACCOUNT_NAME,
+          DefaultValue = EMPTY_STRING)
+  private String metricAccount;
+
+  @StringConfigurationValidatorAnnotation(ConfigurationKey = 
FS_AZURE_METRIC_ACCOUNT_KEY,
+          DefaultValue = EMPTY_STRING)
+  private String metricAccountKey;
+
   @IntegerConfigurationValidatorAnnotation(ConfigurationKey = 
FS_AZURE_ACCOUNT_OPERATION_IDLE_TIMEOUT,
       DefaultValue = DEFAULT_ACCOUNT_OPERATION_IDLE_TIMEOUT_MS)
   private int accountOperationIdleTimeout;
@@ -961,6 +982,26 @@ public boolean isAutoThrottlingEnabled() {
     return this.enableAutoThrottling;
   }
 
+  public int getMetricIdleTimeout() {
+    return this.metricIdleTimeout;
+  }
+
+  public int getMetricAnalysisTimeout() {
+    return this.metricAnalysisTimeout;
+  }
+
+  public String getMetricUri() {
+    return metricUri;
+  }
+
+  public String getMetricAccount() {
+    return metricAccount;
+  }
+
+  public String getMetricAccountKey() {
+    return metricAccountKey;
+  }
+
   public int getAccountOperationIdleTimeout() {
     return accountOperationIdleTimeout;
   }
@@ -1015,6 +1056,10 @@ public TracingHeaderFormat getTracingHeaderFormat() {
     return getEnum(FS_AZURE_TRACINGHEADER_FORMAT, 
TracingHeaderFormat.ALL_ID_FORMAT);
   }
 
+  public MetricFormat getMetricFormat() {
+    return getEnum(FS_AZURE_METRIC_FORMAT, MetricFormat.EMPTY);
+  }
+
   public AuthType getAuthType(String accountName) {
     return getEnum(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, 
AuthType.SharedKey);
   }
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsCountersImpl.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsCountersImpl.java
index 67ee8e90efb..c4d3e05cdb2 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsCountersImpl.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsCountersImpl.java
@@ -21,10 +21,12 @@
 import java.net.URI;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hadoop.classification.VisibleForTesting;
-
 import org.apache.hadoop.fs.azurebfs.services.AbfsCounters;
+import org.apache.hadoop.fs.azurebfs.services.AbfsReadFooterMetrics;
+import org.apache.hadoop.fs.azurebfs.utils.MetricFormat;
 import org.apache.hadoop.fs.statistics.DurationTracker;
 import org.apache.hadoop.fs.statistics.IOStatistics;
 import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
@@ -34,8 +36,42 @@
 import org.apache.hadoop.metrics2.lib.MutableCounterLong;
 import org.apache.hadoop.metrics2.lib.MutableMetric;
 
-import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.*;
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.BYTES_RECEIVED;
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.BYTES_SENT;
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CALL_APPEND;
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CALL_CREATE;
+import static 
org.apache.hadoop.fs.azurebfs.AbfsStatistic.CALL_CREATE_NON_RECURSIVE;
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CALL_DELETE;
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CALL_EXIST;
+import static 
org.apache.hadoop.fs.azurebfs.AbfsStatistic.CALL_GET_DELEGATION_TOKEN;
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CALL_GET_FILE_STATUS;
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CALL_LIST_STATUS;
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CALL_MKDIRS;
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CALL_OPEN;
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CALL_RENAME;
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE;
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.DIRECTORIES_CREATED;
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.DIRECTORIES_DELETED;
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.ERROR_IGNORED;
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.FILES_CREATED;
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.FILES_DELETED;
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.GET_RESPONSES;
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.HTTP_DELETE_REQUEST;
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.HTTP_GET_REQUEST;
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.HTTP_HEAD_REQUEST;
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.HTTP_PATCH_REQUEST;
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.HTTP_POST_REQUEST;
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.HTTP_PUT_REQUEST;
+import static 
org.apache.hadoop.fs.azurebfs.AbfsStatistic.METADATA_INCOMPLETE_RENAME_FAILURES;
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.READ_THROTTLES;
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.RENAME_PATH_ATTEMPTS;
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.RENAME_RECOVERY;
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.SEND_REQUESTS;
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.SERVER_UNAVAILABLE;
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.WRITE_THROTTLES;
 import static 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore;
+import static org.apache.hadoop.util.Time.now;
+
 
 /**
  * Instrumentation of Abfs counters.
@@ -63,6 +99,12 @@ public class AbfsCountersImpl implements AbfsCounters {
 
   private final IOStatisticsStore ioStatisticsStore;
 
+  private AbfsBackoffMetrics abfsBackoffMetrics = null;
+
+  private AbfsReadFooterMetrics abfsReadFooterMetrics = null;
+
+  private AtomicLong lastExecutionTime = null;
+
   private static final AbfsStatistic[] STATISTIC_LIST = {
       CALL_CREATE,
       CALL_OPEN,
@@ -91,7 +133,6 @@ public class AbfsCountersImpl implements AbfsCounters {
       RENAME_RECOVERY,
       METADATA_INCOMPLETE_RENAME_FAILURES,
       RENAME_PATH_ATTEMPTS
-
   };
 
   private static final AbfsStatistic[] DURATION_TRACKER_LIST = {
@@ -121,6 +162,25 @@ public AbfsCountersImpl(URI uri) {
       
ioStatisticsStoreBuilder.withDurationTracking(durationStats.getStatName());
     }
     ioStatisticsStore = ioStatisticsStoreBuilder.build();
+    lastExecutionTime = new AtomicLong(now());
+  }
+
+  @Override
+  public void initializeMetrics(MetricFormat metricFormat) {
+    switch (metricFormat) {
+      case INTERNAL_BACKOFF_METRIC_FORMAT:
+        abfsBackoffMetrics = new AbfsBackoffMetrics();
+        break;
+      case INTERNAL_FOOTER_METRIC_FORMAT:
+        abfsReadFooterMetrics = new AbfsReadFooterMetrics();
+        break;
+      case INTERNAL_METRIC_FORMAT:
+        abfsBackoffMetrics = new AbfsBackoffMetrics();
+        abfsReadFooterMetrics = new AbfsReadFooterMetrics();
+        break;
+      default:
+        break;
+    }
   }
 
   /**
@@ -188,6 +248,21 @@ private MetricsRegistry getRegistry() {
     return registry;
   }
 
+  @Override
+  public AbfsBackoffMetrics getAbfsBackoffMetrics() {
+    return abfsBackoffMetrics != null ? abfsBackoffMetrics : null;
+  }
+
+  @Override
+  public AtomicLong getLastExecutionTime() {
+    return lastExecutionTime;
+  }
+
+  @Override
+  public AbfsReadFooterMetrics getAbfsReadFooterMetrics() {
+    return abfsReadFooterMetrics != null ? abfsReadFooterMetrics : null;
+  }
+
   /**
    * {@inheritDoc}
    *
@@ -244,4 +319,25 @@ public IOStatistics getIOStatistics() {
   public DurationTracker trackDuration(String key) {
     return ioStatisticsStore.trackDuration(key);
   }
+
+  @Override
+  public String toString() {
+    String metric = "";
+    if (abfsBackoffMetrics != null) {
+      long totalNoRequests = 
getAbfsBackoffMetrics().getTotalNumberOfRequests();
+      if (totalNoRequests > 0) {
+        metric += "#BO:" + getAbfsBackoffMetrics().toString();
+      }
+    }
+    if (abfsReadFooterMetrics != null) {
+      Map<String, AbfsReadFooterMetrics> metricsMap = 
getAbfsReadFooterMetrics().getMetricsMap();
+      if (metricsMap != null && !(metricsMap.isEmpty())) {
+        String readFooterMetric = getAbfsReadFooterMetrics().toString();
+        if (!readFooterMetric.equals("")) {
+          metric += "#FO:" + getAbfsReadFooterMetrics().toString();
+        }
+      }
+    }
+    return metric;
+  }
 }
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
index c56ddbd6dde..d243d95b8a9 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
@@ -41,7 +41,6 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-
 import javax.annotation.Nullable;
 
 import org.apache.commons.lang3.StringUtils;
@@ -52,7 +51,6 @@
 import org.apache.hadoop.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -755,6 +753,18 @@ public synchronized void close() throws IOException {
     if (isClosed) {
       return;
     }
+    if (abfsStore.getClient().isMetricCollectionEnabled()) {
+      TracingContext tracingMetricContext = new TracingContext(
+              clientCorrelationId,
+              fileSystemId, FSOperationType.GET_ATTR, true,
+              tracingHeaderFormat,
+              listener, abfsCounters.toString());
+      try {
+        getAbfsClient().getMetricCall(tracingMetricContext);
+      } catch (IOException e) {
+        throw new IOException(e);
+      }
+    }
     // does all the delete-on-exit calls, and may be slow.
     super.close();
     LOG.debug("AzureBlobFileSystem.close");
@@ -1768,3 +1778,4 @@ public IOStatistics getIOStatistics() {
     return abfsCounters != null ? abfsCounters.getIOStatistics() : null;
   }
 }
+
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
index c022778ffc4..620182f5bd2 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
@@ -67,6 +67,10 @@ public final class ConfigurationKeys {
    */
   public static final String FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED = 
"fs.azure.account.expect.header.enabled";
   public static final String FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME = 
"fs.azure.account.key";
+  public static final String FS_AZURE_METRIC_ACCOUNT_NAME = 
"fs.azure.metric.account.name";
+  public static final String FS_AZURE_METRIC_ACCOUNT_KEY = 
"fs.azure.metric.account.key";
+  public static final String FS_AZURE_METRIC_URI = "fs.azure.metric.uri";
+
   public static final String FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME_REGX = 
"fs\\.azure\\.account\\.key\\.(.*)";
   public static final String FS_AZURE_SECURE_MODE = "fs.azure.secure.mode";
   public static final String FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED = 
"fs.azure.account.throttling.enabled";
@@ -176,6 +180,8 @@ public final class ConfigurationKeys {
   public static final String 
AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION = 
"fs.azure.createRemoteFileSystemDuringInitialization";
   public static final String 
AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION = 
"fs.azure.skipUserGroupMetadataDuringInitialization";
   public static final String FS_AZURE_ENABLE_AUTOTHROTTLING = 
"fs.azure.enable.autothrottling";
+  public static final String FS_AZURE_METRIC_IDLE_TIMEOUT = 
"fs.azure.metric.idle.timeout";
+  public static final String FS_AZURE_METRIC_ANALYSIS_TIMEOUT = 
"fs.azure.metric.analysis.timeout";
   public static final String FS_AZURE_ACCOUNT_OPERATION_IDLE_TIMEOUT = 
"fs.azure.account.operation.idle.timeout";
   public static final String FS_AZURE_ANALYSIS_PERIOD = 
"fs.azure.analysis.period";
   public static final String FS_AZURE_ALWAYS_USE_HTTPS = 
"fs.azure.always.use.https";
@@ -216,6 +222,7 @@ public final class ConfigurationKeys {
    * character constraints are not satisfied. **/
   public static final String FS_AZURE_CLIENT_CORRELATIONID = 
"fs.azure.client.correlationid";
   public static final String FS_AZURE_TRACINGHEADER_FORMAT = 
"fs.azure.tracingheader.format";
+  public static final String FS_AZURE_METRIC_FORMAT = "fs.azure.metric.format";
   public static final String FS_AZURE_CLUSTER_NAME = "fs.azure.cluster.name";
   public static final String FS_AZURE_CLUSTER_TYPE = "fs.azure.cluster.type";
   public static final String FS_AZURE_SSL_CHANNEL_MODE_KEY = 
"fs.azure.ssl.channel.mode";
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
index 16c7d18890f..6ada0e8bcca 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
@@ -109,6 +109,8 @@ public final class FileSystemConfigurations {
   public static final boolean DEFAULT_ENABLE_FLUSH = true;
   public static final boolean DEFAULT_DISABLE_OUTPUTSTREAM_FLUSH = true;
   public static final boolean DEFAULT_ENABLE_AUTOTHROTTLING = false;
+  public static final int DEFAULT_METRIC_IDLE_TIMEOUT_MS = 60_000;
+  public static final int DEFAULT_METRIC_ANALYSIS_TIMEOUT_MS = 60_000;
   public static final boolean 
DEFAULT_FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED = true;
   public static final int DEFAULT_ACCOUNT_OPERATION_IDLE_TIMEOUT_MS = 60_000;
   public static final int DEFAULT_ANALYSIS_PERIOD_MS = 10_000;
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java
index c5f4c2a855e..53020750ab3 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java
@@ -43,6 +43,7 @@ public final class HttpHeaderConfigurations {
   public static final String USER_AGENT = "User-Agent";
   public static final String X_HTTP_METHOD_OVERRIDE = "X-HTTP-Method-Override";
   public static final String X_MS_CLIENT_REQUEST_ID = "x-ms-client-request-id";
+  public static final String X_MS_FECLIENT_METRICS = "x-ms-feclient-metrics";
   public static final String X_MS_EXISTING_RESOURCE_TYPE = 
"x-ms-existing-resource-type";
   public static final String X_MS_DATE = "x-ms-date";
   public static final String X_MS_REQUEST_ID = "x-ms-request-id";
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java
index 539dd8f87f3..db1560d5414 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java
@@ -21,7 +21,8 @@
 import java.net.HttpURLConnection;
 import java.util.ArrayList;
 import java.util.List;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 
@@ -62,6 +63,9 @@ public enum AzureServiceErrorCode {
   private final String errorCode;
   private final int httpStatusCode;
   private final String errorMessage;
+
+  private static final Logger LOG1 = 
LoggerFactory.getLogger(AzureServiceErrorCode.class);
+
   AzureServiceErrorCode(String errorCode, int httpStatusCodes, String 
errorMessage) {
     this.errorCode = errorCode;
     this.httpStatusCode = httpStatusCodes;
@@ -107,7 +111,6 @@ public static AzureServiceErrorCode getAzureServiceCode(int 
httpStatusCode, Stri
         return azureServiceErrorCode;
       }
     }
-
     return UNKNOWN;
   }
 
@@ -115,16 +118,15 @@ public static AzureServiceErrorCode 
getAzureServiceCode(int httpStatusCode, Stri
     if (errorCode == null || errorCode.isEmpty() || httpStatusCode == 
UNKNOWN.httpStatusCode || errorMessage == null || errorMessage.isEmpty()) {
       return UNKNOWN;
     }
-
+    String[] errorMessages = errorMessage.split(System.lineSeparator(), 2);
     for (AzureServiceErrorCode azureServiceErrorCode : 
AzureServiceErrorCode.values()) {
-      if (azureServiceErrorCode.httpStatusCode == httpStatusCode
-          && errorCode.equalsIgnoreCase(azureServiceErrorCode.errorCode)
-          && errorMessage.equalsIgnoreCase(azureServiceErrorCode.errorMessage)
-      ) {
+      if (azureServiceErrorCode.getStatusCode() == httpStatusCode
+          && azureServiceErrorCode.getErrorCode().equalsIgnoreCase(errorCode)
+          && azureServiceErrorCode.getErrorMessage()
+              .equalsIgnoreCase(errorMessages[0])) {
         return azureServiceErrorCode;
       }
     }
-
     return UNKNOWN;
   }
 }
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
index 9ff7165dd85..0eb23bd19ed 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
@@ -23,9 +23,11 @@
 import java.io.InputStream;
 import java.io.UnsupportedEncodingException;
 import java.net.HttpURLConnection;
+import java.net.InetAddress;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.net.URLEncoder;
+import java.net.UnknownHostException;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
@@ -33,13 +35,17 @@
 import java.util.Hashtable;
 import java.util.List;
 import java.util.Locale;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
 import org.apache.hadoop.fs.azurebfs.constants.HttpOperationType;
+import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
 import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsInvalidChecksumException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsDriverException;
 import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException;
@@ -47,6 +53,7 @@
 import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
 import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema;
 import 
org.apache.hadoop.fs.azurebfs.contracts.services.StorageErrorResponseSchema;
+import org.apache.hadoop.fs.azurebfs.utils.MetricFormat;
 import org.apache.hadoop.fs.store.LogExactlyOnce;
 import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.Permissions;
 import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider;
@@ -88,10 +95,12 @@
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CLIENT_VERSION;
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DEFAULT_TIMEOUT;
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FILESYSTEM;
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FORWARD_SLASH;
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FORWARD_SLASH_ENCODE;
-import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_DELETE;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_HEAD;
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_DELETE;
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE;
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.JAVA_VENDOR;
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.JAVA_VERSION;
@@ -117,6 +126,7 @@
 import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_ENCRYPTION_KEY;
 import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_ENCRYPTION_KEY_SHA256;
 import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_VERSION;
+import static 
org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_RESOURCE;
 import static 
org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_TIMEOUT;
 import static 
org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_TIMEOUT_ABBREVIATION;
 
@@ -144,6 +154,13 @@ public abstract class AbfsClient implements Closeable {
   private AccessTokenProvider tokenProvider;
   private SASTokenProvider sasTokenProvider;
   private final AbfsCounters abfsCounters;
+  private final Timer timer;
+  private final String abfsMetricUrl;
+  private boolean isMetricCollectionEnabled = false;
+  private final MetricFormat metricFormat;
+  private final AtomicBoolean isMetricCollectionStopped;
+  private final int metricAnalysisPeriod;
+  private final int metricIdlePeriod;
   private EncryptionContextProvider encryptionContextProvider = null;
   private EncryptionType encryptionType = EncryptionType.NONE;
   private final AbfsThrottlingIntercept intercept;
@@ -151,6 +168,9 @@ public abstract class AbfsClient implements Closeable {
   private final ListeningScheduledExecutorService executorService;
 
   private boolean renameResilience;
+  private TimerTask runningTimerTask;
+  private boolean isSendMetricCall;
+  private SharedKeyCredentials metricSharedkeyCredentials = null;
 
   private KeepAliveCache keepAliveCache;
 
@@ -222,6 +242,35 @@ private AbfsClient(final URL baseUrl,
         new ThreadFactoryBuilder().setNameFormat("AbfsClient Lease 
Ops").setDaemon(true).build();
     this.executorService = MoreExecutors.listeningDecorator(
         
HadoopExecutors.newScheduledThreadPool(this.abfsConfiguration.getNumLeaseThreads(),
 tf));
+    this.metricFormat = abfsConfiguration.getMetricFormat();
+    this.isMetricCollectionStopped = new AtomicBoolean(false);
+    this.metricAnalysisPeriod = abfsConfiguration.getMetricAnalysisTimeout();
+    this.metricIdlePeriod = abfsConfiguration.getMetricIdleTimeout();
+    if (!metricFormat.toString().equals("")) {
+      isMetricCollectionEnabled = true;
+      abfsCounters.initializeMetrics(metricFormat);
+      String metricAccountName = abfsConfiguration.getMetricAccount();
+      int dotIndex = metricAccountName.indexOf(AbfsHttpConstants.DOT);
+      if (dotIndex <= 0) {
+        throw new InvalidUriException(
+                metricAccountName + " - account name is not fully qualified.");
+      }
+      String metricAccountKey = abfsConfiguration.getMetricAccountKey();
+      try {
+        metricSharedkeyCredentials = new 
SharedKeyCredentials(metricAccountName.substring(0, dotIndex),
+                metricAccountKey);
+      } catch (IllegalArgumentException e) {
+        throw new IOException("Exception while initializing metric credentials 
" + e);
+      }
+    }
+    this.timer = new Timer(
+        "abfs-timer-client", true);
+    if (isMetricCollectionEnabled) {
+      timer.schedule(new TimerTaskImpl(),
+          metricIdlePeriod,
+          metricIdlePeriod);
+    }
+    this.abfsMetricUrl = abfsConfiguration.getMetricUri();
   }
 
   public AbfsClient(final URL baseUrl, final SharedKeyCredentials 
sharedKeyCredentials,
@@ -254,6 +303,10 @@ public void close() throws IOException {
     if (abfsApacheHttpClient != null) {
       abfsApacheHttpClient.close();
     }
+    if (runningTimerTask != null) {
+      runningTimerTask.cancel();
+      timer.purge();
+    }
     if (tokenProvider instanceof Closeable) {
       IOUtils.cleanupWithLogger(LOG,
           (Closeable) tokenProvider);
@@ -293,6 +346,10 @@ SharedKeyCredentials getSharedKeyCredentials() {
     return sharedKeyCredentials;
   }
 
+  SharedKeyCredentials getMetricSharedkeyCredentials() {
+    return metricSharedkeyCredentials;
+  }
+
   public void setEncryptionType(EncryptionType encryptionType) {
     this.encryptionType = encryptionType;
   }
@@ -1036,7 +1093,7 @@ protected URL createRequestUrl(final String query) throws 
AzureBlobFileSystemExc
    */
   @VisibleForTesting
   protected URL createRequestUrl(final String path, final String query)
-          throws AzureBlobFileSystemException {
+      throws AzureBlobFileSystemException {
     return createRequestUrl(baseUrl, path, query);
   }
 
@@ -1050,7 +1107,7 @@ protected URL createRequestUrl(final String path, final 
String query)
    */
   @VisibleForTesting
   protected URL createRequestUrl(final URL baseUrl, final String path, final 
String query)
-          throws AzureBlobFileSystemException {
+      throws AzureBlobFileSystemException {
     String encodedPath = path;
     try {
       encodedPath = urlEncode(path);
@@ -1060,7 +1117,10 @@ protected URL createRequestUrl(final URL baseUrl, final 
String path, final Strin
     }
 
     final StringBuilder sb = new StringBuilder();
-    sb.append(baseUrl);
+    if (baseUrl == null) {
+      throw new InvalidUriException("URL provided is null");
+    }
+    sb.append(baseUrl.toString());
     sb.append(encodedPath);
     sb.append(query);
 
@@ -1068,7 +1128,7 @@ protected URL createRequestUrl(final URL baseUrl, final 
String path, final Strin
     try {
       url = new URL(sb.toString());
     } catch (MalformedURLException ex) {
-      throw new InvalidUriException(sb.toString());
+      throw new InvalidUriException("URL is malformed" + sb.toString());
     }
     return url;
   }
@@ -1294,7 +1354,7 @@ void 
setEncryptionContextProvider(EncryptionContextProvider provider) {
    * Getter for abfsCounters from AbfsClient.
    * @return AbfsCounters instance.
    */
-  protected AbfsCounters getAbfsCounters() {
+  public AbfsCounters getAbfsCounters() {
     return abfsCounters;
   }
 
@@ -1332,6 +1392,128 @@ protected AccessTokenProvider getTokenProvider() {
     return tokenProvider;
   }
 
+  /**
+   * Retrieves a TracingContext object configured for metric tracking.
+   * This method creates a TracingContext object with the validated client 
correlation ID,
+   * the host name of the local machine (or "UnknownHost" if unable to 
determine),
+   * the file system operation type set to GET_ATTR, and additional 
configuration parameters
+   * for metric tracking.
+   * The TracingContext is intended for use in tracking metrics related to 
Azure Blob FileSystem (ABFS) operations.
+   *
+   * @return A TracingContext object configured for metric tracking.
+   */
+  private TracingContext getMetricTracingContext() {
+    String hostName;
+    try {
+      hostName = InetAddress.getLocalHost().getHostName();
+    } catch (UnknownHostException e) {
+      hostName = "UnknownHost";
+    }
+    return new TracingContext(TracingContext.validateClientCorrelationID(
+        abfsConfiguration.getClientCorrelationId()),
+        hostName, FSOperationType.GET_ATTR, true,
+        abfsConfiguration.getTracingHeaderFormat(),
+        null, abfsCounters.toString());
+  }
+
+  /**
+   * Synchronized method to suspend or resume timer.
+   * @param timerFunctionality resume or suspend.
+   * @param timerTask The timertask object.
+   * @return true or false.
+   */
+  boolean timerOrchestrator(TimerFunctionality timerFunctionality, TimerTask 
timerTask) {
+    switch (timerFunctionality) {
+      case RESUME:
+        if (isMetricCollectionStopped.get()) {
+          synchronized (this) {
+            if (isMetricCollectionStopped.get()) {
+              resumeTimer();
+            }
+          }
+        }
+        break;
+      case SUSPEND:
+        long now = System.currentTimeMillis();
+        long lastExecutionTime = abfsCounters.getLastExecutionTime().get();
+        if (isMetricCollectionEnabled && (now - lastExecutionTime >= 
metricAnalysisPeriod)) {
+          synchronized (this) {
+            if (!isMetricCollectionStopped.get()) {
+              timerTask.cancel();
+              timer.purge();
+              isMetricCollectionStopped.set(true);
+              return true;
+            }
+          }
+        }
+        break;
+      default:
+        break;
+    }
+    return false;
+  }
+
+  private void resumeTimer() {
+    isMetricCollectionStopped.set(false);
+    timer.schedule(new TimerTaskImpl(),
+        metricIdlePeriod,
+        metricIdlePeriod);
+  }
+
+  /**
+   * Initiates a metric call to the Azure Blob FileSystem (ABFS) for 
retrieving file system properties.
+   * This method performs a HEAD request to the specified metric URL, using 
default headers and query parameters.
+   *
+   * @param tracingContext The tracing context to be used for capturing 
tracing information.
+   * @throws IOException throws IOException.
+   */
+  public void getMetricCall(TracingContext tracingContext) throws IOException {
+    this.isSendMetricCall = true;
+    final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+    final AbfsUriQueryBuilder abfsUriQueryBuilder = 
createDefaultUriQueryBuilder();
+    abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM);
+
+    final URL url = createRequestUrl(new URL(abfsMetricUrl), EMPTY_STRING, 
abfsUriQueryBuilder.toString());
+
+    final AbfsRestOperation op = getAbfsRestOperation(
+            AbfsRestOperationType.GetFileSystemProperties,
+            HTTP_METHOD_HEAD,
+            url,
+            requestHeaders);
+    try {
+      op.execute(tracingContext);
+    } finally {
+      this.isSendMetricCall = false;
+    }
+  }
+
+  public boolean isSendMetricCall() {
+    return isSendMetricCall;
+  }
+
+  public boolean isMetricCollectionEnabled() {
+    return isMetricCollectionEnabled;
+  }
+
+  class TimerTaskImpl extends TimerTask {
+    TimerTaskImpl() {
+      runningTimerTask = this;
+    }
+    @Override
+    public void run() {
+      try {
+        if (timerOrchestrator(TimerFunctionality.SUSPEND, this)) {
+            try {
+              getMetricCall(getMetricTracingContext());
+            } finally {
+              abfsCounters.initializeMetrics(metricFormat);
+            }
+        }
+      } catch (IOException e) {
+      }
+    }
+  }
+
   /**
    * Creates an AbfsRestOperation with additional parameters for buffer and 
SAS token.
    *
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientContext.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientContext.java
index 0a5182a6991..baf79e7dd8b 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientContext.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientContext.java
@@ -53,7 +53,7 @@ public AbfsPerfTracker getAbfsPerfTracker() {
     return abfsPerfTracker;
   }
 
-  public AbfsCounters getAbfsCounters() {
+  AbfsCounters getAbfsCounters() {
     return abfsCounters;
   }
 }
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsCounters.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsCounters.java
index d01a3598afc..65e5fa29a13 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsCounters.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsCounters.java
@@ -19,12 +19,15 @@
 package org.apache.hadoop.fs.azurebfs.services;
 
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hadoop.classification.VisibleForTesting;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.azurebfs.AbfsBackoffMetrics;
 import org.apache.hadoop.fs.azurebfs.AbfsStatistic;
+import org.apache.hadoop.fs.azurebfs.utils.MetricFormat;
 import org.apache.hadoop.fs.statistics.DurationTracker;
 import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
 import org.apache.hadoop.fs.statistics.IOStatisticsSource;
@@ -74,4 +77,12 @@ String formString(String prefix, String separator, String 
suffix,
    */
   @Override
   DurationTracker trackDuration(String key);
+
+  void initializeMetrics(MetricFormat metricFormat);
+
+  AbfsBackoffMetrics getAbfsBackoffMetrics();
+
+  AbfsReadFooterMetrics getAbfsReadFooterMetrics();
+
+  AtomicLong getLastExecutionTime();
 }
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
index 92d19fdf743..12bc471b50d 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
@@ -113,15 +113,15 @@ public class AbfsInputStream extends FSInputStream 
implements CanUnbuffer,
   private int bCursorBkp;
   private long fCursorBkp;
   private long fCursorAfterLastReadBkp;
-
+  private final AbfsReadFooterMetrics abfsReadFooterMetrics;
   /** Stream statistics. */
   private final AbfsInputStreamStatistics streamStatistics;
   private long bytesFromReadAhead; // bytes read from readAhead; for testing
   private long bytesFromRemoteRead; // bytes read remotely; for testing
   private Listener listener;
-
   private final AbfsInputStreamContext context;
   private IOStatistics ioStatistics;
+  private String filePathIdentifier;
   /**
    * This is the actual position within the object, used by
    * lazy seek to decide whether to seek on the next read or not.
@@ -144,9 +144,6 @@ public AbfsInputStream(
     this.path = path;
     this.contentLength = contentLength;
     this.bufferSize = abfsInputStreamContext.getReadBufferSize();
-    /*
-    * FooterReadSize should not be more than bufferSize.
-    */
     this.footerReadSize = Math.min(bufferSize, 
abfsInputStreamContext.getFooterReadBufferSize());
     this.readAheadQueueDepth = abfsInputStreamContext.getReadAheadQueueDepth();
     this.tolerateOobAppends = abfsInputStreamContext.isTolerateOobAppends();
@@ -160,12 +157,19 @@ public AbfsInputStream(
     this.cachedSasToken = new CachedSASToken(
         abfsInputStreamContext.getSasTokenRenewPeriodForStreamsInSeconds());
     this.streamStatistics = abfsInputStreamContext.getStreamStatistics();
+    this.abfsReadFooterMetrics = 
client.getAbfsCounters().getAbfsReadFooterMetrics();
     this.inputStreamId = createInputStreamId();
     this.tracingContext = new TracingContext(tracingContext);
     this.tracingContext.setOperation(FSOperationType.READ);
     this.tracingContext.setStreamID(inputStreamId);
     this.context = abfsInputStreamContext;
     readAheadBlockSize = abfsInputStreamContext.getReadAheadBlockSize();
+    if (abfsReadFooterMetrics != null) {
+      this.filePathIdentifier = eTag + path;
+      synchronized (this) {
+        abfsReadFooterMetrics.updateMap(filePathIdentifier);
+      }
+    }
     this.fsBackRef = abfsInputStreamContext.getFsBackRef();
     contextEncryptionAdapter = abfsInputStreamContext.getEncryptionAdapter();
 
@@ -256,6 +260,9 @@ public synchronized int read(final byte[] b, final int off, 
final int len) throw
       // go back and read from buffer is fCursor - limit.
       // There maybe case that we read less than requested data.
       long filePosAtStartOfBuffer = fCursor - limit;
+      if (abfsReadFooterMetrics != null) {
+        abfsReadFooterMetrics.checkMetricUpdate(filePathIdentifier, len, 
contentLength, nextReadPos);
+      }
       if (nextReadPos >= filePosAtStartOfBuffer && nextReadPos <= fCursor) {
         // Determining position in buffer from where data is to be read.
         bCursor = (int) (nextReadPos - filePosAtStartOfBuffer);
@@ -342,7 +349,6 @@ private int readOneBlock(final byte[] b, final int off, 
final int len) throws IO
       if (firstRead) {
         firstRead = false;
       }
-
       if (bytesRead == -1) {
         return -1;
       }
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsReadFooterMetrics.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsReadFooterMetrics.java
new file mode 100644
index 00000000000..5abb97cd9ce
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsReadFooterMetrics.java
@@ -0,0 +1,549 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.StringJoiner;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
+
+public class AbfsReadFooterMetrics {
+  private final AtomicBoolean isParquetFile;
+  private final AtomicBoolean isParquetEvaluated;
+  private final AtomicBoolean isLenUpdated;
+  private String sizeReadByFirstRead;
+  private String offsetDiffBetweenFirstAndSecondRead;
+  private final AtomicLong fileLength;
+  private double avgFileLength;
+  private double avgReadLenRequested;
+  private final AtomicBoolean collectMetrics;
+  private final AtomicBoolean collectMetricsForNextRead;
+  private final AtomicBoolean collectLenMetrics;
+  private final AtomicLong dataLenRequested;
+  private final AtomicLong offsetOfFirstRead;
+  private final AtomicInteger readCount;
+  private final ConcurrentSkipListMap<String, AbfsReadFooterMetrics> 
metricsMap;
+  private static final String FOOTER_LENGTH = "20";
+
+  public AbfsReadFooterMetrics() {
+    this.isParquetFile = new AtomicBoolean(false);
+    this.isParquetEvaluated = new AtomicBoolean(false);
+    this.isLenUpdated = new AtomicBoolean(false);
+    this.fileLength = new AtomicLong();
+    this.readCount = new AtomicInteger(0);
+    this.offsetOfFirstRead = new AtomicLong();
+    this.collectMetrics = new AtomicBoolean(false);
+    this.collectMetricsForNextRead = new AtomicBoolean(false);
+    this.collectLenMetrics = new AtomicBoolean(false);
+    this.dataLenRequested = new AtomicLong(0);
+    this.metricsMap = new ConcurrentSkipListMap<>();
+  }
+
+  public Map<String, AbfsReadFooterMetrics> getMetricsMap() {
+    return metricsMap;
+  }
+
+  private boolean getIsParquetFile() {
+    return isParquetFile.get();
+  }
+
+  public void setIsParquetFile(boolean isParquetFile) {
+    this.isParquetFile.set(isParquetFile);
+  }
+
+  private String getSizeReadByFirstRead() {
+    return sizeReadByFirstRead;
+  }
+
+  public void setSizeReadByFirstRead(final String sizeReadByFirstRead) {
+    this.sizeReadByFirstRead = sizeReadByFirstRead;
+  }
+
+  private String getOffsetDiffBetweenFirstAndSecondRead() {
+    return offsetDiffBetweenFirstAndSecondRead;
+  }
+
+  public void setOffsetDiffBetweenFirstAndSecondRead(final String 
offsetDiffBetweenFirstAndSecondRead) {
+    this.offsetDiffBetweenFirstAndSecondRead
+        = offsetDiffBetweenFirstAndSecondRead;
+  }
+
+  private long getFileLength() {
+    return fileLength.get();
+  }
+
+  private void setFileLength(long fileLength) {
+    this.fileLength.set(fileLength);
+  }
+
+  private double getAvgFileLength() {
+    return avgFileLength;
+  }
+
+  public void setAvgFileLength(final double avgFileLength) {
+    this.avgFileLength = avgFileLength;
+  }
+
+  private double getAvgReadLenRequested() {
+    return avgReadLenRequested;
+  }
+
+  public void setAvgReadLenRequested(final double avgReadLenRequested) {
+    this.avgReadLenRequested = avgReadLenRequested;
+  }
+
+  private boolean getCollectMetricsForNextRead() {
+    return collectMetricsForNextRead.get();
+  }
+
+  private void setCollectMetricsForNextRead(boolean collectMetricsForNextRead) 
{
+    this.collectMetricsForNextRead.set(collectMetricsForNextRead);
+  }
+
+  private long getOffsetOfFirstRead() {
+    return offsetOfFirstRead.get();
+  }
+
+  private void setOffsetOfFirstRead(long offsetOfFirstRead) {
+    this.offsetOfFirstRead.set(offsetOfFirstRead);
+  }
+
+  private int getReadCount() {
+    return readCount.get();
+  }
+
+  private void setReadCount(int readCount) {
+    this.readCount.set(readCount);
+  }
+
+  private int incrementReadCount() {
+    this.readCount.incrementAndGet();
+    return getReadCount();
+  }
+
+  private boolean getCollectLenMetrics() {
+    return collectLenMetrics.get();
+  }
+
+  private void setCollectLenMetrics(boolean collectLenMetrics) {
+    this.collectLenMetrics.set(collectLenMetrics);
+
+  }
+
+  private long getDataLenRequested() {
+    return dataLenRequested.get();
+  }
+
+  private void setDataLenRequested(long dataLenRequested) {
+    this.dataLenRequested.set(dataLenRequested);
+  }
+
+  private void updateDataLenRequested(long dataLenRequested){
+    this.dataLenRequested.addAndGet(dataLenRequested);
+  }
+
+  private boolean getCollectMetrics() {
+    return collectMetrics.get();
+  }
+
+  private void setCollectMetrics(boolean collectMetrics) {
+    this.collectMetrics.set(collectMetrics);
+  }
+
+  private boolean getIsParquetEvaluated() {
+    return isParquetEvaluated.get();
+  }
+
+  private void setIsParquetEvaluated(boolean isParquetEvaluated) {
+    this.isParquetEvaluated.set(isParquetEvaluated);
+  }
+
+  private boolean getIsLenUpdated() {
+    return isLenUpdated.get();
+  }
+
+  private void setIsLenUpdated(boolean isLenUpdated) {
+    this.isLenUpdated.set(isLenUpdated);
+  }
+
+  /**
+   * Updates the metrics map with an entry for the specified file if it 
doesn't already exist.
+   *
+   * @param filePathIdentifier The unique identifier for the file.
+   */
+  public void updateMap(String filePathIdentifier) {
+    // If the file is not already in the metrics map, add it with a new 
AbfsReadFooterMetrics object.
+    metricsMap.computeIfAbsent(filePathIdentifier, key -> new 
AbfsReadFooterMetrics());
+  }
+
+  /**
+   * Checks and updates metrics for a specific file identified by 
filePathIdentifier.
+   * If the metrics do not exist for the file, they are initialized.
+   *
+   * @param filePathIdentifier The unique identifier for the file.
+   * @param len               The length of the read operation.
+   * @param contentLength     The total content length of the file.
+   * @param nextReadPos       The position of the next read operation.
+   */
+  public void checkMetricUpdate(final String filePathIdentifier, final int 
len, final long contentLength,
+      final long nextReadPos) {
+    AbfsReadFooterMetrics readFooterMetrics = metricsMap.computeIfAbsent(
+            filePathIdentifier, key -> new AbfsReadFooterMetrics());
+    if (readFooterMetrics.getReadCount() == 0
+        || (readFooterMetrics.getReadCount() >= 1
+        && readFooterMetrics.getCollectMetrics())) {
+      updateMetrics(filePathIdentifier, len, contentLength, nextReadPos);
+    }
+  }
+
+  /**
+   * Updates metrics for a specific file identified by filePathIdentifier.
+   *
+   * @param filePathIdentifier  The unique identifier for the file.
+   * @param len                The length of the read operation.
+   * @param contentLength      The total content length of the file.
+   * @param nextReadPos        The position of the next read operation.
+   */
+  private void updateMetrics(final String filePathIdentifier, final int len, 
final long contentLength,
+                             final long nextReadPos) {
+    AbfsReadFooterMetrics readFooterMetrics = 
metricsMap.get(filePathIdentifier);
+
+    // Create a new AbfsReadFooterMetrics object if it doesn't exist in the 
metricsMap.
+    if (readFooterMetrics == null) {
+      readFooterMetrics = new AbfsReadFooterMetrics();
+      metricsMap.put(filePathIdentifier, readFooterMetrics);
+    }
+
+    int readCount;
+    synchronized (this) {
+      readCount = readFooterMetrics.incrementReadCount();
+    }
+
+    if (readCount == 1) {
+      // Update metrics for the first read.
+      updateMetricsOnFirstRead(readFooterMetrics, nextReadPos, len, 
contentLength);
+    }
+
+    synchronized (this) {
+      if (readFooterMetrics.getCollectLenMetrics()) {
+        readFooterMetrics.updateDataLenRequested(len);
+      }
+    }
+
+    if (readCount == 2) {
+      // Update metrics for the second read.
+      updateMetricsOnSecondRead(readFooterMetrics, nextReadPos, len);
+    }
+  }
+
+  /**
+   * Updates metrics for the first read operation.
+   *
+   * @param readFooterMetrics The metrics object to update.
+   * @param nextReadPos       The position of the next read operation.
+   * @param len               The length of the read operation.
+   * @param contentLength     The total content length of the file.
+   */
+  private void updateMetricsOnFirstRead(AbfsReadFooterMetrics 
readFooterMetrics, long nextReadPos, int len, long contentLength) {
+    if (nextReadPos >= contentLength - (long) Integer.parseInt(FOOTER_LENGTH) 
* ONE_KB) {
+      readFooterMetrics.setCollectMetrics(true);
+      readFooterMetrics.setCollectMetricsForNextRead(true);
+      readFooterMetrics.setOffsetOfFirstRead(nextReadPos);
+      readFooterMetrics.setSizeReadByFirstRead(len + "_" + 
Math.abs(contentLength - nextReadPos));
+      readFooterMetrics.setFileLength(contentLength);
+    }
+  }
+
+  /**
+   * Updates metrics for the second read operation.
+   *
+   * @param readFooterMetrics The metrics object to update.
+   * @param nextReadPos       The position of the next read operation.
+   * @param len               The length of the read operation.
+   */
+  private void updateMetricsOnSecondRead(AbfsReadFooterMetrics 
readFooterMetrics, long nextReadPos, int len) {
+    if (readFooterMetrics.getCollectMetricsForNextRead()) {
+      long offsetDiff = Math.abs(nextReadPos - 
readFooterMetrics.getOffsetOfFirstRead());
+      readFooterMetrics.setOffsetDiffBetweenFirstAndSecondRead(len + "_" + 
offsetDiff);
+      readFooterMetrics.setCollectLenMetrics(true);
+    }
+  }
+
+
+  /**
+   * Check if the given file should be marked as a Parquet file.
+   *
+   * @param metrics The metrics to evaluate.
+   * @return True if the file meet the criteria for being marked as a Parquet 
file, false otherwise.
+   */
+  private boolean shouldMarkAsParquet(AbfsReadFooterMetrics metrics) {
+    return metrics.getCollectMetrics()
+            && metrics.getReadCount() >= 2
+            && !metrics.getIsParquetEvaluated()
+            && haveEqualValues(metrics.getSizeReadByFirstRead())
+            && 
haveEqualValues(metrics.getOffsetDiffBetweenFirstAndSecondRead());
+  }
+
+  /**
+   * Check if two values are equal, considering they are in the format 
"value1_value2".
+   *
+   * @param value The value to check.
+   * @return True if the two parts of the value are equal, false otherwise.
+   */
+  private boolean haveEqualValues(String value) {
+    String[] parts = value.split("_");
+    return parts.length == 2 && parts[0].equals(parts[1]);
+  }
+
+  /**
+   * Mark the given metrics as a Parquet file and update related values.
+   *
+   * @param metrics The metrics to mark as Parquet.
+   */
+  private void markAsParquet(AbfsReadFooterMetrics metrics) {
+    metrics.setIsParquetFile(true);
+    String[] parts = metrics.getSizeReadByFirstRead().split("_");
+    metrics.setSizeReadByFirstRead(parts[0]);
+    parts = metrics.getOffsetDiffBetweenFirstAndSecondRead().split("_");
+    metrics.setOffsetDiffBetweenFirstAndSecondRead(parts[0]);
+    metrics.setIsParquetEvaluated(true);
+  }
+
+  /**
+   * Check each metric in the provided map and mark them as Parquet files if 
they meet the criteria.
+   *
+   * @param metricsMap The map containing metrics to evaluate.
+   */
+  public void checkIsParquet(Map<String, AbfsReadFooterMetrics> metricsMap) {
+    for (Map.Entry<String, AbfsReadFooterMetrics> entry : 
metricsMap.entrySet()) {
+      AbfsReadFooterMetrics readFooterMetrics = entry.getValue();
+      if (shouldMarkAsParquet(readFooterMetrics)) {
+        markAsParquet(readFooterMetrics);
+        metricsMap.replace(entry.getKey(), readFooterMetrics);
+      }
+    }
+  }
+
+  /**
+   * Updates the average read length requested for metrics of all files in the 
metrics map.
+   * If the metrics indicate that the update is needed, it calculates the 
average read length and updates the metrics.
+   *
+   * @param metricsMap A map containing metrics for different files with 
unique identifiers.
+   */
+  private void updateLenRequested(Map<String, AbfsReadFooterMetrics> 
metricsMap) {
+    for (AbfsReadFooterMetrics readFooterMetrics : metricsMap.values()) {
+      if (shouldUpdateLenRequested(readFooterMetrics)) {
+        int readReqCount = readFooterMetrics.getReadCount() - 2;
+        readFooterMetrics.setAvgReadLenRequested(
+                (double) readFooterMetrics.getDataLenRequested() / 
readReqCount);
+        readFooterMetrics.setIsLenUpdated(true);
+      }
+    }
+  }
+
+  /**
+   * Checks whether the average read length requested should be updated for 
the given metrics.
+   *
+   * The method returns true if the following conditions are met:
+   * - Metrics collection is enabled.
+   * - The number of read counts is greater than 2.
+   * - The average read length has not been updated previously.
+   *
+   * @param readFooterMetrics The metrics object to evaluate.
+   * @return True if the average read length should be updated, false 
otherwise.
+   */
+  private boolean shouldUpdateLenRequested(AbfsReadFooterMetrics 
readFooterMetrics) {
+    return readFooterMetrics.getCollectMetrics()
+            && readFooterMetrics.getReadCount() > 2
+            && !readFooterMetrics.getIsLenUpdated();
+  }
+
+  /**
+   * Calculates the average metrics from a list of AbfsReadFooterMetrics and 
sets the values in the provided 'avgParquetReadFooterMetrics' object.
+   *
+   * @param isParquetList The list of AbfsReadFooterMetrics to compute the 
averages from.
+   * @param avgParquetReadFooterMetrics The target AbfsReadFooterMetrics 
object to store the computed average values.
+   *
+   * This method calculates various average metrics from the provided list and 
sets them in the 'avgParquetReadFooterMetrics' object.
+   * The metrics include:
+   * - Size read by the first read
+   * - Offset difference between the first and second read
+   * - Average file length
+   * - Average requested read length
+   */
+  private void getParquetReadFooterMetricsAverage(List<AbfsReadFooterMetrics> 
isParquetList,
+      AbfsReadFooterMetrics avgParquetReadFooterMetrics){
+    avgParquetReadFooterMetrics.setSizeReadByFirstRead(
+        String.format("%.3f", isParquetList.stream()
+            .map(AbfsReadFooterMetrics::getSizeReadByFirstRead).mapToDouble(
+                Double::parseDouble).average().orElse(0.0)));
+    avgParquetReadFooterMetrics.setOffsetDiffBetweenFirstAndSecondRead(
+        String.format("%.3f", isParquetList.stream()
+            .map(AbfsReadFooterMetrics::getOffsetDiffBetweenFirstAndSecondRead)
+            .mapToDouble(Double::parseDouble).average().orElse(0.0)));
+    avgParquetReadFooterMetrics.setAvgFileLength(isParquetList.stream()
+        
.mapToDouble(AbfsReadFooterMetrics::getFileLength).average().orElse(0.0));
+    avgParquetReadFooterMetrics.setAvgReadLenRequested(isParquetList.stream().
+        map(AbfsReadFooterMetrics::getAvgReadLenRequested).
+        mapToDouble(Double::doubleValue).average().orElse(0.0));
+  }
+
+  /**
+   * Calculates the average metrics from a list of non-Parquet 
AbfsReadFooterMetrics instances.
+   *
+   * This method takes a list of AbfsReadFooterMetrics representing 
non-Parquet reads and calculates
+   * the average values for the size read by the first read and the offset 
difference between the first
+   * and second read. The averages are then set in the provided 
AbfsReadFooterMetrics instance.
+   *
+   * @param isNonParquetList A list of AbfsReadFooterMetrics instances 
representing non-Parquet reads.
+   * @param avgNonParquetReadFooterMetrics The AbfsReadFooterMetrics instance 
to store the calculated averages.
+   *                                      It is assumed that the size of the 
list is at least 1, and the first
+   *                                      element of the list is used to 
determine the size of arrays.
+   *                                      The instance is modified in-place 
with the calculated averages.
+   *
+   *
+   **/
+  private void 
getNonParquetReadFooterMetricsAverage(List<AbfsReadFooterMetrics> 
isNonParquetList,
+                                                     AbfsReadFooterMetrics 
avgNonParquetReadFooterMetrics) {
+    int size = 
isNonParquetList.get(0).getSizeReadByFirstRead().split("_").length;
+    double[] store = new double[2 * size];
+    // Calculating sum of individual values
+    isNonParquetList.forEach(abfsReadFooterMetrics -> {
+      String[] firstReadSize = 
abfsReadFooterMetrics.getSizeReadByFirstRead().split("_");
+      String[] offDiffFirstSecondRead = 
abfsReadFooterMetrics.getOffsetDiffBetweenFirstAndSecondRead().split("_");
+
+      for (int i = 0; i < firstReadSize.length; i++) {
+        store[i] += Long.parseLong(firstReadSize[i]);
+        store[i + size] += Long.parseLong(offDiffFirstSecondRead[i]);
+      }
+    });
+
+    // Calculating averages and creating formatted strings
+    StringJoiner firstReadSize = new StringJoiner("_");
+    StringJoiner offDiffFirstSecondRead = new StringJoiner("_");
+
+    for (int j = 0; j < size; j++) {
+      firstReadSize.add(String.format("%.3f", store[j] / 
isNonParquetList.size()));
+      offDiffFirstSecondRead.add(String.format("%.3f", store[j + size] / 
isNonParquetList.size()));
+    }
+
+    
avgNonParquetReadFooterMetrics.setSizeReadByFirstRead(firstReadSize.toString());
+    
avgNonParquetReadFooterMetrics.setOffsetDiffBetweenFirstAndSecondRead(offDiffFirstSecondRead.toString());
+    avgNonParquetReadFooterMetrics.setAvgFileLength(isNonParquetList.stream()
+            
.mapToDouble(AbfsReadFooterMetrics::getFileLength).average().orElse(0.0));
+    
avgNonParquetReadFooterMetrics.setAvgReadLenRequested(isNonParquetList.stream()
+            
.mapToDouble(AbfsReadFooterMetrics::getAvgReadLenRequested).average().orElse(0.0));
+  }
+
+  /*
+  Acronyms:
+  1.FR :- First Read (In case of parquet we only maintain the size requested 
by application for
+  the first read, in case of non parquet we maintain a string separated by "_" 
delimiter where the first
+  substring represents the len requested for first read and the second 
substring represents the seek pointer difference from the
+  end of the file.)
+  2.SR :- Second Read (In case of parquet we only maintain the size requested 
by application for
+  the second read, in case of non parquet we maintain a string separated by 
"_" delimiter where the first
+  substring represents the len requested for second read and the second 
substring represents the seek pointer difference from the
+  offset of the first read.)
+  3.FL :- Total length of the file requested for read
+   */
+  public String getReadFooterMetrics(AbfsReadFooterMetrics 
avgReadFooterMetrics) {
+    String readFooterMetric = "";
+    if (avgReadFooterMetrics.getIsParquetFile()) {
+      readFooterMetric += "$Parquet:";
+    } else {
+      readFooterMetric += "$NonParquet:";
+    }
+    readFooterMetric += "$FR=" + avgReadFooterMetrics.getSizeReadByFirstRead()
+        + "$SR="
+        + avgReadFooterMetrics.getOffsetDiffBetweenFirstAndSecondRead()
+        + "$FL=" + String.format("%.3f",
+        avgReadFooterMetrics.getAvgFileLength())
+        + "$RL=" + String.format("%.3f",
+        avgReadFooterMetrics.getAvgReadLenRequested());
+    return readFooterMetric;
+  }
+
+/**
+ * Retrieves and aggregates read footer metrics for both Parquet and 
non-Parquet files from a list
+ * of AbfsReadFooterMetrics instances. The function calculates the average 
metrics separately for
+ * Parquet and non-Parquet files and returns a formatted string containing the 
aggregated metrics.
+ *
+ * @param readFooterMetricsList A list of AbfsReadFooterMetrics instances 
containing read footer metrics
+ *                              for both Parquet and non-Parquet files.
+ *
+ * @return A formatted string containing the aggregated read footer metrics 
for both Parquet and non-Parquet files.
+ *
+ **/
+private String getFooterMetrics(List<AbfsReadFooterMetrics> 
readFooterMetricsList) {
+  List<AbfsReadFooterMetrics> isParquetList = new ArrayList<>();
+  List<AbfsReadFooterMetrics> isNonParquetList = new ArrayList<>();
+  for (AbfsReadFooterMetrics abfsReadFooterMetrics : readFooterMetricsList) {
+    if (abfsReadFooterMetrics.getIsParquetFile()) {
+      isParquetList.add(abfsReadFooterMetrics);
+    } else {
+      if (abfsReadFooterMetrics.getReadCount() >= 2) {
+        isNonParquetList.add(abfsReadFooterMetrics);
+      }
+    }
+  }
+  AbfsReadFooterMetrics avgParquetReadFooterMetrics = new 
AbfsReadFooterMetrics();
+  AbfsReadFooterMetrics avgNonparquetReadFooterMetrics = new 
AbfsReadFooterMetrics();
+  String readFooterMetric = "";
+  if (!isParquetList.isEmpty()) {
+    avgParquetReadFooterMetrics.setIsParquetFile(true);
+    getParquetReadFooterMetricsAverage(isParquetList, 
avgParquetReadFooterMetrics);
+    readFooterMetric += getReadFooterMetrics(avgParquetReadFooterMetrics);
+  }
+  if (!isNonParquetList.isEmpty()) {
+    avgNonparquetReadFooterMetrics.setIsParquetFile(false);
+    getNonParquetReadFooterMetricsAverage(isNonParquetList, 
avgNonparquetReadFooterMetrics);
+    readFooterMetric += getReadFooterMetrics(avgNonparquetReadFooterMetrics);
+  }
+  return readFooterMetric;
+}
+
+
+  @Override
+  public String toString() {
+    Map<String, AbfsReadFooterMetrics> metricsMap = getMetricsMap();
+    List<AbfsReadFooterMetrics> readFooterMetricsList = new ArrayList<>();
+    if (metricsMap != null && !(metricsMap.isEmpty())) {
+      checkIsParquet(metricsMap);
+      updateLenRequested(metricsMap);
+      for (Map.Entry<String, AbfsReadFooterMetrics> entry : 
metricsMap.entrySet()) {
+        AbfsReadFooterMetrics abfsReadFooterMetrics = entry.getValue();
+        if (abfsReadFooterMetrics.getCollectMetrics()) {
+          readFooterMetricsList.add(entry.getValue());
+        }
+      }
+    }
+    String readFooterMetrics = "";
+    if (!readFooterMetricsList.isEmpty()) {
+      readFooterMetrics = getFooterMetrics(readFooterMetricsList);
+    }
+    return readFooterMetrics;
+  }
+}
+
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
index d1dcd4c7748..3ed3420a906 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
@@ -43,10 +43,16 @@
 import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema;
 import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
 import org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding;
+import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
+import java.util.Map;
+import org.apache.hadoop.fs.azurebfs.AbfsBackoffMetrics;
 import org.apache.http.impl.execchain.RequestAbortedException;
 
-import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_CONTINUE;
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.PUT_BLOCK_LIST;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ZERO;
+import static org.apache.hadoop.util.Time.now;
+
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_CONTINUE;
 import static 
org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.EGRESS_LIMIT_BREACH_ABBREVIATION;
 import static 
org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.INGRESS_LIMIT_BREACH_ABBREVIATION;
 import static 
org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.TPS_LIMIT_BREACH_ABBREVIATION;
@@ -76,17 +82,20 @@ public class AbfsRestOperation {
   private final String sasToken;
 
   private static final Logger LOG = LoggerFactory.getLogger(AbfsClient.class);
-
+  private static final Logger LOG1 = 
LoggerFactory.getLogger(AbfsRestOperation.class);
   // For uploads, this is the request entity body.  For downloads,
   // this will hold the response entity body.
   private byte[] buffer;
   private int bufferOffset;
   private int bufferLength;
   private int retryCount = 0;
-
+  private boolean isThrottledRequest = false;
+  private long maxRetryCount = 0L;
+  private final int maxIoRetries;
   private AbfsHttpOperation result;
-  private AbfsCounters abfsCounters;
-
+  private final AbfsCounters abfsCounters;
+  private AbfsBackoffMetrics abfsBackoffMetrics;
+  private Map<String, AbfsBackoffMetrics> metricsMap;
   /**
    * This variable contains the reason of last API call within the same
    * AbfsRestOperation object.
@@ -158,6 +167,11 @@ String getSasToken() {
     return sasToken;
   }
 
+  private static final int MIN_FIRST_RANGE = 1;
+  private static final int MAX_FIRST_RANGE = 5;
+  private static final int MAX_SECOND_RANGE = 15;
+  private static final int MAX_THIRD_RANGE = 25;
+
   /**
    * Initializes a new REST operation.
    *
@@ -202,6 +216,13 @@ String getSasToken() {
             || AbfsHttpConstants.HTTP_METHOD_PATCH.equals(method));
     this.sasToken = sasToken;
     this.abfsCounters = client.getAbfsCounters();
+    if (abfsCounters != null) {
+      this.abfsBackoffMetrics = abfsCounters.getAbfsBackoffMetrics();
+    }
+    if (abfsBackoffMetrics != null) {
+      this.metricsMap = abfsBackoffMetrics.getMetricsMap();
+    }
+    this.maxIoRetries = abfsConfiguration.getMaxIoRetries();
     this.intercept = client.getIntercept();
     this.abfsConfiguration = abfsConfiguration;
     this.retryPolicy = client.getExponentialRetryPolicy();
@@ -236,7 +257,6 @@ String getSasToken() {
     this.buffer = buffer;
     this.bufferOffset = bufferOffset;
     this.bufferLength = bufferLength;
-    this.abfsCounters = client.getAbfsCounters();
   }
 
   /**
@@ -246,11 +266,14 @@ String getSasToken() {
    */
   public void execute(TracingContext tracingContext)
       throws AzureBlobFileSystemException {
-
     // Since this might be a sub-sequential or parallel rest operation
     // triggered by a single file system call, using a new tracing context.
     lastUsedTracingContext = createNewTracingContext(tracingContext);
     try {
+      if (abfsCounters != null) {
+        abfsCounters.getLastExecutionTime().set(now());
+      }
+      client.timerOrchestrator(TimerFunctionality.RESUME, null);
       IOStatisticsBinding.trackDurationOfInvocation(abfsCounters,
           AbfsStatistic.getStatNameFromHttpCall(method),
           () -> completeExecute(lastUsedTracingContext));
@@ -281,6 +304,12 @@ void completeExecute(TracingContext tracingContext)
     retryCount = 0;
     retryPolicy = client.getExponentialRetryPolicy();
     LOG.debug("First execution of REST operation - {}", operationType);
+    long sleepDuration = 0L;
+    if (abfsBackoffMetrics != null) {
+      synchronized (this) {
+        abfsBackoffMetrics.incrementTotalNumberOfRequests();
+      }
+    }
     while (!executeHttpOperation(retryCount, tracingContext)) {
       try {
         ++retryCount;
@@ -288,12 +317,17 @@ void completeExecute(TracingContext tracingContext)
         long retryInterval = retryPolicy.getRetryInterval(retryCount);
         LOG.debug("Rest operation {} failed with failureReason: {}. Retrying 
with retryCount = {}, retryPolicy: {} and sleepInterval: {}",
             operationType, failureReason, retryCount, 
retryPolicy.getAbbreviation(), retryInterval);
+        if (abfsBackoffMetrics != null) {
+          updateBackoffTimeMetrics(retryCount, sleepDuration);
+        }
         Thread.sleep(retryInterval);
       } catch (InterruptedException ex) {
         Thread.currentThread().interrupt();
       }
     }
-
+    if (abfsBackoffMetrics != null) {
+      updateBackoffMetrics(retryCount, result.getStatusCode());
+    }
     int status = result.getStatusCode();
     /*
       If even after exhausting all retries, the http status code has an
@@ -312,6 +346,30 @@ void completeExecute(TracingContext tracingContext)
     LOG.trace("{} REST operation complete", operationType);
   }
 
+  @VisibleForTesting
+  void updateBackoffMetrics(int retryCount, int statusCode) {
+    if (abfsBackoffMetrics != null) {
+      if (statusCode < HttpURLConnection.HTTP_OK
+              || statusCode >= HttpURLConnection.HTTP_INTERNAL_ERROR) {
+        synchronized (this) {
+          if (retryCount >= maxIoRetries) {
+            abfsBackoffMetrics.incrementNumberOfRequestsFailed();
+          }
+        }
+      } else {
+        synchronized (this) {
+          if (retryCount > ZERO && retryCount <= maxIoRetries) {
+            maxRetryCount = Math.max(abfsBackoffMetrics.getMaxRetryCount(), 
retryCount);
+            abfsBackoffMetrics.setMaxRetryCount(maxRetryCount);
+            updateCount(retryCount);
+          } else {
+            
abfsBackoffMetrics.incrementNumberOfRequestsSucceededWithoutRetrying();
+          }
+        }
+      }
+    }
+  }
+
   @VisibleForTesting
   String getClientLatency() {
     return client.getAbfsPerfTracker().getClientLatency();
@@ -355,7 +413,35 @@ private boolean executeHttpOperation(final int retryCount,
         }
       }
       httpOperation.processResponse(buffer, bufferOffset, bufferLength);
-      incrementCounter(AbfsStatistic.GET_RESPONSES, 1);
+      if (!isThrottledRequest && httpOperation.getStatusCode()
+          >= HttpURLConnection.HTTP_INTERNAL_ERROR) {
+        isThrottledRequest = true;
+        AzureServiceErrorCode serviceErrorCode =
+            AzureServiceErrorCode.getAzureServiceCode(
+                httpOperation.getStatusCode(),
+                httpOperation.getStorageErrorCode(),
+                httpOperation.getStorageErrorMessage());
+        LOG1.trace("Service code is " + serviceErrorCode + " status code is "
+            + httpOperation.getStatusCode() + " error code is "
+            + httpOperation.getStorageErrorCode()
+            + " error message is " + httpOperation.getStorageErrorMessage());
+        if (abfsBackoffMetrics != null) {
+          synchronized (this) {
+            if (serviceErrorCode.equals(
+                    AzureServiceErrorCode.INGRESS_OVER_ACCOUNT_LIMIT)
+                    || serviceErrorCode.equals(
+                    AzureServiceErrorCode.EGRESS_OVER_ACCOUNT_LIMIT)) {
+              abfsBackoffMetrics.incrementNumberOfBandwidthThrottledRequests();
+            } else if (serviceErrorCode.equals(
+                    AzureServiceErrorCode.TPS_OVER_ACCOUNT_LIMIT)) {
+              abfsBackoffMetrics.incrementNumberOfIOPSThrottledRequests();
+            } else {
+              abfsBackoffMetrics.incrementNumberOfOtherThrottledRequests();
+            }
+          }
+        }
+      }
+        incrementCounter(AbfsStatistic.GET_RESPONSES, 1);
       //Only increment bytesReceived counter when the status code is 2XX.
       if (httpOperation.getStatusCode() >= HttpURLConnection.HTTP_OK
           && httpOperation.getStatusCode() <= HttpURLConnection.HTTP_PARTIAL) {
@@ -394,7 +480,13 @@ private boolean executeHttpOperation(final int retryCount,
       if (httpOperation instanceof AbfsAHCHttpOperation) {
         registerApacheHttpClientIoException();
       }
+      if (abfsBackoffMetrics != null) {
+        synchronized (this) {
+          abfsBackoffMetrics.incrementNumberOfNetworkFailedRequests();
+        }
+      }
       if (!retryPolicy.shouldRetry(retryCount, -1)) {
+        updateBackoffMetrics(retryCount, httpOperation.getStatusCode());
         throw new InvalidAbfsRestOperationException(ex, retryCount);
       }
       return false;
@@ -403,7 +495,11 @@ private boolean executeHttpOperation(final int retryCount,
       if (LOG.isDebugEnabled()) {
         LOG.debug("HttpRequestFailure: {}, {}", httpOperation, ex);
       }
-
+      if (abfsBackoffMetrics != null) {
+        synchronized (this) {
+          abfsBackoffMetrics.incrementNumberOfNetworkFailedRequests();
+        }
+      }
       failureReason = RetryReason.getAbbreviation(ex, -1, "");
       retryPolicy = client.getRetryPolicy(failureReason);
       if (httpOperation instanceof AbfsAHCHttpOperation) {
@@ -414,9 +510,9 @@ private boolean executeHttpOperation(final int retryCount,
         }
       }
       if (!retryPolicy.shouldRetry(retryCount, -1)) {
+        updateBackoffMetrics(retryCount, httpOperation.getStatusCode());
         throw new InvalidAbfsRestOperationException(ex, retryCount);
       }
-
       return false;
     } finally {
       int statusCode = httpOperation.getStatusCode();
@@ -450,7 +546,10 @@ private void registerApacheHttpClientIoException() {
    */
   @VisibleForTesting
   public void signRequest(final AbfsHttpOperation httpOperation, int 
bytesToSign) throws IOException {
-    switch (client.getAuthType()) {
+    if (client.isSendMetricCall()) {
+      client.getMetricSharedkeyCredentials().signRequest(httpOperation, 
bytesToSign);
+    } else {
+      switch (client.getAuthType()) {
       case Custom:
       case OAuth:
         LOG.debug("Authenticating request with OAuth2 access token");
@@ -470,6 +569,7 @@ public void signRequest(final AbfsHttpOperation 
httpOperation, int bytesToSign)
             httpOperation,
             bytesToSign);
         break;
+      }
     }
   }
 
@@ -519,6 +619,60 @@ private void incrementCounter(AbfsStatistic statistic, 
long value) {
     }
   }
 
+  /**
+   * Updates the count metrics based on the provided retry count.
+   * @param retryCount The retry count used to determine the metrics category.
+   *
+   * This method increments the number of succeeded requests for the specified 
retry count.
+   */
+  private void updateCount(int retryCount){
+      String retryCounter = getKey(retryCount);
+      metricsMap.get(retryCounter).incrementNumberOfRequestsSucceeded();
+  }
+
+  /**
+   * Updates backoff time metrics based on the provided retry count and sleep 
duration.
+   * @param retryCount    The retry count used to determine the metrics 
category.
+   * @param sleepDuration The duration of sleep during backoff.
+   *
+   * This method calculates and updates various backoff time metrics, 
including minimum, maximum,
+   * and total backoff time, as well as the total number of requests for the 
specified retry count.
+   */
+  private void updateBackoffTimeMetrics(int retryCount, long sleepDuration) {
+    synchronized (this) {
+      String retryCounter = getKey(retryCount);
+      AbfsBackoffMetrics abfsBackoffMetrics = metricsMap.get(retryCounter);
+      long minBackoffTime = Math.min(abfsBackoffMetrics.getMinBackoff(), 
sleepDuration);
+      long maxBackoffForTime = Math.max(abfsBackoffMetrics.getMaxBackoff(), 
sleepDuration);
+      long totalBackoffTime = abfsBackoffMetrics.getTotalBackoff() + 
sleepDuration;
+      abfsBackoffMetrics.incrementTotalRequests();
+      abfsBackoffMetrics.setMinBackoff(minBackoffTime);
+      abfsBackoffMetrics.setMaxBackoff(maxBackoffForTime);
+      abfsBackoffMetrics.setTotalBackoff(totalBackoffTime);
+      metricsMap.put(retryCounter, abfsBackoffMetrics);
+    }
+  }
+
+  /**
+   * Generates a key based on the provided retry count to categorize metrics.
+   *
+   * @param retryCount The retry count used to determine the key.
+   * @return A string key representing the metrics category for the given 
retry count.
+   *
+   * This method categorizes retry counts into different ranges and assigns a 
corresponding key.
+   */
+  private String getKey(int retryCount) {
+    if (retryCount >= MIN_FIRST_RANGE && retryCount < MAX_FIRST_RANGE) {
+      return Integer.toString(retryCount);
+    } else if (retryCount >= MAX_FIRST_RANGE && retryCount < MAX_SECOND_RANGE) 
{
+      return "5_15";
+    } else if (retryCount >= MAX_SECOND_RANGE && retryCount < MAX_THIRD_RANGE) 
{
+      return "15_25";
+    } else {
+      return "25AndAbove";
+    }
+  }
+
   /**
    * Updating Client Side Throttling Metrics for relevant response status 
codes.
    * Following criteria is used to decide based on status code and failure 
reason.
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/TimerFunctionality.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/TimerFunctionality.java
index bf7da69ec49..ca94c7f86ba 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/TimerFunctionality.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/TimerFunctionality.java
@@ -18,9 +18,11 @@
 
 package org.apache.hadoop.fs.azurebfs.services;
 
+/**
+ * Class for Timer Functionality.
+ */
 public enum TimerFunctionality {
   RESUME,
-
   SUSPEND
 }
 
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/TimerFunctionality.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/MetricFormat.java
similarity index 59%
copy from 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/TimerFunctionality.java
copy to 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/MetricFormat.java
index bf7da69ec49..48c216ff6e5 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/TimerFunctionality.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/MetricFormat.java
@@ -15,12 +15,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.hadoop.fs.azurebfs.utils;
 
-package org.apache.hadoop.fs.azurebfs.services;
+public enum MetricFormat {
+  INTERNAL_BACKOFF_METRIC_FORMAT, // 
<client-correlation-id>:<client-req-id>:<filesystem-id>
+  // :<backoff-metric-results>
 
-public enum TimerFunctionality {
-  RESUME,
+  INTERNAL_FOOTER_METRIC_FORMAT,  // 
<client-correlation-id>:<client-req-id>:<filesystem-id>
+  // :<footer-metric-results>
 
-  SUSPEND
-}
+  INTERNAL_METRIC_FORMAT, // 
<client-correlation-id>:<client-req-id>:<filesystem-id>
+  // :<backoff-metric-results>:<footer-metric-results>
+
+  EMPTY;
 
+  @Override
+  public String toString() {
+    return this == EMPTY ? "" : this.name();
+  }
+}
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java
index a9729945835..a349894cd4f 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java
@@ -63,6 +63,8 @@ public class TracingContext {
   private Listener listener = null;  // null except when testing
   //final concatenated ID list set into x-ms-client-request-id header
   private String header = EMPTY_STRING;
+  private String metricResults = EMPTY_STRING;
+  private String metricHeader = EMPTY_STRING;
 
   /**
    * If {@link #primaryRequestId} is null, this field shall be set equal
@@ -112,6 +114,15 @@ public TracingContext(String clientCorrelationID, String 
fileSystemID,
     }
   }
 
+  public TracingContext(String clientCorrelationID, String fileSystemID,
+      FSOperationType opType, boolean needsPrimaryReqId,
+      TracingHeaderFormat tracingHeaderFormat, Listener listener, String 
metricResults) {
+    this(clientCorrelationID, fileSystemID, opType, needsPrimaryReqId, 
tracingHeaderFormat,
+        listener);
+    this.metricResults = metricResults;
+  }
+
+
   public TracingContext(TracingContext originalTracingContext) {
     this.fileSystemID = originalTracingContext.fileSystemID;
     this.streamID = originalTracingContext.streamID;
@@ -123,8 +134,8 @@ public TracingContext(TracingContext 
originalTracingContext) {
     if (originalTracingContext.listener != null) {
       this.listener = originalTracingContext.listener.getClone();
     }
+    this.metricResults = originalTracingContext.metricResults;
   }
-
   public static String validateClientCorrelationID(String clientCorrelationID) 
{
     if ((clientCorrelationID.length() > MAX_CLIENT_CORRELATION_ID_LENGTH)
         || (!clientCorrelationID.matches(CLIENT_CORRELATION_ID_PATTERN))) {
@@ -182,17 +193,24 @@ public void constructHeader(AbfsHttpOperation 
httpOperation, String previousFail
               + ":" + opType + ":" + retryCount;
       header = addFailureReasons(header, previousFailure, 
retryPolicyAbbreviation);
       header += (":" + httpOperation.getTracingContextSuffix());
+      metricHeader += !(metricResults.trim().isEmpty()) ? metricResults  : "";
       break;
     case TWO_ID_FORMAT:
       header = clientCorrelationID + ":" + clientRequestId;
+      metricHeader += !(metricResults.trim().isEmpty()) ? metricResults  : "";
       break;
     default:
-      header = clientRequestId; //case SINGLE_ID_FORMAT
+      //case SINGLE_ID_FORMAT
+      header = clientRequestId;
+      metricHeader += !(metricResults.trim().isEmpty()) ? metricResults  : "";
     }
     if (listener != null) { //for testing
       listener.callTracingHeaderValidator(header, format);
     }
     
httpOperation.setRequestProperty(HttpHeaderConfigurations.X_MS_CLIENT_REQUEST_ID,
 header);
+    if (!metricHeader.equals(EMPTY_STRING)) {
+      
httpOperation.setRequestProperty(HttpHeaderConfigurations.X_MS_FECLIENT_METRICS,
 metricHeader);
+    }
     /*
     * In case the primaryRequestId is an empty-string and if it is the first 
try to
     * API call (previousFailure shall be null), maintain the last part of 
clientRequestId's
diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md 
b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md
index 36d909623b9..fdf366f95d3 100644
--- a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md
+++ b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md
@@ -1237,6 +1237,49 @@ Note that these performance numbers are also sent back 
to the ADLS Gen 2 API end
 in the `x-ms-abfs-client-latency` HTTP headers in subsequent requests. Azure 
uses these
 settings to track their end-to-end latency.
 
+### <a name="drivermetricoptions"></a> Driver Metric Options
+
+Config `fs.azure.metric.format` provides an option to select the format of IDs 
included in the `header` for metrics.
+This config accepts a String value corresponding to the following enum options.
+`INTERNAL_METRIC_FORMAT` : backoff + footer metrics
+`INTERNAL_BACKOFF_METRIC_FORMAT` : backoff metrics
+`INTERNAL_FOOTER_METRIC_FORMAT` : footer metrics
+`EMPTY` : default
+
+`fs.azure.metric.account.name`: This configuration parameter is used to 
specify the name of the account which will be
+used to push the metrics to the backend. We can configure a separate account 
to push metrics to the store or use the
+same for as the existing account on which other requests are made.
+
+```xml
+
+<property>
+    <name>fs.azure.metric.account.name</name>
+    <value>METRICACCOUNTNAME.dfs.core.windows.net</value>
+</property>
+```
+
+`fs.azure.metric.account.key`: This is the access key for the storage account 
used for pushing metrics to the store.
+
+```xml
+
+<property>
+    <name>fs.azure.metric.account.key</name>
+    <value>ACCOUNTKEY</value>
+</property>
+```
+
+`fs.azure.metric.uri`: This configuration provides the uri in the format of 
'https://`<accountname>`
+.dfs.core.windows.net/`<containername>`'. This should be a part of the config 
in order to prevent extra calls to create
+the filesystem. We use an existing filsystem to push the metrics.
+
+```xml
+
+<property>
+    <name>fs.azure.metric.uri</name>
+    <value>https://METRICACCOUNTNAME.dfs.core.windows.net/CONTAINERNAME</value>
+</property>
+```
+
 ## <a name="troubleshooting"></a> Troubleshooting
 
 The problems associated with the connector usually come down to, in order
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java
index e8cbeb12552..afc92c111a9 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java
@@ -34,7 +34,6 @@
 import org.apache.hadoop.fs.statistics.IOStatistics;
 import org.apache.hadoop.fs.statistics.StoreStatisticNames;
 import org.apache.hadoop.io.IOUtils;
-
 import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.extractStatistics;
 import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupMeanStatistic;
 import static 
org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadFooterMetrics.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadFooterMetrics.java
new file mode 100644
index 00000000000..0071b90771c
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadFooterMetrics.java
@@ -0,0 +1,385 @@
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.azurebfs;
+
+import static 
org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_INFO;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_READ_BUFFER_SIZE;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_WRITE_BUFFER_SIZE;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_METRIC_FORMAT;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.azurebfs.utils.MetricFormat;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Random;
+import org.apache.hadoop.fs.azurebfs.services.AbfsReadFooterMetrics;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator;
+import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
+
+public class ITestAbfsReadFooterMetrics extends AbstractAbfsScaleTest {
+
+  public ITestAbfsReadFooterMetrics() throws Exception {
+  }
+
+  private static final String TEST_PATH = "/testfile";
+  private static final String SLEEP_PERIOD = "90000";
+
+  /**
+   * Integration test for reading footer metrics with both Parquet and 
non-Parquet reads.
+   */
+  @Test
+  public void testReadFooterMetricsWithParquetAndNonParquet() throws Exception 
{
+    testReadWriteAndSeek(8 * ONE_MB, DEFAULT_READ_BUFFER_SIZE, ONE_KB, 4 * 
ONE_KB);
+  }
+
+  /**
+   * Configures the AzureBlobFileSystem with the given buffer size.
+   *
+   * @param bufferSize Buffer size to set for write and read operations.
+   * @return AbfsConfiguration used for configuration.
+   */
+  private Configuration getConfiguration(int bufferSize) {
+    final Configuration configuration = getRawConfiguration();
+    configuration.set(FS_AZURE_METRIC_FORMAT, 
String.valueOf(MetricFormat.INTERNAL_FOOTER_METRIC_FORMAT));
+    configuration.setInt(AZURE_READ_BUFFER_SIZE, bufferSize);
+    configuration.setInt(AZURE_WRITE_BUFFER_SIZE, bufferSize);
+    return configuration;
+  }
+
+  /**
+   * Writes data to the specified file path in the AzureBlobFileSystem.
+   *
+   * @param fs      AzureBlobFileSystem instance.
+   * @param testPath Path to the file.
+   * @param data    Data to write to the file.
+   */
+  private void writeDataToFile(AzureBlobFileSystem fs, Path testPath, byte[] 
data) throws IOException {
+    FSDataOutputStream stream = fs.create(testPath);
+    try {
+      stream.write(data);
+    } finally {
+      stream.close();
+    }
+    IOStatisticsLogging.logIOStatisticsAtLevel(LOG, 
IOSTATISTICS_LOGGING_LEVEL_INFO, stream);
+  }
+
+  /**
+   * Asserts that the actual metrics obtained from the AzureBlobFileSystem 
match the expected metrics string.
+   *
+   * @param fs               AzureBlobFileSystem instance.
+   * @param expectedMetrics  Expected metrics string.
+   */
+  private void assertMetricsEquality(AzureBlobFileSystem fs, String 
expectedMetrics) {
+    AbfsReadFooterMetrics actualMetrics = 
fs.getAbfsClient().getAbfsCounters().getAbfsReadFooterMetrics();
+    assertNotNull("AbfsReadFooterMetrics is null", actualMetrics);
+    assertEquals("The computed metrics differs from the actual metrics", 
expectedMetrics, actualMetrics.toString());
+  }
+
+  /**
+   * Test for reading footer metrics with a non-Parquet file.
+   */
+  @Test
+  public void testReadFooterMetrics() throws Exception {
+    // Initialize AzureBlobFileSystem and set buffer size for configuration.
+    int bufferSize = MIN_BUFFER_SIZE;
+    Configuration configuration = getConfiguration(bufferSize);
+    final AzureBlobFileSystem fs = (AzureBlobFileSystem) 
FileSystem.newInstance(configuration);
+    AbfsConfiguration abfsConfiguration = 
fs.getAbfsStore().getAbfsConfiguration();
+
+    // Generate random data to write to the test file.
+    final byte[] b = new byte[2 * bufferSize];
+    new Random().nextBytes(b);
+
+    // Set up the test file path.
+    Path testPath = path(TEST_PATH);
+
+    // Write random data to the test file.
+    writeDataToFile(fs, testPath, b);
+
+    // Initialize a buffer for reading data.
+    final byte[] readBuffer = new byte[2 * bufferSize];
+    int result;
+
+    // Initialize statistics source for logging.
+    IOStatisticsSource statisticsSource = null;
+
+    try (FSDataInputStream inputStream = fs.open(testPath)) {
+      // Register a listener for tracing header validation.
+      statisticsSource = inputStream;
+      ((AbfsInputStream) inputStream.getWrappedStream()).registerListener(
+              new 
TracingHeaderValidator(abfsConfiguration.getClientCorrelationId(),
+                      fs.getFileSystemId(), FSOperationType.READ, true, 0,
+                      ((AbfsInputStream) inputStream.getWrappedStream())
+                              .getStreamID()));
+
+      // Perform the first read operation with seek.
+      inputStream.seek(bufferSize);
+      result = inputStream.read(readBuffer, bufferSize, bufferSize);
+      assertNotEquals(-1, result);
+
+      // To test tracingHeader for case with bypassReadAhead == true
+      inputStream.seek(0);
+      byte[] temp = new byte[5];
+      int t = inputStream.read(temp, 0, 1);
+
+      // Seek back to the beginning and perform another read operation.
+      inputStream.seek(0);
+      result = inputStream.read(readBuffer, 0, bufferSize);
+    }
+
+    // Log IO statistics at the INFO level.
+    IOStatisticsLogging.logIOStatisticsAtLevel(LOG,
+            IOSTATISTICS_LOGGING_LEVEL_INFO, statisticsSource);
+
+    // Ensure data is read successfully and matches the written data.
+    assertNotEquals("data read in final read()", -1, result);
+    assertArrayEquals(readBuffer, b);
+
+    // Get non-Parquet metrics and assert metrics equality.
+    AbfsReadFooterMetrics nonParquetMetrics = getNonParquetMetrics();
+    String metrics = nonParquetMetrics.getReadFooterMetrics(nonParquetMetrics);
+    assertMetricsEquality(fs, metrics);
+
+    // Close the AzureBlobFileSystem.
+    fs.close();
+  }
+
+  /**
+   * Generates and returns an instance of AbfsReadFooterMetrics for 
non-Parquet files.
+   */
+  private AbfsReadFooterMetrics getNonParquetMetrics() {
+    AbfsReadFooterMetrics nonParquetMetrics = new AbfsReadFooterMetrics();
+    nonParquetMetrics.setIsParquetFile(false);
+    nonParquetMetrics.setSizeReadByFirstRead("16384.000_16384.000");
+    
nonParquetMetrics.setOffsetDiffBetweenFirstAndSecondRead("1.000_16384.000");
+    nonParquetMetrics.setAvgFileLength(Double.parseDouble("32768.000"));
+    nonParquetMetrics.setAvgReadLenRequested(Double.parseDouble("16384.000"));
+    return nonParquetMetrics;
+  }
+
+  /**
+   * Generates and returns an instance of AbfsReadFooterMetrics for parquet 
files.
+   */
+  private AbfsReadFooterMetrics getParquetMetrics() {
+    AbfsReadFooterMetrics parquetMetrics = new AbfsReadFooterMetrics();
+    parquetMetrics.setIsParquetFile(true);
+    parquetMetrics.setSizeReadByFirstRead("1024.000");
+    parquetMetrics.setOffsetDiffBetweenFirstAndSecondRead("4096.000");
+    parquetMetrics.setAvgFileLength(Double.parseDouble("8388608.000"));
+    parquetMetrics.setAvgReadLenRequested(0.000);
+    return parquetMetrics;
+  }
+
+  /**
+   * Test for reading, writing, and seeking with footer metrics.
+   *
+   * This method performs the integration test for reading, writing, and 
seeking operations
+   * with footer metrics. It creates an AzureBlobFileSystem, configures it, 
writes random data
+   * to a test file, performs read and seek operations, and checks the footer 
metrics for both
+   * Parquet and non-Parquet scenarios.
+   *
+   * @param fileSize Size of the test file.
+   * @param bufferSize Size of the buffer used for read and write operations.
+   * @param seek1 The position to seek to in the test file.
+   * @param seek2 Additional position to seek to in the test file (if not 0).
+   */
+  private void testReadWriteAndSeek(int fileSize, int bufferSize, Integer 
seek1, Integer seek2) throws Exception {
+    // Create an AzureBlobFileSystem instance.
+    Configuration configuration = getConfiguration(bufferSize);
+    final AzureBlobFileSystem fs = (AzureBlobFileSystem) 
FileSystem.newInstance(configuration);
+    AbfsConfiguration abfsConfiguration = 
fs.getAbfsStore().getAbfsConfiguration();
+
+    // Generate random data to write to the test file.
+    final byte[] b = new byte[fileSize];
+    new Random().nextBytes(b);
+
+    // Define the path for the test file.
+    Path testPath = path("/testfile");
+
+    // Write the random data to the test file.
+    writeDataToFile(fs, testPath, b);
+
+    // Initialize a buffer for reading.
+    final byte[] readBuffer = new byte[fileSize];
+
+    // Initialize a source for IO statistics.
+    IOStatisticsSource statisticsSource = null;
+
+    // Open an input stream for the test file.
+    FSDataInputStream inputStream = fs.open(testPath);
+    statisticsSource = inputStream;
+
+    // Register a listener for tracing headers.
+    ((AbfsInputStream) inputStream.getWrappedStream()).registerListener(
+            new 
TracingHeaderValidator(abfsConfiguration.getClientCorrelationId(),
+                    fs.getFileSystemId(), FSOperationType.READ, true, 0,
+                    ((AbfsInputStream) inputStream.getWrappedStream())
+                            .getStreamID()));
+
+    // Seek to the specified position in the test file and read data.
+    inputStream.seek(fileSize - seek1);
+    inputStream.read(readBuffer, 0, seek1);
+
+    // If seek2 is non-zero, perform an additional seek and read.
+    if (seek2 != 0) {
+      inputStream.seek(fileSize - seek1 - seek2);
+      inputStream.read(readBuffer, 0, seek2);
+    }
+
+    // Close the input stream.
+    inputStream.close();
+
+    // Set a new buffer size for read and write operations.
+    int bufferSize1 = MIN_BUFFER_SIZE;
+    abfsConfiguration.setWriteBufferSize(bufferSize1);
+    abfsConfiguration.setReadBufferSize(bufferSize1);
+
+    // Generate new random data for a second test file.
+    final byte[] b1 = new byte[2 * bufferSize1];
+    new Random().nextBytes(b1);
+
+    // Define the path for the second test file.
+    Path testPath1 = path("/testfile1");
+
+    // Write the new random data to the second test file.
+    writeDataToFile(fs, testPath1, b1);
+
+    // Initialize a buffer for reading from the second test file.
+    final byte[] readBuffer1 = new byte[2 * bufferSize1];
+
+    // Open an input stream for the second test file.
+    FSDataInputStream inputStream1 = fs.open(testPath1);
+    statisticsSource = inputStream1;
+
+    // Register a listener for tracing headers.
+    ((AbfsInputStream) inputStream1.getWrappedStream()).registerListener(
+            new 
TracingHeaderValidator(abfsConfiguration.getClientCorrelationId(),
+                    fs.getFileSystemId(), FSOperationType.READ, true, 0,
+                    ((AbfsInputStream) inputStream1.getWrappedStream())
+                            .getStreamID()));
+
+    // Seek to a position in the second test file and read data.
+    inputStream1.seek(bufferSize1);
+    inputStream1.read(readBuffer1, bufferSize1, bufferSize1);
+
+    // To test tracingHeader for case with bypassReadAhead == true.
+    inputStream1.seek(0);
+    byte[] temp = new byte[5];
+    int t = inputStream1.read(temp, 0, 1);
+
+    // Seek to the beginning of the second test file and read data.
+    inputStream1.seek(0);
+    inputStream1.read(readBuffer1, 0, bufferSize1);
+
+    // Close the input stream for the second test file.
+    inputStream1.close();
+
+    // Get footer metrics for both Parquet and non-Parquet scenarios.
+    AbfsReadFooterMetrics parquetMetrics = getParquetMetrics();
+    AbfsReadFooterMetrics nonParquetMetrics = getNonParquetMetrics();
+
+    // Concatenate and assert the metrics equality.
+    String metrics = parquetMetrics.getReadFooterMetrics(parquetMetrics);
+    metrics += nonParquetMetrics.getReadFooterMetrics(nonParquetMetrics);
+    assertMetricsEquality(fs, metrics);
+
+    // Close the AzureBlobFileSystem instance.
+    fs.close();
+  }
+
+  /**
+   * Test for reading footer metrics with an idle period.
+   *
+   * This method tests reading footer metrics with an idle period. It creates 
an AzureBlobFileSystem,
+   * configures it, writes random data to a test file, performs read 
operations, introduces an idle
+   * period, and checks the footer metrics for non-Parquet scenarios.
+   *
+   */
+  @Test
+  public void testMetricWithIdlePeriod() throws Exception {
+    // Set the buffer size for the test.
+    int bufferSize = MIN_BUFFER_SIZE;
+    Configuration configuration = getConfiguration(bufferSize);
+    final AzureBlobFileSystem fs = (AzureBlobFileSystem) 
FileSystem.newInstance(configuration);
+    AbfsConfiguration abfsConfiguration = 
fs.getAbfsStore().getAbfsConfiguration();
+
+    // Generate random data to write to the test file.
+    final byte[] b = new byte[2 * bufferSize];
+    new Random().nextBytes(b);
+
+    // Define the path for the test file.
+    Path testPath = path(TEST_PATH);
+
+    // Write the random data to the test file.
+    writeDataToFile(fs, testPath, b);
+
+    // Initialize a buffer for reading.
+    final byte[] readBuffer = new byte[2 * bufferSize];
+
+    // Initialize a source for IO statistics.
+    IOStatisticsSource statisticsSource = null;
+
+    // Open an input stream for the test file.
+    try (FSDataInputStream inputStream = fs.open(testPath)) {
+      // Register a listener for tracing headers.
+      ((AbfsInputStream) inputStream.getWrappedStream()).registerListener(
+              new 
TracingHeaderValidator(abfsConfiguration.getClientCorrelationId(),
+                      fs.getFileSystemId(), FSOperationType.READ, true, 0,
+                      ((AbfsInputStream) inputStream.getWrappedStream())
+                              .getStreamID()));
+
+      // Seek to the specified position in the test file and read data.
+      inputStream.seek(bufferSize);
+      inputStream.read(readBuffer, bufferSize, bufferSize);
+
+      // Introduce an idle period by sleeping.
+      int sleepPeriod = Integer.parseInt(SLEEP_PERIOD);
+      Thread.sleep(sleepPeriod);
+
+      // To test tracingHeader for case with bypassReadAhead == true.
+      inputStream.seek(0);
+      byte[] temp = new byte[5];
+      int t = inputStream.read(temp, 0, 1);
+
+      // Seek to the beginning of the test file and read data.
+      inputStream.seek(0);
+      inputStream.read(readBuffer, 0, bufferSize);
+
+      // Get and assert the footer metrics for non-Parquet scenarios.
+      AbfsReadFooterMetrics nonParquetMetrics = getNonParquetMetrics();
+      String metrics = 
nonParquetMetrics.getReadFooterMetrics(nonParquetMetrics);
+      assertMetricsEquality(fs, metrics);
+
+      // Introduce an additional idle period by sleeping.
+      Thread.sleep(sleepPeriod);
+    }
+  }
+}
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatus.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatus.java
index a4392f37284..29eb05ef978 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatus.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatus.java
@@ -82,8 +82,8 @@ public ITestAzureBlobFileSystemListStatus() throws Exception {
   public void testListPath() throws Exception {
     Configuration config = new Configuration(this.getRawConfiguration());
     config.set(AZURE_LIST_MAX_RESULTS, "5000");
-    try (final AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem
-        .newInstance(getFileSystem().getUri(), config)) {
+    final AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem
+        .newInstance(getFileSystem().getUri(), config);
       final List<Future<Void>> tasks = new ArrayList<>();
 
       ExecutorService es = Executors.newFixedThreadPool(10);
@@ -110,7 +110,10 @@ public Void call() throws Exception {
                       fs.getFileSystemId(), FSOperationType.LISTSTATUS, true, 
0));
       FileStatus[] files = fs.listStatus(new Path("/"));
       assertEquals(TEST_FILES_NUMBER, files.length /* user directory */);
-    }
+    fs.registerListener(
+            new 
TracingHeaderValidator(getConfiguration().getClientCorrelationId(),
+                    fs.getFileSystemId(), FSOperationType.GET_ATTR, true, 0));
+    fs.close();
   }
 
   /**
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java
index 2f537b4442e..a153d9f3027 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java
@@ -20,6 +20,8 @@
 
 import java.io.IOException;
 import java.net.HttpURLConnection;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -27,6 +29,7 @@
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.fs.azurebfs.AbfsCountersImpl;
 import org.assertj.core.api.Assertions;
 import org.mockito.AdditionalMatchers;
 import org.mockito.Mockito;
@@ -112,9 +115,13 @@ public static void 
addGeneralMockBehaviourToRestOpAndHttpOp(final AbfsRestOperat
   public static void addGeneralMockBehaviourToAbfsClient(final AbfsClient 
abfsClient,
                                                          final 
ExponentialRetryPolicy exponentialRetryPolicy,
                                                          final 
StaticRetryPolicy staticRetryPolicy,
-                                                         final 
AbfsThrottlingIntercept intercept) throws IOException {
+                                                         final 
AbfsThrottlingIntercept intercept) throws IOException, URISyntaxException {
     Mockito.doReturn(OAuth).when(abfsClient).getAuthType();
     Mockito.doReturn("").when(abfsClient).getAccessToken();
+    AbfsConfiguration abfsConfiguration = 
Mockito.mock(AbfsConfiguration.class);
+    
Mockito.doReturn(abfsConfiguration).when(abfsClient).getAbfsConfiguration();
+    AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new 
URI("abcd")));
+    Mockito.doReturn(abfsCounters).when(abfsClient).getAbfsCounters();
 
     Mockito.doReturn(intercept).when(abfsClient).getIntercept();
     Mockito.doNothing()
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java
index d32627d679c..7a05bd4129d 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java
@@ -28,7 +28,6 @@
 import java.util.Random;
 import java.util.regex.Pattern;
 
-import org.apache.hadoop.fs.azurebfs.AbfsCountersImpl;
 import org.assertj.core.api.Assertions;
 import org.junit.Assume;
 import org.junit.Test;
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
index 9027e56c9cd..e4ed9881ffa 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
@@ -19,11 +19,14 @@
 package org.apache.hadoop.fs.azurebfs.services;
 
 import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.Arrays;
 import java.util.Optional;
 import java.util.Random;
 import java.util.concurrent.ExecutionException;
 
+import org.apache.hadoop.fs.azurebfs.AbfsCountersImpl;
 import org.assertj.core.api.Assertions;
 import org.junit.Assert;
 import org.junit.Test;
@@ -98,9 +101,11 @@ private AbfsRestOperation getMockRestOp() {
     return op;
   }
 
-  private AbfsClient getMockAbfsClient() {
+  private AbfsClient getMockAbfsClient() throws URISyntaxException {
     // Mock failure for client.read()
     AbfsClient client = mock(AbfsClient.class);
+    AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new 
URI("abcd")));
+    Mockito.doReturn(abfsCounters).when(client).getAbfsCounters();
     AbfsPerfTracker tracker = new AbfsPerfTracker(
         "test",
         this.getAccountName(),
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRestOperation.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRestOperation.java
new file mode 100644
index 00000000000..1c6ee6e7dc9
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRestOperation.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.fs.azurebfs.utils.MetricFormat;
+import org.junit.Test;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_DELETE;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_METRIC_FORMAT;
+import static 
org.apache.hadoop.fs.azurebfs.services.AbfsRestOperationType.DeletePath;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
+import java.util.ArrayList;
+import java.util.Arrays;
+import org.junit.Assert;
+import java.net.HttpURLConnection;
+
+public class TestAbfsRestOperation extends
+    AbstractAbfsIntegrationTest {
+
+  public TestAbfsRestOperation() throws Exception {
+  }
+
+  /**
+   * Test for backoff retry metrics.
+   *
+   * This method tests backoff retry metrics by creating an 
AzureBlobFileSystem, initializing an
+   * AbfsClient, and performing mock operations on an AbfsRestOperation. The 
method then updates
+   * backoff metrics using the AbfsRestOperation.
+   *
+   */
+  @Test
+  public void testBackoffRetryMetrics() throws Exception {
+    // Create an AzureBlobFileSystem instance.
+    final Configuration configuration = getRawConfiguration();
+    configuration.set(FS_AZURE_METRIC_FORMAT, 
String.valueOf(MetricFormat.INTERNAL_BACKOFF_METRIC_FORMAT));
+    final AzureBlobFileSystem fs = (AzureBlobFileSystem) 
FileSystem.newInstance(configuration);
+    AbfsConfiguration abfsConfiguration = 
fs.getAbfsStore().getAbfsConfiguration();
+
+    // Get an instance of AbfsClient and AbfsRestOperation.
+    AbfsClient testClient = super.getAbfsClient(super.getAbfsStore(fs));
+    AbfsRestOperation op = ITestAbfsClient.getRestOp(
+        DeletePath, testClient, HTTP_METHOD_DELETE,
+        ITestAbfsClient.getTestUrl(testClient, "/NonExistingPath"),
+        ITestAbfsClient.getTestRequestHeaders(testClient), getConfiguration());
+
+    // Mock retry counts and status code.
+    ArrayList<String> retryCounts = new ArrayList<>(Arrays.asList("35", "28", 
"31", "45", "10", "2", "9"));
+    int statusCode = HttpURLConnection.HTTP_UNAVAILABLE;
+
+    // Update backoff metrics.
+    for (String retryCount : retryCounts) {
+      op.updateBackoffMetrics(Integer.parseInt(retryCount), statusCode);
+    }
+
+    // For retry count greater than the max configured value, the request 
should fail.
+    Assert.assertEquals("Number of failed requests does not match expected 
value.",
+            "3", 
String.valueOf(testClient.getAbfsCounters().getAbfsBackoffMetrics().getNumberOfRequestsFailed()));
+
+    // Close the AzureBlobFileSystem.
+    fs.close();
+  }
+
+}
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRestOperationMockFailures.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRestOperationMockFailures.java
index 70cafc45478..8ee3a71f358 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRestOperationMockFailures.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRestOperationMockFailures.java
@@ -23,7 +23,6 @@
 import java.net.SocketTimeoutException;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
-
 import org.assertj.core.api.Assertions;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -314,7 +313,7 @@ private void testClientRequestIdForStatusRetry(int status,
     int[] statusCount = new int[1];
     statusCount[0] = 0;
     Mockito.doAnswer(answer -> {
-      if (statusCount[0] <= 5) {
+      if (statusCount[0] <= 10) {
         statusCount[0]++;
         return status;
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to