This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new 6684319185c KAFKA-18645: New consumer should align close timeout
handling with classic consumer (#18702)
6684319185c is described below
commit 6684319185c849663d2cba5e64586baec164d995
Author: TengYao Chi <[email protected]>
AuthorDate: Wed Feb 5 22:08:51 2025 +0800
KAFKA-18645: New consumer should align close timeout handling with classic
consumer (#18702)
Reviewers: Lianet Magrans <[email protected]>, Kirk True
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
.../kafka/clients/consumer/KafkaConsumer.java | 7 +++++
.../consumer/internals/AsyncKafkaConsumer.java | 35 +++++++++++++---------
.../consumer/internals/AsyncKafkaConsumerTest.java | 2 ++
.../integration/kafka/api/ConsumerBounceTest.scala | 2 +-
4 files changed, 31 insertions(+), 15 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 8c9ea180f84..70c0f7cadd5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1782,6 +1782,13 @@ public class KafkaConsumer<K, V> implements Consumer<K,
V> {
* timeout. If the consumer is unable to complete offset commits and
gracefully leave the group
* before the timeout expires, the consumer is force closed. Note that
{@link #wakeup()} cannot be
* used to interrupt close.
+ * <p>
+ * The actual maximum wait time is bounded by the {@link
ConsumerConfig#REQUEST_TIMEOUT_MS_CONFIG} setting, which
+ * only applies to operations performed with the broker
(coordinator-related requests and
+ * fetch sessions). Even if a larger timeout is specified, the consumer
will not wait longer than
+ * {@link ConsumerConfig#REQUEST_TIMEOUT_MS_CONFIG} for these requests to
complete during the close operation.
+ * Note that the execution time of callbacks (such as {@link
OffsetCommitCallback} and
+ * {@link ConsumerRebalanceListener}) does not consume time from the close
timeout.
*
* @param timeout The maximum time to wait for consumer to close
gracefully. The value must be
* non-negative. Specifying a timeout of zero means do not
wait for pending requests to complete.
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index 2c65b44a7a0..3f30acc932c 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -246,6 +246,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
private final ConsumerMetadata metadata;
private final Metrics metrics;
private final long retryBackoffMs;
+ private final int requestTimeoutMs;
private final Duration defaultApiTimeoutMs;
private final boolean autoCommitEnabled;
private volatile boolean closed = false;
@@ -324,6 +325,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
this.metrics = createMetrics(config, time, reporters);
this.kafkaConsumerMetrics = new AsyncConsumerMetrics(metrics);
this.retryBackoffMs =
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+ this.requestTimeoutMs =
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
List<ConsumerInterceptor<K, V>> interceptorList =
configuredConsumerInterceptors(config);
this.interceptors = new ConsumerInterceptors<>(interceptorList);
@@ -447,6 +449,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
SubscriptionState subscriptions,
ConsumerMetadata metadata,
long retryBackoffMs,
+ int requestTimeoutMs,
int defaultApiTimeoutMs,
String groupId,
boolean autoCommitEnabled) {
@@ -466,6 +469,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
this.groupMetadata.set(initializeGroupMetadata(groupId,
Optional.empty()));
this.metadata = metadata;
this.retryBackoffMs = retryBackoffMs;
+ this.requestTimeoutMs = requestTimeoutMs;
this.defaultApiTimeoutMs = Duration.ofMillis(defaultApiTimeoutMs);
this.deserializers = deserializers;
this.applicationEventHandler = applicationEventHandler;
@@ -499,6 +503,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
this.metrics = new Metrics(time);
this.metadata = metadata;
this.retryBackoffMs =
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+ this.requestTimeoutMs =
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
this.defaultApiTimeoutMs =
Duration.ofMillis(config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG));
this.deserializers = new Deserializers<>(keyDeserializer,
valueDeserializer);
this.clientTelemetryReporter = Optional.empty();
@@ -1326,7 +1331,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
// We are already closing with a timeout, don't allow wake-ups from
here on.
wakeupTrigger.disableWakeups();
- final Timer closeTimer = time.timer(timeout);
+ final Timer closeTimer = createTimerForCloseRequests(timeout);
clientTelemetryReporter.ifPresent(ClientTelemetryReporter::initiateClose);
closeTimer.update();
// Prepare shutting down the network thread
@@ -1337,7 +1342,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
swallow(log, Level.ERROR, "Failed to stop finding coordinator",
this::stopFindCoordinatorOnClose, firstException);
swallow(log, Level.ERROR, "Failed to release group assignment",
- () -> runRebalanceCallbacksOnClose(closeTimer), firstException);
+ this::runRebalanceCallbacksOnClose, firstException);
swallow(log, Level.ERROR, "Failed to leave group while closing
consumer",
() -> leaveGroupOnClose(closeTimer), firstException);
swallow(log, Level.ERROR, "Failed invoking asynchronous commit
callbacks while closing consumer",
@@ -1368,6 +1373,12 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
}
}
+ private Timer createTimerForCloseRequests(Duration timeout) {
+ // this.time could be null if an exception occurs in constructor prior
to setting the this.time field
+ final Time time = (this.time == null) ? Time.SYSTEM : this.time;
+ return time.timer(Math.min(timeout.toMillis(), requestTimeoutMs));
+ }
+
private void autoCommitOnClose(final Timer timer) {
if (groupMetadata.get().isEmpty())
return;
@@ -1378,7 +1389,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
applicationEventHandler.add(new CommitOnCloseEvent());
}
- private void runRebalanceCallbacksOnClose(final Timer timer) {
+ private void runRebalanceCallbacksOnClose() {
if (groupMetadata.get().isEmpty())
return;
@@ -1393,19 +1404,15 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
SortedSet<TopicPartition> droppedPartitions = new
TreeSet<>(TOPIC_PARTITION_COMPARATOR);
droppedPartitions.addAll(assignedPartitions);
- try {
- final Exception error;
+ final Exception error;
- if (memberEpoch > 0)
- error =
rebalanceListenerInvoker.invokePartitionsRevoked(droppedPartitions);
- else
- error =
rebalanceListenerInvoker.invokePartitionsLost(droppedPartitions);
+ if (memberEpoch > 0)
+ error =
rebalanceListenerInvoker.invokePartitionsRevoked(droppedPartitions);
+ else
+ error =
rebalanceListenerInvoker.invokePartitionsLost(droppedPartitions);
- if (error != null)
- throw ConsumerUtils.maybeWrapAsKafkaException(error);
- } finally {
- timer.update();
- }
+ if (error != null)
+ throw ConsumerUtils.maybeWrapAsKafkaException(error);
}
private void leaveGroupOnClose(final Timer timer) {
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index 819365e9712..58ec7a334dc 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -244,6 +244,7 @@ public class AsyncKafkaConsumerTest {
String clientId,
boolean autoCommitEnabled) {
long retryBackoffMs = 100L;
+ int requestTimeoutMs = 30000;
int defaultApiTimeoutMs = 1000;
return new AsyncKafkaConsumer<>(
new LogContext(),
@@ -261,6 +262,7 @@ public class AsyncKafkaConsumerTest {
subscriptions,
metadata,
retryBackoffMs,
+ requestTimeoutMs,
defaultApiTimeoutMs,
groupId,
autoCommitEnabled);
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index a91ad5dd563..f05280b24a2 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -240,7 +240,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with
Logging {
}
@ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testClose(quorum: String, groupProtocol: String): Unit = {
val numRecords = 10
val producer = createProducer()