This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6e324487fa9 KAFKA-16480: ListOffsets change should have an associated
API/IBP version update (#16781)
6e324487fa9 is described below
commit 6e324487fa93c540ef611eb9c9694c464fdecd3c
Author: PoAn Yang <[email protected]>
AuthorDate: Sat Aug 3 14:27:27 2024 +0800
KAFKA-16480: ListOffsets change should have an associated API/IBP version
update (#16781)
1. Use oldestAllowedVersion as 9 if using
ListOffsetsRequest#EARLIEST_LOCAL_TIMESTAMP or
ListOffsetsRequest#LATEST_TIERED_TIMESTAMP.
2. Add test cases to
ListOffsetsRequestTest#testListOffsetsRequestOldestVersion to make sure
requireTieredStorageTimestamp return 9 as minVersion.
3. Add EarliestLocalSpec and LatestTierSpec to OffsetSpec.
4. Add more cases to KafkaAdminClient#getOffsetFromSpec.
5. Add testListOffsetsEarliestLocalSpecMinVersion and
testListOffsetsLatestTierSpecSpecMinVersion to KafkaAdminClientTest to make
sure request builder has oldestAllowedVersion as 9.
Signed-off-by: PoAn Yang <[email protected]>
Reviewers: Luke Chen <[email protected]>
---
.../kafka/clients/admin/KafkaAdminClient.java | 4 ++
.../org/apache/kafka/clients/admin/OffsetSpec.java | 19 ++++++++
.../admin/internals/ListOffsetsHandler.java | 6 ++-
.../clients/consumer/internals/OffsetFetcher.java | 2 +-
.../consumer/internals/OffsetsRequestManager.java | 2 +-
.../kafka/common/requests/ListOffsetsRequest.java | 9 +++-
.../kafka/clients/admin/KafkaAdminClientTest.java | 56 ++++++++++++++++++++++
.../common/requests/ListOffsetsRequestTest.java | 4 +-
.../kafka/common/requests/RequestResponseTest.java | 6 +--
.../kafka/api/AuthorizerIntegrationTest.scala | 2 +-
.../scala/unit/kafka/server/KafkaApisTest.scala | 4 +-
.../unit/kafka/server/ListOffsetsRequestTest.scala | 24 +++++++---
.../scala/unit/kafka/server/LogOffsetTest.scala | 2 +-
.../scala/unit/kafka/server/RequestQuotaTest.scala | 2 +-
.../jmh/common/ListOffsetRequestBenchmark.java | 2 +-
15 files changed, 122 insertions(+), 22 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 640a08a3786..8eb7fb4e8c0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -4860,6 +4860,10 @@ public class KafkaAdminClient extends AdminClient {
return ListOffsetsRequest.EARLIEST_TIMESTAMP;
} else if (offsetSpec instanceof OffsetSpec.MaxTimestampSpec) {
return ListOffsetsRequest.MAX_TIMESTAMP;
+ } else if (offsetSpec instanceof OffsetSpec.EarliestLocalSpec) {
+ return ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP;
+ } else if (offsetSpec instanceof OffsetSpec.LatestTierSpec) {
+ return ListOffsetsRequest.LATEST_TIERED_TIMESTAMP;
}
return ListOffsetsRequest.LATEST_TIMESTAMP;
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java
b/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java
index dcf90452c55..5b2fbb3e2e9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java
@@ -26,6 +26,8 @@ public class OffsetSpec {
public static class EarliestSpec extends OffsetSpec { }
public static class LatestSpec extends OffsetSpec { }
public static class MaxTimestampSpec extends OffsetSpec { }
+ public static class EarliestLocalSpec extends OffsetSpec { }
+ public static class LatestTierSpec extends OffsetSpec { }
public static class TimestampSpec extends OffsetSpec {
private final long timestamp;
@@ -70,4 +72,21 @@ public class OffsetSpec {
return new MaxTimestampSpec();
}
+ /**
+ * Used to retrieve the offset with the local log start offset,
+ * log start offset is the offset of a log above which reads are
guaranteed to be served
+ * from the disk of the leader broker, when Tiered Storage is not enabled,
it behaves the same
+ * as the earliest timestamp
+ */
+ public static OffsetSpec earliestLocalSpec() {
+ return new EarliestLocalSpec();
+ }
+
+ /**
+ * Used to retrieve the offset with the highest offset of data stored in
remote storage,
+ * and when Tiered Storage is not enabled, we won't return any offset
(i.e. Unknown offset)
+ */
+ public static OffsetSpec latestTierSpec() {
+ return new LatestTierSpec();
+ }
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java
index 0bb42ed7696..7dfcb22afba 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java
@@ -93,8 +93,12 @@ public final class ListOffsetsHandler extends
Batched<TopicPartition, ListOffset
.stream()
.anyMatch(key -> offsetTimestampsByPartition.get(key) ==
ListOffsetsRequest.MAX_TIMESTAMP);
+ boolean requireTieredStorageTimestamp = keys
+ .stream()
+ .anyMatch(key -> offsetTimestampsByPartition.get(key) ==
ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP ||
offsetTimestampsByPartition.get(key) ==
ListOffsetsRequest.LATEST_TIERED_TIMESTAMP);
+
return ListOffsetsRequest.Builder
- .forConsumer(true, options.isolationLevel(), supportsMaxTimestamp)
+ .forConsumer(true, options.isolationLevel(), supportsMaxTimestamp,
requireTieredStorageTimestamp)
.setTargetTimes(new ArrayList<>(topicsByName.values()));
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
index f624941c525..ec0bfe2fc13 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
@@ -391,7 +391,7 @@ public class OffsetFetcher {
final
Map<TopicPartition, ListOffsetsPartition> timestampsToSearch,
boolean
requireTimestamp) {
ListOffsetsRequest.Builder builder = ListOffsetsRequest.Builder
- .forConsumer(requireTimestamp, isolationLevel, false)
+ .forConsumer(requireTimestamp, isolationLevel, false, false)
.setTargetTimes(ListOffsetsRequest.toListOffsetsTopics(timestampsToSearch));
log.debug("Sending ListOffsetRequest {} to broker {}", builder, node);
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
index b4f63714128..71da2f5bf60 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
@@ -337,7 +337,7 @@ public class OffsetsRequestManager implements
RequestManager, ClusterResourceLis
boolean requireTimestamps,
List<NetworkClientDelegate.UnsentRequest> unsentRequests) {
ListOffsetsRequest.Builder builder = ListOffsetsRequest.Builder
- .forConsumer(requireTimestamps, isolationLevel, false)
+ .forConsumer(requireTimestamps, isolationLevel, false, false)
.setTargetTimes(ListOffsetsRequest.toListOffsetsTopics(targetTimes));
log.debug("Creating ListOffset request {} for broker {} to reset
positions", builder,
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java
index 56dea5262f7..8ebf0886bec 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java
@@ -62,9 +62,14 @@ public class ListOffsetsRequest extends AbstractRequest {
return new Builder((short) 0, allowedVersion, replicaId,
IsolationLevel.READ_UNCOMMITTED);
}
- public static Builder forConsumer(boolean requireTimestamp,
IsolationLevel isolationLevel, boolean requireMaxTimestamp) {
+ public static Builder forConsumer(boolean requireTimestamp,
+ IsolationLevel isolationLevel,
+ boolean requireMaxTimestamp,
+ boolean
requireTieredStorageTimestamp) {
short minVersion = 0;
- if (requireMaxTimestamp)
+ if (requireTieredStorageTimestamp)
+ minVersion = 9;
+ else if (requireMaxTimestamp)
minVersion = 7;
else if (isolationLevel == IsolationLevel.READ_COMMITTED)
minVersion = 2;
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 34c2722d463..8d70e60fc05 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -5844,6 +5844,62 @@ public class KafkaAdminClientTest {
}
}
+ @Test
+ public void testListOffsetsEarliestLocalSpecMinVersion() throws Exception {
+ Node node = new Node(0, "localhost", 8120);
+ List<Node> nodes = Collections.singletonList(node);
+ List<PartitionInfo> pInfos = new ArrayList<>();
+ pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new
Node[]{node}));
+ final Cluster cluster = new Cluster(
+ "mockClusterId",
+ nodes,
+ pInfos,
+ Collections.emptySet(),
+ Collections.emptySet(),
+ node);
+ final TopicPartition tp0 = new TopicPartition("foo", 0);
+ try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster,
+ AdminClientConfig.RETRIES_CONFIG, "2")) {
+
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(),
Errors.NONE));
+
+ env.adminClient().listOffsets(Collections.singletonMap(tp0,
OffsetSpec.earliestLocalSpec()));
+
+ TestUtils.waitForCondition(() ->
env.kafkaClient().requests().stream().anyMatch(request ->
+ request.requestBuilder().apiKey().messageType ==
ApiMessageType.LIST_OFFSETS && request.requestBuilder().oldestAllowedVersion()
== 9
+ ), "no listOffsets request has the expected oldestAllowedVersion");
+ }
+ }
+
+ @Test
+ public void testListOffsetsLatestTierSpecSpecMinVersion() throws Exception
{
+ Node node = new Node(0, "localhost", 8120);
+ List<Node> nodes = Collections.singletonList(node);
+ List<PartitionInfo> pInfos = new ArrayList<>();
+ pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new
Node[]{node}));
+ final Cluster cluster = new Cluster(
+ "mockClusterId",
+ nodes,
+ pInfos,
+ Collections.emptySet(),
+ Collections.emptySet(),
+ node);
+ final TopicPartition tp0 = new TopicPartition("foo", 0);
+ try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster,
+ AdminClientConfig.RETRIES_CONFIG, "2")) {
+
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(),
Errors.NONE));
+
+ env.adminClient().listOffsets(Collections.singletonMap(tp0,
OffsetSpec.latestTierSpec()));
+
+ TestUtils.waitForCondition(() ->
env.kafkaClient().requests().stream().anyMatch(request ->
+ request.requestBuilder().apiKey().messageType ==
ApiMessageType.LIST_OFFSETS && request.requestBuilder().oldestAllowedVersion()
== 9
+ ), "no listOffsets request has the expected oldestAllowedVersion");
+ }
+ }
+
private Map<String, FeatureUpdate> makeTestFeatureUpdates() {
return Utils.mkMap(
Utils.mkEntry("test_feature_1", new FeatureUpdate((short) 2,
FeatureUpdate.UpgradeType.UPGRADE)),
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/ListOffsetsRequestTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/ListOffsetsRequestTest.java
index d748203cce0..e9a2bec2ffd 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/ListOffsetsRequestTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/ListOffsetsRequestTest.java
@@ -68,7 +68,7 @@ public class ListOffsetsRequestTest {
new ListOffsetsPartition()
.setPartitionIndex(0))));
ListOffsetsRequest request = ListOffsetsRequest.Builder
- .forConsumer(true, IsolationLevel.READ_COMMITTED, false)
+ .forConsumer(true, IsolationLevel.READ_COMMITTED, false,
false)
.setTargetTimes(topics)
.build(version);
ListOffsetsResponse response = (ListOffsetsResponse)
request.getErrorResponse(0, Errors.NOT_LEADER_OR_FOLLOWER.exception());
@@ -101,7 +101,7 @@ public class ListOffsetsRequestTest {
new ListOffsetsPartition()
.setPartitionIndex(0))));
ListOffsetsRequest request = ListOffsetsRequest.Builder
- .forConsumer(true, IsolationLevel.READ_UNCOMMITTED, false)
+ .forConsumer(true, IsolationLevel.READ_UNCOMMITTED, false,
false)
.setTargetTimes(topics)
.build((short) 0);
ListOffsetsResponse response = (ListOffsetsResponse)
request.getErrorResponse(0, Errors.NOT_LEADER_OR_FOLLOWER.exception());
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index ac044a5a536..54e681b468b 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -2347,7 +2347,7 @@ public class RequestResponseTest {
.setMaxNumOffsets(10)
.setCurrentLeaderEpoch(5)));
return ListOffsetsRequest.Builder
- .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false)
+ .forConsumer(false, IsolationLevel.READ_UNCOMMITTED,
false, false)
.setTargetTimes(singletonList(topic))
.build(version);
} else if (version == 1) {
@@ -2358,7 +2358,7 @@ public class RequestResponseTest {
.setTimestamp(1000000L)
.setCurrentLeaderEpoch(5)));
return ListOffsetsRequest.Builder
- .forConsumer(true, IsolationLevel.READ_UNCOMMITTED, false)
+ .forConsumer(true, IsolationLevel.READ_UNCOMMITTED, false,
false)
.setTargetTimes(singletonList(topic))
.build(version);
} else if (version >= 2 && version <= LIST_OFFSETS.latestVersion()) {
@@ -2371,7 +2371,7 @@ public class RequestResponseTest {
.setName("test")
.setPartitions(singletonList(partition));
return ListOffsetsRequest.Builder
- .forConsumer(true, IsolationLevel.READ_COMMITTED, false)
+ .forConsumer(true, IsolationLevel.READ_COMMITTED, false,
false)
.setTargetTimes(singletonList(topic))
.build(version);
} else {
diff --git
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index c00abf9427a..c58caaae8b4 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -287,7 +287,7 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
}
private def createListOffsetsRequest = {
- requests.ListOffsetsRequest.Builder.forConsumer(false,
IsolationLevel.READ_UNCOMMITTED, false).setTargetTimes(
+ requests.ListOffsetsRequest.Builder.forConsumer(false,
IsolationLevel.READ_UNCOMMITTED, false, false).setTargetTimes(
List(new ListOffsetsTopic()
.setName(tp.topic)
.setPartitions(List(new ListOffsetsPartition()
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index e66e345a459..91b45dcc033 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -4000,7 +4000,7 @@ class KafkaApisTest extends Logging {
.setPartitionIndex(tp.partition)
.setTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP)
.setCurrentLeaderEpoch(currentLeaderEpoch.get)).asJava)).asJava
- val listOffsetRequest = ListOffsetsRequest.Builder.forConsumer(true,
isolationLevel, false)
+ val listOffsetRequest = ListOffsetsRequest.Builder.forConsumer(true,
isolationLevel, false, false)
.setTargetTimes(targetTimes).build()
val request = buildRequest(listOffsetRequest)
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
@@ -6151,7 +6151,7 @@ class KafkaApisTest extends Logging {
.setPartitions(List(new ListOffsetsPartition()
.setPartitionIndex(tp.partition)
.setTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP)).asJava)).asJava
- val listOffsetRequest = ListOffsetsRequest.Builder.forConsumer(true,
isolationLevel, false)
+ val listOffsetRequest = ListOffsetsRequest.Builder.forConsumer(true,
isolationLevel, false, false)
.setTargetTimes(targetTimes).build()
val request = buildRequest(listOffsetRequest)
kafkaApis = createKafkaApis()
diff --git a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
index 60b8789e428..f48e3546d89 100644
--- a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
@@ -54,7 +54,7 @@ class ListOffsetsRequestTest extends BaseRequestTest {
.setCurrentLeaderEpoch(0)).asJava)).asJava
val consumerRequest = ListOffsetsRequest.Builder
- .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false)
+ .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false, false)
.setTargetTimes(targetTimes)
.build()
@@ -94,15 +94,27 @@ class ListOffsetsRequestTest extends BaseRequestTest {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
- def testListOffsetsMaxTimeStampOldestVersion(quorum: String): Unit = {
+ def testListOffsetsRequestOldestVersion(): Unit = {
val consumerRequestBuilder = ListOffsetsRequest.Builder
- .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false)
+ .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false, false)
+
+ val requireTimestampRequestBuilder = ListOffsetsRequest.Builder
+ .forConsumer(true, IsolationLevel.READ_UNCOMMITTED, false, false)
+
+ val requestCommittedRequestBuilder = ListOffsetsRequest.Builder
+ .forConsumer(false, IsolationLevel.READ_COMMITTED, false, false)
val maxTimestampRequestBuilder = ListOffsetsRequest.Builder
- .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, true)
+ .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, true, false)
+
+ val requireTieredStorageTimestampRequestBuilder =
ListOffsetsRequest.Builder
+ .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false, true)
assertEquals(0.toShort, consumerRequestBuilder.oldestAllowedVersion())
+ assertEquals(1.toShort,
requireTimestampRequestBuilder.oldestAllowedVersion())
+ assertEquals(2.toShort,
requestCommittedRequestBuilder.oldestAllowedVersion())
assertEquals(7.toShort, maxTimestampRequestBuilder.oldestAllowedVersion())
+ assertEquals(9.toShort,
requireTieredStorageTimestampRequestBuilder.oldestAllowedVersion())
}
def assertResponseErrorForEpoch(error: Errors, brokerId: Int,
currentLeaderEpoch: Optional[Integer]): Unit = {
@@ -115,7 +127,7 @@ class ListOffsetsRequestTest extends BaseRequestTest {
.setName(topic)
.setPartitions(List(listOffsetPartition).asJava)).asJava
val request = ListOffsetsRequest.Builder
- .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false)
+ .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false, false)
.setTargetTimes(targetTimes)
.build()
assertResponseError(error, brokerId, request)
@@ -159,7 +171,7 @@ class ListOffsetsRequestTest extends BaseRequestTest {
.setTimestamp(timestamp)).asJava)).asJava
val builder = ListOffsetsRequest.Builder
- .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false)
+ .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false, false)
.setTargetTimes(targetTimes)
val request = if (version == -1) builder.build() else
builder.build(version)
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index dffecb4d9cb..61fcd6fcf21 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -60,7 +60,7 @@ class LogOffsetTest extends BaseRequestTest {
@ValueSource(strings = Array("zk", "kraft"))
def testGetOffsetsForUnknownTopic(quorum: String): Unit = {
val topicPartition = new TopicPartition("foo", 0)
- val request = ListOffsetsRequest.Builder.forConsumer(false,
IsolationLevel.READ_UNCOMMITTED, false)
+ val request = ListOffsetsRequest.Builder.forConsumer(false,
IsolationLevel.READ_UNCOMMITTED, false, false)
.setTargetTimes(buildTargetTimes(topicPartition,
ListOffsetsRequest.LATEST_TIMESTAMP, 10).asJava).build(0)
val response = sendListOffsetsRequest(request)
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code,
findPartition(response.topics.asScala, topicPartition).errorCode)
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 33c7df2fc52..fbffa4ce37a 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -288,7 +288,7 @@ class RequestQuotaTest extends BaseRequestTest {
.setPartitionIndex(tp.partition)
.setTimestamp(0L)
.setCurrentLeaderEpoch(15)).asJava)
- ListOffsetsRequest.Builder.forConsumer(false,
IsolationLevel.READ_UNCOMMITTED, false)
+ ListOffsetsRequest.Builder.forConsumer(false,
IsolationLevel.READ_UNCOMMITTED, false, false)
.setTargetTimes(List(topic).asJava)
case ApiKeys.LEADER_AND_ISR =>
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/ListOffsetRequestBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/ListOffsetRequestBenchmark.java
index fe791350a5d..786648f904e 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/ListOffsetRequestBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/ListOffsetRequestBenchmark.java
@@ -71,7 +71,7 @@ public class ListOffsetRequestBenchmark {
}
}
- this.offsetRequest = ListOffsetsRequest.Builder.forConsumer(false,
IsolationLevel.READ_UNCOMMITTED, false)
+ this.offsetRequest = ListOffsetsRequest.Builder.forConsumer(false,
IsolationLevel.READ_UNCOMMITTED, false, false)
.build(ApiKeys.LIST_OFFSETS.latestVersion());
}