This is an automated email from the ASF dual-hosted git repository. showuon pushed a commit to branch 3.9 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 66485b04c69c12c8c6152053554d078840f34809 Author: PoAn Yang <[email protected]> AuthorDate: Sat Aug 3 14:27:27 2024 +0800 KAFKA-16480: ListOffsets change should have an associated API/IBP version update (#16781) 1. Use oldestAllowedVersion as 9 if using ListOffsetsRequest#EARLIEST_LOCAL_TIMESTAMP or ListOffsetsRequest#LATEST_TIERED_TIMESTAMP. 2. Add test cases to ListOffsetsRequestTest#testListOffsetsRequestOldestVersion to make sure requireTieredStorageTimestamp return 9 as minVersion. 3. Add EarliestLocalSpec and LatestTierSpec to OffsetSpec. 4. Add more cases to KafkaAdminClient#getOffsetFromSpec. 5. Add testListOffsetsEarliestLocalSpecMinVersion and testListOffsetsLatestTierSpecSpecMinVersion to KafkaAdminClientTest to make sure request builder has oldestAllowedVersion as 9. Signed-off-by: PoAn Yang <[email protected]> Reviewers: Luke Chen <[email protected]> --- .../kafka/clients/admin/KafkaAdminClient.java | 4 ++ .../org/apache/kafka/clients/admin/OffsetSpec.java | 19 ++++++++ .../admin/internals/ListOffsetsHandler.java | 6 ++- .../clients/consumer/internals/OffsetFetcher.java | 2 +- .../consumer/internals/OffsetsRequestManager.java | 2 +- .../kafka/common/requests/ListOffsetsRequest.java | 9 +++- .../kafka/clients/admin/KafkaAdminClientTest.java | 56 ++++++++++++++++++++++ .../common/requests/ListOffsetsRequestTest.java | 4 +- .../kafka/common/requests/RequestResponseTest.java | 6 +-- .../kafka/api/AuthorizerIntegrationTest.scala | 2 +- .../scala/unit/kafka/server/KafkaApisTest.scala | 4 +- .../unit/kafka/server/ListOffsetsRequestTest.scala | 24 +++++++--- .../scala/unit/kafka/server/LogOffsetTest.scala | 2 +- .../scala/unit/kafka/server/RequestQuotaTest.scala | 2 +- .../jmh/common/ListOffsetRequestBenchmark.java | 2 +- 15 files changed, 122 insertions(+), 22 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 640a08a3786..8eb7fb4e8c0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -4860,6 +4860,10 @@ public class KafkaAdminClient extends AdminClient { return ListOffsetsRequest.EARLIEST_TIMESTAMP; } else if (offsetSpec instanceof OffsetSpec.MaxTimestampSpec) { return ListOffsetsRequest.MAX_TIMESTAMP; + } else if (offsetSpec instanceof OffsetSpec.EarliestLocalSpec) { + return ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP; + } else if (offsetSpec instanceof OffsetSpec.LatestTierSpec) { + return ListOffsetsRequest.LATEST_TIERED_TIMESTAMP; } return ListOffsetsRequest.LATEST_TIMESTAMP; } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java b/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java index dcf90452c55..5b2fbb3e2e9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java @@ -26,6 +26,8 @@ public class OffsetSpec { public static class EarliestSpec extends OffsetSpec { } public static class LatestSpec extends OffsetSpec { } public static class MaxTimestampSpec extends OffsetSpec { } + public static class EarliestLocalSpec extends OffsetSpec { } + public static class LatestTierSpec extends OffsetSpec { } public static class TimestampSpec extends OffsetSpec { private final long timestamp; @@ -70,4 +72,21 @@ public class OffsetSpec { return new MaxTimestampSpec(); } + /** + * Used to retrieve the offset with the local log start offset, + * log start offset is the offset of a log above which reads are guaranteed to be served + * from the disk of the leader broker, when Tiered Storage is not enabled, it behaves the same + * as the earliest timestamp + */ + public static OffsetSpec earliestLocalSpec() { + return new EarliestLocalSpec(); + } + + /** + * Used to retrieve the offset with the highest offset of data stored in remote storage, + * and when Tiered Storage is not enabled, we won't return any offset (i.e. Unknown offset) + */ + public static OffsetSpec latestTierSpec() { + return new LatestTierSpec(); + } } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java index 0bb42ed7696..7dfcb22afba 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java @@ -93,8 +93,12 @@ public final class ListOffsetsHandler extends Batched<TopicPartition, ListOffset .stream() .anyMatch(key -> offsetTimestampsByPartition.get(key) == ListOffsetsRequest.MAX_TIMESTAMP); + boolean requireTieredStorageTimestamp = keys + .stream() + .anyMatch(key -> offsetTimestampsByPartition.get(key) == ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP || offsetTimestampsByPartition.get(key) == ListOffsetsRequest.LATEST_TIERED_TIMESTAMP); + return ListOffsetsRequest.Builder - .forConsumer(true, options.isolationLevel(), supportsMaxTimestamp) + .forConsumer(true, options.isolationLevel(), supportsMaxTimestamp, requireTieredStorageTimestamp) .setTargetTimes(new ArrayList<>(topicsByName.values())); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java index f624941c525..ec0bfe2fc13 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java @@ -391,7 +391,7 @@ public class OffsetFetcher { final Map<TopicPartition, ListOffsetsPartition> timestampsToSearch, boolean requireTimestamp) { ListOffsetsRequest.Builder builder = ListOffsetsRequest.Builder - .forConsumer(requireTimestamp, isolationLevel, false) + .forConsumer(requireTimestamp, isolationLevel, false, false) .setTargetTimes(ListOffsetsRequest.toListOffsetsTopics(timestampsToSearch)); log.debug("Sending ListOffsetRequest {} to broker {}", builder, node); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java index b4f63714128..71da2f5bf60 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java @@ -337,7 +337,7 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis boolean requireTimestamps, List<NetworkClientDelegate.UnsentRequest> unsentRequests) { ListOffsetsRequest.Builder builder = ListOffsetsRequest.Builder - .forConsumer(requireTimestamps, isolationLevel, false) + .forConsumer(requireTimestamps, isolationLevel, false, false) .setTargetTimes(ListOffsetsRequest.toListOffsetsTopics(targetTimes)); log.debug("Creating ListOffset request {} for broker {} to reset positions", builder, diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java index 56dea5262f7..8ebf0886bec 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java @@ -62,9 +62,14 @@ public class ListOffsetsRequest extends AbstractRequest { return new Builder((short) 0, allowedVersion, replicaId, IsolationLevel.READ_UNCOMMITTED); } - public static Builder forConsumer(boolean requireTimestamp, IsolationLevel isolationLevel, boolean requireMaxTimestamp) { + public static Builder forConsumer(boolean requireTimestamp, + IsolationLevel isolationLevel, + boolean requireMaxTimestamp, + boolean requireTieredStorageTimestamp) { short minVersion = 0; - if (requireMaxTimestamp) + if (requireTieredStorageTimestamp) + minVersion = 9; + else if (requireMaxTimestamp) minVersion = 7; else if (isolationLevel == IsolationLevel.READ_COMMITTED) minVersion = 2; diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 34c2722d463..8d70e60fc05 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -5844,6 +5844,62 @@ public class KafkaAdminClientTest { } } + @Test + public void testListOffsetsEarliestLocalSpecMinVersion() throws Exception { + Node node = new Node(0, "localhost", 8120); + List<Node> nodes = Collections.singletonList(node); + List<PartitionInfo> pInfos = new ArrayList<>(); + pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})); + final Cluster cluster = new Cluster( + "mockClusterId", + nodes, + pInfos, + Collections.emptySet(), + Collections.emptySet(), + node); + final TopicPartition tp0 = new TopicPartition("foo", 0); + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, + AdminClientConfig.RETRIES_CONFIG, "2")) { + + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE)); + + env.adminClient().listOffsets(Collections.singletonMap(tp0, OffsetSpec.earliestLocalSpec())); + + TestUtils.waitForCondition(() -> env.kafkaClient().requests().stream().anyMatch(request -> + request.requestBuilder().apiKey().messageType == ApiMessageType.LIST_OFFSETS && request.requestBuilder().oldestAllowedVersion() == 9 + ), "no listOffsets request has the expected oldestAllowedVersion"); + } + } + + @Test + public void testListOffsetsLatestTierSpecSpecMinVersion() throws Exception { + Node node = new Node(0, "localhost", 8120); + List<Node> nodes = Collections.singletonList(node); + List<PartitionInfo> pInfos = new ArrayList<>(); + pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})); + final Cluster cluster = new Cluster( + "mockClusterId", + nodes, + pInfos, + Collections.emptySet(), + Collections.emptySet(), + node); + final TopicPartition tp0 = new TopicPartition("foo", 0); + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, + AdminClientConfig.RETRIES_CONFIG, "2")) { + + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE)); + + env.adminClient().listOffsets(Collections.singletonMap(tp0, OffsetSpec.latestTierSpec())); + + TestUtils.waitForCondition(() -> env.kafkaClient().requests().stream().anyMatch(request -> + request.requestBuilder().apiKey().messageType == ApiMessageType.LIST_OFFSETS && request.requestBuilder().oldestAllowedVersion() == 9 + ), "no listOffsets request has the expected oldestAllowedVersion"); + } + } + private Map<String, FeatureUpdate> makeTestFeatureUpdates() { return Utils.mkMap( Utils.mkEntry("test_feature_1", new FeatureUpdate((short) 2, FeatureUpdate.UpgradeType.UPGRADE)), diff --git a/clients/src/test/java/org/apache/kafka/common/requests/ListOffsetsRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/ListOffsetsRequestTest.java index d748203cce0..e9a2bec2ffd 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/ListOffsetsRequestTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/ListOffsetsRequestTest.java @@ -68,7 +68,7 @@ public class ListOffsetsRequestTest { new ListOffsetsPartition() .setPartitionIndex(0)))); ListOffsetsRequest request = ListOffsetsRequest.Builder - .forConsumer(true, IsolationLevel.READ_COMMITTED, false) + .forConsumer(true, IsolationLevel.READ_COMMITTED, false, false) .setTargetTimes(topics) .build(version); ListOffsetsResponse response = (ListOffsetsResponse) request.getErrorResponse(0, Errors.NOT_LEADER_OR_FOLLOWER.exception()); @@ -101,7 +101,7 @@ public class ListOffsetsRequestTest { new ListOffsetsPartition() .setPartitionIndex(0)))); ListOffsetsRequest request = ListOffsetsRequest.Builder - .forConsumer(true, IsolationLevel.READ_UNCOMMITTED, false) + .forConsumer(true, IsolationLevel.READ_UNCOMMITTED, false, false) .setTargetTimes(topics) .build((short) 0); ListOffsetsResponse response = (ListOffsetsResponse) request.getErrorResponse(0, Errors.NOT_LEADER_OR_FOLLOWER.exception()); diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index ac044a5a536..54e681b468b 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -2347,7 +2347,7 @@ public class RequestResponseTest { .setMaxNumOffsets(10) .setCurrentLeaderEpoch(5))); return ListOffsetsRequest.Builder - .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false) + .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false, false) .setTargetTimes(singletonList(topic)) .build(version); } else if (version == 1) { @@ -2358,7 +2358,7 @@ public class RequestResponseTest { .setTimestamp(1000000L) .setCurrentLeaderEpoch(5))); return ListOffsetsRequest.Builder - .forConsumer(true, IsolationLevel.READ_UNCOMMITTED, false) + .forConsumer(true, IsolationLevel.READ_UNCOMMITTED, false, false) .setTargetTimes(singletonList(topic)) .build(version); } else if (version >= 2 && version <= LIST_OFFSETS.latestVersion()) { @@ -2371,7 +2371,7 @@ public class RequestResponseTest { .setName("test") .setPartitions(singletonList(partition)); return ListOffsetsRequest.Builder - .forConsumer(true, IsolationLevel.READ_COMMITTED, false) + .forConsumer(true, IsolationLevel.READ_COMMITTED, false, false) .setTargetTimes(singletonList(topic)) .build(version); } else { diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index c00abf9427a..c58caaae8b4 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -287,7 +287,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { } private def createListOffsetsRequest = { - requests.ListOffsetsRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false).setTargetTimes( + requests.ListOffsetsRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false, false).setTargetTimes( List(new ListOffsetsTopic() .setName(tp.topic) .setPartitions(List(new ListOffsetsPartition() diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index e66e345a459..91b45dcc033 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -4000,7 +4000,7 @@ class KafkaApisTest extends Logging { .setPartitionIndex(tp.partition) .setTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP) .setCurrentLeaderEpoch(currentLeaderEpoch.get)).asJava)).asJava - val listOffsetRequest = ListOffsetsRequest.Builder.forConsumer(true, isolationLevel, false) + val listOffsetRequest = ListOffsetsRequest.Builder.forConsumer(true, isolationLevel, false, false) .setTargetTimes(targetTimes).build() val request = buildRequest(listOffsetRequest) when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), @@ -6151,7 +6151,7 @@ class KafkaApisTest extends Logging { .setPartitions(List(new ListOffsetsPartition() .setPartitionIndex(tp.partition) .setTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP)).asJava)).asJava - val listOffsetRequest = ListOffsetsRequest.Builder.forConsumer(true, isolationLevel, false) + val listOffsetRequest = ListOffsetsRequest.Builder.forConsumer(true, isolationLevel, false, false) .setTargetTimes(targetTimes).build() val request = buildRequest(listOffsetRequest) kafkaApis = createKafkaApis() diff --git a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala index 60b8789e428..f48e3546d89 100644 --- a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala @@ -54,7 +54,7 @@ class ListOffsetsRequestTest extends BaseRequestTest { .setCurrentLeaderEpoch(0)).asJava)).asJava val consumerRequest = ListOffsetsRequest.Builder - .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false) + .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false, false) .setTargetTimes(targetTimes) .build() @@ -94,15 +94,27 @@ class ListOffsetsRequestTest extends BaseRequestTest { @ParameterizedTest @ValueSource(strings = Array("zk", "kraft")) - def testListOffsetsMaxTimeStampOldestVersion(quorum: String): Unit = { + def testListOffsetsRequestOldestVersion(): Unit = { val consumerRequestBuilder = ListOffsetsRequest.Builder - .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false) + .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false, false) + + val requireTimestampRequestBuilder = ListOffsetsRequest.Builder + .forConsumer(true, IsolationLevel.READ_UNCOMMITTED, false, false) + + val requestCommittedRequestBuilder = ListOffsetsRequest.Builder + .forConsumer(false, IsolationLevel.READ_COMMITTED, false, false) val maxTimestampRequestBuilder = ListOffsetsRequest.Builder - .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, true) + .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, true, false) + + val requireTieredStorageTimestampRequestBuilder = ListOffsetsRequest.Builder + .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false, true) assertEquals(0.toShort, consumerRequestBuilder.oldestAllowedVersion()) + assertEquals(1.toShort, requireTimestampRequestBuilder.oldestAllowedVersion()) + assertEquals(2.toShort, requestCommittedRequestBuilder.oldestAllowedVersion()) assertEquals(7.toShort, maxTimestampRequestBuilder.oldestAllowedVersion()) + assertEquals(9.toShort, requireTieredStorageTimestampRequestBuilder.oldestAllowedVersion()) } def assertResponseErrorForEpoch(error: Errors, brokerId: Int, currentLeaderEpoch: Optional[Integer]): Unit = { @@ -115,7 +127,7 @@ class ListOffsetsRequestTest extends BaseRequestTest { .setName(topic) .setPartitions(List(listOffsetPartition).asJava)).asJava val request = ListOffsetsRequest.Builder - .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false) + .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false, false) .setTargetTimes(targetTimes) .build() assertResponseError(error, brokerId, request) @@ -159,7 +171,7 @@ class ListOffsetsRequestTest extends BaseRequestTest { .setTimestamp(timestamp)).asJava)).asJava val builder = ListOffsetsRequest.Builder - .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false) + .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false, false) .setTargetTimes(targetTimes) val request = if (version == -1) builder.build() else builder.build(version) diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala index dffecb4d9cb..61fcd6fcf21 100755 --- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala @@ -60,7 +60,7 @@ class LogOffsetTest extends BaseRequestTest { @ValueSource(strings = Array("zk", "kraft")) def testGetOffsetsForUnknownTopic(quorum: String): Unit = { val topicPartition = new TopicPartition("foo", 0) - val request = ListOffsetsRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false) + val request = ListOffsetsRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false, false) .setTargetTimes(buildTargetTimes(topicPartition, ListOffsetsRequest.LATEST_TIMESTAMP, 10).asJava).build(0) val response = sendListOffsetsRequest(request) assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code, findPartition(response.topics.asScala, topicPartition).errorCode) diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index 33c7df2fc52..fbffa4ce37a 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -288,7 +288,7 @@ class RequestQuotaTest extends BaseRequestTest { .setPartitionIndex(tp.partition) .setTimestamp(0L) .setCurrentLeaderEpoch(15)).asJava) - ListOffsetsRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false) + ListOffsetsRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false, false) .setTargetTimes(List(topic).asJava) case ApiKeys.LEADER_AND_ISR => diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/ListOffsetRequestBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/ListOffsetRequestBenchmark.java index fe791350a5d..786648f904e 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/ListOffsetRequestBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/ListOffsetRequestBenchmark.java @@ -71,7 +71,7 @@ public class ListOffsetRequestBenchmark { } } - this.offsetRequest = ListOffsetsRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false) + this.offsetRequest = ListOffsetsRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false, false) .build(ApiKeys.LIST_OFFSETS.latestVersion()); }
