This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b5cceb43e5e KAFKA-19205: inconsistent result of 
beginningOffsets/endoffset between classic and async consumer with 0 timeout 
(#19578)
b5cceb43e5e is described below

commit b5cceb43e5eb9a19cf090e48bc97267b1c6e78a4
Author: xijiu <422766...@qq.com>
AuthorDate: Sun May 4 01:12:20 2025 +0800

    KAFKA-19205: inconsistent result of beginningOffsets/endoffset between 
classic and async consumer with 0 timeout (#19578)
    
    In the return results of the methods beginningOffsets and endOffset, if
    timeout == 0, then an empty Map should be returned uniformly instead of
    in the form of <TopicPartition, null>
    
    Reviewers: Ken Huang <s7133...@gmail.com>, PoAn Yang
     <pay...@apache.org>, Chia-Ping Tsai <chia7...@gmail.com>, Lianet
     Magrans <lmagr...@confluent.io>
---
 .../apache/kafka/clients/consumer/KafkaConsumer.java  |  4 ++--
 .../consumer/internals/AsyncKafkaConsumer.java        |  5 ++++-
 .../consumer/internals/AsyncKafkaConsumerTest.java    |  5 ++---
 .../integration/kafka/api/PlaintextConsumerTest.scala | 19 +++++++++++++++++++
 4 files changed, 27 insertions(+), 6 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index c4c462e7f94..ae23c689de1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1634,7 +1634,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
      * @param partitions the partitions to get the earliest offsets
      * @param timeout The maximum amount of time to await retrieval of the 
beginning offsets
      *
-     * @return The earliest available offsets for the given partitions
+     * @return The earliest available offsets for the given partitions, and it 
will return empty map if zero timeout is provided
      * @throws org.apache.kafka.common.errors.AuthenticationException if 
authentication fails. See the exception for more details
      * @throws org.apache.kafka.common.errors.AuthorizationException if not 
authorized to the topic(s). See the exception for more details
      * @throws org.apache.kafka.common.errors.TimeoutException if the offset 
metadata could not be fetched before
@@ -1684,7 +1684,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
      * @param partitions the partitions to get the end offsets.
      * @param timeout The maximum amount of time to await retrieval of the end 
offsets
      *
-     * @return The end offsets for the given partitions.
+     * @return The end offsets for the given partitions, and it will return 
empty map if zero timeout is provided
      * @throws org.apache.kafka.common.errors.AuthenticationException if 
authentication fails. See the exception for more details
      * @throws org.apache.kafka.common.errors.AuthorizationException if not 
authorized to the topic(s). See the exception for more details
      * @throws org.apache.kafka.common.errors.TimeoutException if the offsets 
could not be fetched before
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index db1d239c0ae..e301b6855c6 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -1306,7 +1306,10 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             // and throw timeout exception if it cannot complete in time.
             if (timeout.isZero()) {
                 applicationEventHandler.add(listOffsetsEvent);
-                return listOffsetsEvent.emptyResults();
+                // It is used to align with classic consumer.
+                // When the "timeout == 0", the classic consumer will return 
an empty map.
+                // Therefore, the AsyncKafkaConsumer needs to be consistent 
with it.
+                return new HashMap<>();
             }
 
             Map<TopicPartition, OffsetAndTimestampInternal> 
offsetAndTimestampMap;
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index 9a12ce75a6c..3b43d665c81 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -993,9 +993,8 @@ public class AsyncKafkaConsumerTest {
         TopicPartition tp = new TopicPartition("topic1", 0);
         Map<TopicPartition, Long> result =
                 assertDoesNotThrow(() -> 
consumer.beginningOffsets(Collections.singletonList(tp), Duration.ZERO));
-        // The result should be {tp=null}
-        assertTrue(result.containsKey(tp));
-        assertNull(result.get(tp));
+        assertNotNull(result);
+        assertEquals(0, result.size());
         
verify(applicationEventHandler).add(ArgumentMatchers.isA(ListOffsetsEvent.class));
     }
 
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index f1c708acfdd..7197212d893 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -14,6 +14,7 @@ package kafka.api
 
 import kafka.api.BaseConsumerTest.{DeserializerImpl, SerializerImpl}
 
+import java.lang.{Long => JLong}
 import java.time.Duration
 import java.util
 import java.util.Arrays.asList
@@ -873,4 +874,22 @@ class PlaintextConsumerTest extends BaseConsumerTest {
       waitTimeMs=leaveGroupTimeoutMs
     )
   }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
+  @MethodSource(Array("getTestGroupProtocolParametersAll"))
+  def testOffsetRelatedWhenTimeoutZero(groupProtocol: String): Unit = {
+    val consumer = createConsumer()
+    val result1 = consumer.beginningOffsets(util.List.of(tp), Duration.ZERO)
+    assertNotNull(result1)
+    assertEquals(0, result1.size())
+
+    val result2 = consumer.endOffsets(util.List.of(tp), Duration.ZERO)
+    assertNotNull(result2)
+    assertEquals(0, result2.size())
+
+    val result3 = consumer.offsetsForTimes(Map[TopicPartition, JLong]((tp, 
0)).asJava, Duration.ZERO)
+    assertNotNull(result3)
+    assertEquals(1, result3.size())
+    assertNull(result3.get(tp))
+  }
 }

Reply via email to