This is an automated email from the ASF dual-hosted git repository.
frankvicky pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e64a3240688 KAFKA-19884: Add tests to ensure Consumer.close() timeouts
work as documented (#21037)
e64a3240688 is described below
commit e64a32406889823a1a9cbf7e60d8965916086a7c
Author: Kirk True <[email protected]>
AuthorDate: Thu Mar 12 07:50:02 2026 -0700
KAFKA-19884: Add tests to ensure Consumer.close() timeouts work as
documented (#21037)
Ensure that `close()` takes at least `DEFAULT_FETCH_MAX_WAIT_MS`
milliseconds to close, and ensure that `close(CloseOptions)` with a
short timeout closes before `DEFAULT_FETCH_MAX_WAIT_MS`.
Reviewers: Lianet Magrans <[email protected]>, TengYao Chi
<[email protected]>
---
.../consumer/PlaintextConsumerCloseTest.java | 135 +++++++++++++++++++++
1 file changed, 135 insertions(+)
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCloseTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCloseTest.java
new file mode 100644
index 00000000000..32efc5947c3
--- /dev/null
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCloseTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.Type;
+
+import java.time.Duration;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+
+import static
org.apache.kafka.clients.ClientsTestUtils.BaseConsumerTestcase.BROKER_COUNT;
+import static org.apache.kafka.clients.ClientsTestUtils.consumeRecords;
+import static org.apache.kafka.clients.ClientsTestUtils.sendRecords;
+import static
org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
+import static
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static
org.apache.kafka.clients.consumer.ConsumerConfig.DEFAULT_FETCH_MAX_WAIT_MS;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
+import static
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG;
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG;
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG;
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@ClusterTestDefaults(
+ types = {Type.KRAFT},
+ brokers = BROKER_COUNT,
+ serverProperties = {
+ @ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value =
"1"),
+ @ClusterConfigProperty(key = GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG,
value = "100"),
+ @ClusterConfigProperty(key = GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG,
value = "60000"),
+ @ClusterConfigProperty(key = GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG,
value = "10"),
+ }
+)
+public class PlaintextConsumerCloseTest {
+
+ private final ClusterInstance cluster;
+
+ public PlaintextConsumerCloseTest(ClusterInstance cluster) {
+ this.cluster = cluster;
+ }
+
+ @ClusterTest
+ public void
testClassicConsumerCloseWithDefaultTakesAtLeastFetchMaxWaitMs() throws
Exception {
+ testCloseWithDefaultTakesAtLeastFetchMaxWaitMs(GroupProtocol.CLASSIC);
+ }
+
+ @ClusterTest
+ public void testAsyncConsumerCloseWithDefaultTakesAtLeastFetchMaxWaitMs()
throws Exception {
+ testCloseWithDefaultTakesAtLeastFetchMaxWaitMs(GroupProtocol.CONSUMER);
+ }
+
+ private void testCloseWithDefaultTakesAtLeastFetchMaxWaitMs(GroupProtocol
groupProtocol) throws Exception {
+ long closeMs = calculateConsumerCloseDelay(groupProtocol,
Consumer::close);
+ assertTrue(
+ closeMs >= DEFAULT_FETCH_MAX_WAIT_MS,
+ "Closing a consumer with the default close() should take longer
than " + DEFAULT_FETCH_MAX_WAIT_MS + " ms, but actually took " + closeMs + " ms"
+ );
+ }
+
+ @ClusterTest
+ public void testClassicConsumerCloseWithTimeoutIgnoresFetchMaxWaitMs()
throws Exception {
+ testCloseWithTimeoutIgnoresFetchMaxWaitMs(GroupProtocol.CLASSIC);
+ }
+
+ @ClusterTest
+ public void testAsyncConsumerCloseWithTimeoutIgnoresFetchMaxWaitMs()
throws Exception {
+ testCloseWithTimeoutIgnoresFetchMaxWaitMs(GroupProtocol.CONSUMER);
+ }
+
+ private void testCloseWithTimeoutIgnoresFetchMaxWaitMs(GroupProtocol
groupProtocol) throws Exception {
+ long timeoutMs = 100;
+
+ // Close the Consumer with a specified timeout that's much shorter
than the default fetch timeout
+ // to ensure the fetch.max.wait.ms is effectively ignored.
+ long closeMs = calculateConsumerCloseDelay(
+ groupProtocol,
+ c -> c.close(CloseOptions.timeout(Duration.ofMillis(timeoutMs)))
+ );
+ assertTrue(
+ closeMs <= DEFAULT_FETCH_MAX_WAIT_MS,
+ "Closing a consumer with a timeout of " + timeoutMs + " ms should
take less than " + DEFAULT_FETCH_MAX_WAIT_MS + " ms, but actually took " +
closeMs + " ms"
+ );
+ }
+
+ private long calculateConsumerCloseDelay(GroupProtocol groupProtocol,
+
java.util.function.Consumer<Consumer<byte[], byte[]>> closeOperation) throws
Exception {
+ Map<String, Object> consumerConfig = Map.of(
+ GROUP_PROTOCOL_CONFIG,
groupProtocol.name().toLowerCase(Locale.ROOT),
+ KEY_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName(),
+ VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName(),
+ AUTO_OFFSET_RESET_CONFIG, "earliest",
+ GROUP_ID_CONFIG, "group_test",
+ BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()
+ );
+ var topicName = "calculate-consumer-close-delay";
+ var numRecords = 100;
+ cluster.createTopic(topicName, 2, (short) 2, Map.of());
+
+ sendRecords(cluster, new TopicPartition(topicName, 0), numRecords);
+
+ try (Consumer<byte[], byte[]> consumer =
cluster.consumer(consumerConfig)) {
+ consumer.subscribe(Set.of(topicName));
+ consumeRecords(consumer, numRecords);
+
+ long start = System.currentTimeMillis();
+ closeOperation.accept(consumer);
+ long end = System.currentTimeMillis();
+ return end - start;
+ }
+ }
+}