This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new 6684319185c KAFKA-18645: New consumer should align close timeout 
handling with classic consumer (#18702)
6684319185c is described below

commit 6684319185c849663d2cba5e64586baec164d995
Author: TengYao Chi <[email protected]>
AuthorDate: Wed Feb 5 22:08:51 2025 +0800

    KAFKA-18645: New consumer should align close timeout handling with classic 
consumer (#18702)
    
    Reviewers: Lianet Magrans <[email protected]>, Kirk True 
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
 .../kafka/clients/consumer/KafkaConsumer.java      |  7 +++++
 .../consumer/internals/AsyncKafkaConsumer.java     | 35 +++++++++++++---------
 .../consumer/internals/AsyncKafkaConsumerTest.java |  2 ++
 .../integration/kafka/api/ConsumerBounceTest.scala |  2 +-
 4 files changed, 31 insertions(+), 15 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 8c9ea180f84..70c0f7cadd5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1782,6 +1782,13 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
      * timeout. If the consumer is unable to complete offset commits and 
gracefully leave the group
      * before the timeout expires, the consumer is force closed. Note that 
{@link #wakeup()} cannot be
      * used to interrupt close.
+     * <p>
+     * The actual maximum wait time is bounded by the {@link 
ConsumerConfig#REQUEST_TIMEOUT_MS_CONFIG} setting, which
+     * only applies to operations performed with the broker 
(coordinator-related requests and
+     * fetch sessions). Even if a larger timeout is specified, the consumer 
will not wait longer than
+     * {@link ConsumerConfig#REQUEST_TIMEOUT_MS_CONFIG} for these requests to 
complete during the close operation.
+     * Note that the execution time of callbacks (such as {@link 
OffsetCommitCallback} and
+     * {@link ConsumerRebalanceListener}) does not consume time from the close 
timeout.
      *
      * @param timeout The maximum time to wait for consumer to close 
gracefully. The value must be
      *                non-negative. Specifying a timeout of zero means do not 
wait for pending requests to complete.
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index 2c65b44a7a0..3f30acc932c 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -246,6 +246,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
     private final ConsumerMetadata metadata;
     private final Metrics metrics;
     private final long retryBackoffMs;
+    private final int requestTimeoutMs;
     private final Duration defaultApiTimeoutMs;
     private final boolean autoCommitEnabled;
     private volatile boolean closed = false;
@@ -324,6 +325,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             this.metrics = createMetrics(config, time, reporters);
             this.kafkaConsumerMetrics = new AsyncConsumerMetrics(metrics);
             this.retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+            this.requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
 
             List<ConsumerInterceptor<K, V>> interceptorList = 
configuredConsumerInterceptors(config);
             this.interceptors = new ConsumerInterceptors<>(interceptorList);
@@ -447,6 +449,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                        SubscriptionState subscriptions,
                        ConsumerMetadata metadata,
                        long retryBackoffMs,
+                       int requestTimeoutMs,
                        int defaultApiTimeoutMs,
                        String groupId,
                        boolean autoCommitEnabled) {
@@ -466,6 +469,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         this.groupMetadata.set(initializeGroupMetadata(groupId, 
Optional.empty()));
         this.metadata = metadata;
         this.retryBackoffMs = retryBackoffMs;
+        this.requestTimeoutMs = requestTimeoutMs;
         this.defaultApiTimeoutMs = Duration.ofMillis(defaultApiTimeoutMs);
         this.deserializers = deserializers;
         this.applicationEventHandler = applicationEventHandler;
@@ -499,6 +503,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         this.metrics = new Metrics(time);
         this.metadata = metadata;
         this.retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+        this.requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
         this.defaultApiTimeoutMs = 
Duration.ofMillis(config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG));
         this.deserializers = new Deserializers<>(keyDeserializer, 
valueDeserializer);
         this.clientTelemetryReporter = Optional.empty();
@@ -1326,7 +1331,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         // We are already closing with a timeout, don't allow wake-ups from 
here on.
         wakeupTrigger.disableWakeups();
 
-        final Timer closeTimer = time.timer(timeout);
+        final Timer closeTimer = createTimerForCloseRequests(timeout);
         
clientTelemetryReporter.ifPresent(ClientTelemetryReporter::initiateClose);
         closeTimer.update();
         // Prepare shutting down the network thread
@@ -1337,7 +1342,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         swallow(log, Level.ERROR, "Failed to stop finding coordinator",
             this::stopFindCoordinatorOnClose, firstException);
         swallow(log, Level.ERROR, "Failed to release group assignment",
-            () -> runRebalanceCallbacksOnClose(closeTimer), firstException);
+            this::runRebalanceCallbacksOnClose, firstException);
         swallow(log, Level.ERROR, "Failed to leave group while closing 
consumer",
             () -> leaveGroupOnClose(closeTimer), firstException);
         swallow(log, Level.ERROR, "Failed invoking asynchronous commit 
callbacks while closing consumer",
@@ -1368,6 +1373,12 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         }
     }
 
+    private Timer createTimerForCloseRequests(Duration timeout) {
+        // this.time could be null if an exception occurs in constructor prior 
to setting the this.time field
+        final Time time = (this.time == null) ? Time.SYSTEM : this.time;
+        return time.timer(Math.min(timeout.toMillis(), requestTimeoutMs));
+    }
+
     private void autoCommitOnClose(final Timer timer) {
         if (groupMetadata.get().isEmpty())
             return;
@@ -1378,7 +1389,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         applicationEventHandler.add(new CommitOnCloseEvent());
     }
 
-    private void runRebalanceCallbacksOnClose(final Timer timer) {
+    private void runRebalanceCallbacksOnClose() {
         if (groupMetadata.get().isEmpty())
             return;
 
@@ -1393,19 +1404,15 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         SortedSet<TopicPartition> droppedPartitions = new 
TreeSet<>(TOPIC_PARTITION_COMPARATOR);
         droppedPartitions.addAll(assignedPartitions);
 
-        try {
-            final Exception error;
+        final Exception error;
 
-            if (memberEpoch > 0)
-                error = 
rebalanceListenerInvoker.invokePartitionsRevoked(droppedPartitions);
-            else
-                error = 
rebalanceListenerInvoker.invokePartitionsLost(droppedPartitions);
+        if (memberEpoch > 0)
+            error = 
rebalanceListenerInvoker.invokePartitionsRevoked(droppedPartitions);
+        else
+            error = 
rebalanceListenerInvoker.invokePartitionsLost(droppedPartitions);
 
-            if (error != null)
-                throw ConsumerUtils.maybeWrapAsKafkaException(error);
-        } finally {
-            timer.update();
-        }
+        if (error != null)
+            throw ConsumerUtils.maybeWrapAsKafkaException(error);
     }
 
     private void leaveGroupOnClose(final Timer timer) {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index 819365e9712..58ec7a334dc 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -244,6 +244,7 @@ public class AsyncKafkaConsumerTest {
         String clientId,
         boolean autoCommitEnabled) {
         long retryBackoffMs = 100L;
+        int requestTimeoutMs = 30000;
         int defaultApiTimeoutMs = 1000;
         return new AsyncKafkaConsumer<>(
             new LogContext(),
@@ -261,6 +262,7 @@ public class AsyncKafkaConsumerTest {
             subscriptions,
             metadata,
             retryBackoffMs,
+            requestTimeoutMs,
             defaultApiTimeoutMs,
             groupId,
             autoCommitEnabled);
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala 
b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index a91ad5dd563..f05280b24a2 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -240,7 +240,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with 
Logging {
   }
 
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
   def testClose(quorum: String, groupProtocol: String): Unit = {
     val numRecords = 10
     val producer = createProducer()

Reply via email to