This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 36d052b6c5c MINOR: Move LogOffsetTest to server module and rewrite in
Java (#22121)
36d052b6c5c is described below
commit 36d052b6c5c3297d448d2c9f981315d519fec5b8
Author: majialong <[email protected]>
AuthorDate: Fri May 15 10:37:07 2026 +0800
MINOR: Move LogOffsetTest to server module and rewrite in Java (#22121)
Move `LogOffsetTest` to server module and rewrite in Java.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../scala/unit/kafka/server/LogOffsetTest.scala | 250 -----------------
.../org/apache/kafka/server/LogOffsetTest.java | 300 +++++++++++++++++++++
2 files changed, 300 insertions(+), 250 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
deleted file mode 100755
index d0e521e68c9..00000000000
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ /dev/null
@@ -1,250 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import kafka.utils.TestUtils
-import
org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition,
ListOffsetsTopic}
-import
org.apache.kafka.common.message.ListOffsetsResponseData.{ListOffsetsPartitionResponse,
ListOffsetsTopicResponse}
-import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.record.internal.FileRecords
-import org.apache.kafka.common.requests.{FetchRequest, FetchResponse,
ListOffsetsRequest, ListOffsetsResponse}
-import org.apache.kafka.common.{IsolationLevel, TopicPartition}
-import org.apache.kafka.storage.internals.log.{LogStartOffsetIncrementReason,
OffsetResultHolder, UnifiedLog}
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{Test, Timeout}
-
-import java.io.File
-import java.util.{Optional, Properties, Random}
-import scala.collection.mutable
-import scala.jdk.CollectionConverters._
-
-@Timeout(300)
-class LogOffsetTest extends BaseRequestTest {
-
- override def brokerCount = 1
-
- protected override def brokerPropertyOverrides(props: Properties): Unit = {
- props.put("log.flush.interval.messages", "1")
- props.put("num.partitions", "20")
- props.put("log.retention.hours", "10")
- props.put("log.retention.check.interval.ms", (5 * 1000 * 60).toString)
- }
-
- @Test
- def testGetOffsetsForUnknownTopic(): Unit = {
- val topicPartition = new TopicPartition("foo", 0)
- val request = ListOffsetsRequest.Builder.forConsumer(false,
IsolationLevel.READ_UNCOMMITTED)
- .setTargetTimes(buildTargetTimes(topicPartition,
ListOffsetsRequest.LATEST_TIMESTAMP).asJava).build(1)
- val response = sendListOffsetsRequest(request)
- assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code,
findPartition(response.topics.asScala, topicPartition).errorCode)
- }
-
- @deprecated("ListOffsetsRequest V0", since = "")
- @Test
- def testGetOffsetsAfterDeleteRecords(): Unit = {
- val topic = "kafka-"
- val topicPartition = new TopicPartition(topic, 0)
- val log = createTopicAndGetLog(topic, topicPartition)
-
- for (_ <- 0 until 20)
- log.appendAsLeader(TestUtils.singletonRecords(value =
Integer.toString(42).getBytes()), 0)
- log.flush(false)
-
- log.updateHighWatermark(log.logEndOffset)
- log.maybeIncrementLogStartOffset(3,
LogStartOffsetIncrementReason.ClientRecordDeletion)
- log.deleteOldSegments()
-
- val offset =
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP,
Optional.empty).timestampAndOffsetOpt.map(_.offset)
- assertEquals(Optional.of(20L), offset)
-
- TestUtils.waitUntilTrue(() => isLeaderLocalOnBroker(topic,
topicPartition.partition, broker),
- "Leader should be elected")
- val request = ListOffsetsRequest.Builder.forReplica(1, 1)
- .setTargetTimes(buildTargetTimes(topicPartition,
ListOffsetsRequest.LATEST_TIMESTAMP).asJava).build()
- val consumerOffset =
findPartition(sendListOffsetsRequest(request).topics.asScala,
topicPartition).offset
- assertEquals(20L, consumerOffset)
- }
-
- @Test
- def testFetchOffsetByTimestampForMaxTimestampAfterTruncate(): Unit = {
- val topic = "kafka-"
- val topicPartition = new TopicPartition(topic, 0)
- val log = createTopicAndGetLog(topic, topicPartition)
-
- for (timestamp <- 0 until 20)
- log.appendAsLeader(TestUtils.singletonRecords(value =
Integer.toString(42).getBytes(), timestamp = timestamp.toLong), 0)
- log.flush(false)
-
- log.updateHighWatermark(log.logEndOffset)
-
- val firstOffset =
log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP,
Optional.empty).timestampAndOffsetOpt
- assertEquals(19L, firstOffset.get.offset)
- assertEquals(19L, firstOffset.get.timestamp)
-
- log.truncateTo(0)
-
- assertEquals(Optional.empty,
log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP,
Optional.empty).timestampAndOffsetOpt)
- }
-
- @Test
- def testFetchOffsetByTimestampForMaxTimestampWithUnorderedTimestamps(): Unit
= {
- val topic = "kafka-"
- val topicPartition = new TopicPartition(topic, 0)
- val log = createTopicAndGetLog(topic, topicPartition)
-
- for (timestamp <- List(0L, 1L, 2L, 3L, 4L, 6L, 5L))
- log.appendAsLeader(TestUtils.singletonRecords(value =
Integer.toString(42).getBytes(), timestamp = timestamp), 0)
- log.flush(false)
-
- log.updateHighWatermark(log.logEndOffset)
-
- val maxTimestampOffset =
log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP,
Optional.empty).timestampAndOffsetOpt
- assertEquals(7L, log.logEndOffset)
- assertEquals(5L, maxTimestampOffset.get.offset)
- assertEquals(6L, maxTimestampOffset.get.timestamp)
- }
-
- @Test
- def testGetOffsetsBeforeLatestTime(): Unit = {
- val topic = "kafka-"
- val topicPartition = new TopicPartition(topic, 0)
- val log = createTopicAndGetLog(topic, topicPartition)
-
- val topicIds = getTopicIds(Seq("kafka-")).asJava
- val topicNames = topicIds.asScala.map(_.swap).asJava
- val topicId = topicIds.get(topic)
-
- for (_ <- 0 until 20)
- log.appendAsLeader(TestUtils.singletonRecords(value =
Integer.toString(42).getBytes()), 0)
- log.flush(false)
-
- val offset =
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP,
Optional.empty).timestampAndOffsetOpt.map(_.offset)
- assertEquals(Optional.of(20L), offset)
-
- TestUtils.waitUntilTrue(() => isLeaderLocalOnBroker(topic, 0, broker),
- "Leader should be elected")
- val request = ListOffsetsRequest.Builder.forReplica(1, 1)
- .setTargetTimes(buildTargetTimes(topicPartition,
ListOffsetsRequest.LATEST_TIMESTAMP).asJava).build()
- val consumerOffset =
findPartition(sendListOffsetsRequest(request).topics.asScala,
topicPartition).offset
- assertEquals(20L, consumerOffset)
-
- // try to fetch using latest offset
- val fetchRequest =
FetchRequest.Builder.forConsumer(ApiKeys.FETCH.latestVersion, 0, 1,
- Map(topicPartition -> new FetchRequest.PartitionData(topicId,
consumerOffset, FetchRequest.INVALID_LOG_START_OFFSET,
- 300 * 1024, Optional.empty())).asJava).build()
- val fetchResponse = sendFetchRequest(fetchRequest)
-
assertFalse(FetchResponse.recordsOrFail(fetchResponse.responseData(topicNames,
ApiKeys.FETCH.latestVersion).get(topicPartition)).batches.iterator.hasNext)
- }
-
- @Test
- def testEmptyLogsGetOffsets(): Unit = {
- val random = new Random
- val topic = "kafka-"
- val topicPartition = new TopicPartition(topic, random.nextInt(10))
- val topicPartitionPath =
s"${TestUtils.tempDir().getAbsolutePath}/$topic-${topicPartition.partition}"
- val topicLogDir = new File(topicPartitionPath)
- topicLogDir.mkdir()
-
- createTopic(topic)
-
- var offsetChanged = false
- for (_ <- 1 to 14) {
- val topicPartition = new TopicPartition(topic, 0)
- val request = ListOffsetsRequest.Builder.forReplica(1, 1)
- .setTargetTimes(buildTargetTimes(topicPartition,
ListOffsetsRequest.EARLIEST_TIMESTAMP).asJava).build()
- val consumerOffset =
findPartition(sendListOffsetsRequest(request).topics.asScala,
topicPartition).offset
- if (consumerOffset == 1)
- offsetChanged = true
- }
- assertFalse(offsetChanged)
- }
-
- @Test
- def testFetchOffsetByTimestampForMaxTimestampWithEmptyLog(): Unit = {
- val topic = "kafka-"
- val topicPartition = new TopicPartition(topic, 0)
- val log = createTopicAndGetLog(topic, topicPartition)
-
- log.updateHighWatermark(log.logEndOffset)
-
- assertEquals(0L, log.logEndOffset)
- assertEquals(new
OffsetResultHolder(Optional.empty[FileRecords.TimestampAndOffset]()),
log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP, Optional.empty))
- }
-
- @Test
- def testGetOffsetsBeforeEarliestTime(): Unit = {
- val random = new Random
- val topic = "kafka-"
- val topicPartition = new TopicPartition(topic, random.nextInt(3))
-
- createTopic(topic, 3)
-
- val logManager = broker.logManager
- val log = logManager.getOrCreateLog(topicPartition, Optional.empty)
- for (_ <- 0 until 20)
- log.appendAsLeader(TestUtils.singletonRecords(value =
Integer.toString(42).getBytes()), 0)
- log.flush(false)
-
- val offset =
log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP,
Optional.empty).timestampAndOffsetOpt.map(_.offset)
- assertEquals(Optional.of(0L), offset)
-
- TestUtils.waitUntilTrue(() => isLeaderLocalOnBroker(topic,
topicPartition.partition, broker),
- "Leader should be elected")
- val request = ListOffsetsRequest.Builder.forReplica(1, 1)
- .setTargetTimes(buildTargetTimes(topicPartition,
ListOffsetsRequest.EARLIEST_TIMESTAMP).asJava).build()
- val offsetFromResponse =
findPartition(sendListOffsetsRequest(request).topics.asScala,
topicPartition).offset
- assertEquals(0L, offsetFromResponse)
- }
-
- private def broker: KafkaBroker = brokers.head
-
- private def sendListOffsetsRequest(request: ListOffsetsRequest):
ListOffsetsResponse = {
- connectAndReceive[ListOffsetsResponse](request)
- }
-
- private def sendFetchRequest(request: FetchRequest): FetchResponse = {
- connectAndReceive[FetchResponse](request)
- }
-
- private def buildTargetTimes(tp: TopicPartition, timestamp: Long):
List[ListOffsetsTopic] = {
- List(new ListOffsetsTopic()
- .setName(tp.topic)
- .setPartitions(List(new ListOffsetsPartition()
- .setPartitionIndex(tp.partition)
- .setTimestamp(timestamp)).asJava)
- )
- }
-
- private def findPartition(topics: mutable.Buffer[ListOffsetsTopicResponse],
tp: TopicPartition): ListOffsetsPartitionResponse = {
- topics.find(_.name == tp.topic).get
- .partitions.asScala.find(_.partitionIndex == tp.partition).get
- }
-
- private def createTopicAndGetLog(topic: String, topicPartition:
TopicPartition): UnifiedLog = {
- createTopic(topic)
-
- val logManager = broker.logManager
- TestUtils.waitUntilTrue(() => logManager.getLog(topicPartition).isPresent,
- "Log for partition [topic,0] should be created")
- logManager.getLog(topicPartition).get
- }
-
- private def isLeaderLocalOnBroker(topic: String, partitionId: Int, broker:
KafkaBroker): Boolean = {
- broker.replicaManager.onlinePartition(new TopicPartition(topic,
partitionId)).exists(_.leaderLogIfLocal.isDefined)
- }
-}
diff --git a/server/src/test/java/org/apache/kafka/server/LogOffsetTest.java
b/server/src/test/java/org/apache/kafka/server/LogOffsetTest.java
new file mode 100644
index 00000000000..0ec1bd42dac
--- /dev/null
+++ b/server/src/test/java/org/apache/kafka/server/LogOffsetTest.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import kafka.server.KafkaBroker;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.compress.Compression;
+import
org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition;
+import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsTopic;
+import
org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse;
+import
org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.internal.FileRecords;
+import org.apache.kafka.common.record.internal.MemoryRecords;
+import org.apache.kafka.common.record.internal.SimpleRecord;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.storage.internals.log.LogStartOffsetIncrementReason;
+import org.apache.kafka.storage.internals.log.OffsetResultHolder;
+import org.apache.kafka.storage.internals.log.UnifiedLog;
+import org.apache.kafka.test.TestUtils;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+
+import static org.apache.kafka.server.TestUtils.awaitLeaderChange;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+@ClusterTestDefaults(
+ types = {Type.KRAFT},
+ serverProperties = {
+ @ClusterConfigProperty(key = "log.flush.interval.messages", value =
"1"),
+ @ClusterConfigProperty(key = "num.partitions", value = "20"),
+ @ClusterConfigProperty(key = "log.retention.hours", value = "10"),
+ @ClusterConfigProperty(key = "log.retention.check.interval.ms", value
= "300000")
+ }
+)
+public class LogOffsetTest {
+
+ private final ClusterInstance clusterInstance;
+
+ LogOffsetTest(ClusterInstance clusterInstance) {
+ this.clusterInstance = clusterInstance;
+ }
+
+ @ClusterTest
+ public void testGetOffsetsForUnknownTopic() throws IOException {
+ TopicPartition topicPartition = new TopicPartition("foo", 0);
+ ListOffsetsRequest request = ListOffsetsRequest.Builder
+ .forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
+ .setTargetTimes(buildTargetTimes(topicPartition,
ListOffsetsRequest.LATEST_TIMESTAMP))
+ .build((short) 1);
+ ListOffsetsResponse response = sendListOffsetsRequest(request);
+ assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code(),
findPartition(response.topics(), topicPartition).errorCode());
+ }
+
+ @ClusterTest
+ public void testGetOffsetsAfterDeleteRecords() throws Exception {
+ String topic = "kafka-";
+ TopicPartition topicPartition = new TopicPartition(topic, 0);
+ UnifiedLog log = createTopicAndGetLog(topic, topicPartition);
+
+ for (int i = 0; i < 20; i++) {
+ log.appendAsLeader(singletonRecords("42".getBytes()), 0);
+ }
+ log.flush(false);
+
+ log.updateHighWatermark(log.logEndOffset());
+ log.maybeIncrementLogStartOffset(3L,
LogStartOffsetIncrementReason.ClientRecordDeletion);
+ log.deleteOldSegments();
+
+ Optional<Long> offset =
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP,
Optional.empty())
+ .timestampAndOffsetOpt().map(t -> t.offset);
+ assertEquals(Optional.of(20L), offset);
+
+ awaitLeaderChange(clusterInstance, topicPartition,
Optional.of(broker().config().brokerId()));
+ ListOffsetsRequest request = ListOffsetsRequest.Builder
+ .forReplica((short) 1, 1)
+ .setTargetTimes(buildTargetTimes(topicPartition,
ListOffsetsRequest.LATEST_TIMESTAMP))
+ .build();
+ long consumerOffset =
findPartition(sendListOffsetsRequest(request).topics(),
topicPartition).offset();
+ assertEquals(20L, consumerOffset);
+ }
+
+ @ClusterTest
+ public void testFetchOffsetByTimestampForMaxTimestampAfterTruncate()
throws Exception {
+ String topic = "kafka-";
+ TopicPartition topicPartition = new TopicPartition(topic, 0);
+ UnifiedLog log = createTopicAndGetLog(topic, topicPartition);
+
+ for (int timestamp = 0; timestamp < 20; timestamp++) {
+ log.appendAsLeader(singletonRecords("42".getBytes(), (long)
timestamp), 0);
+ }
+ log.flush(false);
+
+ log.updateHighWatermark(log.logEndOffset());
+
+ Optional<FileRecords.TimestampAndOffset> firstOffset = log
+ .fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP,
Optional.empty())
+ .timestampAndOffsetOpt();
+ assertEquals(19L, firstOffset.get().offset);
+ assertEquals(19L, firstOffset.get().timestamp);
+
+ log.truncateTo(0);
+
+ assertEquals(Optional.empty(),
+ log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP,
Optional.empty()).timestampAndOffsetOpt());
+ }
+
+ @ClusterTest
+ public void
testFetchOffsetByTimestampForMaxTimestampWithUnorderedTimestamps() throws
Exception {
+ String topic = "kafka-";
+ TopicPartition topicPartition = new TopicPartition(topic, 0);
+ UnifiedLog log = createTopicAndGetLog(topic, topicPartition);
+
+ for (long timestamp : List.of(0L, 1L, 2L, 3L, 4L, 6L, 5L)) {
+ log.appendAsLeader(singletonRecords("42".getBytes(), timestamp),
0);
+ }
+ log.flush(false);
+
+ log.updateHighWatermark(log.logEndOffset());
+
+ Optional<FileRecords.TimestampAndOffset> maxTimestampOffset = log
+ .fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP,
Optional.empty())
+ .timestampAndOffsetOpt();
+ assertEquals(7L, log.logEndOffset());
+ assertEquals(5L, maxTimestampOffset.get().offset);
+ assertEquals(6L, maxTimestampOffset.get().timestamp);
+ }
+
+ @ClusterTest
+ public void testGetOffsetsBeforeLatestTime() throws Exception {
+ String topic = "kafka-";
+ TopicPartition topicPartition = new TopicPartition(topic, 0);
+ UnifiedLog log = createTopicAndGetLog(topic, topicPartition);
+
+ Uuid topicId = getTopicId(topic);
+ Map<Uuid, String> topicNames = Map.of(topicId, topic);
+
+ for (int i = 0; i < 20; i++) {
+ log.appendAsLeader(singletonRecords("42".getBytes()), 0);
+ }
+ log.flush(false);
+
+ Optional<Long> offset =
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP,
Optional.empty())
+ .timestampAndOffsetOpt().map(t -> t.offset);
+ assertEquals(Optional.of(20L), offset);
+
+ awaitLeaderChange(clusterInstance, topicPartition,
Optional.of(broker().config().brokerId()));
+ ListOffsetsRequest request = ListOffsetsRequest.Builder
+ .forReplica((short) 1, 1)
+ .setTargetTimes(buildTargetTimes(topicPartition,
ListOffsetsRequest.LATEST_TIMESTAMP))
+ .build();
+ long consumerOffset =
findPartition(sendListOffsetsRequest(request).topics(),
topicPartition).offset();
+ assertEquals(20L, consumerOffset);
+
+ // try to fetch using latest offset
+ FetchRequest fetchRequest = FetchRequest.Builder.forConsumer(
+ ApiKeys.FETCH.latestVersion(), 0, 1,
+ Map.of(topicPartition, new FetchRequest.PartitionData(topicId,
consumerOffset,
+ FetchRequest.INVALID_LOG_START_OFFSET, 300 * 1024,
Optional.empty()))
+ ).build();
+ FetchResponse fetchResponse = sendFetchRequest(fetchRequest);
+ assertFalse(FetchResponse.recordsOrFail(
+ fetchResponse.responseData(topicNames,
ApiKeys.FETCH.latestVersion()).get(topicPartition))
+ .batches().iterator().hasNext());
+ }
+
+ @ClusterTest
+ public void testEmptyLogsGetOffsets() throws Exception {
+ String topic = "kafka-";
+ TopicPartition topicPartition = new TopicPartition(topic, 0);
+ clusterInstance.createTopic(topic, 1, (short) 1);
+
+ ListOffsetsRequest request = ListOffsetsRequest.Builder
+ .forReplica((short) 1, 1)
+ .setTargetTimes(buildTargetTimes(topicPartition,
ListOffsetsRequest.EARLIEST_TIMESTAMP))
+ .build();
+ long consumerOffset =
findPartition(sendListOffsetsRequest(request).topics(),
topicPartition).offset();
+ assertEquals(0L, consumerOffset);
+ }
+
+ @ClusterTest
+ public void testFetchOffsetByTimestampForMaxTimestampWithEmptyLog() throws
Exception {
+ String topic = "kafka-";
+ TopicPartition topicPartition = new TopicPartition(topic, 0);
+ UnifiedLog log = createTopicAndGetLog(topic, topicPartition);
+
+ log.updateHighWatermark(log.logEndOffset());
+
+ assertEquals(0L, log.logEndOffset());
+ assertEquals(new OffsetResultHolder(),
log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP, Optional.empty()));
+ }
+
+ @ClusterTest
+ public void testGetOffsetsBeforeEarliestTime() throws Exception {
+ Random random = new Random();
+ String topic = "kafka-";
+ TopicPartition topicPartition = new TopicPartition(topic,
random.nextInt(3));
+
+ clusterInstance.createTopic(topic, 3, (short) 1);
+
+ UnifiedLog log = broker().logManager().getOrCreateLog(topicPartition,
Optional.empty());
+ for (int i = 0; i < 20; i++) {
+ log.appendAsLeader(singletonRecords("42".getBytes()), 0);
+ }
+ log.flush(false);
+
+ Optional<Long> offset =
log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP,
Optional.empty())
+ .timestampAndOffsetOpt().map(t -> t.offset);
+ assertEquals(Optional.of(0L), offset);
+
+ awaitLeaderChange(clusterInstance, topicPartition,
Optional.of(broker().config().brokerId()));
+ ListOffsetsRequest request = ListOffsetsRequest.Builder
+ .forReplica((short) 1, 1)
+ .setTargetTimes(buildTargetTimes(topicPartition,
ListOffsetsRequest.EARLIEST_TIMESTAMP))
+ .build();
+ long offsetFromResponse =
findPartition(sendListOffsetsRequest(request).topics(),
topicPartition).offset();
+ assertEquals(0L, offsetFromResponse);
+ }
+
+ private KafkaBroker broker() {
+ return clusterInstance.aliveBrokers().values().iterator().next();
+ }
+
+ private ListOffsetsResponse sendListOffsetsRequest(ListOffsetsRequest
request) throws IOException {
+ return IntegrationTestUtils.connectAndReceive(request,
clusterInstance.brokerBoundPorts().get(0));
+ }
+
+ private FetchResponse sendFetchRequest(FetchRequest request) throws
IOException {
+ return IntegrationTestUtils.connectAndReceive(request,
clusterInstance.brokerBoundPorts().get(0));
+ }
+
+ private List<ListOffsetsTopic> buildTargetTimes(TopicPartition tp, long
timestamp) {
+ return List.of(new ListOffsetsTopic()
+ .setName(tp.topic())
+ .setPartitions(List.of(new ListOffsetsPartition()
+ .setPartitionIndex(tp.partition())
+ .setTimestamp(timestamp))));
+ }
+
+ private ListOffsetsPartitionResponse
findPartition(List<ListOffsetsTopicResponse> topics, TopicPartition tp) {
+ return topics.stream()
+ .filter(t -> t.name().equals(tp.topic())).findFirst().get()
+ .partitions().stream()
+ .filter(p -> p.partitionIndex() ==
tp.partition()).findFirst().get();
+ }
+
+ private UnifiedLog createTopicAndGetLog(String topic, TopicPartition
topicPartition) throws Exception {
+ clusterInstance.createTopic(topic, 1, (short) 1);
+
+ TestUtils.waitForCondition(() ->
broker().logManager().getLog(topicPartition).isPresent(),
+ "Log for partition [topic,0] should be created");
+ return broker().logManager().getLog(topicPartition).get();
+ }
+
+ private Uuid getTopicId(String topic) throws Exception {
+ try (Admin admin = clusterInstance.admin()) {
+ return
admin.describeTopics(List.of(topic)).allTopicNames().get().get(topic).topicId();
+ }
+ }
+
+ private static MemoryRecords singletonRecords(byte[] value) {
+ return MemoryRecords.withRecords(Compression.NONE, new
SimpleRecord(value));
+ }
+
+ private static MemoryRecords singletonRecords(byte[] value, long
timestamp) {
+ return MemoryRecords.withRecords(Compression.NONE, new
SimpleRecord(timestamp, null, value));
+ }
+}