This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4e0578fb814 KAFKA-16156: beginningOrEndOffsets does not need to build 
an OffsetAndTimestamps object upon completion (#15525)
4e0578fb814 is described below

commit 4e0578fb814e99da2a8f4d6adc123fe87c8f24f3
Author: Philip Nee <p...@confluent.io>
AuthorDate: Mon Apr 8 04:04:58 2024 -0700

    KAFKA-16156: beginningOrEndOffsets does not need to build an 
OffsetAndTimestamps object upon completion (#15525)
    
    A subtle difference in the behavior of the two API causes the failures with 
Invalid negative timestamp.
    
    In this PR, the list offsets response will be processed differently based 
on the API. For beginingOffsets/endOffsets - the offset response should be 
directly returned.
    
    For offsetsForTimes - A OffsetAndTimestamp object is constructed for each 
requested TopicPartition before being returned.
    
    The reason beginningOffsets and endOffsets - We are expecting a -1 
timestamp from the response which subsequently causes the invalid timestamp 
exception because the original code tries to construct an OffsetAndTimestamp 
object upon returning.
    
    In this PR, the following missing tasks are added:
    
    short-circuit both beginningOrEndOffsets
    Test both API (beginningOrEndOffsets, OffsetsForTime)
    Seems like we don't have tests for this API: Note it is presented in other 
IntegrationTests but they are added to test Async consumer
    
    Reviewers: Lucas Brutschy <lbruts...@confluent.io>, Lianet Magrans 
<liane...@gmail.com>
---
 .../consumer/internals/AsyncKafkaConsumer.java     |  52 ++++---
 .../internals/OffsetAndTimestampInternal.java      |  81 +++++++++++
 .../consumer/internals/OffsetFetcherUtils.java     |  45 ++++--
 .../consumer/internals/OffsetsRequestManager.java  |  47 +++---
 .../events/ApplicationEventProcessor.java          |  10 +-
 .../internals/events/ListOffsetsEvent.java         |  18 +--
 .../consumer/internals/AsyncKafkaConsumerTest.java |  70 +++++----
 .../internals/ConsumerNetworkThreadTest.java       |   9 +-
 .../internals/OffsetsRequestManagerTest.java       | 160 ++++++++++-----------
 .../kafka/api/PlaintextConsumerTest.scala          |  56 +++++++-
 10 files changed, 367 insertions(+), 181 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index fcd57469c2a..74e8866242e 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -1093,17 +1093,22 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                 return Collections.emptyMap();
             }
             final Timer timer = time.timer(timeout);
-            final ListOffsetsEvent listOffsetsEvent = new ListOffsetsEvent(
-                timestampsToSearch,
-                true,
-                timer);
+            ListOffsetsEvent listOffsetsEvent = new ListOffsetsEvent(
+                    timestampsToSearch,
+                    timer,
+                    true);
 
             // If timeout is set to zero return empty immediately; otherwise 
try to get the results
             // and throw timeout exception if it cannot complete in time.
             if (timeout.toMillis() == 0L)
-                return listOffsetsEvent.emptyResult();
-
-            return applicationEventHandler.addAndGet(listOffsetsEvent, timer);
+                return listOffsetsEvent.emptyResults();
+
+            return applicationEventHandler.addAndGet(listOffsetsEvent, timer)
+                    .entrySet()
+                    .stream()
+                    .collect(Collectors.toMap(
+                            Map.Entry::getKey,
+                            entry -> 
entry.getValue().buildOffsetAndTimestamp()));
         } finally {
             release();
         }
@@ -1141,21 +1146,30 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             if (partitions.isEmpty()) {
                 return Collections.emptyMap();
             }
+
             Map<TopicPartition, Long> timestampToSearch = partitions
-                .stream()
-                .collect(Collectors.toMap(Function.identity(), tp -> 
timestamp));
+                    .stream()
+                    .collect(Collectors.toMap(Function.identity(), tp -> 
timestamp));
             Timer timer = time.timer(timeout);
             ListOffsetsEvent listOffsetsEvent = new ListOffsetsEvent(
-                timestampToSearch,
-                false,
-                timer);
-            Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestampMap = 
applicationEventHandler.addAndGet(
-                listOffsetsEvent,
-                timer);
-            return offsetAndTimestampMap
-                .entrySet()
-                .stream()
-                .collect(Collectors.toMap(Map.Entry::getKey, e -> 
e.getValue().offset()));
+                    timestampToSearch,
+                    timer,
+                    false);
+
+            Map<TopicPartition, OffsetAndTimestampInternal> 
offsetAndTimestampMap;
+            if (timeout.isZero()) {
+                // Return an empty results but also send a request to update 
the highwatermark.
+                applicationEventHandler.add(listOffsetsEvent);
+                return listOffsetsEvent.emptyResults();
+            }
+            offsetAndTimestampMap = applicationEventHandler.addAndGet(
+                    listOffsetsEvent,
+                    timer);
+            return offsetAndTimestampMap.entrySet()
+                    .stream()
+                    .collect(Collectors.toMap(
+                            Map.Entry::getKey,
+                            entry -> entry.getValue().offset()));
         } finally {
             release();
         }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetAndTimestampInternal.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetAndTimestampInternal.java
new file mode 100644
index 00000000000..08d451da477
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetAndTimestampInternal.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+
+import java.util.Optional;
+
+/**
+ * Internal representation of {@link OffsetAndTimestamp} to allow negative 
timestamps and offset.
+ */
+public class OffsetAndTimestampInternal {
+    private final long timestamp;
+    private final long offset;
+    private final Optional<Integer> leaderEpoch;
+
+    public OffsetAndTimestampInternal(long offset, long timestamp, 
Optional<Integer> leaderEpoch) {
+        this.offset = offset;
+        this.timestamp = timestamp;
+        this.leaderEpoch = leaderEpoch;
+    }
+
+    long offset() {
+        return offset;
+    }
+
+    long timestamp() {
+        return timestamp;
+    }
+
+    Optional<Integer> leaderEpoch() {
+        return leaderEpoch;
+    }
+
+    public OffsetAndTimestamp buildOffsetAndTimestamp() {
+        return new OffsetAndTimestamp(offset, timestamp, leaderEpoch);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = (int) (timestamp ^ (timestamp >>> 32));
+        result = 31 * result + (int) (offset ^ (offset >>> 32));
+        result = 31 * result + leaderEpoch.hashCode();
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (!(o instanceof OffsetAndTimestampInternal)) return false;
+
+        OffsetAndTimestampInternal that = (OffsetAndTimestampInternal) o;
+
+        if (timestamp != that.timestamp) return false;
+        if (offset != that.offset) return false;
+        return leaderEpoch.equals(that.leaderEpoch);
+    }
+
+    @Override
+    public String toString() {
+        return "OffsetAndTimestampInternal{" +
+                "timestamp=" + timestamp +
+                ", offset=" + offset +
+                ", leaderEpoch=" + leaderEpoch +
+                '}';
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java
index 9239811f7d6..ac5d8a4acff 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java
@@ -50,6 +50,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -240,20 +241,48 @@ class OffsetFetcherUtils {
         return offsetResetTimestamps;
     }
 
-    static Map<TopicPartition, OffsetAndTimestamp> 
buildOffsetsForTimesResult(final Map<TopicPartition, Long> timestampsToSearch,
-                                                                       final 
Map<TopicPartition, ListOffsetData> fetchedOffsets) {
-        HashMap<TopicPartition, OffsetAndTimestamp> offsetsByTimes = new 
HashMap<>(timestampsToSearch.size());
+    static Map<TopicPartition, OffsetAndTimestamp> buildListOffsetsResult(
+        final Map<TopicPartition, Long> timestampsToSearch,
+        final Map<TopicPartition, ListOffsetData> fetchedOffsets,
+        BiFunction<TopicPartition, ListOffsetData, OffsetAndTimestamp> 
resultMapper) {
+
+        HashMap<TopicPartition, OffsetAndTimestamp> offsetsResults = new 
HashMap<>(timestampsToSearch.size());
         for (Map.Entry<TopicPartition, Long> entry : 
timestampsToSearch.entrySet())
-            offsetsByTimes.put(entry.getKey(), null);
+            offsetsResults.put(entry.getKey(), null);
 
         for (Map.Entry<TopicPartition, ListOffsetData> entry : 
fetchedOffsets.entrySet()) {
-            // 'entry.getValue().timestamp' will not be null since we are 
guaranteed
-            // to work with a v1 (or later) ListOffset request
             ListOffsetData offsetData = entry.getValue();
-            offsetsByTimes.put(entry.getKey(), new 
OffsetAndTimestamp(offsetData.offset, offsetData.timestamp, 
offsetData.leaderEpoch));
+            offsetsResults.put(entry.getKey(), 
resultMapper.apply(entry.getKey(), offsetData));
         }
 
-        return offsetsByTimes;
+        return offsetsResults;
+    }
+
+    static Map<TopicPartition, OffsetAndTimestamp> buildOffsetsForTimesResult(
+        final Map<TopicPartition, Long> timestampsToSearch,
+        final Map<TopicPartition, ListOffsetData> fetchedOffsets) {
+        return buildListOffsetsResult(timestampsToSearch, fetchedOffsets,
+            (topicPartition, offsetData) -> new OffsetAndTimestamp(
+                offsetData.offset,
+                offsetData.timestamp,
+                offsetData.leaderEpoch));
+    }
+
+    static Map<TopicPartition, OffsetAndTimestampInternal> 
buildOffsetsForTimeInternalResult(
+            final Map<TopicPartition, Long> timestampsToSearch,
+            final Map<TopicPartition, ListOffsetData> fetchedOffsets) {
+        HashMap<TopicPartition, OffsetAndTimestampInternal> offsetsResults = 
new HashMap<>(timestampsToSearch.size());
+        for (Map.Entry<TopicPartition, Long> entry : 
timestampsToSearch.entrySet()) {
+            offsetsResults.put(entry.getKey(), null);
+        }
+        for (Map.Entry<TopicPartition, ListOffsetData> entry : 
fetchedOffsets.entrySet()) {
+            ListOffsetData offsetData = entry.getValue();
+            offsetsResults.put(entry.getKey(), new OffsetAndTimestampInternal(
+                    offsetData.offset,
+                    offsetData.timestamp,
+                    offsetData.leaderEpoch));
+        }
+        return offsetsResults;
     }
 
     private Long offsetResetStrategyTimestamp(final TopicPartition partition) {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
index c5156e9e0b9..22e56111b47 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
@@ -151,14 +151,13 @@ public class OffsetsRequestManager implements 
RequestManager, ClusterResourceLis
      * found .The future will complete when the requests responses are 
received and
      * processed, following a call to {@link #poll(long)}
      */
-    public CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> 
fetchOffsets(
-            final Map<TopicPartition, Long> timestampsToSearch,
-            final boolean requireTimestamps) {
+    public CompletableFuture<Map<TopicPartition, OffsetAndTimestampInternal>> 
fetchOffsets(
+            Map<TopicPartition, Long> timestampsToSearch,
+            boolean requireTimestamps) {
         if (timestampsToSearch.isEmpty()) {
             return CompletableFuture.completedFuture(Collections.emptyMap());
         }
         
metadata.addTransientTopics(OffsetFetcherUtils.topicsForPartitions(timestampsToSearch.keySet()));
-
         ListOffsetsRequestState listOffsetsRequestState = new 
ListOffsetsRequestState(
                 timestampsToSearch,
                 requireTimestamps,
@@ -175,10 +174,11 @@ public class OffsetsRequestManager implements 
RequestManager, ClusterResourceLis
             }
         });
 
-        fetchOffsetsByTimes(timestampsToSearch, requireTimestamps, 
listOffsetsRequestState);
-
-        return listOffsetsRequestState.globalResult.thenApply(result ->
-                
OffsetFetcherUtils.buildOffsetsForTimesResult(timestampsToSearch, 
result.fetchedOffsets));
+        prepareFetchOffsetsRequests(timestampsToSearch, requireTimestamps, 
listOffsetsRequestState);
+        return listOffsetsRequestState.globalResult.thenApply(
+                result -> OffsetFetcherUtils.buildOffsetsForTimeInternalResult(
+                        timestampsToSearch,
+                        result.fetchedOffsets));
     }
 
     /**
@@ -235,14 +235,9 @@ public class OffsetsRequestManager implements 
RequestManager, ClusterResourceLis
      * Generate requests for partitions with known leaders. Update the 
listOffsetsRequestState by adding
      * partitions with unknown leader to the 
listOffsetsRequestState.remainingToSearch
      */
-    private void fetchOffsetsByTimes(final Map<TopicPartition, Long> 
timestampsToSearch,
-                                     final boolean requireTimestamps,
-                                     final ListOffsetsRequestState 
listOffsetsRequestState) {
-        if (timestampsToSearch.isEmpty()) {
-            // Early return if empty map to avoid wrongfully raising 
StaleMetadataException on
-            // empty grouping
-            return;
-        }
+    private void prepareFetchOffsetsRequests(final Map<TopicPartition, Long> 
timestampsToSearch,
+                                             final boolean requireTimestamps,
+                                             final ListOffsetsRequestState 
listOffsetsRequestState) {
         try {
             List<NetworkClientDelegate.UnsentRequest> unsentRequests = 
buildListOffsetsRequests(
                     timestampsToSearch, requireTimestamps, 
listOffsetsRequestState);
@@ -263,7 +258,7 @@ public class OffsetsRequestManager implements 
RequestManager, ClusterResourceLis
             Map<TopicPartition, Long> timestampsToSearch =
                     new HashMap<>(requestState.remainingToSearch);
             requestState.remainingToSearch.clear();
-            fetchOffsetsByTimes(timestampsToSearch, 
requestState.requireTimestamps, requestState);
+            prepareFetchOffsetsRequests(timestampsToSearch, 
requestState.requireTimestamps, requestState);
         });
     }
 
@@ -298,7 +293,7 @@ public class OffsetsRequestManager implements 
RequestManager, ClusterResourceLis
                 
offsetFetcherUtils.updateSubscriptionState(multiNodeResult.fetchedOffsets,
                         isolationLevel);
 
-                if (listOffsetsRequestState.remainingToSearch.size() == 0) {
+                if (listOffsetsRequestState.remainingToSearch.isEmpty()) {
                     ListOffsetResult listOffsetResult =
                             new 
ListOffsetResult(listOffsetsRequestState.fetchedOffsets,
                                     
listOffsetsRequestState.remainingToSearch.keySet());
@@ -314,7 +309,6 @@ public class OffsetsRequestManager implements 
RequestManager, ClusterResourceLis
 
         for (Map.Entry<Node, Map<TopicPartition, 
ListOffsetsRequestData.ListOffsetsPartition>> entry : 
timestampsToSearchByNode.entrySet()) {
             Node node = entry.getKey();
-
             CompletableFuture<ListOffsetResult> partialResult = 
buildListOffsetRequestToNode(
                     node,
                     entry.getValue(),
@@ -364,8 +358,7 @@ public class OffsetsRequestManager implements 
RequestManager, ClusterResourceLis
                 ListOffsetsResponse lor = (ListOffsetsResponse) 
response.responseBody();
                 log.trace("Received ListOffsetResponse {} from broker {}", 
lor, node);
                 try {
-                    ListOffsetResult listOffsetResult =
-                            offsetFetcherUtils.handleListOffsetResponse(lor);
+                    ListOffsetResult listOffsetResult = 
offsetFetcherUtils.handleListOffsetResponse(lor);
                     result.complete(listOffsetResult);
                 } catch (RuntimeException e) {
                     result.completeExceptionally(e);
@@ -423,11 +416,11 @@ public class OffsetsRequestManager implements 
RequestManager, ClusterResourceLis
             });
         });
 
-        if (unsentRequests.size() > 0) {
+        if (unsentRequests.isEmpty()) {
+            globalResult.complete(null);
+        } else {
             expectedResponses.set(unsentRequests.size());
             requestsToSend.addAll(unsentRequests);
-        } else {
-            globalResult.complete(null);
         }
 
         return globalResult;
@@ -503,11 +496,11 @@ public class OffsetsRequestManager implements 
RequestManager, ClusterResourceLis
             });
         });
 
-        if (unsentRequests.size() > 0) {
+        if (unsentRequests.isEmpty()) {
+            globalResult.complete(null);
+        } else {
             expectedResponses.set(unsentRequests.size());
             requestsToSend.addAll(unsentRequests);
-        } else {
-            globalResult.complete(null);
         }
 
         return globalResult;
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index 33825307465..d5cb1c04b38 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -17,12 +17,12 @@
 package org.apache.kafka.clients.consumer.internals.events;
 
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.clients.consumer.internals.CachedSupplier;
 import org.apache.kafka.clients.consumer.internals.CommitRequestManager;
 import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
 import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread;
 import org.apache.kafka.clients.consumer.internals.MembershipManager;
+import org.apache.kafka.clients.consumer.internals.OffsetAndTimestampInternal;
 import org.apache.kafka.clients.consumer.internals.RequestManagers;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.PartitionInfo;
@@ -197,10 +197,12 @@ public class ApplicationEventProcessor extends 
EventProcessor<ApplicationEvent>
         manager.maybeAutoCommitAsync();
     }
 
+    /**
+     * Handles ListOffsetsEvent by fetching the offsets for the given 
partitions and timestamps.
+     */
     private void process(final ListOffsetsEvent event) {
-        final CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> 
future =
-                
requestManagers.offsetsRequestManager.fetchOffsets(event.timestampsToSearch(),
-                        event.requireTimestamps());
+        final CompletableFuture<Map<TopicPartition, 
OffsetAndTimestampInternal>> future =
+            
requestManagers.offsetsRequestManager.fetchOffsets(event.timestampsToSearch(), 
event.requireTimestamps());
         future.whenComplete(complete(event.future()));
     }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsEvent.java
index e218705846e..3df4719a7b0 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsEvent.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.clients.consumer.internals.events;
 
 import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.clients.consumer.internals.OffsetAndTimestampInternal;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.Timer;
 
@@ -32,12 +33,13 @@ import java.util.Map;
  * {@link OffsetAndTimestamp} found (offset of the first message whose 
timestamp is greater than
  * or equals to the target timestamp)
  */
-public class ListOffsetsEvent extends 
CompletableApplicationEvent<Map<TopicPartition, OffsetAndTimestamp>> {
-
+public class ListOffsetsEvent extends 
CompletableApplicationEvent<Map<TopicPartition, OffsetAndTimestampInternal>> {
     private final Map<TopicPartition, Long> timestampsToSearch;
     private final boolean requireTimestamps;
 
-    public ListOffsetsEvent(final Map<TopicPartition, Long> timestampToSearch, 
final boolean requireTimestamps, final Timer timer) {
+    public ListOffsetsEvent(Map<TopicPartition, Long> timestampToSearch,
+                            Timer timer,
+                            boolean requireTimestamps) {
         super(Type.LIST_OFFSETS, timer);
         this.timestampsToSearch = 
Collections.unmodifiableMap(timestampToSearch);
         this.requireTimestamps = requireTimestamps;
@@ -49,11 +51,10 @@ public class ListOffsetsEvent extends 
CompletableApplicationEvent<Map<TopicParti
      * @return Map containing all the partitions the event was trying to get 
offsets for, and
      * null {@link OffsetAndTimestamp} as value
      */
-    public Map<TopicPartition, OffsetAndTimestamp> emptyResult() {
-        HashMap<TopicPartition, OffsetAndTimestamp> offsetsByTimes = new 
HashMap<>(timestampsToSearch.size());
-        for (Map.Entry<TopicPartition, Long> entry : 
timestampsToSearch.entrySet())
-            offsetsByTimes.put(entry.getKey(), null);
-        return offsetsByTimes;
+    public <T> Map<TopicPartition, T> emptyResults() {
+        Map<TopicPartition, T> result = new HashMap<>();
+        timestampsToSearch.keySet().forEach(tp -> result.put(tp, null));
+        return result;
     }
 
     public Map<TopicPartition, Long> timestampsToSearch() {
@@ -70,5 +71,4 @@ public class ListOffsetsEvent extends 
CompletableApplicationEvent<Map<TopicParti
                 ", timestampsToSearch=" + timestampsToSearch +
                 ", requireTimestamps=" + requireTimestamps;
     }
-
 }
\ No newline at end of file
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index fa413eed36a..51fcd2e44eb 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -102,7 +102,6 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static java.util.Arrays.asList;
@@ -309,7 +308,7 @@ public class AsyncKafkaConsumerTest {
     @Test
     public void testCommitAsyncWithFencedException() {
         consumer = newConsumer();
-        final HashMap<TopicPartition, OffsetAndMetadata> offsets = 
mockTopicPartitionOffset();
+        final Map<TopicPartition, OffsetAndMetadata> offsets = 
mockTopicPartitionOffset();
         MockCommitCallback callback = new MockCommitCallback();
 
         assertDoesNotThrow(() -> consumer.commitAsync(offsets, callback));
@@ -841,18 +840,17 @@ public class AsyncKafkaConsumerTest {
     @Test
     public void testBeginningOffsets() {
         consumer = newConsumer();
-        Map<TopicPartition, OffsetAndTimestamp> expectedOffsetsAndTimestamp =
-            mockOffsetAndTimestamp();
-        Set<TopicPartition> partitions = expectedOffsetsAndTimestamp.keySet();
-        
doReturn(expectedOffsetsAndTimestamp).when(applicationEventHandler).addAndGet(any(),
 any());
-        Map<TopicPartition, Long> result =
-            assertDoesNotThrow(() -> consumer.beginningOffsets(partitions,
-                Duration.ofMillis(1)));
-        Map<TopicPartition, Long> expectedOffsets = 
expectedOffsetsAndTimestamp.entrySet().stream()
-            .collect(Collectors.toMap(Map.Entry::getKey, e -> 
e.getValue().offset()));
-        assertEquals(expectedOffsets, result);
-        
verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(ListOffsetsEvent.class),
-            ArgumentMatchers.isA(Timer.class));
+        Map<TopicPartition, OffsetAndTimestampInternal> expectedOffsets = 
mockOffsetAndTimestamp();
+
+        
doReturn(expectedOffsets).when(applicationEventHandler).addAndGet(any(), any());
+
+        Map<TopicPartition, Long> result = assertDoesNotThrow(() -> 
consumer.beginningOffsets(expectedOffsets.keySet(), Duration.ofMillis(1)));
+
+        expectedOffsets.forEach((key, value) -> {
+            assertTrue(result.containsKey(key));
+            assertEquals(value.offset(), result.get(key));
+        });
+        verify(applicationEventHandler).addAndGet(any(ListOffsetsEvent.class), 
any(Timer.class));
     }
 
     @Test
@@ -913,13 +911,16 @@ public class AsyncKafkaConsumerTest {
     @Test
     public void testOffsetsForTimes() {
         consumer = newConsumer();
-        Map<TopicPartition, OffsetAndTimestamp> expectedResult = 
mockOffsetAndTimestamp();
+        Map<TopicPartition, OffsetAndTimestampInternal> expectedResult = 
mockOffsetAndTimestamp();
         Map<TopicPartition, Long> timestampToSearch = mockTimestampToSearch();
 
         
doReturn(expectedResult).when(applicationEventHandler).addAndGet(any(), any());
         Map<TopicPartition, OffsetAndTimestamp> result =
                 assertDoesNotThrow(() -> 
consumer.offsetsForTimes(timestampToSearch, Duration.ofMillis(1)));
-        assertEquals(expectedResult, result);
+        expectedResult.forEach((key, value) -> {
+            OffsetAndTimestamp expected = value.buildOffsetAndTimestamp();
+            assertEquals(expected, result.get(key));
+        });
         
verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(ListOffsetsEvent.class),
                 ArgumentMatchers.isA(Timer.class));
     }
@@ -927,17 +928,26 @@ public class AsyncKafkaConsumerTest {
     // This test ensures same behaviour as the current consumer when 
offsetsForTimes is called
     // with 0 timeout. It should return map with all requested partitions as 
keys, with null
     // OffsetAndTimestamp as value.
+    @Test
+    public void testBeginningOffsetsWithZeroTimeout() {
+        consumer = newConsumer();
+        TopicPartition tp = new TopicPartition("topic1", 0);
+        Map<TopicPartition, Long> result =
+                assertDoesNotThrow(() -> 
consumer.beginningOffsets(Collections.singletonList(tp), Duration.ZERO));
+        // The result should be {tp=null}
+        assertTrue(result.containsKey(tp));
+        assertNull(result.get(tp));
+        
verify(applicationEventHandler).add(ArgumentMatchers.isA(ListOffsetsEvent.class));
+    }
+
     @Test
     public void testOffsetsForTimesWithZeroTimeout() {
         consumer = newConsumer();
         TopicPartition tp = new TopicPartition("topic1", 0);
-        Map<TopicPartition, OffsetAndTimestamp> expectedResult =
-                Collections.singletonMap(tp, null);
+        Map<TopicPartition, OffsetAndTimestamp> expectedResult = 
Collections.singletonMap(tp, null);
         Map<TopicPartition, Long> timestampToSearch = 
Collections.singletonMap(tp, 5L);
-
         Map<TopicPartition, OffsetAndTimestamp> result =
-                assertDoesNotThrow(() -> 
consumer.offsetsForTimes(timestampToSearch,
-                        Duration.ZERO));
+            assertDoesNotThrow(() -> 
consumer.offsetsForTimes(timestampToSearch, Duration.ZERO));
         assertEquals(expectedResult, result);
         verify(applicationEventHandler, 
never()).addAndGet(ArgumentMatchers.isA(ListOffsetsEvent.class),
             ArgumentMatchers.isA(Timer.class));
@@ -946,7 +956,7 @@ public class AsyncKafkaConsumerTest {
     @Test
     public void testWakeupCommitted() {
         consumer = newConsumer();
-        final HashMap<TopicPartition, OffsetAndMetadata> offsets = 
mockTopicPartitionOffset();
+        final Map<TopicPartition, OffsetAndMetadata> offsets = 
mockTopicPartitionOffset();
         doAnswer(invocation -> {
             CompletableApplicationEvent<?> event = invocation.getArgument(0);
             Timer timer = invocation.getArgument(1);
@@ -1647,28 +1657,28 @@ public class AsyncKafkaConsumerTest {
         }
     }
 
-    private HashMap<TopicPartition, OffsetAndMetadata> 
mockTopicPartitionOffset() {
+    private Map<TopicPartition, OffsetAndMetadata> mockTopicPartitionOffset() {
         final TopicPartition t0 = new TopicPartition("t0", 2);
         final TopicPartition t1 = new TopicPartition("t0", 3);
-        HashMap<TopicPartition, OffsetAndMetadata> topicPartitionOffsets = new 
HashMap<>();
+        Map<TopicPartition, OffsetAndMetadata> topicPartitionOffsets = new 
HashMap<>();
         topicPartitionOffsets.put(t0, new OffsetAndMetadata(10L));
         topicPartitionOffsets.put(t1, new OffsetAndMetadata(20L));
         return topicPartitionOffsets;
     }
 
-    private HashMap<TopicPartition, OffsetAndTimestamp> 
mockOffsetAndTimestamp() {
+    private Map<TopicPartition, OffsetAndTimestampInternal> 
mockOffsetAndTimestamp() {
         final TopicPartition t0 = new TopicPartition("t0", 2);
         final TopicPartition t1 = new TopicPartition("t0", 3);
-        HashMap<TopicPartition, OffsetAndTimestamp> offsetAndTimestamp = new 
HashMap<>();
-        offsetAndTimestamp.put(t0, new OffsetAndTimestamp(5L, 1L));
-        offsetAndTimestamp.put(t1, new OffsetAndTimestamp(6L, 3L));
+        Map<TopicPartition, OffsetAndTimestampInternal> offsetAndTimestamp = 
new HashMap<>();
+        offsetAndTimestamp.put(t0, new OffsetAndTimestampInternal(5L, 1L, 
Optional.empty()));
+        offsetAndTimestamp.put(t1, new OffsetAndTimestampInternal(6L, 3L, 
Optional.empty()));
         return offsetAndTimestamp;
     }
 
-    private HashMap<TopicPartition, Long> mockTimestampToSearch() {
+    private Map<TopicPartition, Long> mockTimestampToSearch() {
         final TopicPartition t0 = new TopicPartition("t0", 2);
         final TopicPartition t1 = new TopicPartition("t0", 3);
-        HashMap<TopicPartition, Long> timestampToSearch = new HashMap<>();
+        Map<TopicPartition, Long> timestampToSearch = new HashMap<>();
         timestampToSearch.put(t0, 1L);
         timestampToSearch.put(t1, 2L);
         return timestampToSearch;
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
index e4d492fb581..091009064de 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
@@ -47,6 +47,8 @@ import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.time.Duration;
 import java.util.ArrayList;
@@ -169,11 +171,12 @@ public class ConsumerNetworkThreadTest {
         verify(applicationEventProcessor).process(any(SyncCommitEvent.class));
     }
 
-    @Test
-    public void testListOffsetsEventIsProcessed() {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testListOffsetsEventIsProcessed(boolean requireTimestamp) {
         Map<TopicPartition, Long> timestamps = Collections.singletonMap(new 
TopicPartition("topic1", 1), 5L);
         Timer timer = time.timer(100);
-        ApplicationEvent e = new ListOffsetsEvent(timestamps, true, timer);
+        ApplicationEvent e = new ListOffsetsEvent(timestamps, timer, 
requireTimestamp);
         applicationEventsQueue.add(e);
         consumerNetworkThread.runOnce();
         verify(applicationEventProcessor).process(any(ListOffsetsEvent.class));
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java
index 582efdfe76b..109a7d92b55 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java
@@ -20,7 +20,6 @@ import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.NodeApiVersions;
-import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
@@ -131,13 +130,15 @@ public class OffsetsRequestManagerTest {
                 ListOffsetsRequest.EARLIEST_TIMESTAMP);
 
         mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, 
LEADER_1));
-        CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> result = 
requestManager.fetchOffsets(
+        CompletableFuture<Map<TopicPartition, OffsetAndTimestampInternal>> 
result = requestManager.fetchOffsets(
                 timestampsToSearch,
                 false);
         assertEquals(1, requestManager.requestsToSend());
         assertEquals(0, requestManager.requestsToRetry());
 
-        Map<TopicPartition, OffsetAndTimestamp> expectedOffsets = 
Collections.singletonMap(TEST_PARTITION_1, new OffsetAndTimestamp(5L, 1L));
+        Map<TopicPartition, OffsetAndTimestampInternal> expectedOffsets = 
Collections.singletonMap(
+                TEST_PARTITION_1,
+                new OffsetAndTimestampInternal(5L, -1, Optional.empty()));
         verifySuccessfulPollAndResponseReceived(result, expectedOffsets);
     }
 
@@ -148,8 +149,9 @@ public class OffsetsRequestManagerTest {
 
         // Building list offsets request fails with unknown leader
         mockFailedRequest_MissingLeader();
-        CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> 
fetchOffsetsFuture =
-                requestManager.fetchOffsets(timestampsToSearch, false);
+        CompletableFuture<Map<TopicPartition, OffsetAndTimestampInternal>> 
fetchOffsetsFuture =
+            requestManager.fetchOffsets(timestampsToSearch, false);
+
         assertEquals(0, requestManager.requestsToSend());
         assertEquals(1, requestManager.requestsToRetry());
         verify(metadata).requestUpdate(true);
@@ -172,22 +174,23 @@ public class OffsetsRequestManagerTest {
         partitionLeaders.put(TEST_PARTITION_1, LEADER_1);
         partitionLeaders.put(TEST_PARTITION_2, LEADER_1);
         mockSuccessfulRequest(partitionLeaders);
-        CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> result = 
requestManager.fetchOffsets(
-                timestampsToSearch,
-                false);
+        CompletableFuture<Map<TopicPartition, OffsetAndTimestampInternal>> 
result = requestManager.fetchOffsets(
+                        timestampsToSearch,
+                        false);
         assertEquals(1, requestManager.requestsToSend());
         assertEquals(0, requestManager.requestsToRetry());
 
-        Map<TopicPartition, OffsetAndTimestamp> expectedOffsets = 
timestampsToSearch.entrySet().stream()
-                .collect(Collectors.toMap(e -> e.getKey(), e -> new 
OffsetAndTimestamp(5L, 1L)));
+        Map<TopicPartition, OffsetAndTimestampInternal> expectedOffsets = 
timestampsToSearch.entrySet().stream()
+                .collect(Collectors.toMap(Map.Entry::getKey,
+                        e -> new OffsetAndTimestampInternal(5L, -1, 
Optional.empty())));
         verifySuccessfulPollAndResponseReceived(result, expectedOffsets);
     }
 
     @Test
     public void testListOffsetsRequestEmpty() throws ExecutionException, 
InterruptedException {
-        CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> result = 
requestManager.fetchOffsets(
-                Collections.emptyMap(),
-                false);
+        CompletableFuture<Map<TopicPartition, OffsetAndTimestampInternal>> 
result = requestManager.fetchOffsets(
+                        Collections.emptyMap(),
+                        false);
         assertEquals(0, requestManager.requestsToSend());
         assertEquals(0, requestManager.requestsToRetry());
 
@@ -209,7 +212,7 @@ public class OffsetsRequestManagerTest {
                 ListOffsetsRequest.EARLIEST_TIMESTAMP);
 
         mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, 
LEADER_1));
-        CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> result = 
requestManager.fetchOffsets(
+        CompletableFuture<Map<TopicPartition, OffsetAndTimestampInternal>> 
result = requestManager.fetchOffsets(
                 timestampsToSearch,
                 false);
         assertEquals(1, requestManager.requestsToSend());
@@ -223,9 +226,7 @@ public class OffsetsRequestManagerTest {
         NetworkClientDelegate.UnsentRequest unsentRequest = 
retriedPoll.unsentRequests.get(0);
         ClientResponse clientResponse = buildClientResponse(unsentRequest, 
topicResponses);
         clientResponse.onComplete();
-
-        Map<TopicPartition, OffsetAndTimestamp> expectedOffsets =
-                Collections.singletonMap(TEST_PARTITION_1, null);
+        Map<TopicPartition, OffsetAndTimestampInternal> expectedOffsets = 
Collections.singletonMap(TEST_PARTITION_1, null);
         verifyRequestSuccessfullyCompleted(result, expectedOffsets);
     }
 
@@ -237,9 +238,8 @@ public class OffsetsRequestManagerTest {
 
         // Building list offsets request fails with unknown leader
         mockFailedRequest_MissingLeader();
-        CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> 
fetchOffsetsFuture =
-                requestManager.fetchOffsets(timestampsToSearch,
-                        false);
+        CompletableFuture<Map<TopicPartition, OffsetAndTimestampInternal>> 
fetchOffsetsFuture =
+            requestManager.fetchOffsets(timestampsToSearch, false);
         assertEquals(0, requestManager.requestsToSend());
         assertEquals(1, requestManager.requestsToRetry());
         verify(metadata).requestUpdate(true);
@@ -254,8 +254,8 @@ public class OffsetsRequestManagerTest {
         requestManager.onUpdate(new ClusterResource(""));
         assertEquals(1, requestManager.requestsToSend());
 
-        Map<TopicPartition, OffsetAndTimestamp> expectedOffsets = 
Collections.singletonMap(
-                TEST_PARTITION_1, new OffsetAndTimestamp(5L, 1L));
+        Map<TopicPartition, OffsetAndTimestampInternal> expectedOffsets = 
Collections.singletonMap(
+                TEST_PARTITION_1, new OffsetAndTimestampInternal(5L, -1, 
Optional.empty()));
         verifySuccessfulPollAndResponseReceived(fetchOffsetsFuture, 
expectedOffsets);
     }
 
@@ -267,7 +267,7 @@ public class OffsetsRequestManagerTest {
 
         // List offsets request successfully built
         mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, 
LEADER_1));
-        CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> 
fetchOffsetsFuture = requestManager.fetchOffsets(
+        CompletableFuture<Map<TopicPartition, OffsetAndTimestampInternal>> 
fetchOffsetsFuture = requestManager.fetchOffsets(
                 timestampsToSearch,
                 false);
         assertEquals(1, requestManager.requestsToSend());
@@ -293,7 +293,8 @@ public class OffsetsRequestManagerTest {
         requestManager.onUpdate(new ClusterResource(""));
         assertEquals(1, requestManager.requestsToSend());
 
-        Map<TopicPartition, OffsetAndTimestamp> expectedOffsets = 
Collections.singletonMap(TEST_PARTITION_1, new OffsetAndTimestamp(5L, 1L));
+        Map<TopicPartition, OffsetAndTimestampInternal> expectedOffsets =
+                Collections.singletonMap(TEST_PARTITION_1, new 
OffsetAndTimestampInternal(5L, -1, Optional.empty()));
         verifySuccessfulPollAndResponseReceived(fetchOffsetsFuture, 
expectedOffsets);
     }
 
@@ -315,7 +316,7 @@ public class OffsetsRequestManagerTest {
 
         // List offsets request successfully built
         mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, 
LEADER_1));
-        CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> 
fetchOffsetsFuture = requestManager.fetchOffsets(
+        CompletableFuture<Map<TopicPartition, OffsetAndTimestampInternal>> 
fetchOffsetsFuture = requestManager.fetchOffsets(
                 timestampsToSearch,
                 false);
         assertEquals(1, requestManager.requestsToSend());
@@ -334,7 +335,8 @@ public class OffsetsRequestManagerTest {
         clientResponse.onComplete();
 
         // Null offsets should be returned for each partition
-        Map<TopicPartition, OffsetAndTimestamp> expectedOffsets = 
Collections.singletonMap(TEST_PARTITION_1, null);
+        Map<TopicPartition, OffsetAndTimestampInternal> expectedOffsets =
+                Collections.singletonMap(TEST_PARTITION_1, null);
         verifyRequestSuccessfullyCompleted(fetchOffsetsFuture, 
expectedOffsets);
     }
 
@@ -344,15 +346,17 @@ public class OffsetsRequestManagerTest {
         timestampsToSearch.put(TEST_PARTITION_1, 
ListOffsetsRequest.EARLIEST_TIMESTAMP);
         timestampsToSearch.put(TEST_PARTITION_2, 
ListOffsetsRequest.EARLIEST_TIMESTAMP);
 
-        Map<TopicPartition, OffsetAndTimestamp> expectedOffsets = 
timestampsToSearch.entrySet().stream()
-                .collect(Collectors.toMap(e -> e.getKey(), e -> new 
OffsetAndTimestamp(5L, 1L)));
+        Map<TopicPartition, OffsetAndTimestampInternal> expectedOffsets = 
timestampsToSearch.entrySet().stream()
+            .collect(Collectors.toMap(
+                    Map.Entry::getKey,
+                    e -> new OffsetAndTimestampInternal(5L, -1, 
Optional.empty())));
 
         // List offsets request to 2 brokers successfully built
         Map<TopicPartition, Node> partitionLeaders = new HashMap<>();
         partitionLeaders.put(TEST_PARTITION_1, LEADER_1);
         partitionLeaders.put(TEST_PARTITION_2, LEADER_2);
         mockSuccessfulRequest(partitionLeaders);
-        CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> 
fetchOffsetsFuture = requestManager.fetchOffsets(
+        CompletableFuture<Map<TopicPartition, OffsetAndTimestampInternal>> 
fetchOffsetsFuture = requestManager.fetchOffsets(
                 timestampsToSearch,
                 false);
         assertEquals(2, requestManager.requestsToSend());
@@ -366,14 +370,16 @@ public class OffsetsRequestManagerTest {
         // Mixed response with failures and successes. Offsets successfully 
fetched from one
         // broker but retriable UNKNOWN_LEADER_EPOCH received from second 
broker.
         NetworkClientDelegate.UnsentRequest unsentRequest1 = 
res.unsentRequests.get(0);
+        long offsets = expectedOffsets.get(TEST_PARTITION_1).offset();
         ClientResponse clientResponse1 = buildClientResponse(
                 unsentRequest1,
-                Collections.singletonMap(TEST_PARTITION_1, 
expectedOffsets.get(TEST_PARTITION_1)));
+                Collections.singletonMap(TEST_PARTITION_1,
+                        new OffsetAndTimestampInternal(offsets, -1L, 
Optional.empty())));
         clientResponse1.onComplete();
         NetworkClientDelegate.UnsentRequest unsentRequest2 = 
res.unsentRequests.get(1);
         ClientResponse clientResponse2 = buildClientResponseWithErrors(
-                unsentRequest2,
-                Collections.singletonMap(TEST_PARTITION_2, 
Errors.UNKNOWN_LEADER_EPOCH));
+            unsentRequest2,
+            Collections.singletonMap(TEST_PARTITION_2, 
Errors.UNKNOWN_LEADER_EPOCH));
         clientResponse2.onComplete();
 
         assertFalse(fetchOffsetsFuture.isDone());
@@ -389,8 +395,10 @@ public class OffsetsRequestManagerTest {
         NetworkClientDelegate.PollResult retriedPoll = 
requestManager.poll(time.milliseconds());
         verifySuccessfulPollAwaitingResponse(retriedPoll);
         NetworkClientDelegate.UnsentRequest unsentRequest = 
retriedPoll.unsentRequests.get(0);
+        long offsets2 = expectedOffsets.get(TEST_PARTITION_2).offset();
         ClientResponse clientResponse = buildClientResponse(unsentRequest,
-                Collections.singletonMap(TEST_PARTITION_2, 
expectedOffsets.get(TEST_PARTITION_2)));
+            Collections.singletonMap(TEST_PARTITION_2,
+                    new OffsetAndTimestampInternal(offsets2, -1L, 
Optional.empty())));
         clientResponse.onComplete();
 
         // Verify global result with the offset initially retrieved, and the 
offset that
@@ -405,7 +413,7 @@ public class OffsetsRequestManagerTest {
 
         // List offsets request successfully built
         mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, 
LEADER_1));
-        CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> 
fetchOffsetsFuture =
+        CompletableFuture<Map<TopicPartition, OffsetAndTimestampInternal>> 
fetchOffsetsFuture =
                 requestManager.fetchOffsets(
                         timestampsToSearch,
                         false);
@@ -434,7 +442,7 @@ public class OffsetsRequestManagerTest {
 
         // List offsets request successfully built
         mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, 
LEADER_1));
-        CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> 
fetchOffsetsFuture =
+        CompletableFuture<Map<TopicPartition, OffsetAndTimestampInternal>> 
fetchOffsetsFuture =
                 requestManager.fetchOffsets(
                         timestampsToSearch,
                         false);
@@ -466,10 +474,11 @@ public class OffsetsRequestManagerTest {
 
         // List offsets request successfully built
         mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, 
LEADER_1));
-        CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> 
fetchOffsetsFuture =
-                requestManager.fetchOffsets(
-                        timestampsToSearch,
-                        false);
+        CompletableFuture<Map<TopicPartition, OffsetAndTimestampInternal>> 
fetchOffsetsFuture =
+            requestManager.fetchOffsets(
+                    timestampsToSearch,
+                    false);
+
         assertEquals(1, requestManager.requestsToSend());
         assertEquals(0, requestManager.requestsToRetry());
 
@@ -480,11 +489,16 @@ public class OffsetsRequestManagerTest {
         // Response received with auth error
         NetworkClientDelegate.UnsentRequest unsentRequest = 
res.unsentRequests.get(0);
         ClientResponse clientResponse =
-                buildClientResponseWithAuthenticationException(unsentRequest);
+            buildClientResponse(unsentRequest,
+                Collections.emptyList(),
+                false,
+                new AuthenticationException("Authentication failed"));
         clientResponse.onComplete();
 
-        // Request completed with error. Nothing pending to be sent or retried
-        verifyRequestCompletedWithErrorResponse(fetchOffsetsFuture, 
AuthenticationException.class);
+        assertTrue(fetchOffsetsFuture.isCompletedExceptionally());
+        Throwable failure = assertThrows(ExecutionException.class, 
fetchOffsetsFuture::get);
+        assertEquals(AuthenticationException.class, 
failure.getCause().getClass());
+
         assertEquals(0, requestManager.requestsToRetry());
         assertEquals(0, requestManager.requestsToSend());
     }
@@ -669,8 +683,6 @@ public class OffsetsRequestManagerTest {
         Node leader = LEADER_1;
         OffsetResetStrategy strategy = OffsetResetStrategy.EARLIEST;
         long offset = 5L;
-        Map<TopicPartition, OffsetAndTimestamp> expectedOffsets = 
Collections.singletonMap(tp,
-                new OffsetAndTimestamp(offset, 1L, leaderAndEpoch.epoch));
         
when(subscriptionState.partitionsNeedingReset(time.milliseconds())).thenReturn(Collections.singleton(tp));
         when(subscriptionState.resetStrategy(any())).thenReturn(strategy);
         mockSuccessfulRequest(Collections.singletonMap(tp, leader));
@@ -682,7 +694,8 @@ public class OffsetsRequestManagerTest {
         when(metadata.currentLeader(tp)).thenReturn(testLeaderEpoch(leader, 
leaderAndEpoch.epoch));
         NetworkClientDelegate.PollResult pollResult = 
requestManager.poll(time.milliseconds());
         NetworkClientDelegate.UnsentRequest unsentRequest = 
pollResult.unsentRequests.get(0);
-        ClientResponse clientResponse = buildClientResponse(unsentRequest, 
expectedOffsets);
+        ClientResponse clientResponse = buildClientResponse(unsentRequest, 
Collections.singletonMap(tp,
+                new OffsetAndTimestampInternal(offset, 1L, 
leaderAndEpoch.epoch)));
         clientResponse.onComplete();
         assertTrue(unsentRequest.future().isDone());
         assertFalse(unsentRequest.future().isCompletedExceptionally());
@@ -714,8 +727,8 @@ public class OffsetsRequestManagerTest {
     }
 
     private void verifySuccessfulPollAndResponseReceived(
-            CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> 
actualResult,
-            Map<TopicPartition, OffsetAndTimestamp> expectedResult) throws 
ExecutionException,
+            CompletableFuture<Map<TopicPartition, OffsetAndTimestampInternal>> 
actualResult,
+            Map<TopicPartition, OffsetAndTimestampInternal> expectedResult) 
throws ExecutionException,
             InterruptedException {
         // Following poll should send the request and get a response
         NetworkClientDelegate.PollResult retriedPoll = 
requestManager.poll(time.milliseconds());
@@ -726,6 +739,15 @@ public class OffsetsRequestManagerTest {
         verifyRequestSuccessfullyCompleted(actualResult, expectedResult);
     }
 
+
+    private void 
verifyRequestCompletedWithErrorResponse(CompletableFuture<Map<TopicPartition, 
OffsetAndTimestampInternal>> actualResult,
+                                                         Class<? extends 
Throwable> expectedFailure) {
+        assertTrue(actualResult.isDone());
+        assertTrue(actualResult.isCompletedExceptionally());
+        Throwable failure = assertThrows(ExecutionException.class, 
actualResult::get);
+        assertEquals(expectedFailure, failure.getCause().getClass());
+    }
+
     private void mockSuccessfulRequest(Map<TopicPartition, Node> 
partitionLeaders) {
         partitionLeaders.forEach((tp, broker) -> {
             when(metadata.currentLeader(tp)).thenReturn(testLeaderEpoch(broker,
@@ -753,24 +775,24 @@ public class OffsetsRequestManagerTest {
     }
 
     private void verifyRequestSuccessfullyCompleted(
-            CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> 
actualResult,
-            Map<TopicPartition, OffsetAndTimestamp> expectedResult) throws 
ExecutionException, InterruptedException {
+            CompletableFuture<Map<TopicPartition, OffsetAndTimestampInternal>> 
actualResult,
+            Map<TopicPartition, OffsetAndTimestampInternal> expectedResult) 
throws ExecutionException, InterruptedException {
         assertEquals(0, requestManager.requestsToRetry());
         assertEquals(0, requestManager.requestsToSend());
 
         assertTrue(actualResult.isDone());
         assertFalse(actualResult.isCompletedExceptionally());
-        Map<TopicPartition, OffsetAndTimestamp> partitionOffsets = 
actualResult.get();
+        Map<TopicPartition, OffsetAndTimestampInternal> partitionOffsets = 
actualResult.get();
         assertEquals(expectedResult, partitionOffsets);
 
         // Validate that the subscription state has been updated for all 
non-null offsets retrieved
-        Map<TopicPartition, OffsetAndTimestamp> validExpectedOffsets = 
expectedResult.entrySet().stream()
+        Map<TopicPartition, Long> validExpectedOffsets = 
expectedResult.entrySet().stream()
                 .filter(entry -> entry.getValue() != null)
-                .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+                .collect(Collectors.toMap(Map.Entry::getKey, v -> 
v.getValue().offset()));
         verifySubscriptionStateUpdated(validExpectedOffsets);
     }
 
-    private void verifySubscriptionStateUpdated(Map<TopicPartition, 
OffsetAndTimestamp> expectedResult) {
+    private void verifySubscriptionStateUpdated(Map<TopicPartition, Long> 
expectedResult) {
         ArgumentCaptor<TopicPartition> tpCaptor = 
ArgumentCaptor.forClass(TopicPartition.class);
         ArgumentCaptor<Long> offsetCaptor = 
ArgumentCaptor.forClass(Long.class);
 
@@ -784,18 +806,10 @@ public class OffsetsRequestManagerTest {
 
         assertEquals(expectedResult.values().size(), updatedOffsets.size());
         expectedResult.values().stream()
-                .map(offsetAndTimestamp -> 
updatedOffsets.contains(offsetAndTimestamp.offset()))
+                .map(updatedOffsets::contains)
                 .forEach(Assertions::assertTrue);
     }
 
-    private void 
verifyRequestCompletedWithErrorResponse(CompletableFuture<Map<TopicPartition, 
OffsetAndTimestamp>> actualResult,
-                                                         Class<? extends 
Throwable> expectedFailure) {
-        assertTrue(actualResult.isDone());
-        assertTrue(actualResult.isCompletedExceptionally());
-        Throwable failure = assertThrows(ExecutionException.class, 
actualResult::get);
-        assertEquals(expectedFailure, failure.getCause().getClass());
-    }
-
     private Metadata.LeaderAndEpoch testLeaderEpoch(Node leader, 
Optional<Integer> epoch) {
         return new Metadata.LeaderAndEpoch(Optional.of(leader), epoch);
     }
@@ -814,7 +828,7 @@ public class OffsetsRequestManagerTest {
 
     private ClientResponse buildClientResponse(
             final NetworkClientDelegate.UnsentRequest request,
-            final Map<TopicPartition, OffsetAndTimestamp> partitionsOffsets) {
+            final Map<TopicPartition, OffsetAndTimestampInternal> 
partitionsOffsets) {
         List<ListOffsetsResponseData.ListOffsetsTopicResponse> topicResponses 
= new
                 ArrayList<>();
         partitionsOffsets.forEach((tp, offsetAndTimestamp) -> {
@@ -901,7 +915,6 @@ public class OffsetsRequestManagerTest {
     private ClientResponse buildClientResponse(
             final NetworkClientDelegate.UnsentRequest request,
             final List<ListOffsetsResponseData.ListOffsetsTopicResponse> 
topicResponses) {
-
         return buildClientResponse(request, topicResponses, false, null);
     }
 
@@ -919,12 +932,6 @@ public class OffsetsRequestManagerTest {
         return buildClientResponse(request, topicResponses, false, null);
     }
 
-    private ClientResponse buildClientResponseWithAuthenticationException(
-            final NetworkClientDelegate.UnsentRequest request) {
-        return buildClientResponse(request, Collections.emptyList(), true,
-                new AuthenticationException("Authentication failed"));
-    }
-
     private ClientResponse buildClientResponse(
             final NetworkClientDelegate.UnsentRequest request,
             final List<ListOffsetsResponseData.ListOffsetsTopicResponse> 
topicResponses,
@@ -933,7 +940,9 @@ public class OffsetsRequestManagerTest {
         AbstractRequest abstractRequest = request.requestBuilder().build();
         assertInstanceOf(ListOffsetsRequest.class, abstractRequest);
         ListOffsetsRequest offsetFetchRequest = (ListOffsetsRequest) 
abstractRequest;
-        ListOffsetsResponse response = 
buildListOffsetsResponse(topicResponses);
+        ListOffsetsResponse response = new ListOffsetsResponse(new 
ListOffsetsResponseData()
+                .setThrottleTimeMs(0)
+                .setTopics(topicResponses));
         return new ClientResponse(
                 new RequestHeader(ApiKeys.OFFSET_FETCH, 
offsetFetchRequest.version(), "", 1),
                 request.handler(),
@@ -946,13 +955,4 @@ public class OffsetsRequestManagerTest {
                 response
         );
     }
-
-    private ListOffsetsResponse buildListOffsetsResponse(
-            List<ListOffsetsResponseData.ListOffsetsTopicResponse> 
offsetsTopicResponses) {
-        ListOffsetsResponseData responseData = new ListOffsetsResponseData()
-                .setThrottleTimeMs(0)
-                .setTopics(offsetsTopicResponses);
-
-        return new ListOffsetsResponse(responseData);
-    }
 }
\ No newline at end of file
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 77ba47fa08f..933bea9ac0e 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -15,7 +15,7 @@ package kafka.api
 import java.time.Duration
 import java.util
 import java.util.Arrays.asList
-import java.util.{Locale, Properties}
+import java.util.{Collections, Locale, Optional, Properties}
 import kafka.server.{KafkaBroker, QuotaType}
 import kafka.utils.{TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.admin.{NewPartitions, NewTopic}
@@ -805,4 +805,58 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
     consumer2.close()
   }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testEndOffsets(quorum: String, groupProtocol: String): Unit = {
+    val producer = createProducer()
+    val startingTimestamp = System.currentTimeMillis()
+    val numRecords = 10000
+    (0 until numRecords).map { i =>
+      val timestamp = startingTimestamp + i.toLong
+      val record = new ProducerRecord(tp.topic(), tp.partition(), timestamp, 
s"key $i".getBytes, s"value $i".getBytes)
+      producer.send(record)
+      record
+    }
+    producer.flush()
+
+    val consumer = createConsumer()
+    consumer.subscribe(List(topic).asJava)
+    awaitAssignment(consumer, Set(tp, tp2))
+
+    val endOffsets = consumer.endOffsets(Set(tp).asJava)
+    assertEquals(numRecords, endOffsets.get(tp))
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testFetchOffsetsForTime(quorum: String, groupProtocol: String): Unit = {
+    val numPartitions = 2
+    val producer = createProducer()
+    val timestampsToSearch = new util.HashMap[TopicPartition, java.lang.Long]()
+    var i = 0
+    for (part <- 0 until numPartitions) {
+      val tp = new TopicPartition(topic, part)
+      // key, val, and timestamp equal to the sequence number.
+      sendRecords(producer, numRecords = 100, tp, startingTimestamp = 0)
+      timestampsToSearch.put(tp, (i * 20).toLong)
+      i += 1
+    }
+
+    val consumer = createConsumer()
+    // Test negative target time
+    assertThrows(classOf[IllegalArgumentException],
+      () => consumer.offsetsForTimes(Collections.singletonMap(new 
TopicPartition(topic, 0), -1)))
+    val timestampOffsets = consumer.offsetsForTimes(timestampsToSearch)
+
+    val timestampTp0 = timestampOffsets.get(new TopicPartition(topic, 0))
+    assertEquals(0, timestampTp0.offset)
+    assertEquals(0, timestampTp0.timestamp)
+    assertEquals(Optional.of(0), timestampTp0.leaderEpoch)
+
+    val timestampTp1 = timestampOffsets.get(new TopicPartition(topic, 1))
+    assertEquals(20, timestampTp1.offset)
+    assertEquals(20, timestampTp1.timestamp)
+    assertEquals(Optional.of(0), timestampTp1.leaderEpoch)
+  }
 }

Reply via email to