This is an automated email from the ASF dual-hosted git repository.

mthakur pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1336c362e5b6 Hadoop-18759: [ABFS][Backoff-Optimization] Have a Static 
retry policy for connection timeout. (#5881)
1336c362e5b6 is described below

commit 1336c362e5b685c020b667e308f97f7bdb5b6fd8
Author: Anuj Modi <128447756+anujmodi2...@users.noreply.github.com>
AuthorDate: Tue Feb 20 09:31:42 2024 -0800

    Hadoop-18759: [ABFS][Backoff-Optimization] Have a Static retry policy for 
connection timeout. (#5881)
    
    
    Contributed By: Anuj Modi
---
 .../hadoop/fs/azurebfs/AbfsConfiguration.java      |  32 +++++
 .../fs/azurebfs/AzureBlobFileSystemStore.java      |   3 +
 .../fs/azurebfs/constants/ConfigurationKeys.java   |  13 ++
 .../constants/FileSystemConfigurations.java        |  21 ++-
 .../hadoop/fs/azurebfs/services/AbfsClient.java    |  27 +++-
 .../fs/azurebfs/services/AbfsClientContext.java    |   8 ++
 .../services/AbfsClientContextBuilder.java         |  12 +-
 .../fs/azurebfs/services/AbfsHttpOperation.java    |  14 +-
 .../fs/azurebfs/services/AbfsRestOperation.java    |  65 ++++++---
 .../fs/azurebfs/services/AbfsRetryPolicy.java      |  98 +++++++++++++
 .../azurebfs/services/ExponentialRetryPolicy.java  |  44 +-----
 .../fs/azurebfs/services/RetryPolicyConstants.java |  35 +++++
 .../fs/azurebfs/services/StaticRetryPolicy.java    |  52 +++++++
 .../hadoop/fs/azurebfs/utils/TracingContext.java   |  14 +-
 .../fs/azurebfs/ITestAzureBlobFileSystemE2E.java   |  47 +++++++
 .../ITestAzureBlobFileSystemListStatus.java        |   2 +-
 .../TestAbfsConfigurationFieldsValidation.java     |  94 ++++++++-----
 .../hadoop/fs/azurebfs/TestTracingContext.java     |  77 ++++++++++-
 .../extensions/MockDelegationSASTokenProvider.java |   6 +-
 .../fs/azurebfs/services/AbfsClientTestUtil.java   |  61 +++++++--
 .../fs/azurebfs/services/ITestAbfsClient.java      |   8 +-
 .../azurebfs/services/ITestAbfsRestOperation.java  |   5 +-
 ...olicy.java => ITestExponentialRetryPolicy.java} |  44 +++---
 .../azurebfs/services/ITestStaticRetryPolicy.java  | 142 +++++++++++++++++++
 .../fs/azurebfs/services/TestAbfsPerfTracker.java  |  29 ++--
 .../services/TestAbfsRenameRetryRecovery.java      |  11 +-
 .../TestAbfsRestOperationMockFailures.java         | 152 ++++++++++++++++++---
 .../services/TestAzureADAuthenticator.java         |   4 +-
 28 files changed, 935 insertions(+), 185 deletions(-)

diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
index eff8c0860544..1216fe0696f7 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
@@ -152,6 +152,14 @@ public class AbfsConfiguration{
       DefaultValue = DEFAULT_MAX_BACKOFF_INTERVAL)
   private int maxBackoffInterval;
 
+  @BooleanConfigurationValidatorAnnotation(ConfigurationKey = 
AZURE_STATIC_RETRY_FOR_CONNECTION_TIMEOUT_ENABLED,
+      DefaultValue = DEFAULT_STATIC_RETRY_FOR_CONNECTION_TIMEOUT_ENABLED)
+  private boolean staticRetryForConnectionTimeoutEnabled;
+
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = 
AZURE_STATIC_RETRY_INTERVAL,
+      DefaultValue = DEFAULT_STATIC_RETRY_INTERVAL)
+  private int staticRetryInterval;
+
   @IntegerConfigurationValidatorAnnotation(ConfigurationKey = 
AZURE_BACKOFF_INTERVAL,
       DefaultValue = DEFAULT_BACKOFF_INTERVAL)
   private int backoffInterval;
@@ -166,6 +174,14 @@ public class AbfsConfiguration{
       DefaultValue = DEFAULT_CUSTOM_TOKEN_FETCH_RETRY_COUNT)
   private int customTokenFetchRetryCount;
 
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = 
AZURE_HTTP_CONNECTION_TIMEOUT,
+          DefaultValue = DEFAULT_HTTP_CONNECTION_TIMEOUT)
+  private int httpConnectionTimeout;
+
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = 
AZURE_HTTP_READ_TIMEOUT,
+          DefaultValue = DEFAULT_HTTP_READ_TIMEOUT)
+  private int httpReadTimeout;
+
   @IntegerConfigurationValidatorAnnotation(ConfigurationKey = 
AZURE_OAUTH_TOKEN_FETCH_RETRY_COUNT,
       MinValue = 0,
       DefaultValue = DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_ATTEMPTS)
@@ -669,6 +685,14 @@ public class AbfsConfiguration{
     return this.maxBackoffInterval;
   }
 
+  public boolean getStaticRetryForConnectionTimeoutEnabled() {
+    return staticRetryForConnectionTimeoutEnabled;
+  }
+
+  public int getStaticRetryInterval() {
+    return staticRetryInterval;
+  }
+
   public int getBackoffIntervalMilliseconds() {
     return this.backoffInterval;
   }
@@ -681,6 +705,14 @@ public class AbfsConfiguration{
     return this.customTokenFetchRetryCount;
   }
 
+  public int getHttpConnectionTimeout() {
+    return this.httpConnectionTimeout;
+  }
+
+  public int getHttpReadTimeout() {
+    return this.httpReadTimeout;
+  }
+
   public long getAzureBlockSize() {
     return this.azureBlockSize;
   }
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index d9693dd7e1cd..8ece527e56a8 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -118,6 +118,7 @@ import 
org.apache.hadoop.fs.azurebfs.services.AbfsPermission;
 import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
 import org.apache.hadoop.fs.azurebfs.services.AuthType;
 import org.apache.hadoop.fs.azurebfs.services.ExponentialRetryPolicy;
+import org.apache.hadoop.fs.azurebfs.services.StaticRetryPolicy;
 import org.apache.hadoop.fs.azurebfs.services.AbfsLease;
 import org.apache.hadoop.fs.azurebfs.services.SharedKeyCredentials;
 import org.apache.hadoop.fs.azurebfs.services.AbfsPerfTracker;
@@ -1781,6 +1782,8 @@ public class AzureBlobFileSystemStore implements 
Closeable, ListingSupport {
     return new AbfsClientContextBuilder()
         .withExponentialRetryPolicy(
             new ExponentialRetryPolicy(abfsConfiguration))
+        .withStaticRetryPolicy(
+            new StaticRetryPolicy(abfsConfiguration))
         .withAbfsCounters(abfsCounters)
         .withAbfsPerfTracker(abfsPerfTracker)
         .build();
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
index a27c7570265d..af60ce949f50 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
@@ -48,10 +48,23 @@ public final class ConfigurationKeys {
   // Retry strategy defined by the user
   public static final String AZURE_MIN_BACKOFF_INTERVAL = 
"fs.azure.io.retry.min.backoff.interval";
   public static final String AZURE_MAX_BACKOFF_INTERVAL = 
"fs.azure.io.retry.max.backoff.interval";
+  public static final String AZURE_STATIC_RETRY_FOR_CONNECTION_TIMEOUT_ENABLED 
= "fs.azure.static.retry.for.connection.timeout.enabled";
+  public static final String AZURE_STATIC_RETRY_INTERVAL = 
"fs.azure.static.retry.interval";
   public static final String AZURE_BACKOFF_INTERVAL = 
"fs.azure.io.retry.backoff.interval";
   public static final String AZURE_MAX_IO_RETRIES = 
"fs.azure.io.retry.max.retries";
   public static final String AZURE_CUSTOM_TOKEN_FETCH_RETRY_COUNT = 
"fs.azure.custom.token.fetch.retry.count";
 
+  /**
+   * Config to set HTTP Connection Timeout Value for Rest Operations.
+   * Value: {@value}.
+   */
+  public static final String AZURE_HTTP_CONNECTION_TIMEOUT = 
"fs.azure.http.connection.timeout";
+  /**
+   * Config to set HTTP Read Timeout Value for Rest Operations.
+   * Value: {@value}.
+   */
+  public static final String AZURE_HTTP_READ_TIMEOUT = 
"fs.azure.http.read.timeout";
+
   //  Retry strategy for getToken calls
   public static final String AZURE_OAUTH_TOKEN_FETCH_RETRY_COUNT = 
"fs.azure.oauth.token.fetch.retry.max.retries";
   public static final String AZURE_OAUTH_TOKEN_FETCH_RETRY_MIN_BACKOFF = 
"fs.azure.oauth.token.fetch.retry.min.backoff.interval";
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
index b3825b4c53ec..331c9e5684f7 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
@@ -35,15 +35,28 @@ public final class FileSystemConfigurations {
   public static final boolean 
DEFAULT_FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED = true;
   public static final String USER_HOME_DIRECTORY_PREFIX = "/user";
 
-  private static final int SIXTY_SECONDS = 60 * 1000;
+  private static final int SIXTY_SECONDS = 60_000;
 
   // Retry parameter defaults.
-  public static final int DEFAULT_MIN_BACKOFF_INTERVAL = 3 * 1000;  // 3s
-  public static final int DEFAULT_MAX_BACKOFF_INTERVAL = 30 * 1000;  // 30s
-  public static final int DEFAULT_BACKOFF_INTERVAL = 3 * 1000;  // 3s
+  public static final int DEFAULT_MIN_BACKOFF_INTERVAL = 3_000;  // 3s
+  public static final int DEFAULT_MAX_BACKOFF_INTERVAL = 30_000;  // 30s
+  public static final boolean 
DEFAULT_STATIC_RETRY_FOR_CONNECTION_TIMEOUT_ENABLED = true;
+  public static final int DEFAULT_STATIC_RETRY_INTERVAL = 1_000; // 1s
+  public static final int DEFAULT_BACKOFF_INTERVAL = 3_000;  // 3s
   public static final int DEFAULT_MAX_RETRY_ATTEMPTS = 30;
   public static final int DEFAULT_CUSTOM_TOKEN_FETCH_RETRY_COUNT = 3;
 
+  /**
+   * Default value of connection timeout to be used while setting up HTTP 
Connection.
+   * Value: {@value}.
+   */
+  public static final int DEFAULT_HTTP_CONNECTION_TIMEOUT = 2_000; // 2s
+  /**
+   * Default value of read timeout to be used while setting up HTTP Connection.
+   * Value: {@value}.
+   */
+  public static final int DEFAULT_HTTP_READ_TIMEOUT = 30_000; // 30 secs
+
   // Retry parameter defaults.
   public static final int DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_ATTEMPTS = 
5;
   public static final int 
DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MIN_BACKOFF_INTERVAL = 0;
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
index 8eeb548f500b..cb6f8e9eadc7 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
@@ -82,6 +82,7 @@ import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.*
 import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.*;
 import static 
org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND;
 import static 
org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND;
+import static 
org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_TIMEOUT_ABBREVIATION;
 
 /**
  * AbfsClient.
@@ -93,7 +94,8 @@ public class AbfsClient implements Closeable {
   private final URL baseUrl;
   private final SharedKeyCredentials sharedKeyCredentials;
   private String xMsVersion = DECEMBER_2019_API_VERSION;
-  private final ExponentialRetryPolicy retryPolicy;
+  private final ExponentialRetryPolicy exponentialRetryPolicy;
+  private final StaticRetryPolicy staticRetryPolicy;
   private final String filesystem;
   private final AbfsConfiguration abfsConfiguration;
   private final String userAgent;
@@ -131,7 +133,8 @@ public class AbfsClient implements Closeable {
     String baseUrlString = baseUrl.toString();
     this.filesystem = 
baseUrlString.substring(baseUrlString.lastIndexOf(FORWARD_SLASH) + 1);
     this.abfsConfiguration = abfsConfiguration;
-    this.retryPolicy = abfsClientContext.getExponentialRetryPolicy();
+    this.exponentialRetryPolicy = 
abfsClientContext.getExponentialRetryPolicy();
+    this.staticRetryPolicy = abfsClientContext.getStaticRetryPolicy();
     this.accountName = abfsConfiguration.getAccountName().substring(0, 
abfsConfiguration.getAccountName().indexOf(AbfsHttpConstants.DOT));
     this.authType = abfsConfiguration.getAuthType(accountName);
     this.intercept = AbfsThrottlingInterceptFactory.getInstance(accountName, 
abfsConfiguration);
@@ -213,8 +216,24 @@ public class AbfsClient implements Closeable {
     return abfsPerfTracker;
   }
 
-  ExponentialRetryPolicy getRetryPolicy() {
-    return retryPolicy;
+  ExponentialRetryPolicy getExponentialRetryPolicy() {
+    return exponentialRetryPolicy;
+  }
+
+  StaticRetryPolicy getStaticRetryPolicy() {
+    return staticRetryPolicy;
+  }
+
+  /**
+   * Returns the retry policy to be used for Abfs Rest Operation Failure.
+   * @param failureReason helps to decide which type of retryPolicy to be used.
+   * @return retry policy to be used.
+   */
+  public AbfsRetryPolicy getRetryPolicy(final String failureReason) {
+    return CONNECTION_TIMEOUT_ABBREVIATION.equals(failureReason)
+        && getAbfsConfiguration().getStaticRetryForConnectionTimeoutEnabled()
+        ? getStaticRetryPolicy()
+        : getExponentialRetryPolicy();
   }
 
   SharedKeyCredentials getSharedKeyCredentials() {
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientContext.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientContext.java
index ad20550af7c3..0a5182a69914 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientContext.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientContext.java
@@ -25,14 +25,18 @@ package org.apache.hadoop.fs.azurebfs.services;
 public class AbfsClientContext {
 
   private final ExponentialRetryPolicy exponentialRetryPolicy;
+  private final StaticRetryPolicy staticRetryPolicy;
   private final AbfsPerfTracker abfsPerfTracker;
   private final AbfsCounters abfsCounters;
 
   AbfsClientContext(
       ExponentialRetryPolicy exponentialRetryPolicy,
+      StaticRetryPolicy staticRetryPolicy,
       AbfsPerfTracker abfsPerfTracker,
       AbfsCounters abfsCounters) {
     this.exponentialRetryPolicy = exponentialRetryPolicy;
+
+    this.staticRetryPolicy = staticRetryPolicy;
     this.abfsPerfTracker = abfsPerfTracker;
     this.abfsCounters = abfsCounters;
   }
@@ -41,6 +45,10 @@ public class AbfsClientContext {
     return exponentialRetryPolicy;
   }
 
+  public StaticRetryPolicy getStaticRetryPolicy() {
+    return staticRetryPolicy;
+  }
+
   public AbfsPerfTracker getAbfsPerfTracker() {
     return abfsPerfTracker;
   }
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientContextBuilder.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientContextBuilder.java
index 00513f7138d5..ca16de6d4f9f 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientContextBuilder.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientContextBuilder.java
@@ -25,6 +25,7 @@ package org.apache.hadoop.fs.azurebfs.services;
 public class AbfsClientContextBuilder {
 
   private ExponentialRetryPolicy exponentialRetryPolicy;
+  private StaticRetryPolicy staticRetryPolicy;
   private AbfsPerfTracker abfsPerfTracker;
   private AbfsCounters abfsCounters;
 
@@ -34,6 +35,12 @@ public class AbfsClientContextBuilder {
     return this;
   }
 
+  public AbfsClientContextBuilder withStaticRetryPolicy(
+      final StaticRetryPolicy staticRetryPolicy) {
+    this.staticRetryPolicy = staticRetryPolicy;
+    return this;
+  }
+
   public AbfsClientContextBuilder withAbfsPerfTracker(
       final AbfsPerfTracker abfsPerfTracker) {
     this.abfsPerfTracker = abfsPerfTracker;
@@ -52,7 +59,10 @@ public class AbfsClientContextBuilder {
    */
   public AbfsClientContext build() {
     //validate the values
-    return new AbfsClientContext(exponentialRetryPolicy, abfsPerfTracker,
+    return new AbfsClientContext(
+        exponentialRetryPolicy,
+        staticRetryPolicy,
+        abfsPerfTracker,
         abfsCounters);
   }
 }
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java
index c0b554f60702..a29eed6f4251 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java
@@ -55,9 +55,6 @@ import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.E
 public class AbfsHttpOperation implements AbfsPerfLoggable {
   private static final Logger LOG = 
LoggerFactory.getLogger(AbfsHttpOperation.class);
 
-  private static final int CONNECT_TIMEOUT = 30 * 1000;
-  private static final int READ_TIMEOUT = 30 * 1000;
-
   private static final int CLEAN_UP_BUFFER_SIZE = 64 * 1024;
 
   private static final int ONE_THOUSAND = 1000;
@@ -263,10 +260,12 @@ public class AbfsHttpOperation implements 
AbfsPerfLoggable {
    * @param url The full URL including query string parameters.
    * @param method The HTTP method (PUT, PATCH, POST, GET, HEAD, or DELETE).
    * @param requestHeaders The HTTP request headers.READ_TIMEOUT
-   *
+   * @param connectionTimeout The Connection Timeout value to be used while 
establishing http connection
+   * @param readTimeout The Read Timeout value to be used with http connection 
while making a request
    * @throws IOException if an error occurs.
    */
-  public AbfsHttpOperation(final URL url, final String method, final 
List<AbfsHttpHeader> requestHeaders)
+  public AbfsHttpOperation(final URL url, final String method, final 
List<AbfsHttpHeader> requestHeaders,
+                           final int connectionTimeout, final int readTimeout)
       throws IOException {
     this.url = url;
     this.method = method;
@@ -280,9 +279,8 @@ public class AbfsHttpOperation implements AbfsPerfLoggable {
       }
     }
 
-    this.connection.setConnectTimeout(CONNECT_TIMEOUT);
-    this.connection.setReadTimeout(READ_TIMEOUT);
-
+    this.connection.setConnectTimeout(connectionTimeout);
+    this.connection.setReadTimeout(readTimeout);
     this.connection.setRequestMethod(method);
 
     for (AbfsHttpHeader header : requestHeaders) {
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
index f40cd2cea81e..e901196bcc2e 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
 import org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding;
 
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_CONTINUE;
+import static 
org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_TIMEOUT_ABBREVIATION;
 
 /**
  * The AbfsRestOperation for Rest AbfsClient.
@@ -81,6 +82,7 @@ public class AbfsRestOperation {
    * AbfsRestOperation object.
    */
   private String failureReason;
+  private AbfsRetryPolicy retryPolicy;
 
   /**
    * This variable stores the tracing context used for last Rest Operation.
@@ -162,6 +164,7 @@ public class AbfsRestOperation {
     this.sasToken = sasToken;
     this.abfsCounters = client.getAbfsCounters();
     this.intercept = client.getIntercept();
+    this.retryPolicy = client.getExponentialRetryPolicy();
   }
 
   /**
@@ -232,15 +235,18 @@ public class AbfsRestOperation {
       requestHeaders.add(httpHeader);
     }
 
+    // By Default Exponential Retry Policy Will be used
     retryCount = 0;
+    retryPolicy = client.getExponentialRetryPolicy();
     LOG.debug("First execution of REST operation - {}", operationType);
     while (!executeHttpOperation(retryCount, tracingContext)) {
       try {
         ++retryCount;
         tracingContext.setRetryCount(retryCount);
-        LOG.debug("Retrying REST operation {}. RetryCount = {}",
-            operationType, retryCount);
-        Thread.sleep(client.getRetryPolicy().getRetryInterval(retryCount));
+        long retryInterval = retryPolicy.getRetryInterval(retryCount);
+        LOG.debug("Rest operation {} failed with failureReason: {}. Retrying 
with retryCount = {}, retryPolicy: {} and sleepInterval: {}",
+            operationType, failureReason, retryCount, 
retryPolicy.getAbbreviation(), retryInterval);
+        Thread.sleep(retryInterval);
       } catch (InterruptedException ex) {
         Thread.currentThread().interrupt();
       }
@@ -277,12 +283,13 @@ public class AbfsRestOperation {
   private boolean executeHttpOperation(final int retryCount,
     TracingContext tracingContext) throws AzureBlobFileSystemException {
     AbfsHttpOperation httpOperation;
+    boolean wasIOExceptionThrown = false;
 
     try {
       // initialize the HTTP request and open the connection
       httpOperation = createHttpOperation();
       incrementCounter(AbfsStatistic.CONNECTIONS_MADE, 1);
-      tracingContext.constructHeader(httpOperation, failureReason);
+      tracingContext.constructHeader(httpOperation, failureReason, 
retryPolicy.getAbbreviation());
 
       signRequest(httpOperation, hasRequestBody ? bufferLength : 0);
 
@@ -318,9 +325,10 @@ public class AbfsRestOperation {
       String hostname = null;
       hostname = httpOperation.getHost();
       failureReason = RetryReason.getAbbreviation(ex, null, null);
+      retryPolicy = client.getRetryPolicy(failureReason);
       LOG.warn("Unknown host name: {}. Retrying to resolve the host name...",
           hostname);
-      if (!client.getRetryPolicy().shouldRetry(retryCount, -1)) {
+      if (!retryPolicy.shouldRetry(retryCount, -1)) {
         throw new InvalidAbfsRestOperationException(ex, retryCount);
       }
       return false;
@@ -330,8 +338,9 @@ public class AbfsRestOperation {
       }
 
       failureReason = RetryReason.getAbbreviation(ex, -1, "");
-
-      if (!client.getRetryPolicy().shouldRetry(retryCount, -1)) {
+      retryPolicy = client.getRetryPolicy(failureReason);
+      wasIOExceptionThrown = true;
+      if (!retryPolicy.shouldRetry(retryCount, -1)) {
         throw new InvalidAbfsRestOperationException(ex, retryCount);
       }
 
@@ -339,25 +348,37 @@ public class AbfsRestOperation {
     } finally {
       int status = httpOperation.getStatusCode();
       /*
-        A status less than 300 (2xx range) or greater than or equal
-        to 500 (5xx range) should contribute to throttling metrics being 
updated.
-        Less than 200 or greater than or equal to 500 show failed operations. 
2xx
-        range contributes to successful operations. 3xx range is for redirects
-        and 4xx range is for user errors. These should not be a part of
-        throttling backoff computation.
+       A status less than 300 (2xx range) or greater than or equal
+       to 500 (5xx range) should contribute to throttling metrics being 
updated.
+       Less than 200 or greater than or equal to 500 show failed operations. 
2xx
+       range contributes to successful operations. 3xx range is for redirects
+       and 4xx range is for user errors. These should not be a part of
+       throttling backoff computation.
        */
       boolean updateMetricsResponseCode = (status < 
HttpURLConnection.HTTP_MULT_CHOICE
               || status >= HttpURLConnection.HTTP_INTERNAL_ERROR);
-      if (updateMetricsResponseCode) {
+
+      /*
+       Connection Timeout failures should not contribute to throttling
+       In case the current request fails with Connection Timeout we will have
+       ioExceptionThrown true and failure reason as CT
+       In case the current request failed with 5xx, failure reason will be
+       updated after finally block but wasIOExceptionThrown will be false;
+       */
+      boolean isCTFailure = 
CONNECTION_TIMEOUT_ABBREVIATION.equals(failureReason) && wasIOExceptionThrown;
+
+      if (updateMetricsResponseCode && !isCTFailure) {
         intercept.updateMetrics(operationType, httpOperation);
       }
     }
 
     LOG.debug("HttpRequest: {}: {}", operationType, httpOperation);
 
-    if (client.getRetryPolicy().shouldRetry(retryCount, 
httpOperation.getStatusCode())) {
-      int status = httpOperation.getStatusCode();
-      failureReason = RetryReason.getAbbreviation(null, status, 
httpOperation.getStorageErrorMessage());
+    int status = httpOperation.getStatusCode();
+    failureReason = RetryReason.getAbbreviation(null, status, 
httpOperation.getStorageErrorMessage());
+    retryPolicy = client.getRetryPolicy(failureReason);
+
+    if (retryPolicy.shouldRetry(retryCount, httpOperation.getStatusCode())) {
       return false;
     }
 
@@ -398,12 +419,16 @@ public class AbfsRestOperation {
   }
 
   /**
-   * Creates new object of {@link AbfsHttpOperation} with the url, method, and
-   * requestHeaders fields of the AbfsRestOperation object.
+   * Creates new object of {@link AbfsHttpOperation} with the url, method, 
requestHeader fields and
+   * timeout values as set in configuration of the AbfsRestOperation object.
+   *
+   * @return {@link AbfsHttpOperation} to be used for sending requests
    */
   @VisibleForTesting
   AbfsHttpOperation createHttpOperation() throws IOException {
-    return new AbfsHttpOperation(url, method, requestHeaders);
+    return new AbfsHttpOperation(url, method, requestHeaders,
+            client.getAbfsConfiguration().getHttpConnectionTimeout(),
+            client.getAbfsConfiguration().getHttpReadTimeout());
   }
 
   /**
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRetryPolicy.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRetryPolicy.java
new file mode 100644
index 000000000000..ffddd341ac21
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRetryPolicy.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.net.HttpURLConnection;
+
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_CONTINUE;
+
+/**
+ * Abstract Class for Retry policy to be used by {@link AbfsClient}
+ * Implementation to be used is based on retry cause.
+ */
+public abstract class AbfsRetryPolicy {
+
+  /**
+   * The maximum number of retry attempts.
+   */
+  private final int maxRetryCount;
+
+  /**
+   * Retry Policy Abbreviation for logging purpose.
+   */
+  private final String retryPolicyAbbreviation;
+
+  protected AbfsRetryPolicy(final int maxRetryCount, final String 
retryPolicyAbbreviation) {
+    this.maxRetryCount = maxRetryCount;
+    this.retryPolicyAbbreviation = retryPolicyAbbreviation;
+  }
+
+  /**
+   * Returns if a request should be retried based on the retry count, current 
response,
+   * and the current strategy. The valid http status code lies in the range of 
1xx-5xx.
+   * But an invalid status code might be set due to network or timeout kind of 
issues.
+   * Such invalid status code also qualify for retry.
+   *
+   * @param retryCount The current retry attempt count.
+   * @param statusCode The status code of the response, or -1 for socket error.
+   * @return true if the request should be retried; false otherwise.
+   */
+  public boolean shouldRetry(final int retryCount, final int statusCode) {
+    return retryCount < maxRetryCount
+        && (statusCode < HTTP_CONTINUE
+        || statusCode == HttpURLConnection.HTTP_CLIENT_TIMEOUT
+        || (statusCode >= HttpURLConnection.HTTP_INTERNAL_ERROR
+        && statusCode != HttpURLConnection.HTTP_NOT_IMPLEMENTED
+        && statusCode != HttpURLConnection.HTTP_VERSION));
+  }
+
+  /**
+   * Returns backoff interval to be used for a particular retry count
+   * Child class should define how they want to calculate retry interval
+   *
+   * @param retryCount The current retry attempt count.
+   * @return backoff Interval time
+   */
+  public abstract long getRetryInterval(int retryCount);
+
+  /**
+   * Returns a String value of the abbreviation
+   * denoting which type of retry policy is used
+   * @return retry policy abbreviation
+   */
+  public String getAbbreviation() {
+    return retryPolicyAbbreviation;
+  }
+
+  /**
+   * Returns maximum number of retries allowed in this retry policy
+   * @return max retry count
+   */
+  protected int getMaxRetryCount() {
+    return maxRetryCount;
+  }
+
+  @Override
+  public String toString() {
+    return "AbfsRetryPolicy of subtype: "
+        + retryPolicyAbbreviation
+        + " and max retry count: "
+        + maxRetryCount;
+  }
+}
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java
index 227bdc5fc1c9..f1f2bc8be346 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java
@@ -19,17 +19,14 @@
 package org.apache.hadoop.fs.azurebfs.services;
 
 import java.util.Random;
-import java.net.HttpURLConnection;
 
 import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
 import org.apache.hadoop.classification.VisibleForTesting;
 
-import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_CONTINUE;
-
 /**
  * Retry policy used by AbfsClient.
  * */
-public class ExponentialRetryPolicy {
+public class ExponentialRetryPolicy extends AbfsRetryPolicy {
   /**
    * Represents the default amount of time used when calculating a random 
delta in the exponential
    * delay between retries.
@@ -78,11 +75,6 @@ public class ExponentialRetryPolicy {
    */
   private final int minBackoff;
 
-  /**
-   * The maximum number of retry attempts.
-   */
-  private final int retryCount;
-
   /**
    * Initializes a new instance of the {@link ExponentialRetryPolicy} class.
    */
@@ -105,38 +97,19 @@ public class ExponentialRetryPolicy {
   /**
    * Initializes a new instance of the {@link ExponentialRetryPolicy} class.
    *
-   * @param retryCount The maximum number of retry attempts.
+   * @param maxRetryCount The maximum number of retry attempts.
    * @param minBackoff The minimum backoff time.
    * @param maxBackoff The maximum backoff time.
    * @param deltaBackoff The value that will be used to calculate a random 
delta in the exponential delay
    *                     between retries.
    */
-  public ExponentialRetryPolicy(final int retryCount, final int minBackoff, 
final int maxBackoff, final int deltaBackoff) {
-    this.retryCount = retryCount;
+  public ExponentialRetryPolicy(final int maxRetryCount, final int minBackoff, 
final int maxBackoff, final int deltaBackoff) {
+    super(maxRetryCount, 
RetryPolicyConstants.EXPONENTIAL_RETRY_POLICY_ABBREVIATION);
     this.minBackoff = minBackoff;
     this.maxBackoff = maxBackoff;
     this.deltaBackoff = deltaBackoff;
   }
 
-  /**
-   * Returns if a request should be retried based on the retry count, current 
response,
-   * and the current strategy. The valid http status code lies in the range of 
1xx-5xx.
-   * But an invalid status code might be set due to network or timeout kind of 
issues.
-   * Such invalid status code also qualify for retry.
-   *
-   * @param retryCount The current retry attempt count.
-   * @param statusCode The status code of the response, or -1 for socket error.
-   * @return true if the request should be retried; false otherwise.
-   */
-  public boolean shouldRetry(final int retryCount, final int statusCode) {
-    return retryCount < this.retryCount
-        && (statusCode < HTTP_CONTINUE
-        || statusCode == HttpURLConnection.HTTP_CLIENT_TIMEOUT
-        || (statusCode >= HttpURLConnection.HTTP_INTERNAL_ERROR
-            && statusCode != HttpURLConnection.HTTP_NOT_IMPLEMENTED
-            && statusCode != HttpURLConnection.HTTP_VERSION));
-  }
-
   /**
    * Returns backoff interval between 80% and 120% of the desired backoff,
    * multiply by 2^n-1 for exponential.
@@ -144,6 +117,7 @@ public class ExponentialRetryPolicy {
    * @param retryCount The current retry attempt count.
    * @return backoff Interval time
    */
+  @Override
   public long getRetryInterval(final int retryCount) {
     final long boundedRandDelta = (int) (this.deltaBackoff * MIN_RANDOM_RATIO)
         + this.randRef.nextInt((int) (this.deltaBackoff * MAX_RANDOM_RATIO)
@@ -151,16 +125,12 @@ public class ExponentialRetryPolicy {
 
     final double incrementDelta = (Math.pow(2, retryCount - 1)) * 
boundedRandDelta;
 
-    final long retryInterval = (int) Math.round(Math.min(this.minBackoff + 
incrementDelta, maxBackoff));
+    final long retryInterval = (int) Math.round(Math.min(
+            this.minBackoff + incrementDelta, maxBackoff));
 
     return retryInterval;
   }
 
-  @VisibleForTesting
-  int getRetryCount() {
-    return this.retryCount;
-  }
-
   @VisibleForTesting
   int getMinBackoff() {
     return this.minBackoff;
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RetryPolicyConstants.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RetryPolicyConstants.java
new file mode 100644
index 000000000000..d2df032e717c
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RetryPolicyConstants.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+public final class RetryPolicyConstants {
+
+  private RetryPolicyConstants() {
+
+  }
+
+  /**
+   * Constant for Exponential Retry Policy Abbreviation. {@value}
+   */
+  public static final String EXPONENTIAL_RETRY_POLICY_ABBREVIATION= "E";
+  /**
+   * Constant for Static Retry Policy Abbreviation. {@value}
+   */
+  public static final String STATIC_RETRY_POLICY_ABBREVIATION = "S";
+}
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/StaticRetryPolicy.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/StaticRetryPolicy.java
new file mode 100644
index 000000000000..7f6eb5f4dd54
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/StaticRetryPolicy.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+
+/**
+ * Retry policy used by AbfsClient for Network Errors.
+ * */
+public class StaticRetryPolicy extends AbfsRetryPolicy {
+
+  /**
+   * Represents the constant retry interval to be used with Static Retry Policy
+   */
+  private final int retryInterval;
+
+  /**
+   * Initializes a new instance of the {@link StaticRetryPolicy} class.
+   * @param conf The {@link AbfsConfiguration} from which to retrieve retry 
configuration.
+   */
+  public StaticRetryPolicy(AbfsConfiguration conf) {
+    super(conf.getMaxIoRetries(), 
RetryPolicyConstants.STATIC_RETRY_POLICY_ABBREVIATION);
+    this.retryInterval = conf.getStaticRetryInterval();
+  }
+
+  /**
+   * Returns a constant backoff interval independent of retry count;
+   *
+   * @param retryCount The current retry attempt count.
+   * @return backoff Interval time
+   */
+  @Override
+  public long getRetryInterval(final int retryCount) {
+    return retryInterval;
+  }
+}
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java
index 97864e61e0be..3c54c204dda9 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
 import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
 
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
+import static 
org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_TIMEOUT_ABBREVIATION;
 
 /**
  * The TracingContext class to correlate Store requests using unique
@@ -66,7 +67,7 @@ public class TracingContext {
   /**
    * If {@link #primaryRequestId} is null, this field shall be set equal
    * to the last part of the {@link #clientRequestId}'s UUID
-   * in {@link #constructHeader(AbfsHttpOperation, String)} only on the
+   * in {@link #constructHeader(AbfsHttpOperation, String, String)} only on the
    * first API call for an operation. Subsequent retries for that operation
    * will not change this field. In case {@link  #primaryRequestId} is 
non-null,
    * this field shall not be set.
@@ -168,8 +169,10 @@ public class TracingContext {
    *                      connection
    * @param previousFailure Failure seen before this API trigger on same 
operation
    * from AbfsClient.
+   * @param retryPolicyAbbreviation Retry policy used to get retry interval 
before this
+   * API trigger on same operation from AbfsClient
    */
-  public void constructHeader(AbfsHttpOperation httpOperation, String 
previousFailure) {
+  public void constructHeader(AbfsHttpOperation httpOperation, String 
previousFailure, String retryPolicyAbbreviation) {
     clientRequestId = UUID.randomUUID().toString();
     switch (format) {
     case ALL_ID_FORMAT: // Optional IDs (e.g. streamId) may be empty
@@ -177,7 +180,7 @@ public class TracingContext {
           clientCorrelationID + ":" + clientRequestId + ":" + fileSystemID + 
":"
               + getPrimaryRequestIdForHeader(retryCount > 0) + ":" + streamID
               + ":" + opType + ":" + retryCount;
-      header = addFailureReasons(header, previousFailure);
+      header = addFailureReasons(header, previousFailure, 
retryPolicyAbbreviation);
       break;
     case TWO_ID_FORMAT:
       header = clientCorrelationID + ":" + clientRequestId;
@@ -217,10 +220,13 @@ public class TracingContext {
   }
 
   private String addFailureReasons(final String header,
-      final String previousFailure) {
+      final String previousFailure, String retryPolicyAbbreviation) {
     if (previousFailure == null) {
       return header;
     }
+    if (CONNECTION_TIMEOUT_ABBREVIATION.equals(previousFailure) && 
retryPolicyAbbreviation != null) {
+      return String.format("%s_%s_%s", header, previousFailure, 
retryPolicyAbbreviation);
+    }
     return String.format("%s_%s", header, previousFailure);
   }
 
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2E.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2E.java
index 56016a39470e..f1673a3b38b4 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2E.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2E.java
@@ -23,6 +23,9 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Random;
 
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.assertj.core.api.Assertions;
 import org.junit.Test;
 
 import org.apache.hadoop.conf.Configuration;
@@ -32,6 +35,9 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
 
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_HTTP_CONNECTION_TIMEOUT;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_HTTP_READ_TIMEOUT;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_MAX_IO_RETRIES;
 import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_TOLERATE_CONCURRENT_APPEND;
 import static 
org.apache.hadoop.fs.contract.ContractTestUtils.assertPathDoesNotExist;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathExists;
@@ -45,6 +51,9 @@ public class ITestAzureBlobFileSystemE2E extends 
AbstractAbfsIntegrationTest {
   private static final int TEST_OFFSET = 100;
   private static final int TEST_DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024;
   private static final int TEST_DEFAULT_READ_BUFFER_SIZE = 1023900;
+  private static final int TEST_STABLE_DEFAULT_CONNECTION_TIMEOUT_MS = 500;
+  private static final int TEST_STABLE_DEFAULT_READ_TIMEOUT_MS = 30000;
+  private static final int TEST_UNSTABLE_READ_TIMEOUT_MS = 1;
 
   public ITestAzureBlobFileSystemE2E() throws Exception {
     super();
@@ -229,4 +238,42 @@ public class ITestAzureBlobFileSystemE2E extends 
AbstractAbfsIntegrationTest {
     FileStatus fileStatus = fs.getFileStatus(testFilePath);
     assertEquals(1, fileStatus.getLen());
   }
+
+  @Test
+  public void testHttpConnectionTimeout() throws Exception {
+    // Not seeing connection failures while testing with 1 ms connection
+    // timeout itself and on repeated TPCDS runs when cluster
+    // and account are in same region, 10 ms is seen stable.
+    // 500 ms is seen stable for cross region.
+    testHttpTimeouts(TEST_STABLE_DEFAULT_CONNECTION_TIMEOUT_MS,
+            TEST_STABLE_DEFAULT_READ_TIMEOUT_MS);
+  }
+
+  @Test(expected = InvalidAbfsRestOperationException.class)
+  public void testHttpReadTimeout() throws Exception {
+    // Small read timeout is bound to make the request fail.
+    testHttpTimeouts(TEST_STABLE_DEFAULT_CONNECTION_TIMEOUT_MS,
+            TEST_UNSTABLE_READ_TIMEOUT_MS);
+  }
+
+  public void testHttpTimeouts(int connectionTimeoutMs, int readTimeoutMs)
+          throws Exception {
+    Configuration conf = this.getRawConfiguration();
+    // set to small values that will cause timeouts
+    conf.setInt(AZURE_HTTP_CONNECTION_TIMEOUT, connectionTimeoutMs);
+    conf.setInt(AZURE_HTTP_READ_TIMEOUT, readTimeoutMs);
+    // Reduce retry count to reduce test run time
+    conf.setInt(AZURE_MAX_IO_RETRIES, 1);
+    final AzureBlobFileSystem fs = getFileSystem(conf);
+    Assertions.assertThat(
+                    
fs.getAbfsStore().getAbfsConfiguration().getHttpConnectionTimeout())
+            .describedAs("HTTP connection time should be picked from config")
+            .isEqualTo(connectionTimeoutMs);
+    Assertions.assertThat(
+                    
fs.getAbfsStore().getAbfsConfiguration().getHttpReadTimeout())
+            .describedAs("HTTP Read time should be picked from config")
+            .isEqualTo(readTimeoutMs);
+    Path testPath = path(methodName.getMethodName());
+    ContractTestUtils.createFile(fs, testPath, false, new byte[0]);
+  }
 }
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatus.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatus.java
index e7f57b8af54d..b374193e9bc9 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatus.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatus.java
@@ -178,7 +178,7 @@ public class ITestAzureBlobFileSystemListStatus extends
         TEST_CONTINUATION_TOKEN, spiedTracingContext);
 
     // Assert that none of the API calls used the same tracing header.
-    Mockito.verify(spiedTracingContext, times(0)).constructHeader(any(), 
any());
+    Mockito.verify(spiedTracingContext, times(0)).constructHeader(any(), 
any(), any());
   }
 
   /**
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsConfigurationFieldsValidation.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsConfigurationFieldsValidation.java
index f041f4bccdc8..0b7645bd243b 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsConfigurationFieldsValidation.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsConfigurationFieldsValidation.java
@@ -34,21 +34,11 @@ import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.KeyProviderException;
 import org.apache.hadoop.fs.azurebfs.utils.Base64;
 
 import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_SSL_CHANNEL_MODE_KEY;
-import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_AHEAD_RANGE;
-import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE;
-import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_WRITE_BUFFER_SIZE;
-import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_MAX_RETRY_ATTEMPTS;
-import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_BACKOFF_INTERVAL;
-import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_MAX_BACKOFF_INTERVAL;
-import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_MIN_BACKOFF_INTERVAL;
-import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MAX_AZURE_BLOCK_SIZE;
-import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.AZURE_BLOCK_LOCATION_HOST_DEFAULT;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.*;
 
 import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException;
 import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
+import org.assertj.core.api.Assertions;
 import org.junit.Test;
 
 /**
@@ -118,15 +108,15 @@ public class TestAbfsConfigurationFieldsValidation {
     for (Field field : fields) {
       field.setAccessible(true);
       if 
(field.isAnnotationPresent(IntegerConfigurationValidatorAnnotation.class)) {
-        assertEquals(TEST_INT, abfsConfiguration.validateInt(field));
+        
Assertions.assertThat(abfsConfiguration.validateInt(field)).isEqualTo(TEST_INT);
       } else if 
(field.isAnnotationPresent(LongConfigurationValidatorAnnotation.class)) {
-        assertEquals(DEFAULT_LONG, abfsConfiguration.validateLong(field));
+        
Assertions.assertThat(abfsConfiguration.validateLong(field)).isEqualTo(DEFAULT_LONG);
       } else if 
(field.isAnnotationPresent(StringConfigurationValidatorAnnotation.class)) {
-        assertEquals("stringValue", abfsConfiguration.validateString(field));
+        
Assertions.assertThat(abfsConfiguration.validateString(field)).isEqualTo("stringValue");
       } else if 
(field.isAnnotationPresent(Base64StringConfigurationValidatorAnnotation.class)) 
{
-        assertEquals(this.encodedString, 
abfsConfiguration.validateBase64String(field));
+        
Assertions.assertThat(abfsConfiguration.validateBase64String(field)).isEqualTo(this.encodedString);
       } else if 
(field.isAnnotationPresent(BooleanConfigurationValidatorAnnotation.class)) {
-        assertEquals(true, abfsConfiguration.validateBoolean(field));
+        
Assertions.assertThat(abfsConfiguration.validateBoolean(field)).isEqualTo(true);
       }
     }
   }
@@ -134,27 +124,54 @@ public class TestAbfsConfigurationFieldsValidation {
   @Test
   public void testConfigServiceImplAnnotatedFieldsInitialized() throws 
Exception {
     // test that all the ConfigurationServiceImpl annotated fields have been 
initialized in the constructor
-    assertEquals(DEFAULT_WRITE_BUFFER_SIZE, 
abfsConfiguration.getWriteBufferSize());
-    assertEquals(DEFAULT_READ_BUFFER_SIZE, 
abfsConfiguration.getReadBufferSize());
-    assertEquals(DEFAULT_MIN_BACKOFF_INTERVAL, 
abfsConfiguration.getMinBackoffIntervalMilliseconds());
-    assertEquals(DEFAULT_MAX_BACKOFF_INTERVAL, 
abfsConfiguration.getMaxBackoffIntervalMilliseconds());
-    assertEquals(DEFAULT_BACKOFF_INTERVAL, 
abfsConfiguration.getBackoffIntervalMilliseconds());
-    assertEquals(DEFAULT_MAX_RETRY_ATTEMPTS, 
abfsConfiguration.getMaxIoRetries());
-    assertEquals(MAX_AZURE_BLOCK_SIZE, abfsConfiguration.getAzureBlockSize());
-    assertEquals(AZURE_BLOCK_LOCATION_HOST_DEFAULT, 
abfsConfiguration.getAzureBlockLocationHost());
-    assertEquals(DEFAULT_READ_AHEAD_RANGE, 
abfsConfiguration.getReadAheadRange());
+    Assertions.assertThat(abfsConfiguration.getWriteBufferSize())
+            .describedAs("Default value of write buffer size should be 
initialized")
+            .isEqualTo(DEFAULT_WRITE_BUFFER_SIZE);
+    Assertions.assertThat(abfsConfiguration.getReadBufferSize())
+            .describedAs("Default value of read buffer size should be 
initialized")
+            .isEqualTo(DEFAULT_READ_BUFFER_SIZE);
+    
Assertions.assertThat(abfsConfiguration.getMinBackoffIntervalMilliseconds())
+            .describedAs("Default value of min backoff interval should be 
initialized")
+            .isEqualTo(DEFAULT_MIN_BACKOFF_INTERVAL);
+    
Assertions.assertThat(abfsConfiguration.getMaxBackoffIntervalMilliseconds())
+            .describedAs("Default value of max backoff interval should be 
initialized")
+            .isEqualTo(DEFAULT_MAX_BACKOFF_INTERVAL);
+    Assertions.assertThat(abfsConfiguration.getBackoffIntervalMilliseconds())
+            .describedAs("Default value of backoff interval should be 
initialized")
+            .isEqualTo(DEFAULT_BACKOFF_INTERVAL);
+    Assertions.assertThat(abfsConfiguration.getMaxIoRetries())
+            .describedAs("Default value of max number of retries should be 
initialized")
+            .isEqualTo(DEFAULT_MAX_RETRY_ATTEMPTS);
+    Assertions.assertThat(abfsConfiguration.getAzureBlockSize())
+            .describedAs("Default value of azure block size should be 
initialized")
+            .isEqualTo(MAX_AZURE_BLOCK_SIZE);
+    Assertions.assertThat(abfsConfiguration.getAzureBlockLocationHost())
+            .describedAs("Default value of azure block location host should be 
initialized")
+            .isEqualTo(AZURE_BLOCK_LOCATION_HOST_DEFAULT);
+    Assertions.assertThat(abfsConfiguration.getReadAheadRange())
+            .describedAs("Default value of read ahead range should be 
initialized")
+            .isEqualTo(DEFAULT_READ_AHEAD_RANGE);
+    Assertions.assertThat(abfsConfiguration.getHttpConnectionTimeout())
+            .describedAs("Default value of http connection timeout should be 
initialized")
+            .isEqualTo(DEFAULT_HTTP_CONNECTION_TIMEOUT);
+    Assertions.assertThat(abfsConfiguration.getHttpReadTimeout())
+            .describedAs("Default value of http read timeout should be 
initialized")
+            .isEqualTo(DEFAULT_HTTP_READ_TIMEOUT);
   }
 
   @Test
   public void testConfigBlockSizeInitialized() throws Exception {
     // test the block size annotated field has been initialized in the 
constructor
-    assertEquals(MAX_AZURE_BLOCK_SIZE, abfsConfiguration.getAzureBlockSize());
+    Assertions.assertThat(abfsConfiguration.getAzureBlockSize())
+            .describedAs("Default value of max azure block size should be 
initialized")
+            .isEqualTo(MAX_AZURE_BLOCK_SIZE);
   }
 
   @Test
   public void testGetAccountKey() throws Exception {
     String accountKey = abfsConfiguration.getStorageAccountKey();
-    assertEquals(this.encodedAccountKey, accountKey);
+    Assertions.assertThat(accountKey).describedAs("Account Key should be 
initialized in configs")
+            .isEqualTo(this.encodedAccountKey);
   }
 
   @Test(expected = KeyProviderException.class)
@@ -169,19 +186,28 @@ public class TestAbfsConfigurationFieldsValidation {
   @Test
   public void testSSLSocketFactoryConfiguration()
       throws InvalidConfigurationValueException, IllegalAccessException, 
IOException {
-    assertEquals(DelegatingSSLSocketFactory.SSLChannelMode.Default, 
abfsConfiguration.getPreferredSSLFactoryOption());
-    assertNotEquals(DelegatingSSLSocketFactory.SSLChannelMode.Default_JSSE, 
abfsConfiguration.getPreferredSSLFactoryOption());
-    assertNotEquals(DelegatingSSLSocketFactory.SSLChannelMode.OpenSSL, 
abfsConfiguration.getPreferredSSLFactoryOption());
-
+    Assertions.assertThat(abfsConfiguration.getPreferredSSLFactoryOption())
+            .describedAs("By default SSL Channel Mode should be Default")
+            .isEqualTo(DelegatingSSLSocketFactory.SSLChannelMode.Default);
+    Assertions.assertThat(abfsConfiguration.getPreferredSSLFactoryOption())
+            .describedAs("By default SSL Channel Mode should be Default")
+            
.isNotEqualTo(DelegatingSSLSocketFactory.SSLChannelMode.Default_JSSE);
+    Assertions.assertThat(abfsConfiguration.getPreferredSSLFactoryOption())
+            .describedAs("By default SSL Channel Mode should be Default")
+            .isNotEqualTo(DelegatingSSLSocketFactory.SSLChannelMode.OpenSSL);
     Configuration configuration = new Configuration();
     configuration.setEnum(FS_AZURE_SSL_CHANNEL_MODE_KEY, 
DelegatingSSLSocketFactory.SSLChannelMode.Default_JSSE);
     AbfsConfiguration localAbfsConfiguration = new 
AbfsConfiguration(configuration, accountName);
-    assertEquals(DelegatingSSLSocketFactory.SSLChannelMode.Default_JSSE, 
localAbfsConfiguration.getPreferredSSLFactoryOption());
+    
Assertions.assertThat(localAbfsConfiguration.getPreferredSSLFactoryOption())
+            .describedAs("SSL Channel Mode should be Default_JSSE as set")
+            .isEqualTo(DelegatingSSLSocketFactory.SSLChannelMode.Default_JSSE);
 
     configuration = new Configuration();
     configuration.setEnum(FS_AZURE_SSL_CHANNEL_MODE_KEY, 
DelegatingSSLSocketFactory.SSLChannelMode.OpenSSL);
     localAbfsConfiguration = new AbfsConfiguration(configuration, accountName);
-    assertEquals(DelegatingSSLSocketFactory.SSLChannelMode.OpenSSL, 
localAbfsConfiguration.getPreferredSSLFactoryOption());
+    
Assertions.assertThat(localAbfsConfiguration.getPreferredSSLFactoryOption())
+            .describedAs("SSL Channel Mode should be OpenSSL as set")
+            .isEqualTo(DelegatingSSLSocketFactory.SSLChannelMode.OpenSSL);
   }
 
   public static AbfsConfiguration updateRetryConfigs(AbfsConfiguration 
abfsConfig,
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestTracingContext.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestTracingContext.java
index 0a19a24de3ee..3ffa2bd49e42 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestTracingContext.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestTracingContext.java
@@ -51,6 +51,10 @@ import org.apache.hadoop.util.Preconditions;
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
 import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_CLIENT_CORRELATIONID;
 import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE;
+import static 
org.apache.hadoop.fs.azurebfs.services.RetryPolicyConstants.EXPONENTIAL_RETRY_POLICY_ABBREVIATION;
+import static 
org.apache.hadoop.fs.azurebfs.services.RetryPolicyConstants.STATIC_RETRY_POLICY_ABBREVIATION;
+import static 
org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_TIMEOUT_ABBREVIATION;
+import static 
org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.READ_TIMEOUT_ABBREVIATION;
 
 public class TestTracingContext extends AbstractAbfsIntegrationTest {
   private static final String[] CLIENT_CORRELATIONID_LIST = {
@@ -213,7 +217,7 @@ public class TestTracingContext extends 
AbstractAbfsIntegrationTest {
         0));
     AbfsHttpOperation abfsHttpOperation = 
Mockito.mock(AbfsHttpOperation.class);
     
Mockito.doNothing().when(abfsHttpOperation).setRequestProperty(Mockito.anyString(),
 Mockito.anyString());
-    tracingContext.constructHeader(abfsHttpOperation, null);
+    tracingContext.constructHeader(abfsHttpOperation, null, 
EXPONENTIAL_RETRY_POLICY_ABBREVIATION);
     String header = tracingContext.getHeader();
     String clientRequestIdUsed = header.split(":")[1];
     String[] clientRequestIdUsedParts = clientRequestIdUsed.split("-");
@@ -225,7 +229,7 @@ public class TestTracingContext extends 
AbstractAbfsIntegrationTest {
         fs.getFileSystemId(), FSOperationType.CREATE_FILESYSTEM, false,
         1));
 
-    tracingContext.constructHeader(abfsHttpOperation, "RT");
+    tracingContext.constructHeader(abfsHttpOperation, 
READ_TIMEOUT_ABBREVIATION, EXPONENTIAL_RETRY_POLICY_ABBREVIATION);
     header = tracingContext.getHeader();
     String primaryRequestId = header.split(":")[3];
 
@@ -250,7 +254,7 @@ public class TestTracingContext extends 
AbstractAbfsIntegrationTest {
     tracingContext.setPrimaryRequestID();
     AbfsHttpOperation abfsHttpOperation = 
Mockito.mock(AbfsHttpOperation.class);
     
Mockito.doNothing().when(abfsHttpOperation).setRequestProperty(Mockito.anyString(),
 Mockito.anyString());
-    tracingContext.constructHeader(abfsHttpOperation, null);
+    tracingContext.constructHeader(abfsHttpOperation, null, 
EXPONENTIAL_RETRY_POLICY_ABBREVIATION);
     String header = tracingContext.getHeader();
     String assertionPrimaryId = header.split(":")[3];
 
@@ -260,7 +264,7 @@ public class TestTracingContext extends 
AbstractAbfsIntegrationTest {
         fs.getFileSystemId(), FSOperationType.CREATE_FILESYSTEM, false,
         1));
 
-    tracingContext.constructHeader(abfsHttpOperation, "RT");
+    tracingContext.constructHeader(abfsHttpOperation, 
READ_TIMEOUT_ABBREVIATION, EXPONENTIAL_RETRY_POLICY_ABBREVIATION);
     header = tracingContext.getHeader();
     String primaryRequestId = header.split(":")[3];
 
@@ -269,4 +273,69 @@ public class TestTracingContext extends 
AbstractAbfsIntegrationTest {
             + "should be equal to PrimaryRequestId in the original request.")
         .isEqualTo(assertionPrimaryId);
   }
+
+  @Test
+  public void testTracingContextHeaderForRetrypolicy() throws Exception {
+    final AzureBlobFileSystem fs = getFileSystem();
+    final String fileSystemId = fs.getFileSystemId();
+    final String clientCorrelationId = fs.getClientCorrelationId();
+    final TracingHeaderFormat tracingHeaderFormat = 
TracingHeaderFormat.ALL_ID_FORMAT;
+    TracingContext tracingContext = new TracingContext(clientCorrelationId,
+        fileSystemId, FSOperationType.CREATE_FILESYSTEM, tracingHeaderFormat, 
new TracingHeaderValidator(
+        fs.getAbfsStore().getAbfsConfiguration().getClientCorrelationId(),
+        fs.getFileSystemId(), FSOperationType.CREATE_FILESYSTEM, false,
+        0));
+    tracingContext.setPrimaryRequestID();
+    AbfsHttpOperation abfsHttpOperation = 
Mockito.mock(AbfsHttpOperation.class);
+    
Mockito.doNothing().when(abfsHttpOperation).setRequestProperty(Mockito.anyString(),
 Mockito.anyString());
+
+    tracingContext.constructHeader(abfsHttpOperation, null, null);
+    checkHeaderForRetryPolicyAbbreviation(tracingContext.getHeader(), null, 
null);
+
+    tracingContext.constructHeader(abfsHttpOperation, null, 
STATIC_RETRY_POLICY_ABBREVIATION);
+    checkHeaderForRetryPolicyAbbreviation(tracingContext.getHeader(), null, 
null);
+
+    tracingContext.constructHeader(abfsHttpOperation, null, 
EXPONENTIAL_RETRY_POLICY_ABBREVIATION);
+    checkHeaderForRetryPolicyAbbreviation(tracingContext.getHeader(), null, 
null);
+
+    tracingContext.constructHeader(abfsHttpOperation, 
CONNECTION_TIMEOUT_ABBREVIATION, null);
+    checkHeaderForRetryPolicyAbbreviation(tracingContext.getHeader(), 
CONNECTION_TIMEOUT_ABBREVIATION, null);
+
+    tracingContext.constructHeader(abfsHttpOperation, 
CONNECTION_TIMEOUT_ABBREVIATION, STATIC_RETRY_POLICY_ABBREVIATION);
+    checkHeaderForRetryPolicyAbbreviation(tracingContext.getHeader(), 
CONNECTION_TIMEOUT_ABBREVIATION, STATIC_RETRY_POLICY_ABBREVIATION);
+
+    tracingContext.constructHeader(abfsHttpOperation, 
CONNECTION_TIMEOUT_ABBREVIATION, EXPONENTIAL_RETRY_POLICY_ABBREVIATION);
+    checkHeaderForRetryPolicyAbbreviation(tracingContext.getHeader(), 
CONNECTION_TIMEOUT_ABBREVIATION, EXPONENTIAL_RETRY_POLICY_ABBREVIATION);
+
+    tracingContext.constructHeader(abfsHttpOperation, "503", null);
+    checkHeaderForRetryPolicyAbbreviation(tracingContext.getHeader(), "503", 
null);
+
+    tracingContext.constructHeader(abfsHttpOperation, "503", 
STATIC_RETRY_POLICY_ABBREVIATION);
+    checkHeaderForRetryPolicyAbbreviation(tracingContext.getHeader(), "503", 
null);
+
+    tracingContext.constructHeader(abfsHttpOperation, "503", 
EXPONENTIAL_RETRY_POLICY_ABBREVIATION);
+    checkHeaderForRetryPolicyAbbreviation(tracingContext.getHeader(), "503", 
null);
+  }
+
+  private void checkHeaderForRetryPolicyAbbreviation(String header, String 
expectedFailureReason, String expectedRetryPolicyAbbreviation) {
+    String[] headerContents = header.split(":");
+    String previousReqContext = headerContents[6];
+
+    if (expectedFailureReason != null) {
+      Assertions.assertThat(previousReqContext.split("_")[1]).describedAs(
+          "Failure reason Is not as 
expected").isEqualTo(expectedFailureReason);
+      if (expectedRetryPolicyAbbreviation != null) {
+        Assertions.assertThat(previousReqContext.split("_")).describedAs(
+            "Retry Count, Failure Reason and Retry Policy should be 
present").hasSize(3);
+        Assertions.assertThat(previousReqContext.split("_")[2]).describedAs(
+            "Retry policy is not as 
expected").isEqualTo(expectedRetryPolicyAbbreviation);
+      } else {
+        Assertions.assertThat(previousReqContext.split("_")).describedAs(
+            "Retry Count and Failure Reason should be present").hasSize(2);
+      }
+    } else {
+      Assertions.assertThat(previousReqContext.split("_")).describedAs(
+          "Only Retry Count should be present").hasSize(1);
+    }
+  }
 }
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockDelegationSASTokenProvider.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockDelegationSASTokenProvider.java
index cf7d51da4c44..00c681fdadde 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockDelegationSASTokenProvider.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockDelegationSASTokenProvider.java
@@ -39,6 +39,9 @@ import 
org.apache.hadoop.fs.azurebfs.utils.DelegationSASGenerator;
 import org.apache.hadoop.fs.azurebfs.utils.SASGenerator;
 import org.apache.hadoop.security.AccessControlException;
 
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_HTTP_CONNECTION_TIMEOUT;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_HTTP_READ_TIMEOUT;
+
 /**
  * A mock SAS token provider implementation
  */
@@ -103,7 +106,8 @@ public class MockDelegationSASTokenProvider implements 
SASTokenProvider {
     requestBody.append(ske);
     requestBody.append("</Expiry></KeyInfo>");
 
-    AbfsHttpOperation op = new AbfsHttpOperation(url, method, requestHeaders);
+    AbfsHttpOperation op = new AbfsHttpOperation(url, method, requestHeaders,
+            DEFAULT_HTTP_CONNECTION_TIMEOUT, DEFAULT_HTTP_READ_TIMEOUT);
 
     byte[] requestBuffer = 
requestBody.toString().getBytes(StandardCharsets.UTF_8.toString());
     op.sendRequest(requestBuffer, 0, requestBuffer.length);
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java
index 875682fe2032..b1b093d67063 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java
@@ -26,7 +26,9 @@ import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
 import org.assertj.core.api.Assertions;
+import org.mockito.AdditionalMatchers;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -35,8 +37,12 @@ import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
 import org.apache.hadoop.util.functional.FunctionRaisingIOE;
 
 import static java.net.HttpURLConnection.HTTP_OK;
+import static java.net.HttpURLConnection.HTTP_UNAVAILABLE;
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_GET;
 import static org.apache.hadoop.fs.azurebfs.services.AuthType.OAuth;
+import static 
org.apache.hadoop.fs.azurebfs.services.RetryPolicyConstants.EXPONENTIAL_RETRY_POLICY_ABBREVIATION;
+import static 
org.apache.hadoop.fs.azurebfs.services.RetryPolicyConstants.STATIC_RETRY_POLICY_ABBREVIATION;
+import static 
org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_TIMEOUT_ABBREVIATION;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.ArgumentMatchers.nullable;
@@ -46,6 +52,7 @@ import static org.mockito.ArgumentMatchers.nullable;
  * objects which are protected inside services package.
  */
 public final class AbfsClientTestUtil {
+  private static final long ONE_SEC = 1000;
 
   private AbfsClientTestUtil() {
 
@@ -55,7 +62,9 @@ public final class AbfsClientTestUtil {
       final AbfsClient spiedClient,
       FunctionRaisingIOE<AbfsHttpOperation, AbfsHttpOperation> 
functionRaisingIOE)
       throws Exception {
-    ExponentialRetryPolicy retryPolicy = 
Mockito.mock(ExponentialRetryPolicy.class);
+    ExponentialRetryPolicy exponentialRetryPolicy = 
Mockito.mock(ExponentialRetryPolicy.class);
+    StaticRetryPolicy staticRetryPolicy = 
Mockito.mock(StaticRetryPolicy.class);
+    AbfsThrottlingIntercept intercept = 
Mockito.mock(AbfsThrottlingIntercept.class);
     AbfsHttpOperation httpOperation = Mockito.mock(AbfsHttpOperation.class);
     AbfsRestOperation abfsRestOperation = Mockito.spy(new AbfsRestOperation(
         AbfsRestOperationType.ListPaths,
@@ -68,7 +77,7 @@ public final class AbfsClientTestUtil {
     Mockito.doReturn(abfsRestOperation).when(spiedClient).getAbfsRestOperation(
         eq(AbfsRestOperationType.ListPaths), any(), any(), any());
 
-    addGeneralMockBehaviourToAbfsClient(spiedClient, retryPolicy);
+    addGeneralMockBehaviourToAbfsClient(spiedClient, exponentialRetryPolicy, 
staticRetryPolicy, intercept);
     addGeneralMockBehaviourToRestOpAndHttpOp(abfsRestOperation, httpOperation);
 
     functionRaisingIOE.apply(httpOperation);
@@ -96,28 +105,54 @@ public final class AbfsClientTestUtil {
    * Adding general mock behaviour to AbfsClient to avoid any NPE occurring.
    * These will avoid any network call made and will return the relevant 
exception or return value directly.
    * @param abfsClient to be mocked
-   * @param retryPolicy to be mocked
+   * @param exponentialRetryPolicy
+   * @param staticRetryPolicy
    * @throws IOException
    */
   public static void addGeneralMockBehaviourToAbfsClient(final AbfsClient 
abfsClient,
-                                                         final 
ExponentialRetryPolicy retryPolicy) throws IOException {
+                                                         final 
ExponentialRetryPolicy exponentialRetryPolicy,
+                                                         final 
StaticRetryPolicy staticRetryPolicy,
+                                                         final 
AbfsThrottlingIntercept intercept) throws IOException {
     Mockito.doReturn(OAuth).when(abfsClient).getAuthType();
     Mockito.doReturn("").when(abfsClient).getAccessToken();
-    AbfsThrottlingIntercept intercept = Mockito.mock(
-        AbfsThrottlingIntercept.class);
+
     Mockito.doReturn(intercept).when(abfsClient).getIntercept();
     Mockito.doNothing()
         .when(intercept)
         .sendingRequest(any(), nullable(AbfsCounters.class));
     Mockito.doNothing().when(intercept).updateMetrics(any(), any());
 
-    Mockito.doReturn(retryPolicy).when(abfsClient).getRetryPolicy();
-    Mockito.doReturn(true)
-        .when(retryPolicy)
-        .shouldRetry(nullable(Integer.class), nullable(Integer.class));
-    Mockito.doReturn(false).when(retryPolicy).shouldRetry(0, HTTP_OK);
-    Mockito.doReturn(false).when(retryPolicy).shouldRetry(1, HTTP_OK);
-    Mockito.doReturn(false).when(retryPolicy).shouldRetry(2, HTTP_OK);
+    // Returning correct retry policy based on failure reason
+    
Mockito.doReturn(exponentialRetryPolicy).when(abfsClient).getExponentialRetryPolicy();
+    
Mockito.doReturn(staticRetryPolicy).when(abfsClient).getRetryPolicy(CONNECTION_TIMEOUT_ABBREVIATION);
+    Mockito.doReturn(exponentialRetryPolicy).when(abfsClient).getRetryPolicy(
+            AdditionalMatchers.not(eq(CONNECTION_TIMEOUT_ABBREVIATION)));
+
+    // Defining behavior of static retry policy
+    Mockito.doReturn(true).when(staticRetryPolicy)
+            .shouldRetry(nullable(Integer.class), nullable(Integer.class));
+    Mockito.doReturn(false).when(staticRetryPolicy).shouldRetry(1, HTTP_OK);
+    Mockito.doReturn(false).when(staticRetryPolicy).shouldRetry(2, HTTP_OK);
+    Mockito.doReturn(true).when(staticRetryPolicy).shouldRetry(1, 
HTTP_UNAVAILABLE);
+    // We want only two retries to occcur
+    Mockito.doReturn(false).when(staticRetryPolicy).shouldRetry(2, 
HTTP_UNAVAILABLE);
+    
Mockito.doReturn(STATIC_RETRY_POLICY_ABBREVIATION).when(staticRetryPolicy).getAbbreviation();
+    
Mockito.doReturn(ONE_SEC).when(staticRetryPolicy).getRetryInterval(nullable(Integer.class));
+
+    // Defining behavior of exponential retry policy
+    Mockito.doReturn(true).when(exponentialRetryPolicy)
+            .shouldRetry(nullable(Integer.class), nullable(Integer.class));
+    Mockito.doReturn(false).when(exponentialRetryPolicy).shouldRetry(1, 
HTTP_OK);
+    Mockito.doReturn(false).when(exponentialRetryPolicy).shouldRetry(2, 
HTTP_OK);
+    Mockito.doReturn(true).when(exponentialRetryPolicy).shouldRetry(1, 
HTTP_UNAVAILABLE);
+    // We want only two retries to occcur
+    Mockito.doReturn(false).when(exponentialRetryPolicy).shouldRetry(2, 
HTTP_UNAVAILABLE);
+    
Mockito.doReturn(EXPONENTIAL_RETRY_POLICY_ABBREVIATION).when(exponentialRetryPolicy).getAbbreviation();
+    Mockito.doReturn(2 * 
ONE_SEC).when(exponentialRetryPolicy).getRetryInterval(nullable(Integer.class));
+
+    AbfsConfiguration configurations = Mockito.mock(AbfsConfiguration.class);
+    Mockito.doReturn(configurations).when(abfsClient).getAbfsConfiguration();
+    
Mockito.doReturn(true).when(configurations).getStaticRetryForConnectionTimeoutEnabled();
   }
 
   public static void hookOnRestOpsForTracingContextSingularity(AbfsClient 
client) {
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java
index 08af6375a1ae..b031271a51f2 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java
@@ -52,6 +52,7 @@ import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EXPECT_1
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH;
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.*;
 import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.EXPECT;
 import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_HTTP_METHOD_OVERRIDE;
 import static 
org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_ACTION;
@@ -77,7 +78,6 @@ import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SEMICOLO
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SINGLE_WHITE_SPACE;
 import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_CLUSTER_NAME;
 import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_CLUSTER_TYPE;
-import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_VALUE_UNKNOWN;
 import static 
org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.TEST_CONFIGURATION_FILE_NAME;
 
 /**
@@ -365,7 +365,9 @@ public final class ITestAbfsClient extends 
AbstractAbfsIntegrationTest {
 
     when(client.getAbfsPerfTracker()).thenReturn(tracker);
     when(client.getAuthType()).thenReturn(currentAuthType);
-    when(client.getRetryPolicy()).thenReturn(
+    when(client.getExponentialRetryPolicy()).thenReturn(
+        new ExponentialRetryPolicy(1));
+    when(client.getRetryPolicy(any())).thenReturn(
         new ExponentialRetryPolicy(1));
 
     when(client.createDefaultUriQueryBuilder()).thenCallRealMethod();
@@ -560,7 +562,7 @@ public final class ITestAbfsClient extends 
AbstractAbfsIntegrationTest {
         appendRequestParameters.getLength(), null));
 
     AbfsHttpOperation abfsHttpOperation = Mockito.spy(new 
AbfsHttpOperation(url,
-        HTTP_METHOD_PUT, requestHeaders));
+        HTTP_METHOD_PUT, requestHeaders, DEFAULT_HTTP_CONNECTION_TIMEOUT, 
DEFAULT_HTTP_READ_TIMEOUT));
 
     // Sets the expect request property if expect header is enabled.
     if (appendRequestParameters.isExpectHeaderEnabled()) {
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsRestOperation.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsRestOperation.java
index 16a47d15f523..32897355f138 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsRestOperation.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsRestOperation.java
@@ -53,6 +53,8 @@ import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EXPECT_1
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH;
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_HTTP_CONNECTION_TIMEOUT;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_HTTP_READ_TIMEOUT;
 import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.EXPECT;
 import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_HTTP_METHOD_OVERRIDE;
 import static 
org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_ACTION;
@@ -202,7 +204,8 @@ public class ITestAbfsRestOperation extends 
AbstractAbfsIntegrationTest {
         appendRequestParameters.getoffset(),
         appendRequestParameters.getLength(), null));
 
-    AbfsHttpOperation abfsHttpOperation = Mockito.spy(new 
AbfsHttpOperation(url, HTTP_METHOD_PUT, requestHeaders));
+    AbfsHttpOperation abfsHttpOperation = Mockito.spy(new 
AbfsHttpOperation(url, HTTP_METHOD_PUT, requestHeaders,
+            DEFAULT_HTTP_CONNECTION_TIMEOUT, DEFAULT_HTTP_READ_TIMEOUT));
 
     // Sets the expect request property if expect header is enabled.
     if (expectHeaderEnabled) {
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestExponentialRetryPolicy.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestExponentialRetryPolicy.java
similarity index 89%
rename from 
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestExponentialRetryPolicy.java
rename to 
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestExponentialRetryPolicy.java
index 12ab4e9ead68..13323eb2a209 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestExponentialRetryPolicy.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestExponentialRetryPolicy.java
@@ -47,7 +47,6 @@ import java.util.Random;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
-import org.junit.Assert;
 import org.junit.Test;
 
 import org.apache.hadoop.conf.Configuration;
@@ -56,9 +55,9 @@ import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
 import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
 
 /**
- * Unit test TestExponentialRetryPolicy.
+ * Unit test ITestExponentialRetryPolicy.
  */
-public class TestExponentialRetryPolicy extends AbstractAbfsIntegrationTest {
+public class ITestExponentialRetryPolicy extends AbstractAbfsIntegrationTest {
   private final int maxRetryCount = 30;
   private final int noRetryCount = 0;
   private final int retryCount = new Random().nextInt(maxRetryCount);
@@ -68,7 +67,7 @@ public class TestExponentialRetryPolicy extends 
AbstractAbfsIntegrationTest {
   private static final int ANALYSIS_PERIOD = 10000;
 
 
-  public TestExponentialRetryPolicy() throws Exception {
+  public ITestExponentialRetryPolicy() throws Exception {
     super();
   }
 
@@ -86,9 +85,10 @@ public class TestExponentialRetryPolicy extends 
AbstractAbfsIntegrationTest {
   @Test
   public void testDefaultMaxIORetryCount() throws Exception {
     AbfsConfiguration abfsConfig = getAbfsConfig();
-    Assert.assertEquals(
-        String.format("default maxIORetry count is %s.", maxRetryCount),
-        maxRetryCount, abfsConfig.getMaxIoRetries());
+    Assertions.assertThat(abfsConfig.getMaxIoRetries())
+        .describedAs("Max retry count should be %s", maxRetryCount)
+        .isEqualTo(maxRetryCount);
+
     testMaxIOConfig(abfsConfig);
   }
 
@@ -265,7 +265,7 @@ public class TestExponentialRetryPolicy extends 
AbstractAbfsIntegrationTest {
     ExponentialRetryPolicy template = new ExponentialRetryPolicy(
         getAbfsConfig().getMaxIoRetries());
     int testModifier = 1;
-    int expectedMaxRetries = template.getRetryCount() + testModifier;
+    int expectedMaxRetries = template.getMaxRetryCount() + testModifier;
     int expectedMinBackoff = template.getMinBackoff() + testModifier;
     int expectedMaxBackoff = template.getMaxBackoff() + testModifier;
     int expectedDeltaBackoff = template.getDeltaBackoff() + testModifier;
@@ -279,10 +279,18 @@ public class TestExponentialRetryPolicy extends 
AbstractAbfsIntegrationTest {
     ExponentialRetryPolicy policy = new ExponentialRetryPolicy(
         new AbfsConfiguration(config, "dummyAccountName"));
 
-    Assert.assertEquals("Max retry count was not set as expected.", 
expectedMaxRetries, policy.getRetryCount());
-    Assert.assertEquals("Min backoff interval was not set as expected.", 
expectedMinBackoff, policy.getMinBackoff());
-    Assert.assertEquals("Max backoff interval was not set as expected.", 
expectedMaxBackoff, policy.getMaxBackoff());
-    Assert.assertEquals("Delta backoff interval was not set as expected.", 
expectedDeltaBackoff, policy.getDeltaBackoff());
+    Assertions.assertThat(policy.getMaxRetryCount())
+        .describedAs("Max retry count was not set as expected.")
+        .isEqualTo(expectedMaxRetries);
+    Assertions.assertThat(policy.getMinBackoff())
+        .describedAs("Min backoff interval was not set as expected.")
+        .isEqualTo(expectedMinBackoff);
+    Assertions.assertThat(policy.getMaxBackoff())
+        .describedAs("Max backoff interval was not set as expected")
+        .isEqualTo(expectedMaxBackoff);
+    Assertions.assertThat(policy.getDeltaBackoff())
+        .describedAs("Delta backoff interval was not set as expected.")
+        .isEqualTo(expectedDeltaBackoff);
   }
 
   private AbfsConfiguration getAbfsConfig() throws Exception {
@@ -297,14 +305,14 @@ public class TestExponentialRetryPolicy extends 
AbstractAbfsIntegrationTest {
     int localRetryCount = 0;
 
     while (localRetryCount < abfsConfig.getMaxIoRetries()) {
-      Assert.assertTrue(
-          "Retry should be allowed when retryCount less than max count 
configured.",
-          retryPolicy.shouldRetry(localRetryCount, -1));
+      Assertions.assertThat(retryPolicy.shouldRetry(localRetryCount, -1))
+          .describedAs("Retry should be allowed when retryCount less than max 
count configured.")
+          .isTrue();
       localRetryCount++;
     }
 
-    Assert.assertEquals(
-        "When all retries are exhausted, the retryCount will be same as max 
configured",
-        abfsConfig.getMaxIoRetries(), localRetryCount);
+    Assertions.assertThat(localRetryCount)
+        .describedAs("When all retries are exhausted, the retryCount will be 
same as max configured.")
+        .isEqualTo(abfsConfig.getMaxIoRetries());
   }
 }
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestStaticRetryPolicy.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestStaticRetryPolicy.java
new file mode 100644
index 000000000000..9b4467c1dbd3
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestStaticRetryPolicy.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_MAX_IO_RETRIES;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_STATIC_RETRY_FOR_CONNECTION_TIMEOUT_ENABLED;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_STATIC_RETRY_INTERVAL;
+import static 
org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_RESET_ABBREVIATION;
+import static 
org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_TIMEOUT_ABBREVIATION;
+
+/**
+ * Class to test the behavior of Static Retry policy as well the inheritance
+ * between {@link AbfsRetryPolicy}, {@link ExponentialRetryPolicy}, {@link 
StaticRetryPolicy}
+ */
+public class ITestStaticRetryPolicy extends AbstractAbfsIntegrationTest {
+
+  public ITestStaticRetryPolicy() throws Exception {
+    super();
+  }
+
+  /**
+   * Tests for retry policy related configurations.
+   * Asserting that the correct retry policy is used for a given set of
+   * configurations including default ones
+   * @throws Exception
+   */
+  @Test
+  public void testStaticRetryPolicyInitializationDefault() throws Exception {
+    Configuration config = new Configuration(this.getRawConfiguration());
+    assertInitialization(config, StaticRetryPolicy.class);
+  }
+
+  @Test
+  public void testStaticRetryPolicyInitialization1() throws Exception {
+    Configuration config = new Configuration(this.getRawConfiguration());
+    config.set(AZURE_STATIC_RETRY_FOR_CONNECTION_TIMEOUT_ENABLED, "true");
+    assertInitialization(config, StaticRetryPolicy.class);
+  }
+
+  @Test
+  public void testStaticRetryPolicyInitialization2() throws Exception {
+    Configuration config = new Configuration(this.getRawConfiguration());
+    config.set(AZURE_STATIC_RETRY_FOR_CONNECTION_TIMEOUT_ENABLED, "false");
+    assertInitialization(config, ExponentialRetryPolicy.class);
+  }
+
+  private void assertInitialization(Configuration config, Class 
retryPolicyClass) throws Exception{
+    final AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem
+        .newInstance(getFileSystem().getUri(), config);
+    AbfsClient client = fs.getAbfsStore().getClient();
+
+    // Assert that static retry policy will be used only for CT Failures
+    AbfsRetryPolicy retryPolicy = 
client.getRetryPolicy(CONNECTION_TIMEOUT_ABBREVIATION);
+    Assertions.assertThat(retryPolicy)
+        .describedAs("RetryPolicy Type is Not As Expected")
+        .isInstanceOf(retryPolicyClass);
+
+    // For all other possible values of failureReason, Exponential retry is 
used
+    retryPolicy = client.getRetryPolicy("");
+    assertIsExponentialRetryPolicy(retryPolicy);
+    retryPolicy = client.getRetryPolicy(null);
+    assertIsExponentialRetryPolicy(retryPolicy);
+    retryPolicy = client.getRetryPolicy(CONNECTION_RESET_ABBREVIATION);
+    assertIsExponentialRetryPolicy(retryPolicy);
+  }
+
+  /**
+   * Test to assert that static retry policy returns the same retry interval
+   * independent of retry count
+   * @throws Exception
+   */
+  @Test
+  public void testStaticRetryInterval() throws Exception {
+    Configuration config = new Configuration(this.getRawConfiguration());
+    long retryInterval = 1000;
+    int maxIoRetry = 5;
+    config.set(AZURE_STATIC_RETRY_FOR_CONNECTION_TIMEOUT_ENABLED, "true");
+    config.set(AZURE_STATIC_RETRY_INTERVAL, "1000");
+    config.set(AZURE_MAX_IO_RETRIES, "5");
+    final AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem
+        .newInstance(getFileSystem().getUri(), config);
+    AbfsClient client = fs.getAbfsStore().getClient();
+
+    AbfsRetryPolicy retryPolicy = 
client.getRetryPolicy(CONNECTION_TIMEOUT_ABBREVIATION);
+    assertIsStaticRetryPolicy(retryPolicy);
+
+    Assertions.assertThat(retryPolicy.shouldRetry(0, -1))
+        .describedAs("Should retry should be true")
+        .isEqualTo(true);
+    Assertions.assertThat(retryPolicy.getRetryInterval(0))
+        .describedAs("Retry Interval Value Not as expected")
+        .isEqualTo(retryInterval);
+    Assertions.assertThat(retryPolicy.getRetryInterval(1))
+        .describedAs("Retry Interval Value Not as expected")
+        .isEqualTo(retryInterval);
+    Assertions.assertThat(retryPolicy.getRetryInterval(2))
+        .describedAs("Retry Interval Value Not as expected")
+        .isEqualTo(retryInterval);
+    Assertions.assertThat(retryPolicy.getRetryInterval(3))
+        .describedAs("Retry Interval Value Not as expected")
+        .isEqualTo(retryInterval);
+    Assertions.assertThat(retryPolicy.shouldRetry(maxIoRetry, -1))
+        .describedAs("Should retry for maxretrycount should be false")
+        .isEqualTo(false);
+  }
+
+  private void assertIsExponentialRetryPolicy(AbfsRetryPolicy retryPolicy) {
+    Assertions.assertThat(retryPolicy)
+        .describedAs("Exponential Retry policy must be used")
+        .isInstanceOf(ExponentialRetryPolicy.class);
+  }
+
+  private void assertIsStaticRetryPolicy(AbfsRetryPolicy retryPolicy) {
+    Assertions.assertThat(retryPolicy)
+        .describedAs("Static Retry policy must be used")
+        .isInstanceOf(StaticRetryPolicy.class);
+  }
+}
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsPerfTracker.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsPerfTracker.java
index ef52f244f7e4..b7fb892362b4 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsPerfTracker.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsPerfTracker.java
@@ -36,6 +36,8 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
 
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_HTTP_CONNECTION_TIMEOUT;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_HTTP_READ_TIMEOUT;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /**
@@ -74,7 +76,8 @@ public final class TestAbfsPerfTracker {
 
     try (AbfsPerfInfo tracker = new AbfsPerfInfo(abfsPerfTracker, 
"disablingCaller",
             "disablingCallee")) {
-      AbfsHttpOperation op = new AbfsHttpOperation(url, "GET", new 
ArrayList<>());
+      AbfsHttpOperation op = new AbfsHttpOperation(url, "GET", new 
ArrayList<>(),
+              DEFAULT_HTTP_CONNECTION_TIMEOUT, DEFAULT_HTTP_READ_TIMEOUT);
       tracker.registerResult(op).registerSuccess(true);
     }
 
@@ -92,7 +95,8 @@ public final class TestAbfsPerfTracker {
     assertThat(latencyDetails).describedAs("AbfsPerfTracker should be 
empty").isNull();
 
     List<Callable<Integer>> tasks = new ArrayList<>();
-    AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new 
ArrayList<>());
+    AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new 
ArrayList<>(),
+            DEFAULT_HTTP_CONNECTION_TIMEOUT, DEFAULT_HTTP_READ_TIMEOUT);
 
     for (int i = 0; i < numTasks; i++) {
       tasks.add(() -> {
@@ -131,7 +135,8 @@ public final class TestAbfsPerfTracker {
     assertThat(latencyDetails).describedAs("AbfsPerfTracker should be 
empty").isNull();
 
     List<Callable<Integer>> tasks = new ArrayList<>();
-    AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new 
ArrayList<>());
+    AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new 
ArrayList<>(),
+            DEFAULT_HTTP_CONNECTION_TIMEOUT, DEFAULT_HTTP_READ_TIMEOUT);
 
     for (int i = 0; i < numTasks; i++) {
       tasks.add(() -> {
@@ -170,7 +175,8 @@ public final class TestAbfsPerfTracker {
     long aggregateLatency = 0;
     AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, 
filesystemName, false);
     List<Callable<Long>> tasks = new ArrayList<>();
-    final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", 
new ArrayList<>());
+    final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", 
new ArrayList<>(),
+            DEFAULT_HTTP_CONNECTION_TIMEOUT, DEFAULT_HTTP_READ_TIMEOUT);
 
     for (int i = 0; i < numTasks; i++) {
       tasks.add(() -> {
@@ -205,7 +211,8 @@ public final class TestAbfsPerfTracker {
     long aggregateLatency = 0;
     AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, 
filesystemName, false);
     List<Callable<Long>> tasks = new ArrayList<>();
-    final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", 
new ArrayList<>());
+    final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", 
new ArrayList<>(),
+            DEFAULT_HTTP_CONNECTION_TIMEOUT, DEFAULT_HTTP_READ_TIMEOUT);
 
     for (int i = 0; i < numTasks; i++) {
       tasks.add(() -> {
@@ -269,7 +276,8 @@ public final class TestAbfsPerfTracker {
     long aggregateLatency = 0;
     AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, 
filesystemName, true);
     List<Callable<Long>> tasks = new ArrayList<>();
-    final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", 
new ArrayList<>());
+    final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", 
new ArrayList<>(),
+            DEFAULT_HTTP_CONNECTION_TIMEOUT, DEFAULT_HTTP_READ_TIMEOUT);
 
     for (int i = 0; i < numTasks; i++) {
       tasks.add(() -> {
@@ -303,7 +311,8 @@ public final class TestAbfsPerfTracker {
     long aggregateLatency = 0;
     AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, 
filesystemName, true);
     List<Callable<Long>> tasks = new ArrayList<>();
-    final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", 
new ArrayList<>());
+    final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", 
new ArrayList<>(),
+            DEFAULT_HTTP_CONNECTION_TIMEOUT, DEFAULT_HTTP_READ_TIMEOUT);
 
     for (int i = 0; i < numTasks; i++) {
       tasks.add(() -> {
@@ -363,7 +372,8 @@ public final class TestAbfsPerfTracker {
     Instant testInstant = Instant.now();
     AbfsPerfTracker abfsPerfTrackerDisabled = new AbfsPerfTracker(accountName, 
filesystemName, false);
     AbfsPerfTracker abfsPerfTrackerEnabled = new AbfsPerfTracker(accountName, 
filesystemName, true);
-    final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", 
new ArrayList<AbfsHttpHeader>());
+    final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", 
new ArrayList<AbfsHttpHeader>(),
+            DEFAULT_HTTP_CONNECTION_TIMEOUT, DEFAULT_HTTP_READ_TIMEOUT);
 
     verifyNoException(abfsPerfTrackerDisabled);
     verifyNoException(abfsPerfTrackerEnabled);
@@ -371,7 +381,8 @@ public final class TestAbfsPerfTracker {
 
   private void verifyNoException(AbfsPerfTracker abfsPerfTracker) throws 
Exception {
     Instant testInstant = Instant.now();
-    final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", 
new ArrayList<AbfsHttpHeader>());
+    final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", 
new ArrayList<AbfsHttpHeader>(),
+            DEFAULT_HTTP_CONNECTION_TIMEOUT, DEFAULT_HTTP_READ_TIMEOUT);
 
     try (
             AbfsPerfInfo tracker01 = new AbfsPerfInfo(abfsPerfTracker, null, 
null);
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java
index cef1c9ae5a1e..1c53e62dd58b 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java
@@ -25,6 +25,7 @@ import java.net.URL;
 import java.time.Duration;
 
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
 import org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys;
 import org.apache.hadoop.fs.statistics.IOStatistics;
 import org.assertj.core.api.Assertions;
@@ -47,6 +48,8 @@ import 
org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
 import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
 
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_HTTP_CONNECTION_TIMEOUT;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_HTTP_READ_TIMEOUT;
 import static 
org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND;
 import static 
org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.PATH_ALREADY_EXISTS;
 import static 
org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND;
@@ -158,6 +161,10 @@ public class TestAbfsRenameRetryRecovery extends 
AbstractAbfsIntegrationTest {
 
     // adding mock objects to current AbfsClient
     AbfsClient spyClient = Mockito.spy(fs.getAbfsStore().getClient());
+    AbfsConfiguration spiedConf = 
Mockito.spy(fs.getAbfsStore().getAbfsConfiguration());
+    
Mockito.doReturn(DEFAULT_HTTP_CONNECTION_TIMEOUT).when(spiedConf).getHttpConnectionTimeout();
+    
Mockito.doReturn(DEFAULT_HTTP_READ_TIMEOUT).when(spiedConf).getHttpReadTimeout();
+    Mockito.doReturn(spiedConf).when(spyClient).getAbfsConfiguration();
 
     Mockito.doAnswer(answer -> {
       AbfsRestOperation op = new 
AbfsRestOperation(AbfsRestOperationType.RenamePath,
@@ -191,9 +198,7 @@ public class TestAbfsRenameRetryRecovery extends 
AbstractAbfsIntegrationTest {
     
normalOp2.getConnection().setRequestProperty(HttpHeaderConfigurations.AUTHORIZATION,
             client.getAccessToken());
 
-    when(spiedRestOp.createHttpOperation())
-            .thenReturn(failingOperation)
-            .thenReturn(normalOp2);
+    
Mockito.doReturn(failingOperation).doReturn(normalOp2).when(spiedRestOp).createHttpOperation();
   }
 
   /**
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRestOperationMockFailures.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRestOperationMockFailures.java
index b302a1fa939e..7f422582e7ac 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRestOperationMockFailures.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRestOperationMockFailures.java
@@ -29,6 +29,7 @@ import org.junit.Test;
 import org.mockito.Mockito;
 import org.mockito.stubbing.Stubber;
 
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
 import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
 
 import static java.net.HttpURLConnection.HTTP_BAD_REQUEST;
@@ -37,8 +38,11 @@ import static java.net.HttpURLConnection.HTTP_OK;
 import static java.net.HttpURLConnection.HTTP_UNAVAILABLE;
 import static 
org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.EGRESS_OVER_ACCOUNT_LIMIT;
 import static 
org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.INGRESS_OVER_ACCOUNT_LIMIT;
+import static 
org.apache.hadoop.fs.azurebfs.services.RetryPolicyConstants.EXPONENTIAL_RETRY_POLICY_ABBREVIATION;
+import static 
org.apache.hadoop.fs.azurebfs.services.RetryPolicyConstants.STATIC_RETRY_POLICY_ABBREVIATION;
 import static 
org.apache.hadoop.fs.azurebfs.services.AbfsClientTestUtil.addGeneralMockBehaviourToAbfsClient;
 import static 
org.apache.hadoop.fs.azurebfs.services.AbfsClientTestUtil.addGeneralMockBehaviourToRestOpAndHttpOp;
+
 import static 
org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_RESET_ABBREVIATION;
 import static 
org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_RESET_MESSAGE;
 import static 
org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_TIMEOUT_ABBREVIATION;
@@ -54,6 +58,8 @@ import static 
org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.SOCKET
 import static 
org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.UNKNOWN_HOST_EXCEPTION_ABBREVIATION;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.nullable;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.when;
 
 public class TestAbfsRestOperationMockFailures {
 
@@ -63,7 +69,7 @@ public class TestAbfsRestOperationMockFailures {
     String[] abbreviations = new String[1];
     exceptions[0] = new SocketTimeoutException(CONNECTION_TIMEOUT_JDK_MESSAGE);
     abbreviations[0] = CONNECTION_TIMEOUT_ABBREVIATION;
-    testClientRequestIdForTimeoutRetry(exceptions, abbreviations, 1);
+    testClientRequestIdForTimeoutRetry(exceptions, abbreviations, 1, 1);
   }
 
   @Test
@@ -75,7 +81,7 @@ public class TestAbfsRestOperationMockFailures {
     abbreviations[0] = CONNECTION_TIMEOUT_ABBREVIATION;
     exceptions[1] = new SocketTimeoutException(READ_TIMEOUT_JDK_MESSAGE);
     abbreviations[1] = READ_TIMEOUT_ABBREVIATION;
-    testClientRequestIdForTimeoutRetry(exceptions, abbreviations, 1);
+    testClientRequestIdForTimeoutRetry(exceptions, abbreviations, 1, 1);
   }
 
   @Test
@@ -84,7 +90,7 @@ public class TestAbfsRestOperationMockFailures {
     String[] abbreviations = new String[1];
     exceptions[0] = new SocketTimeoutException(READ_TIMEOUT_JDK_MESSAGE);
     abbreviations[0] = READ_TIMEOUT_ABBREVIATION;
-    testClientRequestIdForTimeoutRetry(exceptions, abbreviations, 1);
+    testClientRequestIdForTimeoutRetry(exceptions, abbreviations, 1, 0);
   }
 
   @Test
@@ -93,7 +99,7 @@ public class TestAbfsRestOperationMockFailures {
     String[] abbreviations = new String[1];
     exceptions[0] = new UnknownHostException();
     abbreviations[0] = UNKNOWN_HOST_EXCEPTION_ABBREVIATION;
-    testClientRequestIdForTimeoutRetry(exceptions, abbreviations, 1);
+    testClientRequestIdForTimeoutRetry(exceptions, abbreviations, 1, 0);
   }
 
   @Test
@@ -102,7 +108,7 @@ public class TestAbfsRestOperationMockFailures {
     String[] abbreviations = new String[1];
     exceptions[0] = new SocketTimeoutException(CONNECTION_RESET_MESSAGE + " by 
peer");
     abbreviations[0] = CONNECTION_RESET_ABBREVIATION;
-    testClientRequestIdForTimeoutRetry(exceptions, abbreviations, 1);
+    testClientRequestIdForTimeoutRetry(exceptions, abbreviations, 1, 0);
   }
 
   @Test
@@ -111,7 +117,7 @@ public class TestAbfsRestOperationMockFailures {
     String[] abbreviations = new String[1];
     exceptions[0] = new SocketException("unknown");
     abbreviations[0] = SOCKET_EXCEPTION_ABBREVIATION;
-    testClientRequestIdForTimeoutRetry(exceptions, abbreviations, 1);
+    testClientRequestIdForTimeoutRetry(exceptions, abbreviations, 1, 0);
   }
 
   @Test
@@ -120,7 +126,7 @@ public class TestAbfsRestOperationMockFailures {
     String[] abbreviations = new String[1];
     exceptions[0] = new InterruptedIOException();
     abbreviations[0] = IO_EXCEPTION_ABBREVIATION;
-    testClientRequestIdForTimeoutRetry(exceptions, abbreviations, 1);
+    testClientRequestIdForTimeoutRetry(exceptions, abbreviations, 1, 0);
   }
 
   @Test
@@ -158,16 +164,115 @@ public class TestAbfsRestOperationMockFailures {
     testClientRequestIdForStatusRetry(HTTP_UNAVAILABLE, "Other.", "503");
   }
 
+  /**
+   * Test for mocking the failure scenario with retry policy assertions.
+   * Here we will try to create a request with following life cycle:
+   * 1. Primary Request made fails with Connection Timeout and fall into retry 
loop
+   * 2. Retried request fails with 503 and again go for retry
+   * 3. Retried request fails with 503 and do not go for retry.
+   *
+   * We will try to assert that:
+   * 1. Correct retry policy is used to get the retry interval for each failed 
request
+   * 2. Tracing header construction takes place with proper arguments based on 
the failure reason and retry policy used
+   * @throws Exception
+   */
+
+  @Test
+  public void testRetryPolicyWithDifferentFailureReasons() throws Exception {
+
+    AbfsClient abfsClient = Mockito.mock(AbfsClient.class);
+    ExponentialRetryPolicy exponentialRetryPolicy = Mockito.mock(
+        ExponentialRetryPolicy.class);
+    StaticRetryPolicy staticRetryPolicy = 
Mockito.mock(StaticRetryPolicy.class);
+    AbfsThrottlingIntercept intercept = Mockito.mock(
+        AbfsThrottlingIntercept.class);
+    addGeneralMockBehaviourToAbfsClient(abfsClient, exponentialRetryPolicy, 
staticRetryPolicy, intercept);
+
+    AbfsRestOperation abfsRestOperation = Mockito.spy(new AbfsRestOperation(
+        AbfsRestOperationType.ReadFile,
+        abfsClient,
+        "PUT",
+        null,
+        new ArrayList<>()
+    ));
+
+    AbfsHttpOperation httpOperation = Mockito.mock(AbfsHttpOperation.class);
+    addGeneralMockBehaviourToRestOpAndHttpOp(abfsRestOperation, httpOperation);
+
+    Stubber stubber = Mockito.doThrow(new 
SocketTimeoutException(CONNECTION_TIMEOUT_JDK_MESSAGE));
+    stubber.doNothing().when(httpOperation).processResponse(
+        nullable(byte[].class), nullable(int.class), nullable(int.class));
+
+    
when(httpOperation.getStatusCode()).thenReturn(-1).thenReturn(HTTP_UNAVAILABLE);
+
+    TracingContext tracingContext = Mockito.mock(TracingContext.class);
+    
Mockito.doNothing().when(tracingContext).setRetryCount(nullable(int.class));
+    Mockito.doReturn("").when(httpOperation).getStorageErrorMessage();
+    Mockito.doReturn("").when(httpOperation).getStorageErrorCode();
+    Mockito.doReturn("HEAD").when(httpOperation).getMethod();
+    
Mockito.doReturn(tracingContext).when(abfsRestOperation).createNewTracingContext(any());
+
+    try {
+      // Operation will fail with CT first and then 503 thereafter.
+      abfsRestOperation.execute(tracingContext);
+    } catch(AbfsRestOperationException ex) {
+      Assertions.assertThat(ex.getStatusCode())
+          .describedAs("Status Code must be HTTP_UNAVAILABLE(409)")
+          .isEqualTo(HTTP_UNAVAILABLE);
+    }
+
+    // Assert that httpOperation.processResponse was called 3 times.
+    // One for retry count 0
+    // One for retry count 1 after failing with CT
+    // One for retry count 2 after failing with 50
+    Mockito.verify(httpOperation, times(3)).processResponse(
+        nullable(byte[].class), nullable(int.class), nullable(int.class));
+
+    // Assert that Static Retry Policy was used after CT failure.
+    // Iteration 1 failed with CT and shouldRetry was called with retry count 0
+    // Before iteration 2 sleep will be computed using static retry policy and 
retry count 1
+    Mockito.verify(abfsClient, Mockito.times(1))
+        .getRetryPolicy(CONNECTION_TIMEOUT_ABBREVIATION);
+    Mockito.verify(staticRetryPolicy, Mockito.times(1))
+        .shouldRetry(0, -1);
+    Mockito.verify(staticRetryPolicy, Mockito.times(1))
+        .getRetryInterval(1);
+    Mockito.verify(tracingContext, Mockito.times(1))
+        .constructHeader(httpOperation, CONNECTION_TIMEOUT_ABBREVIATION, 
STATIC_RETRY_POLICY_ABBREVIATION);
+
+    // Assert that exponential Retry Policy was used during second and third 
Iteration.
+    // Iteration 2 and 3 failed with 503 and should retry was called with 
retry count 1 and 2
+    // Before iteration 3 sleep will be computed using exponential retry 
policy and retry count 2
+    // Should retry with retry count 2 will return false and no further 
requests will be made.
+    Mockito.verify(abfsClient, Mockito.times(2))
+        .getRetryPolicy("503");
+    Mockito.verify(exponentialRetryPolicy, Mockito.times(1))
+        .shouldRetry(1, HTTP_UNAVAILABLE);
+    Mockito.verify(exponentialRetryPolicy, Mockito.times(1))
+        .shouldRetry(2, HTTP_UNAVAILABLE);
+    Mockito.verify(exponentialRetryPolicy, Mockito.times(1))
+        .getRetryInterval(2);
+    Mockito.verify(tracingContext, Mockito.times(1))
+        .constructHeader(httpOperation, "503", 
EXPONENTIAL_RETRY_POLICY_ABBREVIATION);
+
+    // Assert that intercept.updateMetrics was called only once during second 
Iteration
+    Mockito.verify(intercept, Mockito.times(2))
+        .updateMetrics(nullable(AbfsRestOperationType.class), 
nullable(AbfsHttpOperation.class));
+  }
+
   private void testClientRequestIdForStatusRetry(int status,
       String serverErrorMessage,
       String keyExpected) throws Exception {
 
     AbfsClient abfsClient = Mockito.mock(AbfsClient.class);
-    ExponentialRetryPolicy retryPolicy = Mockito.mock(
+    ExponentialRetryPolicy exponentialRetryPolicy = Mockito.mock(
         ExponentialRetryPolicy.class);
-    addGeneralMockBehaviourToAbfsClient(abfsClient, retryPolicy);
-
+    StaticRetryPolicy staticRetryPolicy = 
Mockito.mock(StaticRetryPolicy.class);
+    AbfsThrottlingIntercept intercept = Mockito.mock(
+        AbfsThrottlingIntercept.class);
+    addGeneralMockBehaviourToAbfsClient(abfsClient, exponentialRetryPolicy, 
staticRetryPolicy, intercept);
 
+    // Create a readfile operation that will fail
     AbfsRestOperation abfsRestOperation = Mockito.spy(new AbfsRestOperation(
         AbfsRestOperationType.ReadFile,
         abfsClient,
@@ -201,8 +306,7 @@ public class TestAbfsRestOperationMockFailures {
 
     TracingContext tracingContext = Mockito.mock(TracingContext.class);
     
Mockito.doNothing().when(tracingContext).setRetryCount(nullable(int.class));
-    Mockito.doReturn(tracingContext)
-        .when(abfsRestOperation).createNewTracingContext(any());
+    
Mockito.doReturn(tracingContext).when(abfsRestOperation).createNewTracingContext(any());
 
     int[] count = new int[1];
     count[0] = 0;
@@ -213,7 +317,7 @@ public class TestAbfsRestOperationMockFailures {
       }
       count[0]++;
       return null;
-    }).when(tracingContext).constructHeader(any(), any());
+    }).when(tracingContext).constructHeader(any(), any(), any());
 
     abfsRestOperation.execute(tracingContext);
     Assertions.assertThat(count[0]).isEqualTo(2);
@@ -222,12 +326,14 @@ public class TestAbfsRestOperationMockFailures {
 
   private void testClientRequestIdForTimeoutRetry(Exception[] exceptions,
       String[] abbreviationsExpected,
-      int len) throws Exception {
+      int len, int numOfCTExceptions) throws Exception {
     AbfsClient abfsClient = Mockito.mock(AbfsClient.class);
-    ExponentialRetryPolicy retryPolicy = Mockito.mock(
+    ExponentialRetryPolicy exponentialRetryPolicy = Mockito.mock(
         ExponentialRetryPolicy.class);
-    addGeneralMockBehaviourToAbfsClient(abfsClient, retryPolicy);
-
+    StaticRetryPolicy staticRetryPolicy = 
Mockito.mock(StaticRetryPolicy.class);
+    AbfsThrottlingIntercept intercept = Mockito.mock(
+        AbfsThrottlingIntercept.class);
+    addGeneralMockBehaviourToAbfsClient(abfsClient, exponentialRetryPolicy, 
staticRetryPolicy, intercept);
 
     AbfsRestOperation abfsRestOperation = Mockito.spy(new AbfsRestOperation(
         AbfsRestOperationType.ReadFile,
@@ -265,9 +371,19 @@ public class TestAbfsRestOperationMockFailures {
       }
       count[0]++;
       return null;
-    }).when(tracingContext).constructHeader(any(), any());
+    }).when(tracingContext).constructHeader(any(), any(), any());
 
     abfsRestOperation.execute(tracingContext);
     Assertions.assertThat(count[0]).isEqualTo(len + 1);
+
+    /**
+     * Assert that getRetryPolicy was called with
+     * failureReason CT only for Connection Timeout Cases.
+     * For every failed request getRetryPolicy will be called three times
+     * It will be called with failureReason CT for every request failing with 
CT
+     */
+    Mockito.verify(abfsClient, Mockito.times(
+        numOfCTExceptions))
+        .getRetryPolicy(CONNECTION_TIMEOUT_ABBREVIATION);
   }
 }
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAzureADAuthenticator.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAzureADAuthenticator.java
index 8e79288cf6e7..37a7a986e114 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAzureADAuthenticator.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAzureADAuthenticator.java
@@ -60,7 +60,7 @@ public class TestAzureADAuthenticator extends 
AbstractAbfsIntegrationTest {
     ExponentialRetryPolicy retryPolicy = abfsConfig
         .getOauthTokenFetchRetryPolicy();
 
-    Assertions.assertThat(retryPolicy.getRetryCount()).describedAs(
+    Assertions.assertThat(retryPolicy.getMaxRetryCount()).describedAs(
         "retryCount should be the default value {} as the same "
             + "is not configured",
         DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_ATTEMPTS)
@@ -103,7 +103,7 @@ public class TestAzureADAuthenticator extends 
AbstractAbfsIntegrationTest {
     ExponentialRetryPolicy retryPolicy = abfsConfig
         .getOauthTokenFetchRetryPolicy();
 
-    Assertions.assertThat(retryPolicy.getRetryCount())
+    Assertions.assertThat(retryPolicy.getMaxRetryCount())
         .describedAs("retryCount should be {}", TEST_RETRY_COUNT)
         .isEqualTo(TEST_RETRY_COUNT);
     Assertions.assertThat(retryPolicy.getMinBackoff())


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to