This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 66485b04c69c12c8c6152053554d078840f34809
Author: PoAn Yang <[email protected]>
AuthorDate: Sat Aug 3 14:27:27 2024 +0800

    KAFKA-16480: ListOffsets change should have an associated API/IBP version 
update (#16781)
    
       1. Use oldestAllowedVersion as 9 if using 
ListOffsetsRequest#EARLIEST_LOCAL_TIMESTAMP or 
ListOffsetsRequest#LATEST_TIERED_TIMESTAMP.
       2. Add test cases to 
ListOffsetsRequestTest#testListOffsetsRequestOldestVersion to make sure 
requireTieredStorageTimestamp return 9 as minVersion.
       3. Add EarliestLocalSpec and LatestTierSpec to OffsetSpec.
       4. Add more cases to KafkaAdminClient#getOffsetFromSpec.
       5. Add testListOffsetsEarliestLocalSpecMinVersion and 
testListOffsetsLatestTierSpecSpecMinVersion to KafkaAdminClientTest to make 
sure request builder has oldestAllowedVersion as 9.
    
    Signed-off-by: PoAn Yang <[email protected]>
    
    Reviewers: Luke Chen <[email protected]>
---
 .../kafka/clients/admin/KafkaAdminClient.java      |  4 ++
 .../org/apache/kafka/clients/admin/OffsetSpec.java | 19 ++++++++
 .../admin/internals/ListOffsetsHandler.java        |  6 ++-
 .../clients/consumer/internals/OffsetFetcher.java  |  2 +-
 .../consumer/internals/OffsetsRequestManager.java  |  2 +-
 .../kafka/common/requests/ListOffsetsRequest.java  |  9 +++-
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 56 ++++++++++++++++++++++
 .../common/requests/ListOffsetsRequestTest.java    |  4 +-
 .../kafka/common/requests/RequestResponseTest.java |  6 +--
 .../kafka/api/AuthorizerIntegrationTest.scala      |  2 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    |  4 +-
 .../unit/kafka/server/ListOffsetsRequestTest.scala | 24 +++++++---
 .../scala/unit/kafka/server/LogOffsetTest.scala    |  2 +-
 .../scala/unit/kafka/server/RequestQuotaTest.scala |  2 +-
 .../jmh/common/ListOffsetRequestBenchmark.java     |  2 +-
 15 files changed, 122 insertions(+), 22 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 640a08a3786..8eb7fb4e8c0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -4860,6 +4860,10 @@ public class KafkaAdminClient extends AdminClient {
             return ListOffsetsRequest.EARLIEST_TIMESTAMP;
         } else if (offsetSpec instanceof OffsetSpec.MaxTimestampSpec) {
             return ListOffsetsRequest.MAX_TIMESTAMP;
+        } else if (offsetSpec instanceof OffsetSpec.EarliestLocalSpec) {
+            return ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP;
+        } else if (offsetSpec instanceof OffsetSpec.LatestTierSpec) {
+            return ListOffsetsRequest.LATEST_TIERED_TIMESTAMP;
         }
         return ListOffsetsRequest.LATEST_TIMESTAMP;
     }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java
index dcf90452c55..5b2fbb3e2e9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java
@@ -26,6 +26,8 @@ public class OffsetSpec {
     public static class EarliestSpec extends OffsetSpec { }
     public static class LatestSpec extends OffsetSpec { }
     public static class MaxTimestampSpec extends OffsetSpec { }
+    public static class EarliestLocalSpec extends OffsetSpec { }
+    public static class LatestTierSpec extends OffsetSpec { }
     public static class TimestampSpec extends OffsetSpec {
         private final long timestamp;
 
@@ -70,4 +72,21 @@ public class OffsetSpec {
         return new MaxTimestampSpec();
     }
 
+    /**
+     * Used to retrieve the offset with the local log start offset,
+     * log start offset is the offset of a log above which reads are 
guaranteed to be served
+     * from the disk of the leader broker, when Tiered Storage is not enabled, 
it behaves the same
+     * as the earliest timestamp
+     */
+    public static OffsetSpec earliestLocalSpec() {
+        return new EarliestLocalSpec();
+    }
+
+    /**
+     * Used to retrieve the offset with the highest offset of data stored in 
remote storage,
+     * and when Tiered Storage is not enabled, we won't return any offset 
(i.e. Unknown offset)
+     */
+    public static OffsetSpec latestTierSpec() {
+        return new LatestTierSpec();
+    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java
index 0bb42ed7696..7dfcb22afba 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java
@@ -93,8 +93,12 @@ public final class ListOffsetsHandler extends 
Batched<TopicPartition, ListOffset
             .stream()
             .anyMatch(key -> offsetTimestampsByPartition.get(key) == 
ListOffsetsRequest.MAX_TIMESTAMP);
 
+        boolean requireTieredStorageTimestamp = keys
+            .stream()
+            .anyMatch(key -> offsetTimestampsByPartition.get(key) == 
ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP || 
offsetTimestampsByPartition.get(key) == 
ListOffsetsRequest.LATEST_TIERED_TIMESTAMP);
+
         return ListOffsetsRequest.Builder
-            .forConsumer(true, options.isolationLevel(), supportsMaxTimestamp)
+            .forConsumer(true, options.isolationLevel(), supportsMaxTimestamp, 
requireTieredStorageTimestamp)
             .setTargetTimes(new ArrayList<>(topicsByName.values()));
     }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
index f624941c525..ec0bfe2fc13 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
@@ -391,7 +391,7 @@ public class OffsetFetcher {
                                                                   final 
Map<TopicPartition, ListOffsetsPartition> timestampsToSearch,
                                                                   boolean 
requireTimestamp) {
         ListOffsetsRequest.Builder builder = ListOffsetsRequest.Builder
-                .forConsumer(requireTimestamp, isolationLevel, false)
+                .forConsumer(requireTimestamp, isolationLevel, false, false)
                 
.setTargetTimes(ListOffsetsRequest.toListOffsetsTopics(timestampsToSearch));
 
         log.debug("Sending ListOffsetRequest {} to broker {}", builder, node);
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
index b4f63714128..71da2f5bf60 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
@@ -337,7 +337,7 @@ public class OffsetsRequestManager implements 
RequestManager, ClusterResourceLis
             boolean requireTimestamps,
             List<NetworkClientDelegate.UnsentRequest> unsentRequests) {
         ListOffsetsRequest.Builder builder = ListOffsetsRequest.Builder
-                .forConsumer(requireTimestamps, isolationLevel, false)
+                .forConsumer(requireTimestamps, isolationLevel, false, false)
                 
.setTargetTimes(ListOffsetsRequest.toListOffsetsTopics(targetTimes));
 
         log.debug("Creating ListOffset request {} for broker {} to reset 
positions", builder,
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java
index 56dea5262f7..8ebf0886bec 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java
@@ -62,9 +62,14 @@ public class ListOffsetsRequest extends AbstractRequest {
             return new Builder((short) 0, allowedVersion, replicaId, 
IsolationLevel.READ_UNCOMMITTED);
         }
 
-        public static Builder forConsumer(boolean requireTimestamp, 
IsolationLevel isolationLevel, boolean requireMaxTimestamp) {
+        public static Builder forConsumer(boolean requireTimestamp,
+                                          IsolationLevel isolationLevel,
+                                          boolean requireMaxTimestamp,
+                                          boolean 
requireTieredStorageTimestamp) {
             short minVersion = 0;
-            if (requireMaxTimestamp)
+            if (requireTieredStorageTimestamp)
+                minVersion = 9;
+            else if (requireMaxTimestamp)
                 minVersion = 7;
             else if (isolationLevel == IsolationLevel.READ_COMMITTED)
                 minVersion = 2;
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 34c2722d463..8d70e60fc05 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -5844,6 +5844,62 @@ public class KafkaAdminClientTest {
         }
     }
 
+    @Test
+    public void testListOffsetsEarliestLocalSpecMinVersion() throws Exception {
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        List<PartitionInfo> pInfos = new ArrayList<>();
+        pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new 
Node[]{node}));
+        final Cluster cluster = new Cluster(
+                "mockClusterId",
+                nodes,
+                pInfos,
+                Collections.emptySet(),
+                Collections.emptySet(),
+                node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster,
+                AdminClientConfig.RETRIES_CONFIG, "2")) {
+
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), 
Errors.NONE));
+
+            env.adminClient().listOffsets(Collections.singletonMap(tp0, 
OffsetSpec.earliestLocalSpec()));
+
+            TestUtils.waitForCondition(() -> 
env.kafkaClient().requests().stream().anyMatch(request ->
+                request.requestBuilder().apiKey().messageType == 
ApiMessageType.LIST_OFFSETS && request.requestBuilder().oldestAllowedVersion() 
== 9
+            ), "no listOffsets request has the expected oldestAllowedVersion");
+        }
+    }
+
+    @Test
+    public void testListOffsetsLatestTierSpecSpecMinVersion() throws Exception 
{
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        List<PartitionInfo> pInfos = new ArrayList<>();
+        pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new 
Node[]{node}));
+        final Cluster cluster = new Cluster(
+                "mockClusterId",
+                nodes,
+                pInfos,
+                Collections.emptySet(),
+                Collections.emptySet(),
+                node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster,
+                AdminClientConfig.RETRIES_CONFIG, "2")) {
+
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), 
Errors.NONE));
+
+            env.adminClient().listOffsets(Collections.singletonMap(tp0, 
OffsetSpec.latestTierSpec()));
+
+            TestUtils.waitForCondition(() -> 
env.kafkaClient().requests().stream().anyMatch(request ->
+                    request.requestBuilder().apiKey().messageType == 
ApiMessageType.LIST_OFFSETS && request.requestBuilder().oldestAllowedVersion() 
== 9
+            ), "no listOffsets request has the expected oldestAllowedVersion");
+        }
+    }
+
     private Map<String, FeatureUpdate> makeTestFeatureUpdates() {
         return Utils.mkMap(
             Utils.mkEntry("test_feature_1", new FeatureUpdate((short) 2,  
FeatureUpdate.UpgradeType.UPGRADE)),
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/ListOffsetsRequestTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/ListOffsetsRequestTest.java
index d748203cce0..e9a2bec2ffd 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/ListOffsetsRequestTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/ListOffsetsRequestTest.java
@@ -68,7 +68,7 @@ public class ListOffsetsRequestTest {
                                 new ListOffsetsPartition()
                                     .setPartitionIndex(0))));
             ListOffsetsRequest request = ListOffsetsRequest.Builder
-                    .forConsumer(true, IsolationLevel.READ_COMMITTED, false)
+                    .forConsumer(true, IsolationLevel.READ_COMMITTED, false, 
false)
                     .setTargetTimes(topics)
                     .build(version);
             ListOffsetsResponse response = (ListOffsetsResponse) 
request.getErrorResponse(0, Errors.NOT_LEADER_OR_FOLLOWER.exception());
@@ -101,7 +101,7 @@ public class ListOffsetsRequestTest {
                             new ListOffsetsPartition()
                                 .setPartitionIndex(0))));
         ListOffsetsRequest request = ListOffsetsRequest.Builder
-                .forConsumer(true, IsolationLevel.READ_UNCOMMITTED, false)
+                .forConsumer(true, IsolationLevel.READ_UNCOMMITTED, false, 
false)
                 .setTargetTimes(topics)
                 .build((short) 0);
         ListOffsetsResponse response = (ListOffsetsResponse) 
request.getErrorResponse(0, Errors.NOT_LEADER_OR_FOLLOWER.exception());
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index ac044a5a536..54e681b468b 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -2347,7 +2347,7 @@ public class RequestResponseTest {
                             .setMaxNumOffsets(10)
                             .setCurrentLeaderEpoch(5)));
             return ListOffsetsRequest.Builder
-                    .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false)
+                    .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, 
false, false)
                     .setTargetTimes(singletonList(topic))
                     .build(version);
         } else if (version == 1) {
@@ -2358,7 +2358,7 @@ public class RequestResponseTest {
                             .setTimestamp(1000000L)
                             .setCurrentLeaderEpoch(5)));
             return ListOffsetsRequest.Builder
-                    .forConsumer(true, IsolationLevel.READ_UNCOMMITTED, false)
+                    .forConsumer(true, IsolationLevel.READ_UNCOMMITTED, false, 
false)
                     .setTargetTimes(singletonList(topic))
                     .build(version);
         } else if (version >= 2 && version <= LIST_OFFSETS.latestVersion()) {
@@ -2371,7 +2371,7 @@ public class RequestResponseTest {
                     .setName("test")
                     .setPartitions(singletonList(partition));
             return ListOffsetsRequest.Builder
-                    .forConsumer(true, IsolationLevel.READ_COMMITTED, false)
+                    .forConsumer(true, IsolationLevel.READ_COMMITTED, false, 
false)
                     .setTargetTimes(singletonList(topic))
                     .build(version);
         } else {
diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index c00abf9427a..c58caaae8b4 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -287,7 +287,7 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
   }
 
   private def createListOffsetsRequest = {
-    requests.ListOffsetsRequest.Builder.forConsumer(false, 
IsolationLevel.READ_UNCOMMITTED, false).setTargetTimes(
+    requests.ListOffsetsRequest.Builder.forConsumer(false, 
IsolationLevel.READ_UNCOMMITTED, false, false).setTargetTimes(
       List(new ListOffsetsTopic()
         .setName(tp.topic)
         .setPartitions(List(new ListOffsetsPartition()
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index e66e345a459..91b45dcc033 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -4000,7 +4000,7 @@ class KafkaApisTest extends Logging {
         .setPartitionIndex(tp.partition)
         .setTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP)
         .setCurrentLeaderEpoch(currentLeaderEpoch.get)).asJava)).asJava
-    val listOffsetRequest = ListOffsetsRequest.Builder.forConsumer(true, 
isolationLevel, false)
+    val listOffsetRequest = ListOffsetsRequest.Builder.forConsumer(true, 
isolationLevel, false, false)
       .setTargetTimes(targetTimes).build()
     val request = buildRequest(listOffsetRequest)
     
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
@@ -6151,7 +6151,7 @@ class KafkaApisTest extends Logging {
       .setPartitions(List(new ListOffsetsPartition()
         .setPartitionIndex(tp.partition)
         .setTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP)).asJava)).asJava
-    val listOffsetRequest = ListOffsetsRequest.Builder.forConsumer(true, 
isolationLevel, false)
+    val listOffsetRequest = ListOffsetsRequest.Builder.forConsumer(true, 
isolationLevel, false, false)
       .setTargetTimes(targetTimes).build()
     val request = buildRequest(listOffsetRequest)
     kafkaApis = createKafkaApis()
diff --git a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
index 60b8789e428..f48e3546d89 100644
--- a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
@@ -54,7 +54,7 @@ class ListOffsetsRequestTest extends BaseRequestTest {
         .setCurrentLeaderEpoch(0)).asJava)).asJava
 
     val consumerRequest = ListOffsetsRequest.Builder
-      .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false)
+      .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false, false)
       .setTargetTimes(targetTimes)
       .build()
 
@@ -94,15 +94,27 @@ class ListOffsetsRequestTest extends BaseRequestTest {
 
   @ParameterizedTest
   @ValueSource(strings = Array("zk", "kraft"))
-  def testListOffsetsMaxTimeStampOldestVersion(quorum: String): Unit = {
+  def testListOffsetsRequestOldestVersion(): Unit = {
     val consumerRequestBuilder = ListOffsetsRequest.Builder
-      .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false)
+      .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false, false)
+
+    val requireTimestampRequestBuilder = ListOffsetsRequest.Builder
+      .forConsumer(true, IsolationLevel.READ_UNCOMMITTED, false, false)
+
+    val requestCommittedRequestBuilder = ListOffsetsRequest.Builder
+      .forConsumer(false, IsolationLevel.READ_COMMITTED, false, false)
 
     val maxTimestampRequestBuilder = ListOffsetsRequest.Builder
-      .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, true)
+      .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, true, false)
+
+    val requireTieredStorageTimestampRequestBuilder = 
ListOffsetsRequest.Builder
+      .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false, true)
 
     assertEquals(0.toShort, consumerRequestBuilder.oldestAllowedVersion())
+    assertEquals(1.toShort, 
requireTimestampRequestBuilder.oldestAllowedVersion())
+    assertEquals(2.toShort, 
requestCommittedRequestBuilder.oldestAllowedVersion())
     assertEquals(7.toShort, maxTimestampRequestBuilder.oldestAllowedVersion())
+    assertEquals(9.toShort, 
requireTieredStorageTimestampRequestBuilder.oldestAllowedVersion())
   }
 
   def assertResponseErrorForEpoch(error: Errors, brokerId: Int, 
currentLeaderEpoch: Optional[Integer]): Unit = {
@@ -115,7 +127,7 @@ class ListOffsetsRequestTest extends BaseRequestTest {
       .setName(topic)
       .setPartitions(List(listOffsetPartition).asJava)).asJava
     val request = ListOffsetsRequest.Builder
-      .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false)
+      .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false, false)
       .setTargetTimes(targetTimes)
       .build()
     assertResponseError(error, brokerId, request)
@@ -159,7 +171,7 @@ class ListOffsetsRequestTest extends BaseRequestTest {
         .setTimestamp(timestamp)).asJava)).asJava
 
     val builder = ListOffsetsRequest.Builder
-      .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false)
+      .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false, false)
       .setTargetTimes(targetTimes)
 
     val request = if (version == -1) builder.build() else 
builder.build(version)
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala 
b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index dffecb4d9cb..61fcd6fcf21 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -60,7 +60,7 @@ class LogOffsetTest extends BaseRequestTest {
   @ValueSource(strings = Array("zk", "kraft"))
   def testGetOffsetsForUnknownTopic(quorum: String): Unit = {
     val topicPartition = new TopicPartition("foo", 0)
-    val request = ListOffsetsRequest.Builder.forConsumer(false, 
IsolationLevel.READ_UNCOMMITTED, false)
+    val request = ListOffsetsRequest.Builder.forConsumer(false, 
IsolationLevel.READ_UNCOMMITTED, false, false)
       .setTargetTimes(buildTargetTimes(topicPartition, 
ListOffsetsRequest.LATEST_TIMESTAMP, 10).asJava).build(0)
     val response = sendListOffsetsRequest(request)
     assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code, 
findPartition(response.topics.asScala, topicPartition).errorCode)
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala 
b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 33c7df2fc52..fbffa4ce37a 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -288,7 +288,7 @@ class RequestQuotaTest extends BaseRequestTest {
               .setPartitionIndex(tp.partition)
               .setTimestamp(0L)
               .setCurrentLeaderEpoch(15)).asJava)
-          ListOffsetsRequest.Builder.forConsumer(false, 
IsolationLevel.READ_UNCOMMITTED, false)
+          ListOffsetsRequest.Builder.forConsumer(false, 
IsolationLevel.READ_UNCOMMITTED, false, false)
             .setTargetTimes(List(topic).asJava)
 
         case ApiKeys.LEADER_AND_ISR =>
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/ListOffsetRequestBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/ListOffsetRequestBenchmark.java
index fe791350a5d..786648f904e 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/ListOffsetRequestBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/ListOffsetRequestBenchmark.java
@@ -71,7 +71,7 @@ public class ListOffsetRequestBenchmark {
             }
         }
 
-        this.offsetRequest = ListOffsetsRequest.Builder.forConsumer(false, 
IsolationLevel.READ_UNCOMMITTED, false)
+        this.offsetRequest = ListOffsetsRequest.Builder.forConsumer(false, 
IsolationLevel.READ_UNCOMMITTED, false, false)
                 .build(ApiKeys.LIST_OFFSETS.latestVersion());
     }
 

Reply via email to