This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2caef07  MINOR: Improve confusing admin client shutdown logging 
(#10107)
2caef07 is described below

commit 2caef070d4199877337f59399563354e41b203f7
Author: Jason Gustafson <[email protected]>
AuthorDate: Thu Feb 11 15:12:25 2021 -0800

    MINOR: Improve confusing admin client shutdown logging (#10107)
    
    If the admin client is shutdown with some unfinished calls, we see messages 
such as the following in the log:
    ```
    2021-02-09 11:08:05.964 DEBUG [AdminClient clientId=adminclient-1] 
Call(callName=fetchMetadata, deadlineMs=1612843805378) timed out at 
9223372036854775807 after 1 attempt(s)
    ```
    The problem is that we are using passing `Long.MaxValue` as the current 
time in `Call.fail` in order to ensure the call is timed out and we are 
discarding the original cause. The patch fixes the problem by setting 
`aborted=true` instead and preserving the original exception message.
    
    Reviewers: David Jacot <[email protected]>, Chia-Ping Tsai 
<[email protected]>
---
 .../apache/kafka/clients/admin/KafkaAdminClient.java  | 19 ++++++++++++++-----
 1 file changed, 14 insertions(+), 5 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index ab40470..3cad328 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -756,6 +756,11 @@ public class KafkaAdminClient extends AdminClient {
             return curNode;
         }
 
+        void abortAndFail(TimeoutException timeoutException) {
+            this.aborted = true;
+            fail(time.milliseconds(), timeoutException);
+        }
+
         /**
          * Handle a failure.
          *
@@ -818,8 +823,12 @@ public class KafkaAdminClient extends AdminClient {
                 log.debug("{} timed out at {} after {} attempt(s)", this, now, 
tries,
                     new Exception(prettyPrintException(cause)));
             }
-            handleFailure(new TimeoutException(this + " timed out at " + now
-                + " after " + tries + " attempt(s)", cause));
+            if (cause instanceof TimeoutException) {
+                handleFailure(cause);
+            } else {
+                handleFailure(new TimeoutException(this + " timed out at " + 
now
+                    + " after " + tries + " attempt(s)", cause));
+            }
         }
 
         /**
@@ -1130,7 +1139,7 @@ public class KafkaAdminClient extends AdminClient {
                     if (call.aborted) {
                         log.warn("Aborted call {} is still in callsInFlight.", 
call);
                     } else {
-                        log.debug("Closing connection to {} to time out {}", 
nodeId, call);
+                        log.debug("Closing connection to {} due to timeout 
while awaiting {}", nodeId, call);
                         call.aborted = true;
                         client.disconnect(nodeId);
                         numTimedOut++;
@@ -1375,7 +1384,7 @@ public class KafkaAdminClient extends AdminClient {
                 client.wakeup(); // wake the thread if it is in poll()
             } else {
                 log.debug("The AdminClient thread has exited. Timing out {}.", 
call);
-                call.fail(Long.MAX_VALUE, new TimeoutException("The 
AdminClient thread has exited."));
+                call.abortAndFail(new TimeoutException("The AdminClient thread 
has exited."));
             }
         }
 
@@ -1390,7 +1399,7 @@ public class KafkaAdminClient extends AdminClient {
         void call(Call call, long now) {
             if (hardShutdownTimeMs.get() != INVALID_SHUTDOWN_TIME) {
                 log.debug("The AdminClient is not accepting new calls. Timing 
out {}.", call);
-                call.fail(Long.MAX_VALUE, new TimeoutException("The 
AdminClient thread is not accepting new calls."));
+                call.abortAndFail(new TimeoutException("The AdminClient thread 
is not accepting new calls."));
             } else {
                 enqueue(call, now);
             }

Reply via email to