This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a0fec75  MINOR; Preserve ThrottlingQuotaExceededException when request 
timeouts after being retried due to a quota violation (KIP-599) (#9344)
a0fec75 is described below

commit a0fec75d3cee3d23bd517fe0acc65270a6cb0f88
Author: David Jacot <dja...@confluent.io>
AuthorDate: Tue Sep 29 16:17:37 2020 +0200

    MINOR; Preserve ThrottlingQuotaExceededException when request timeouts 
after being retried due to a quota violation (KIP-599) (#9344)
    
    This PR adds the logic to preserve the ThrottlingQuotaExceededException 
when topics are retried. The throttleTimeMs is also adjusted accordingly as the 
request could remain pending or in-flight for quite a long time.
    
    Have run various tests on clusters with enabled quotas and I, indeed, find 
it better to preserve the exception. Otherwise, the caller does not really 
understand what is going on. This allows the caller to take the appropriate 
measure and also to take the throttleTimeMs into consideration.
    
    Reviewers: Rajini Sivaram <rajinisiva...@googlemail.com>
---
 .../kafka/clients/admin/KafkaAdminClient.java      | 95 +++++++++++++++++-----
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 12 ++-
 2 files changed, 85 insertions(+), 22 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 9acd341..3f39bbe 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -1445,6 +1445,24 @@ public class KafkaAdminClient extends AdminClient {
                 entry.getValue().completeExceptionally(new 
ApiException(messageFormatter.apply(entry.getKey()))));
     }
 
+    /**
+     * Fail futures in the given Map which were retried due to exceeding 
quota. We propagate
+     * the initial error back to the caller if the request timed out.
+     */
+    private static <K, V> void maybeCompleteQuotaExceededException(
+            boolean shouldRetryOnQuotaViolation,
+            Throwable throwable,
+            Map<K, KafkaFutureImpl<V>> futures,
+            Map<K, ThrottlingQuotaExceededException> quotaExceededExceptions,
+            int throttleTimeDelta) {
+        if (shouldRetryOnQuotaViolation && throwable instanceof 
TimeoutException) {
+            quotaExceededExceptions.forEach((key, value) -> 
futures.get(key).completeExceptionally(
+                new ThrottlingQuotaExceededException(
+                    Math.max(0, value.throttleTimeMs() - throttleTimeDelta),
+                    value.getMessage())));
+        }
+    }
+
     @Override
     public CreateTopicsResult createTopics(final Collection<NewTopic> 
newTopics,
                                            final CreateTopicsOptions options) {
@@ -1464,7 +1482,8 @@ public class KafkaAdminClient extends AdminClient {
         if (!topics.isEmpty()) {
             final long now = time.milliseconds();
             final long deadline = calcDeadlineMs(now, options.timeoutMs());
-            final Call call = getCreateTopicsCall(options, topicFutures, 
topics, deadline);
+            final Call call = getCreateTopicsCall(options, topicFutures, 
topics,
+                Collections.emptyMap(), now, deadline);
             runnable.call(call, now);
         }
         return new CreateTopicsResult(new HashMap<>(topicFutures));
@@ -1473,6 +1492,8 @@ public class KafkaAdminClient extends AdminClient {
     private Call getCreateTopicsCall(final CreateTopicsOptions options,
                                      final Map<String, 
KafkaFutureImpl<TopicMetadataAndConfig>> futures,
                                      final CreatableTopicCollection topics,
+                                     final Map<String, 
ThrottlingQuotaExceededException> quotaExceededExceptions,
+                                     final long now,
                                      final long deadline) {
         return new Call("createTopics", deadline, new 
ControllerNodeProvider()) {
             @Override
@@ -1491,6 +1512,7 @@ public class KafkaAdminClient extends AdminClient {
                 // Handle server responses for particular topics.
                 final CreateTopicsResponse response = (CreateTopicsResponse) 
abstractResponse;
                 final CreatableTopicCollection retryTopics = new 
CreatableTopicCollection();
+                final Map<String, ThrottlingQuotaExceededException> 
retryTopicQuotaExceededExceptions = new HashMap<>();
                 for (CreatableTopicResult result : response.data().topics()) {
                     KafkaFutureImpl<TopicMetadataAndConfig> future = 
futures.get(result.name());
                     if (future == null) {
@@ -1499,11 +1521,13 @@ public class KafkaAdminClient extends AdminClient {
                         ApiError error = new ApiError(result.errorCode(), 
result.errorMessage());
                         if (error.isFailure()) {
                             if (error.is(Errors.THROTTLING_QUOTA_EXCEEDED)) {
+                                ThrottlingQuotaExceededException 
quotaExceededException = new ThrottlingQuotaExceededException(
+                                    response.throttleTimeMs(), 
error.messageWithFallback());
                                 if (options.shouldRetryOnQuotaViolation()) {
                                     
retryTopics.add(topics.find(result.name()).duplicate());
+                                    
retryTopicQuotaExceededExceptions.put(result.name(), quotaExceededException);
                                 } else {
-                                    future.completeExceptionally(new 
ThrottlingQuotaExceededException(
-                                        response.throttleTimeMs(), 
error.messageWithFallback()));
+                                    
future.completeExceptionally(quotaExceededException);
                                 }
                             } else {
                                 
future.completeExceptionally(error.exception());
@@ -1535,8 +1559,10 @@ public class KafkaAdminClient extends AdminClient {
                     completeUnrealizedFutures(futures.entrySet().stream(),
                         topic -> "The controller response did not contain a 
result for topic " + topic);
                 } else {
-                    final Call call = getCreateTopicsCall(options, futures, 
retryTopics, deadline);
-                    runnable.call(call, time.milliseconds());
+                    final long now = time.milliseconds();
+                    final Call call = getCreateTopicsCall(options, futures, 
retryTopics,
+                        retryTopicQuotaExceededExceptions, now, deadline);
+                    runnable.call(call, now);
                 }
             }
 
@@ -1554,6 +1580,11 @@ public class KafkaAdminClient extends AdminClient {
 
             @Override
             void handleFailure(Throwable throwable) {
+                // If there were any topics retries due to a quota exceeded 
exception, we propagate
+                // the initial error back to the caller if the request timed 
out.
+                
maybeCompleteQuotaExceededException(options.shouldRetryOnQuotaViolation(),
+                    throwable, futures, quotaExceededExceptions, (int) 
(time.milliseconds() - now));
+                // Fail all the other remaining futures
                 completeAllExceptionally(futures.values(), throwable);
             }
         };
@@ -1578,7 +1609,8 @@ public class KafkaAdminClient extends AdminClient {
         if (!validTopicNames.isEmpty()) {
             final long now = time.milliseconds();
             final long deadline = calcDeadlineMs(now, options.timeoutMs());
-            final Call call = getDeleteTopicsCall(options, topicFutures, 
validTopicNames, deadline);
+            final Call call = getDeleteTopicsCall(options, topicFutures, 
validTopicNames,
+                Collections.emptyMap(), now, deadline);
             runnable.call(call, now);
         }
         return new DeleteTopicsResult(new HashMap<>(topicFutures));
@@ -1587,6 +1619,8 @@ public class KafkaAdminClient extends AdminClient {
     private Call getDeleteTopicsCall(final DeleteTopicsOptions options,
                                      final Map<String, KafkaFutureImpl<Void>> 
futures,
                                      final List<String> topics,
+                                     final Map<String, 
ThrottlingQuotaExceededException> quotaExceededExceptions,
+                                     final long now,
                                      final long deadline) {
         return new Call("deleteTopics", deadline, new 
ControllerNodeProvider()) {
             @Override
@@ -1604,6 +1638,7 @@ public class KafkaAdminClient extends AdminClient {
                 // Handle server responses for particular topics.
                 final DeleteTopicsResponse response = (DeleteTopicsResponse) 
abstractResponse;
                 final List<String> retryTopics = new ArrayList<>();
+                final Map<String, ThrottlingQuotaExceededException> 
retryTopicQuotaExceededExceptions = new HashMap<>();
                 for (DeletableTopicResult result : 
response.data().responses()) {
                     KafkaFutureImpl<Void> future = futures.get(result.name());
                     if (future == null) {
@@ -1612,11 +1647,13 @@ public class KafkaAdminClient extends AdminClient {
                         ApiError error = new ApiError(result.errorCode(), 
result.errorMessage());
                         if (error.isFailure()) {
                             if (error.is(Errors.THROTTLING_QUOTA_EXCEEDED)) {
+                                ThrottlingQuotaExceededException 
quotaExceededException = new ThrottlingQuotaExceededException(
+                                    response.throttleTimeMs(), 
error.messageWithFallback());
                                 if (options.shouldRetryOnQuotaViolation()) {
                                     retryTopics.add(result.name());
+                                    
retryTopicQuotaExceededExceptions.put(result.name(), quotaExceededException);
                                 } else {
-                                    future.completeExceptionally(new 
ThrottlingQuotaExceededException(
-                                        response.throttleTimeMs(), 
error.messageWithFallback()));
+                                    
future.completeExceptionally(quotaExceededException);
                                 }
                             } else {
                                 
future.completeExceptionally(error.exception());
@@ -1632,13 +1669,20 @@ public class KafkaAdminClient extends AdminClient {
                     completeUnrealizedFutures(futures.entrySet().stream(),
                         topic -> "The controller response did not contain a 
result for topic " + topic);
                 } else {
-                    final Call call = getDeleteTopicsCall(options, futures, 
retryTopics, deadline);
-                    runnable.call(call, time.milliseconds());
+                    final long now = time.milliseconds();
+                    final Call call = getDeleteTopicsCall(options, futures, 
retryTopics,
+                        retryTopicQuotaExceededExceptions, now, deadline);
+                    runnable.call(call, now);
                 }
             }
 
             @Override
             void handleFailure(Throwable throwable) {
+                // If there were any topics retries due to a quota exceeded 
exception, we propagate
+                // the initial error back to the caller if the request timed 
out.
+                
maybeCompleteQuotaExceededException(options.shouldRetryOnQuotaViolation(),
+                    throwable, futures, quotaExceededExceptions, (int) 
(time.milliseconds() - now));
+                // Fail all the other remaining futures
                 completeAllExceptionally(futures.values(), throwable);
             }
         };
@@ -2478,7 +2522,7 @@ public class KafkaAdminClient extends AdminClient {
 
     @Override
     public CreatePartitionsResult createPartitions(final Map<String, 
NewPartitions> newPartitions,
-        final CreatePartitionsOptions options) {
+                                                   final 
CreatePartitionsOptions options) {
         final Map<String, KafkaFutureImpl<Void>> futures = new 
HashMap<>(newPartitions.size());
         final CreatePartitionsTopicCollection topics = new 
CreatePartitionsTopicCollection(newPartitions.size());
         for (Map.Entry<String, NewPartitions> entry : 
newPartitions.entrySet()) {
@@ -2498,16 +2542,19 @@ public class KafkaAdminClient extends AdminClient {
         if (!topics.isEmpty()) {
             final long now = time.milliseconds();
             final long deadline = calcDeadlineMs(now, options.timeoutMs());
-            final Call call = getCreatePartitionsCall(options, futures, 
topics, deadline);
+            final Call call = getCreatePartitionsCall(options, futures, topics,
+                Collections.emptyMap(), now, deadline);
             runnable.call(call, now);
         }
         return new CreatePartitionsResult(new HashMap<>(futures));
     }
 
     private Call getCreatePartitionsCall(final CreatePartitionsOptions options,
-        final Map<String, KafkaFutureImpl<Void>> futures,
-        final CreatePartitionsTopicCollection topics,
-        final long deadline) {
+                                         final Map<String, 
KafkaFutureImpl<Void>> futures,
+                                         final CreatePartitionsTopicCollection 
topics,
+                                         final Map<String, 
ThrottlingQuotaExceededException> quotaExceededExceptions,
+                                         final long now,
+                                         final long deadline) {
         return new Call("createPartitions", deadline, new 
ControllerNodeProvider()) {
             @Override
             public CreatePartitionsRequest.Builder createRequest(int 
timeoutMs) {
@@ -2525,6 +2572,7 @@ public class KafkaAdminClient extends AdminClient {
                 // Handle server responses for particular topics.
                 final CreatePartitionsResponse response = 
(CreatePartitionsResponse) abstractResponse;
                 final CreatePartitionsTopicCollection retryTopics = new 
CreatePartitionsTopicCollection();
+                final Map<String, ThrottlingQuotaExceededException> 
retryTopicQuotaExceededExceptions = new HashMap<>();
                 for (CreatePartitionsTopicResult result : 
response.data().results()) {
                     KafkaFutureImpl<Void> future = futures.get(result.name());
                     if (future == null) {
@@ -2533,11 +2581,13 @@ public class KafkaAdminClient extends AdminClient {
                         ApiError error = new ApiError(result.errorCode(), 
result.errorMessage());
                         if (error.isFailure()) {
                             if (error.is(Errors.THROTTLING_QUOTA_EXCEEDED)) {
+                                ThrottlingQuotaExceededException 
quotaExceededException = new ThrottlingQuotaExceededException(
+                                    response.throttleTimeMs(), 
error.messageWithFallback());
                                 if (options.shouldRetryOnQuotaViolation()) {
                                     
retryTopics.add(topics.find(result.name()).duplicate());
+                                    
retryTopicQuotaExceededExceptions.put(result.name(), quotaExceededException);
                                 } else {
-                                    future.completeExceptionally(new 
ThrottlingQuotaExceededException(
-                                        response.throttleTimeMs(), 
error.messageWithFallback()));
+                                    
future.completeExceptionally(quotaExceededException);
                                 }
                             } else {
                                 
future.completeExceptionally(error.exception());
@@ -2553,13 +2603,20 @@ public class KafkaAdminClient extends AdminClient {
                     completeUnrealizedFutures(futures.entrySet().stream(),
                         topic -> "The controller response did not contain a 
result for topic " + topic);
                 } else {
-                    final Call call = getCreatePartitionsCall(options, 
futures, retryTopics, deadline);
-                    runnable.call(call, time.milliseconds());
+                    final long now = time.milliseconds();
+                    final Call call = getCreatePartitionsCall(options, 
futures, retryTopics,
+                        retryTopicQuotaExceededExceptions, now, deadline);
+                    runnable.call(call, now);
                 }
             }
 
             @Override
             void handleFailure(Throwable throwable) {
+                // If there were any topics retries due to a quota exceeded 
exception, we propagate
+                // the initial error back to the caller if the request timed 
out.
+                
maybeCompleteQuotaExceededException(options.shouldRetryOnQuotaViolation(),
+                    throwable, futures, quotaExceededExceptions, (int) 
(time.milliseconds() - now));
+                // Fail all the other remaining futures
                 completeAllExceptionally(futures.values(), throwable);
             }
         };
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 80d5294..a4ae6f4 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -733,7 +733,9 @@ public class KafkaAdminClientTest {
             time.sleep(defaultApiTimeout + 1);
 
             assertNull(result.values().get("topic1").get());
-            TestUtils.assertFutureThrows(result.values().get("topic2"), 
TimeoutException.class);
+            ThrottlingQuotaExceededException e = 
TestUtils.assertFutureThrows(result.values().get("topic2"),
+                ThrottlingQuotaExceededException.class);
+            assertEquals(0, e.throttleTimeMs());
             TestUtils.assertFutureThrows(result.values().get("topic3"), 
TopicExistsException.class);
         }
     }
@@ -895,7 +897,9 @@ public class KafkaAdminClientTest {
             time.sleep(defaultApiTimeout + 1);
 
             assertNull(result.values().get("topic1").get());
-            TestUtils.assertFutureThrows(result.values().get("topic2"), 
TimeoutException.class);
+            ThrottlingQuotaExceededException e = 
TestUtils.assertFutureThrows(result.values().get("topic2"),
+                ThrottlingQuotaExceededException.class);
+            assertEquals(0, e.throttleTimeMs());
             TestUtils.assertFutureThrows(result.values().get("topic3"), 
TopicExistsException.class);
         }
     }
@@ -1727,7 +1731,9 @@ public class KafkaAdminClientTest {
             time.sleep(defaultApiTimeout + 1);
 
             assertNull(result.values().get("topic1").get());
-            TestUtils.assertFutureThrows(result.values().get("topic2"), 
TimeoutException.class);
+            ThrottlingQuotaExceededException e = 
TestUtils.assertFutureThrows(result.values().get("topic2"),
+                ThrottlingQuotaExceededException.class);
+            assertEquals(0, e.throttleTimeMs());
             TestUtils.assertFutureThrows(result.values().get("topic3"), 
TopicExistsException.class);
         }
     }

Reply via email to