This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new fe988889600 MINOR: Improving log for outstanding requests on close and
cleanup (#16304)
fe988889600 is described below
commit fe988889600dfb8145d2fb4de319630bf1cad314
Author: Lianet Magrans <[email protected]>
AuthorDate: Thu Jun 13 08:31:16 2024 +0200
MINOR: Improving log for outstanding requests on close and cleanup (#16304)
Reviewers: Andrew Schofield <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../clients/consumer/internals/AsyncKafkaConsumer.java | 8 ++++----
.../clients/consumer/internals/ConsumerNetworkThread.java | 13 +++++++++----
.../clients/consumer/internals/NetworkClientDelegate.java | 4 ++++
.../clients/consumer/internals/AsyncKafkaConsumerTest.java | 2 +-
.../clients/consumer/internals/FetchRequestManagerTest.java | 2 +-
5 files changed, 19 insertions(+), 10 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index e6de169f7b3..4a4012f4ddd 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -1234,7 +1234,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
clientTelemetryReporter.ifPresent(reporter ->
reporter.initiateClose(timeout.toMillis()));
closeTimer.update();
// Prepare shutting down the network thread
- prepareShutdown(closeTimer, firstException);
+ releaseAssignmentAndLeaveGroup(closeTimer, firstException);
closeTimer.update();
swallow(log, Level.ERROR, "Failed invoking asynchronous commit
callback.",
() ->
awaitPendingAsyncCommitsAndExecuteCommitCallbacks(closeTimer, false),
firstException);
@@ -1270,12 +1270,12 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
* 2. revoke all partitions
* 3. if partition revocation completes successfully, send leave group
*/
- void prepareShutdown(final Timer timer, final AtomicReference<Throwable>
firstException) {
+ void releaseAssignmentAndLeaveGroup(final Timer timer, final
AtomicReference<Throwable> firstException) {
if (!groupMetadata.get().isPresent())
return;
if (autoCommitEnabled)
- autoCommitSync(timer);
+ commitSyncAllConsumed(timer);
applicationEventHandler.add(new CommitOnCloseEvent());
completeQuietly(
@@ -1287,7 +1287,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
}
// Visible for testing
- void autoCommitSync(final Timer timer) {
+ void commitSyncAllConsumed(final Timer timer) {
Map<TopicPartition, OffsetAndMetadata> allConsumed =
subscriptions.allConsumed();
log.debug("Sending synchronous auto-commit of offsets {} on closing",
allConsumed);
try {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java
index 7616ac69122..64bba14837a 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java
@@ -207,8 +207,7 @@ public class ConsumerNetworkThread extends KafkaThread
implements Closeable {
*/
// Visible for testing
static void runAtClose(final Collection<Optional<? extends
RequestManager>> requestManagers,
- final NetworkClientDelegate networkClientDelegate,
- final Timer timer) {
+ final NetworkClientDelegate networkClientDelegate) {
// These are the optional outgoing requests at the
requestManagers.stream()
.filter(Optional::isPresent)
@@ -300,15 +299,21 @@ public class ConsumerNetworkThread extends KafkaThread
implements Closeable {
networkClientDelegate.poll(timer.remainingMs(),
timer.currentTimeMs());
timer.update();
} while (timer.notExpired() &&
networkClientDelegate.hasAnyPendingRequests());
+
+ if (networkClientDelegate.hasAnyPendingRequests()) {
+ log.warn("Close timeout of {} ms expired before the consumer
network thread was able " +
+ "to complete pending requests. Inflight request count: {},
Unsent request count: {}",
+ timer.timeoutMs(),
networkClientDelegate.inflightRequestCount(),
networkClientDelegate.unsentRequests().size());
+ }
}
void cleanup() {
log.trace("Closing the consumer network thread");
Timer timer = time.timer(closeTimeout);
try {
- runAtClose(requestManagers.entries(), networkClientDelegate,
timer);
+ runAtClose(requestManagers.entries(), networkClientDelegate);
} catch (Exception e) {
- log.error("Unexpected error during shutdown. Proceed with
closing.", e);
+ log.error("Unexpected error during shutdown. Proceed with
closing.", e);
} finally {
sendUnsentRequests(timer);
applicationEventReaper.reap(applicationEventQueue);
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
index a0dd43112d6..ee17f3b5d57 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
@@ -90,6 +90,10 @@ public class NetworkClientDelegate implements AutoCloseable {
return unsentRequests;
}
+ public int inflightRequestCount() {
+ return client.inFlightRequestCount();
+ }
+
/**
* Check if the node is disconnected and unavailable for immediate
reconnection (i.e. if it is in
* reconnect backoff window following the disconnect).
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index 5f597947978..2d126b3eac6 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -881,7 +881,7 @@ public class AsyncKafkaConsumerTest {
consumer.subscribe(singleton("topic"),
mock(ConsumerRebalanceListener.class));
subscriptions.assignFromSubscribed(singleton(new
TopicPartition("topic", 0)));
subscriptions.seek(new TopicPartition("topic", 0), 100);
- consumer.autoCommitSync(time.timer(100));
+ consumer.commitSyncAllConsumed(time.timer(100));
verify(applicationEventHandler).add(any(SyncCommitEvent.class));
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
index 1d2f16e4476..b16cb6482e4 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
@@ -373,7 +373,7 @@ public class FetchRequestManagerTest {
// NOTE: by design the FetchRequestManager doesn't perform network I/O
internally. That means that calling
// the close() method with a Timer will NOT send out the close session
requests on close. The network
// I/O logic is handled inside ConsumerNetworkThread.runAtClose, so we
need to run that logic here.
- ConsumerNetworkThread.runAtClose(singletonList(Optional.of(fetcher)),
networkClientDelegate, timer);
+ ConsumerNetworkThread.runAtClose(singletonList(Optional.of(fetcher)),
networkClientDelegate);
// the network is polled during the last state of clean up.
networkClientDelegate.poll(time.timer(1));
// validate that closing the fetcher has sent a request with final
epoch. 2 requests are sent, one for the