[
https://issues.apache.org/jira/browse/HADOOP-17377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17760518#comment-17760518
]
ASF GitHub Bot commented on HADOOP-17377:
-----------------------------------------
steveloughran commented on code in PR #5273:
URL: https://github.com/apache/hadoop/pull/5273#discussion_r1310534958
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsMsiTokenProvider.java:
##########
@@ -90,4 +99,55 @@ private String getTrimmedPasswordString(AbfsConfiguration
conf, String key,
return value.trim();
}
+ /**
+ * Test to verify that token fetch is retried for throttling errors (too
many requests 429).
+ */
+ @Test
+ public void testRetryForThrottling() throws Exception {
+ AbfsConfiguration conf = getConfiguration();
+
+ // Exception to be thrown with throttling error code 429.
+ AzureADAuthenticator.HttpException httpException
+ = new AzureADAuthenticator.HttpException(HTTP_TOO_MANY_REQUESTS,
+ "abc", "abc", "abc", "abc", "abc");
+
+ String tenantGuid = "abcd";
+ String clientId = "abcd";
+ String authEndpoint = getTrimmedPasswordString(conf,
+ FS_AZURE_ACCOUNT_OAUTH_MSI_ENDPOINT,
+ DEFAULT_FS_AZURE_ACCOUNT_OAUTH_MSI_ENDPOINT);
+ String authority = getTrimmedPasswordString(conf,
+ FS_AZURE_ACCOUNT_OAUTH_MSI_AUTHORITY,
+ DEFAULT_FS_AZURE_ACCOUNT_OAUTH_MSI_AUTHORITY);
+
+ // Mock the getTokenSingleCall to throw exception so the retry logic comes
into place.
+ try (MockedStatic<AzureADAuthenticator> adAuthenticator =
Mockito.mockStatic(
+ AzureADAuthenticator.class, Mockito.CALLS_REAL_METHODS)) {
+ adAuthenticator.when(
+ () -> AzureADAuthenticator.getTokenSingleCall(Mockito.anyString(),
+ Mockito.anyString(), Mockito.any(), Mockito.anyString(),
+ Mockito.anyBoolean())).thenThrow(httpException);
+
+ // Mock the tokenFetchRetryPolicy to verify retries.
+ ExponentialRetryPolicy exponentialRetryPolicy = Mockito.spy(
+ conf.getOauthTokenFetchRetryPolicy());
+ Field tokenFetchRetryPolicy =
AzureADAuthenticator.class.getDeclaredField(
+ "tokenFetchRetryPolicy");
+ tokenFetchRetryPolicy.setAccessible(true);
+ tokenFetchRetryPolicy.set(ExponentialRetryPolicy.class,
+ exponentialRetryPolicy);
+
+ AccessTokenProvider tokenProvider = new MsiTokenProvider(authEndpoint,
+ tenantGuid, clientId, authority);
+ AzureADToken token = null;
+ intercept(AzureADAuthenticator.HttpException.class,
+ tokenProvider::getToken);
+
+ // If the status code doesn't qualify for retry shouldRetry returns
false and the loop ends.
+ // It being called multiple times verifies that the retry was done for
the throttling status code 429.
+ Mockito.verify(exponentialRetryPolicy,
Review Comment:
so ExponentialRetryPolicy.getRetryCount() is there to let you pass a
non-mocked policy in and then assert on it. how about using that here? it
probably needs making the accessors public, rather than package scoped, but
that's all. The less use we make of mockito, the less things will break with
every mockito upgrade
> ABFS: MsiTokenProvider doesn't retry HTTP 429 from the Instance Metadata
> Service
> --------------------------------------------------------------------------------
>
> Key: HADOOP-17377
> URL: https://issues.apache.org/jira/browse/HADOOP-17377
> Project: Hadoop Common
> Issue Type: Bug
> Components: fs/azure
> Affects Versions: 3.2.1
> Reporter: Brandon
> Priority: Major
> Labels: pull-request-available
>
> *Summary*
> The instance metadata service has its own guidance for error handling and
> retry which are different from the Blob store.
> [https://docs.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/how-to-use-vm-token#error-handling]
> In particular, it responds with HTTP 429 if request rate is too high. Whereas
> Blob store will respond with HTTP 503. The retry policy used only accounts
> for the latter as it will retry any status >=500. This can result in job
> instability when running multiple processes on the same host.
> *Environment*
> * Spark talking to an ABFS store
> * Hadoop 3.2.1
> * Running on an Azure VM with user-assigned identity, ABFS configured to use
> MsiTokenProvider
> * 6 executor processes on each VM
> *Example*
> Here's an example error message and stack trace. It's always the same stack
> trace. This appears in logs a few hundred to low thousands of times a day.
> It's luckily skating by since the download operation is wrapped in 3 retries.
> {noformat}
> AADToken: HTTP connection failed for getting token from AzureAD. Http
> response: 429 null
> Content-Type: application/json; charset=utf-8 Content-Length: 90 Request ID:
> Proxies: none
> First 1K of Body: {"error":"invalid_request","error_description":"Temporarily
> throttled, too many requests"}
> at
> org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.executeHttpOperation(AbfsRestOperation.java:190)
> at
> org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.execute(AbfsRestOperation.java:125)
> at
> org.apache.hadoop.fs.azurebfs.services.AbfsClient.getAclStatus(AbfsClient.java:506)
> at
> org.apache.hadoop.fs.azurebfs.services.AbfsClient.getAclStatus(AbfsClient.java:489)
> at
> org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.getIsNamespaceEnabled(AzureBlobFileSystemStore.java:208)
> at
> org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.getFileStatus(AzureBlobFileSystemStore.java:473)
> at
> org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.getFileStatus(AzureBlobFileSystem.java:437)
> at org.apache.hadoop.fs.FileSystem.isFile(FileSystem.java:1717)
> at org.apache.spark.util.Utils$.fetchHcfsFile(Utils.scala:747)
> at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:724)
> at org.apache.spark.util.Utils$.fetchFile(Utils.scala:496)
> at
> org.apache.spark.executor.Executor.$anonfun$updateDependencies$7(Executor.scala:812)
> at
> org.apache.spark.executor.Executor.$anonfun$updateDependencies$7$adapted(Executor.scala:803)
> at
> scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:792)
> at
> scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149)
> at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237)
> at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:149)
> at
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:791)
> at
> org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:803)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:375)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748){noformat}
> CC [~mackrorysd], [[email protected]]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]