This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2caef07 MINOR: Improve confusing admin client shutdown logging
(#10107)
2caef07 is described below
commit 2caef070d4199877337f59399563354e41b203f7
Author: Jason Gustafson <[email protected]>
AuthorDate: Thu Feb 11 15:12:25 2021 -0800
MINOR: Improve confusing admin client shutdown logging (#10107)
If the admin client is shutdown with some unfinished calls, we see messages
such as the following in the log:
```
2021-02-09 11:08:05.964 DEBUG [AdminClient clientId=adminclient-1]
Call(callName=fetchMetadata, deadlineMs=1612843805378) timed out at
9223372036854775807 after 1 attempt(s)
```
The problem is that we are using passing `Long.MaxValue` as the current
time in `Call.fail` in order to ensure the call is timed out and we are
discarding the original cause. The patch fixes the problem by setting
`aborted=true` instead and preserving the original exception message.
Reviewers: David Jacot <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../apache/kafka/clients/admin/KafkaAdminClient.java | 19 ++++++++++++++-----
1 file changed, 14 insertions(+), 5 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index ab40470..3cad328 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -756,6 +756,11 @@ public class KafkaAdminClient extends AdminClient {
return curNode;
}
+ void abortAndFail(TimeoutException timeoutException) {
+ this.aborted = true;
+ fail(time.milliseconds(), timeoutException);
+ }
+
/**
* Handle a failure.
*
@@ -818,8 +823,12 @@ public class KafkaAdminClient extends AdminClient {
log.debug("{} timed out at {} after {} attempt(s)", this, now,
tries,
new Exception(prettyPrintException(cause)));
}
- handleFailure(new TimeoutException(this + " timed out at " + now
- + " after " + tries + " attempt(s)", cause));
+ if (cause instanceof TimeoutException) {
+ handleFailure(cause);
+ } else {
+ handleFailure(new TimeoutException(this + " timed out at " +
now
+ + " after " + tries + " attempt(s)", cause));
+ }
}
/**
@@ -1130,7 +1139,7 @@ public class KafkaAdminClient extends AdminClient {
if (call.aborted) {
log.warn("Aborted call {} is still in callsInFlight.",
call);
} else {
- log.debug("Closing connection to {} to time out {}",
nodeId, call);
+ log.debug("Closing connection to {} due to timeout
while awaiting {}", nodeId, call);
call.aborted = true;
client.disconnect(nodeId);
numTimedOut++;
@@ -1375,7 +1384,7 @@ public class KafkaAdminClient extends AdminClient {
client.wakeup(); // wake the thread if it is in poll()
} else {
log.debug("The AdminClient thread has exited. Timing out {}.",
call);
- call.fail(Long.MAX_VALUE, new TimeoutException("The
AdminClient thread has exited."));
+ call.abortAndFail(new TimeoutException("The AdminClient thread
has exited."));
}
}
@@ -1390,7 +1399,7 @@ public class KafkaAdminClient extends AdminClient {
void call(Call call, long now) {
if (hardShutdownTimeMs.get() != INVALID_SHUTDOWN_TIME) {
log.debug("The AdminClient is not accepting new calls. Timing
out {}.", call);
- call.fail(Long.MAX_VALUE, new TimeoutException("The
AdminClient thread is not accepting new calls."));
+ call.abortAndFail(new TimeoutException("The AdminClient thread
is not accepting new calls."));
} else {
enqueue(call, now);
}