This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fe988889600 MINOR: Improving log for outstanding requests on close and 
cleanup (#16304)
fe988889600 is described below

commit fe988889600dfb8145d2fb4de319630bf1cad314
Author: Lianet Magrans <[email protected]>
AuthorDate: Thu Jun 13 08:31:16 2024 +0200

    MINOR: Improving log for outstanding requests on close and cleanup (#16304)
    
    Reviewers: Andrew Schofield <[email protected]>, Chia-Ping Tsai 
<[email protected]>
---
 .../clients/consumer/internals/AsyncKafkaConsumer.java      |  8 ++++----
 .../clients/consumer/internals/ConsumerNetworkThread.java   | 13 +++++++++----
 .../clients/consumer/internals/NetworkClientDelegate.java   |  4 ++++
 .../clients/consumer/internals/AsyncKafkaConsumerTest.java  |  2 +-
 .../clients/consumer/internals/FetchRequestManagerTest.java |  2 +-
 5 files changed, 19 insertions(+), 10 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index e6de169f7b3..4a4012f4ddd 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -1234,7 +1234,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         clientTelemetryReporter.ifPresent(reporter -> 
reporter.initiateClose(timeout.toMillis()));
         closeTimer.update();
         // Prepare shutting down the network thread
-        prepareShutdown(closeTimer, firstException);
+        releaseAssignmentAndLeaveGroup(closeTimer, firstException);
         closeTimer.update();
         swallow(log, Level.ERROR, "Failed invoking asynchronous commit 
callback.",
             () -> 
awaitPendingAsyncCommitsAndExecuteCommitCallbacks(closeTimer, false), 
firstException);
@@ -1270,12 +1270,12 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
      * 2. revoke all partitions
      * 3. if partition revocation completes successfully, send leave group
      */
-    void prepareShutdown(final Timer timer, final AtomicReference<Throwable> 
firstException) {
+    void releaseAssignmentAndLeaveGroup(final Timer timer, final 
AtomicReference<Throwable> firstException) {
         if (!groupMetadata.get().isPresent())
             return;
 
         if (autoCommitEnabled)
-            autoCommitSync(timer);
+            commitSyncAllConsumed(timer);
 
         applicationEventHandler.add(new CommitOnCloseEvent());
         completeQuietly(
@@ -1287,7 +1287,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
     }
 
     // Visible for testing
-    void autoCommitSync(final Timer timer) {
+    void commitSyncAllConsumed(final Timer timer) {
         Map<TopicPartition, OffsetAndMetadata> allConsumed = 
subscriptions.allConsumed();
         log.debug("Sending synchronous auto-commit of offsets {} on closing", 
allConsumed);
         try {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java
index 7616ac69122..64bba14837a 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java
@@ -207,8 +207,7 @@ public class ConsumerNetworkThread extends KafkaThread 
implements Closeable {
      */
     // Visible for testing
     static void runAtClose(final Collection<Optional<? extends 
RequestManager>> requestManagers,
-                           final NetworkClientDelegate networkClientDelegate,
-                           final Timer timer) {
+                           final NetworkClientDelegate networkClientDelegate) {
         // These are the optional outgoing requests at the
         requestManagers.stream()
                 .filter(Optional::isPresent)
@@ -300,15 +299,21 @@ public class ConsumerNetworkThread extends KafkaThread 
implements Closeable {
             networkClientDelegate.poll(timer.remainingMs(), 
timer.currentTimeMs());
             timer.update();
         } while (timer.notExpired() && 
networkClientDelegate.hasAnyPendingRequests());
+
+        if (networkClientDelegate.hasAnyPendingRequests()) {
+            log.warn("Close timeout of {} ms expired before the consumer 
network thread was able " +
+                "to complete pending requests. Inflight request count: {}, 
Unsent request count: {}",
+                timer.timeoutMs(), 
networkClientDelegate.inflightRequestCount(), 
networkClientDelegate.unsentRequests().size());
+        }
     }
 
     void cleanup() {
         log.trace("Closing the consumer network thread");
         Timer timer = time.timer(closeTimeout);
         try {
-            runAtClose(requestManagers.entries(), networkClientDelegate, 
timer);
+            runAtClose(requestManagers.entries(), networkClientDelegate);
         } catch (Exception e) {
-            log.error("Unexpected error during shutdown.  Proceed with 
closing.", e);
+            log.error("Unexpected error during shutdown. Proceed with 
closing.", e);
         } finally {
             sendUnsentRequests(timer);
             applicationEventReaper.reap(applicationEventQueue);
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
index a0dd43112d6..ee17f3b5d57 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
@@ -90,6 +90,10 @@ public class NetworkClientDelegate implements AutoCloseable {
         return unsentRequests;
     }
 
+    public int inflightRequestCount() {
+        return client.inFlightRequestCount();
+    }
+
     /**
      * Check if the node is disconnected and unavailable for immediate 
reconnection (i.e. if it is in
      * reconnect backoff window following the disconnect).
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index 5f597947978..2d126b3eac6 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -881,7 +881,7 @@ public class AsyncKafkaConsumerTest {
         consumer.subscribe(singleton("topic"), 
mock(ConsumerRebalanceListener.class));
         subscriptions.assignFromSubscribed(singleton(new 
TopicPartition("topic", 0)));
         subscriptions.seek(new TopicPartition("topic", 0), 100);
-        consumer.autoCommitSync(time.timer(100));
+        consumer.commitSyncAllConsumed(time.timer(100));
         verify(applicationEventHandler).add(any(SyncCommitEvent.class));
     }
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
index 1d2f16e4476..b16cb6482e4 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
@@ -373,7 +373,7 @@ public class FetchRequestManagerTest {
         // NOTE: by design the FetchRequestManager doesn't perform network I/O 
internally. That means that calling
         // the close() method with a Timer will NOT send out the close session 
requests on close. The network
         // I/O logic is handled inside ConsumerNetworkThread.runAtClose, so we 
need to run that logic here.
-        ConsumerNetworkThread.runAtClose(singletonList(Optional.of(fetcher)), 
networkClientDelegate, timer);
+        ConsumerNetworkThread.runAtClose(singletonList(Optional.of(fetcher)), 
networkClientDelegate);
         // the network is polled during the last state of clean up.
         networkClientDelegate.poll(time.timer(1));
         // validate that closing the fetcher has sent a request with final 
epoch. 2 requests are sent, one for the

Reply via email to