This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 8c12f28af2b3 HADOOP-19096. [ABFS] [CST Optimization] Enhance 
Client-Side Throttling Metrics Logic (#6276)
8c12f28af2b3 is described below

commit 8c12f28af2b3c29a3c3863e669520d3643ab6d36
Author: Anuj Modi <128447756+anujmodi2...@users.noreply.github.com>
AuthorDate: Thu Apr 11 18:41:33 2024 +0530

    HADOOP-19096. [ABFS] [CST Optimization] Enhance Client-Side Throttling 
Metrics Logic (#6276)
    
    
    ABFS has a client-side throttling mechanism which works on the metrics 
collected
    from past requests
    
    When requests are fail due to server-side throttling it updates its
    metrics and recalculates any client side backoff.
    
    The choice of which requests should be used to compute client side
    backoff interval is based on the http status code:
    
    - Status code in 2xx range: Successful Operations should contribute.
    - Status code in 3xx range: Redirection Operations should not contribute.
    - Status code in 4xx range: User Errors should not contribute.
    - Status code is 503: Throttling Error should contribute only if they
      are due to client limits breach as follows:
      * 503, Ingress Over Account Limit: Should Contribute
      * 503, Egress Over Account Limit: Should Contribute
      * 503, TPS Over Account Limit: Should Contribute
      * 503, Other Server Throttling: Should not Contribute.
    - Status code in 5xx range other than 503: Should not Contribute.
    - IOException and UnknownHostExceptions: Should not Contribute.
    
    Contributed by Anuj Modi
---
 .../contracts/services/AzureServiceErrorCode.java  | 10 ++-
 .../hadoop/fs/azurebfs/services/AbfsClient.java    |  2 +-
 .../fs/azurebfs/services/AbfsRestOperation.java    | 95 +++++++++++++---------
 .../fs/azurebfs/services/RetryReasonConstants.java |  4 +-
 .../ServerErrorRetryReason.java                    | 14 +++-
 .../azurebfs/services/ITestAbfsRestOperation.java  |  6 ++
 .../TestAbfsRestOperationMockFailures.java         | 72 ++++++++++------
 .../fs/azurebfs/services/TestRetryReason.java      |  8 +-
 8 files changed, 134 insertions(+), 77 deletions(-)

diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java
index 6c0ecfcdf862..12e687c15bb4 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java
@@ -42,8 +42,14 @@ public enum AzureServiceErrorCode {
   
INVALID_SOURCE_OR_DESTINATION_RESOURCE_TYPE("InvalidSourceOrDestinationResourceType",
 HttpURLConnection.HTTP_CONFLICT, null),
   
RENAME_DESTINATION_PARENT_PATH_NOT_FOUND("RenameDestinationParentPathNotFound", 
HttpURLConnection.HTTP_NOT_FOUND, null),
   INVALID_RENAME_SOURCE_PATH("InvalidRenameSourcePath", 
HttpURLConnection.HTTP_CONFLICT, null),
-  INGRESS_OVER_ACCOUNT_LIMIT(null, HttpURLConnection.HTTP_UNAVAILABLE, 
"Ingress is over the account limit."),
-  EGRESS_OVER_ACCOUNT_LIMIT(null, HttpURLConnection.HTTP_UNAVAILABLE, "Egress 
is over the account limit."),
+  INGRESS_OVER_ACCOUNT_LIMIT("ServerBusy", HttpURLConnection.HTTP_UNAVAILABLE,
+          "Ingress is over the account limit."),
+  EGRESS_OVER_ACCOUNT_LIMIT("ServerBusy", HttpURLConnection.HTTP_UNAVAILABLE,
+          "Egress is over the account limit."),
+  TPS_OVER_ACCOUNT_LIMIT("ServerBusy", HttpURLConnection.HTTP_UNAVAILABLE,
+          "Operations per second is over the account limit."),
+  OTHER_SERVER_THROTTLING("ServerBusy", HttpURLConnection.HTTP_UNAVAILABLE,
+          "The server is currently unable to receive requests. Please retry 
your request."),
   INVALID_QUERY_PARAMETER_VALUE("InvalidQueryParameterValue", 
HttpURLConnection.HTTP_BAD_REQUEST, null),
   AUTHORIZATION_PERMISSION_MISS_MATCH("AuthorizationPermissionMismatch", 
HttpURLConnection.HTTP_FORBIDDEN, null),
   ACCOUNT_REQUIRES_HTTPS("AccountRequiresHttps", 
HttpURLConnection.HTTP_BAD_REQUEST, null),
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
index 45da438a91bc..acbb0392fc97 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
@@ -168,7 +168,7 @@ public class AbfsClient implements Closeable {
         
DelegatingSSLSocketFactory.initializeDefaultFactory(this.abfsConfiguration.getPreferredSSLFactoryOption());
         sslProviderName = 
DelegatingSSLSocketFactory.getDefaultFactory().getProviderName();
       } catch (IOException e) {
-        // Suppress exception. Failure to init DelegatingSSLSocketFactory 
would have only performance impact.
+        // Suppress exception, failure to init DelegatingSSLSocketFactory 
would have only performance impact.
         LOG.trace("NonCritFailure: DelegatingSSLSocketFactory Init failed : "
             + "{}", e.getMessage());
       }
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
index e901196bcc2e..4abe9a574a87 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
@@ -39,7 +39,9 @@ import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
 import org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding;
 
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_CONTINUE;
-import static 
org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_TIMEOUT_ABBREVIATION;
+import static 
org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.EGRESS_LIMIT_BREACH_ABBREVIATION;
+import static 
org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.INGRESS_LIMIT_BREACH_ABBREVIATION;
+import static 
org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.TPS_LIMIT_BREACH_ABBREVIATION;
 
 /**
  * The AbfsRestOperation for Rest AbfsClient.
@@ -283,7 +285,8 @@ public class AbfsRestOperation {
   private boolean executeHttpOperation(final int retryCount,
     TracingContext tracingContext) throws AzureBlobFileSystemException {
     AbfsHttpOperation httpOperation;
-    boolean wasIOExceptionThrown = false;
+    // Used to avoid CST Metric Update in Case of UnknownHost/IO Exception.
+    boolean wasKnownExceptionThrown = false;
 
     try {
       // initialize the HTTP request and open the connection
@@ -321,7 +324,27 @@ public class AbfsRestOperation {
       } else if (httpOperation.getStatusCode() == 
HttpURLConnection.HTTP_UNAVAILABLE) {
         incrementCounter(AbfsStatistic.SERVER_UNAVAILABLE, 1);
       }
+
+      // If no exception occurred till here it means http operation was 
successfully complete and
+      // a response from server has been received which might be failure or 
success.
+      // If any kind of exception has occurred it will be caught below.
+      // If request failed to determine failure reason and retry policy here.
+      // else simply return with success after saving the result.
+      LOG.debug("HttpRequest: {}: {}", operationType, httpOperation);
+
+      int status = httpOperation.getStatusCode();
+      failureReason = RetryReason.getAbbreviation(null, status, 
httpOperation.getStorageErrorMessage());
+      retryPolicy = client.getRetryPolicy(failureReason);
+
+      if (retryPolicy.shouldRetry(retryCount, httpOperation.getStatusCode())) {
+        return false;
+      }
+
+      // If the request has succeeded or failed with non-retrial error, save 
the operation and return.
+      result = httpOperation;
+
     } catch (UnknownHostException ex) {
+      wasKnownExceptionThrown = true;
       String hostname = null;
       hostname = httpOperation.getHost();
       failureReason = RetryReason.getAbbreviation(ex, null, null);
@@ -333,57 +356,27 @@ public class AbfsRestOperation {
       }
       return false;
     } catch (IOException ex) {
+      wasKnownExceptionThrown = true;
       if (LOG.isDebugEnabled()) {
         LOG.debug("HttpRequestFailure: {}, {}", httpOperation, ex);
       }
 
       failureReason = RetryReason.getAbbreviation(ex, -1, "");
       retryPolicy = client.getRetryPolicy(failureReason);
-      wasIOExceptionThrown = true;
       if (!retryPolicy.shouldRetry(retryCount, -1)) {
         throw new InvalidAbfsRestOperationException(ex, retryCount);
       }
 
       return false;
     } finally {
-      int status = httpOperation.getStatusCode();
-      /*
-       A status less than 300 (2xx range) or greater than or equal
-       to 500 (5xx range) should contribute to throttling metrics being 
updated.
-       Less than 200 or greater than or equal to 500 show failed operations. 
2xx
-       range contributes to successful operations. 3xx range is for redirects
-       and 4xx range is for user errors. These should not be a part of
-       throttling backoff computation.
-       */
-      boolean updateMetricsResponseCode = (status < 
HttpURLConnection.HTTP_MULT_CHOICE
-              || status >= HttpURLConnection.HTTP_INTERNAL_ERROR);
-
-      /*
-       Connection Timeout failures should not contribute to throttling
-       In case the current request fails with Connection Timeout we will have
-       ioExceptionThrown true and failure reason as CT
-       In case the current request failed with 5xx, failure reason will be
-       updated after finally block but wasIOExceptionThrown will be false;
-       */
-      boolean isCTFailure = 
CONNECTION_TIMEOUT_ABBREVIATION.equals(failureReason) && wasIOExceptionThrown;
-
-      if (updateMetricsResponseCode && !isCTFailure) {
+      int statusCode = httpOperation.getStatusCode();
+      // Update Metrics only if Succeeded or Throttled due to account limits.
+      // Also Update in case of any unhandled exception is thrown.
+      if (shouldUpdateCSTMetrics(statusCode) && !wasKnownExceptionThrown) {
         intercept.updateMetrics(operationType, httpOperation);
       }
     }
 
-    LOG.debug("HttpRequest: {}: {}", operationType, httpOperation);
-
-    int status = httpOperation.getStatusCode();
-    failureReason = RetryReason.getAbbreviation(null, status, 
httpOperation.getStorageErrorMessage());
-    retryPolicy = client.getRetryPolicy(failureReason);
-
-    if (retryPolicy.shouldRetry(retryCount, httpOperation.getStatusCode())) {
-      return false;
-    }
-
-    result = httpOperation;
-
     return true;
   }
 
@@ -443,6 +436,34 @@ public class AbfsRestOperation {
     }
   }
 
+  /**
+   * Updating Client Side Throttling Metrics for relevant response status 
codes.
+   * Following criteria is used to decide based on status code and failure 
reason.
+   * <ol>
+   *   <li>Case 1: Status code in 2xx range: Successful Operations should 
contribute</li>
+   *   <li>Case 2: Status code in 3xx range: Redirection Operations should not 
contribute</li>
+   *   <li>Case 3: Status code in 4xx range: User Errors should not 
contribute</li>
+   *   <li>
+   *     Case 4: Status code is 503: Throttling Error should contribute as 
following:
+   *     <ol>
+   *       <li>Case 4.a: Ingress Over Account Limit: Should Contribute</li>
+   *       <li>Case 4.b: Egress Over Account Limit: Should Contribute</li>
+   *       <li>Case 4.c: TPS Over Account Limit: Should Contribute</li>
+   *       <li>Case 4.d: Other Server Throttling: Should not contribute</li>
+   *     </ol>
+   *   </li>
+   *   <li>Case 5: Status code in 5xx range other than 503: Should not 
contribute</li>
+   * </ol>
+   * @param statusCode
+   * @return
+   */
+  private boolean shouldUpdateCSTMetrics(final int statusCode) {
+    return statusCode <  HttpURLConnection.HTTP_MULT_CHOICE // Case 1
+        || INGRESS_LIMIT_BREACH_ABBREVIATION.equals(failureReason) // Case 4.a
+        || EGRESS_LIMIT_BREACH_ABBREVIATION.equals(failureReason) // Case 4.b
+        || TPS_LIMIT_BREACH_ABBREVIATION.equals(failureReason); // Case 4.c
+  }
+
   /**
    * Creates a new Tracing context before entering the retry loop of a rest 
operation.
    * This will ensure all rest operations have unique
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RetryReasonConstants.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RetryReasonConstants.java
index 8a0af183e30a..42d8587aa6d3 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RetryReasonConstants.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RetryReasonConstants.java
@@ -26,13 +26,13 @@ public final class RetryReasonConstants {
   public static final String CONNECTION_TIMEOUT_JDK_MESSAGE = "connect timed 
out";
   public static final String READ_TIMEOUT_JDK_MESSAGE = "Read timed out";
   public static final String CONNECTION_RESET_MESSAGE = "Connection reset";
-  public static final String OPERATION_BREACH_MESSAGE = "Operations per second 
is over the account limit.";
   public static final String CONNECTION_RESET_ABBREVIATION = "CR";
   public static final String CONNECTION_TIMEOUT_ABBREVIATION = "CT";
   public static final String READ_TIMEOUT_ABBREVIATION = "RT";
   public static final String INGRESS_LIMIT_BREACH_ABBREVIATION = "ING";
   public static final String EGRESS_LIMIT_BREACH_ABBREVIATION = "EGR";
-  public static final String OPERATION_LIMIT_BREACH_ABBREVIATION = "OPR";
+  public static final String TPS_LIMIT_BREACH_ABBREVIATION = "OPR";
+  public static final String OTHER_SERVER_THROTTLING_ABBREVIATION = "OTH";
   public static final String UNKNOWN_HOST_EXCEPTION_ABBREVIATION = "UH";
   public static final String IO_EXCEPTION_ABBREVIATION = "IOE";
   public static final String SOCKET_EXCEPTION_ABBREVIATION = "SE";
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/retryReasonCategories/ServerErrorRetryReason.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/retryReasonCategories/ServerErrorRetryReason.java
index dd67a0cb8cbb..727dcfd8dce0 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/retryReasonCategories/ServerErrorRetryReason.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/retryReasonCategories/ServerErrorRetryReason.java
@@ -22,10 +22,12 @@ import static java.net.HttpURLConnection.HTTP_UNAVAILABLE;
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_STATUS_CATEGORY_QUOTIENT;
 import static 
org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.EGRESS_OVER_ACCOUNT_LIMIT;
 import static 
org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.INGRESS_OVER_ACCOUNT_LIMIT;
+import static 
org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.OTHER_SERVER_THROTTLING;
+import static 
org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.TPS_OVER_ACCOUNT_LIMIT;
 import static 
org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.EGRESS_LIMIT_BREACH_ABBREVIATION;
 import static 
org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.INGRESS_LIMIT_BREACH_ABBREVIATION;
-import static 
org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.OPERATION_BREACH_MESSAGE;
-import static 
org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.OPERATION_LIMIT_BREACH_ABBREVIATION;
+import static 
org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.TPS_LIMIT_BREACH_ABBREVIATION;
+import static 
org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.OTHER_SERVER_THROTTLING_ABBREVIATION;
 
 /**
  * Category that can capture server-response errors for 5XX status-code.
@@ -56,9 +58,13 @@ public class ServerErrorRetryReason extends 
RetryReasonCategory {
           splitedServerErrorMessage)) {
         return EGRESS_LIMIT_BREACH_ABBREVIATION;
       }
-      if (OPERATION_BREACH_MESSAGE.equalsIgnoreCase(
+      if (TPS_OVER_ACCOUNT_LIMIT.getErrorMessage().equalsIgnoreCase(
           splitedServerErrorMessage)) {
-        return OPERATION_LIMIT_BREACH_ABBREVIATION;
+        return TPS_LIMIT_BREACH_ABBREVIATION;
+      }
+      if (OTHER_SERVER_THROTTLING.getErrorMessage().equalsIgnoreCase(
+          splitedServerErrorMessage)) {
+        return OTHER_SERVER_THROTTLING_ABBREVIATION;
       }
       return HTTP_UNAVAILABLE + "";
     }
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsRestOperation.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsRestOperation.java
index 32897355f138..41cbc3be3bc0 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsRestOperation.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsRestOperation.java
@@ -61,6 +61,7 @@ import static 
org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARA
 import static 
org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_POSITION;
 import static 
org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_ABFS_ACCOUNT_NAME;
 import static 
org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.TEST_CONFIGURATION_FILE_NAME;
+import static 
org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.EGRESS_OVER_ACCOUNT_LIMIT;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
@@ -233,6 +234,11 @@ public class ITestAbfsRestOperation extends 
AbstractAbfsIntegrationTest {
       // mocked the response code and the response message to check different
       // behaviour based on response code.
       
Mockito.doReturn(responseCode).when(abfsHttpOperation).getConnResponseCode();
+      if (responseCode == HTTP_UNAVAILABLE) {
+        Mockito.doReturn(EGRESS_OVER_ACCOUNT_LIMIT.getErrorMessage())
+            .when(abfsHttpOperation)
+            .getStorageErrorMessage();
+      }
       Mockito.doReturn(responseMessage)
           .when(abfsHttpOperation)
           .getConnResponseMessage();
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRestOperationMockFailures.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRestOperationMockFailures.java
index 7f422582e7ac..078b42cf0db1 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRestOperationMockFailures.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRestOperationMockFailures.java
@@ -38,6 +38,8 @@ import static java.net.HttpURLConnection.HTTP_OK;
 import static java.net.HttpURLConnection.HTTP_UNAVAILABLE;
 import static 
org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.EGRESS_OVER_ACCOUNT_LIMIT;
 import static 
org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.INGRESS_OVER_ACCOUNT_LIMIT;
+import static 
org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.OTHER_SERVER_THROTTLING;
+import static 
org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.TPS_OVER_ACCOUNT_LIMIT;
 import static 
org.apache.hadoop.fs.azurebfs.services.RetryPolicyConstants.EXPONENTIAL_RETRY_POLICY_ABBREVIATION;
 import static 
org.apache.hadoop.fs.azurebfs.services.RetryPolicyConstants.STATIC_RETRY_POLICY_ABBREVIATION;
 import static 
org.apache.hadoop.fs.azurebfs.services.AbfsClientTestUtil.addGeneralMockBehaviourToAbfsClient;
@@ -50,8 +52,8 @@ import static 
org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNEC
 import static 
org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.EGRESS_LIMIT_BREACH_ABBREVIATION;
 import static 
org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.INGRESS_LIMIT_BREACH_ABBREVIATION;
 import static 
org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.IO_EXCEPTION_ABBREVIATION;
-import static 
org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.OPERATION_BREACH_MESSAGE;
-import static 
org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.OPERATION_LIMIT_BREACH_ABBREVIATION;
+import static 
org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.TPS_LIMIT_BREACH_ABBREVIATION;
+import static 
org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.OTHER_SERVER_THROTTLING_ABBREVIATION;
 import static 
org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.READ_TIMEOUT_ABBREVIATION;
 import static 
org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.READ_TIMEOUT_JDK_MESSAGE;
 import static 
org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.SOCKET_EXCEPTION_ABBREVIATION;
@@ -62,6 +64,9 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.when;
 
 public class TestAbfsRestOperationMockFailures {
+  // In these tests a request first fails with given exceptions and then 
succeed on retry.
+  // Client Side Throttling Metrics will be updated at least for retried 
request which succeeded.
+  // For original requests it will be updated only for EGR, IGR, OPR 
throttling.
 
   @Test
   public void testClientRequestIdForConnectTimeoutRetry() throws Exception {
@@ -131,37 +136,48 @@ public class TestAbfsRestOperationMockFailures {
 
   @Test
   public void testClientRequestIdFor400Retry() throws Exception {
-    testClientRequestIdForStatusRetry(HTTP_BAD_REQUEST, "", "400");
+    testClientRequestIdForStatusRetry(HTTP_BAD_REQUEST, "", "400", 1);
   }
 
   @Test
   public void testClientRequestIdFor500Retry() throws Exception {
-    testClientRequestIdForStatusRetry(HTTP_INTERNAL_ERROR, "", "500");
+    testClientRequestIdForStatusRetry(HTTP_INTERNAL_ERROR, "", "500", 1);
   }
 
   @Test
   public void testClientRequestIdFor503INGRetry() throws Exception {
-    testClientRequestIdForStatusRetry(HTTP_UNAVAILABLE,
-        INGRESS_OVER_ACCOUNT_LIMIT.getErrorMessage(),
-        INGRESS_LIMIT_BREACH_ABBREVIATION);
+    testClientRequestIdForStatusRetry(
+            HTTP_UNAVAILABLE,
+            INGRESS_OVER_ACCOUNT_LIMIT.getErrorMessage(),
+            INGRESS_LIMIT_BREACH_ABBREVIATION,
+            2);
   }
 
   @Test
-  public void testClientRequestIdFor503egrRetry() throws Exception {
-    testClientRequestIdForStatusRetry(HTTP_UNAVAILABLE,
-        EGRESS_OVER_ACCOUNT_LIMIT.getErrorMessage(),
-        EGRESS_LIMIT_BREACH_ABBREVIATION);
+  public void testClientRequestIdFor503EGRRetry() throws Exception {
+    testClientRequestIdForStatusRetry(
+            HTTP_UNAVAILABLE,
+            EGRESS_OVER_ACCOUNT_LIMIT.getErrorMessage(),
+            EGRESS_LIMIT_BREACH_ABBREVIATION,
+            2);
   }
 
   @Test
   public void testClientRequestIdFor503OPRRetry() throws Exception {
-    testClientRequestIdForStatusRetry(HTTP_UNAVAILABLE,
-        OPERATION_BREACH_MESSAGE, OPERATION_LIMIT_BREACH_ABBREVIATION);
+    testClientRequestIdForStatusRetry(
+            HTTP_UNAVAILABLE,
+            TPS_OVER_ACCOUNT_LIMIT.getErrorMessage(),
+        TPS_LIMIT_BREACH_ABBREVIATION,
+            2);
   }
 
   @Test
   public void testClientRequestIdFor503OtherRetry() throws Exception {
-    testClientRequestIdForStatusRetry(HTTP_UNAVAILABLE, "Other.", "503");
+    testClientRequestIdForStatusRetry(
+            HTTP_UNAVAILABLE,
+            OTHER_SERVER_THROTTLING.getErrorMessage(),
+            OTHER_SERVER_THROTTLING_ABBREVIATION,
+            1);
   }
 
   /**
@@ -176,7 +192,6 @@ public class TestAbfsRestOperationMockFailures {
    * 2. Tracing header construction takes place with proper arguments based on 
the failure reason and retry policy used
    * @throws Exception
    */
-
   @Test
   public void testRetryPolicyWithDifferentFailureReasons() throws Exception {
 
@@ -210,6 +225,7 @@ public class TestAbfsRestOperationMockFailures {
     Mockito.doReturn("").when(httpOperation).getStorageErrorMessage();
     Mockito.doReturn("").when(httpOperation).getStorageErrorCode();
     Mockito.doReturn("HEAD").when(httpOperation).getMethod();
+    
Mockito.doReturn(EGRESS_OVER_ACCOUNT_LIMIT.getErrorMessage()).when(httpOperation).getStorageErrorMessage();
     
Mockito.doReturn(tracingContext).when(abfsRestOperation).createNewTracingContext(any());
 
     try {
@@ -217,20 +233,18 @@ public class TestAbfsRestOperationMockFailures {
       abfsRestOperation.execute(tracingContext);
     } catch(AbfsRestOperationException ex) {
       Assertions.assertThat(ex.getStatusCode())
-          .describedAs("Status Code must be HTTP_UNAVAILABLE(409)")
+          .describedAs("Status Code must be HTTP_UNAVAILABLE(503)")
           .isEqualTo(HTTP_UNAVAILABLE);
     }
 
     // Assert that httpOperation.processResponse was called 3 times.
     // One for retry count 0
     // One for retry count 1 after failing with CT
-    // One for retry count 2 after failing with 50
+    // One for retry count 2 after failing with 503
     Mockito.verify(httpOperation, times(3)).processResponse(
         nullable(byte[].class), nullable(int.class), nullable(int.class));
 
-    // Assert that Static Retry Policy was used after CT failure.
-    // Iteration 1 failed with CT and shouldRetry was called with retry count 0
-    // Before iteration 2 sleep will be computed using static retry policy and 
retry count 1
+    // Primary Request Failed with CT. Static Retry Policy should be used.
     Mockito.verify(abfsClient, Mockito.times(1))
         .getRetryPolicy(CONNECTION_TIMEOUT_ABBREVIATION);
     Mockito.verify(staticRetryPolicy, Mockito.times(1))
@@ -245,7 +259,7 @@ public class TestAbfsRestOperationMockFailures {
     // Before iteration 3 sleep will be computed using exponential retry 
policy and retry count 2
     // Should retry with retry count 2 will return false and no further 
requests will be made.
     Mockito.verify(abfsClient, Mockito.times(2))
-        .getRetryPolicy("503");
+        .getRetryPolicy(EGRESS_LIMIT_BREACH_ABBREVIATION);
     Mockito.verify(exponentialRetryPolicy, Mockito.times(1))
         .shouldRetry(1, HTTP_UNAVAILABLE);
     Mockito.verify(exponentialRetryPolicy, Mockito.times(1))
@@ -253,16 +267,17 @@ public class TestAbfsRestOperationMockFailures {
     Mockito.verify(exponentialRetryPolicy, Mockito.times(1))
         .getRetryInterval(2);
     Mockito.verify(tracingContext, Mockito.times(1))
-        .constructHeader(httpOperation, "503", 
EXPONENTIAL_RETRY_POLICY_ABBREVIATION);
+        .constructHeader(httpOperation, EGRESS_LIMIT_BREACH_ABBREVIATION, 
EXPONENTIAL_RETRY_POLICY_ABBREVIATION);
 
-    // Assert that intercept.updateMetrics was called only once during second 
Iteration
+    // Assert that intercept.updateMetrics was called 2 times. Both the 
retried request fails with EGR.
     Mockito.verify(intercept, Mockito.times(2))
         .updateMetrics(nullable(AbfsRestOperationType.class), 
nullable(AbfsHttpOperation.class));
   }
 
   private void testClientRequestIdForStatusRetry(int status,
-      String serverErrorMessage,
-      String keyExpected) throws Exception {
+                                                 String serverErrorMessage,
+                                                 String keyExpected,
+                                                 int 
numOfTimesCSTMetricsUpdated) throws Exception {
 
     AbfsClient abfsClient = Mockito.mock(AbfsClient.class);
     ExponentialRetryPolicy exponentialRetryPolicy = Mockito.mock(
@@ -322,11 +337,14 @@ public class TestAbfsRestOperationMockFailures {
     abfsRestOperation.execute(tracingContext);
     Assertions.assertThat(count[0]).isEqualTo(2);
 
+    Mockito.verify(intercept, 
Mockito.times(numOfTimesCSTMetricsUpdated)).updateMetrics(any(), any());
+
   }
 
   private void testClientRequestIdForTimeoutRetry(Exception[] exceptions,
-      String[] abbreviationsExpected,
-      int len, int numOfCTExceptions) throws Exception {
+                                                  String[] 
abbreviationsExpected,
+                                                  int len,
+                                                  int numOfCTExceptions) 
throws Exception {
     AbfsClient abfsClient = Mockito.mock(AbfsClient.class);
     ExponentialRetryPolicy exponentialRetryPolicy = Mockito.mock(
         ExponentialRetryPolicy.class);
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestRetryReason.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestRetryReason.java
index 76fcc6dc2c8a..d9d8ee51f9b3 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestRetryReason.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestRetryReason.java
@@ -31,6 +31,7 @@ import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR;
 import static java.net.HttpURLConnection.HTTP_UNAVAILABLE;
 import static 
org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.EGRESS_OVER_ACCOUNT_LIMIT;
 import static 
org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.INGRESS_OVER_ACCOUNT_LIMIT;
+import static 
org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.TPS_OVER_ACCOUNT_LIMIT;
 import static 
org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_RESET_ABBREVIATION;
 import static 
org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_RESET_MESSAGE;
 import static 
org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_TIMEOUT_ABBREVIATION;
@@ -38,8 +39,7 @@ import static 
org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNEC
 import static 
org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.EGRESS_LIMIT_BREACH_ABBREVIATION;
 import static 
org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.INGRESS_LIMIT_BREACH_ABBREVIATION;
 import static 
org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.IO_EXCEPTION_ABBREVIATION;
-import static 
org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.OPERATION_BREACH_MESSAGE;
-import static 
org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.OPERATION_LIMIT_BREACH_ABBREVIATION;
+import static 
org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.TPS_LIMIT_BREACH_ABBREVIATION;
 import static 
org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.READ_TIMEOUT_ABBREVIATION;
 import static 
org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.READ_TIMEOUT_JDK_MESSAGE;
 import static 
org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.SOCKET_EXCEPTION_ABBREVIATION;
@@ -92,8 +92,8 @@ public class TestRetryReason {
 
   @Test
   public void testOperationLimitRetryReason() {
-    Assertions.assertThat(RetryReason.getAbbreviation(null, HTTP_UNAVAILABLE, 
OPERATION_BREACH_MESSAGE)).isEqualTo(
-        OPERATION_LIMIT_BREACH_ABBREVIATION
+    Assertions.assertThat(RetryReason.getAbbreviation(null, HTTP_UNAVAILABLE, 
TPS_OVER_ACCOUNT_LIMIT.getErrorMessage())).isEqualTo(
+        TPS_LIMIT_BREACH_ABBREVIATION
     );
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to