This is an automated email from the ASF dual-hosted git repository. lianetm pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new b5cceb43e5e KAFKA-19205: inconsistent result of beginningOffsets/endoffset between classic and async consumer with 0 timeout (#19578) b5cceb43e5e is described below commit b5cceb43e5eb9a19cf090e48bc97267b1c6e78a4 Author: xijiu <422766...@qq.com> AuthorDate: Sun May 4 01:12:20 2025 +0800 KAFKA-19205: inconsistent result of beginningOffsets/endoffset between classic and async consumer with 0 timeout (#19578) In the return results of the methods beginningOffsets and endOffset, if timeout == 0, then an empty Map should be returned uniformly instead of in the form of <TopicPartition, null> Reviewers: Ken Huang <s7133...@gmail.com>, PoAn Yang <pay...@apache.org>, Chia-Ping Tsai <chia7...@gmail.com>, Lianet Magrans <lmagr...@confluent.io> --- .../apache/kafka/clients/consumer/KafkaConsumer.java | 4 ++-- .../consumer/internals/AsyncKafkaConsumer.java | 5 ++++- .../consumer/internals/AsyncKafkaConsumerTest.java | 5 ++--- .../integration/kafka/api/PlaintextConsumerTest.scala | 19 +++++++++++++++++++ 4 files changed, 27 insertions(+), 6 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index c4c462e7f94..ae23c689de1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -1634,7 +1634,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * @param partitions the partitions to get the earliest offsets * @param timeout The maximum amount of time to await retrieval of the beginning offsets * - * @return The earliest available offsets for the given partitions + * @return The earliest available offsets for the given partitions, and it will return empty map if zero timeout is provided * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details * @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before @@ -1684,7 +1684,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * @param partitions the partitions to get the end offsets. * @param timeout The maximum amount of time to await retrieval of the end offsets * - * @return The end offsets for the given partitions. + * @return The end offsets for the given partitions, and it will return empty map if zero timeout is provided * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details * @throws org.apache.kafka.common.errors.TimeoutException if the offsets could not be fetched before diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index db1d239c0ae..e301b6855c6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -1306,7 +1306,10 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { // and throw timeout exception if it cannot complete in time. if (timeout.isZero()) { applicationEventHandler.add(listOffsetsEvent); - return listOffsetsEvent.emptyResults(); + // It is used to align with classic consumer. + // When the "timeout == 0", the classic consumer will return an empty map. + // Therefore, the AsyncKafkaConsumer needs to be consistent with it. + return new HashMap<>(); } Map<TopicPartition, OffsetAndTimestampInternal> offsetAndTimestampMap; diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index 9a12ce75a6c..3b43d665c81 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -993,9 +993,8 @@ public class AsyncKafkaConsumerTest { TopicPartition tp = new TopicPartition("topic1", 0); Map<TopicPartition, Long> result = assertDoesNotThrow(() -> consumer.beginningOffsets(Collections.singletonList(tp), Duration.ZERO)); - // The result should be {tp=null} - assertTrue(result.containsKey(tp)); - assertNull(result.get(tp)); + assertNotNull(result); + assertEquals(0, result.size()); verify(applicationEventHandler).add(ArgumentMatchers.isA(ListOffsetsEvent.class)); } diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala index f1c708acfdd..7197212d893 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala @@ -14,6 +14,7 @@ package kafka.api import kafka.api.BaseConsumerTest.{DeserializerImpl, SerializerImpl} +import java.lang.{Long => JLong} import java.time.Duration import java.util import java.util.Arrays.asList @@ -873,4 +874,22 @@ class PlaintextConsumerTest extends BaseConsumerTest { waitTimeMs=leaveGroupTimeoutMs ) } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) + @MethodSource(Array("getTestGroupProtocolParametersAll")) + def testOffsetRelatedWhenTimeoutZero(groupProtocol: String): Unit = { + val consumer = createConsumer() + val result1 = consumer.beginningOffsets(util.List.of(tp), Duration.ZERO) + assertNotNull(result1) + assertEquals(0, result1.size()) + + val result2 = consumer.endOffsets(util.List.of(tp), Duration.ZERO) + assertNotNull(result2) + assertEquals(0, result2.size()) + + val result3 = consumer.offsetsForTimes(Map[TopicPartition, JLong]((tp, 0)).asJava, Duration.ZERO) + assertNotNull(result3) + assertEquals(1, result3.size()) + assertNull(result3.get(tp)) + } }