This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 729275c0c7 [ISSUE #9271] Enhance tiered storage
getQueueOffsetByTimeAsync (#9272)
729275c0c7 is described below
commit 729275c0c775a722d2f04fea4b81e251c18bb323
Author: bxfjb <[email protected]>
AuthorDate: Thu Mar 27 13:57:47 2025 +0800
[ISSUE #9271] Enhance tiered storage getQueueOffsetByTimeAsync (#9272)
---
.../rocketmq/tieredstore/file/FlatMessageFile.java | 22 ++++++++++++--
.../tieredstore/file/FlatMessageFileTest.java | 34 ++++++++++++++++++----
2 files changed, 48 insertions(+), 8 deletions(-)
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
index ade37149d6..2519f91ebe 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
@@ -38,6 +38,7 @@ import org.apache.rocketmq.tieredstore.common.AppendResult;
import org.apache.rocketmq.tieredstore.metadata.MetadataStore;
import org.apache.rocketmq.tieredstore.metadata.entity.QueueMetadata;
import org.apache.rocketmq.tieredstore.metadata.entity.TopicMetadata;
+import org.apache.rocketmq.tieredstore.provider.FileSegment;
import org.apache.rocketmq.tieredstore.util.MessageFormatUtil;
import org.apache.rocketmq.tieredstore.util.MessageStoreUtil;
import org.slf4j.Logger;
@@ -302,9 +303,26 @@ public class FlatMessageFile implements FlatFileInterface {
return CompletableFuture.completedFuture(cqMin);
}
+ // get correct consume queue file by binary search
+ List<FileSegment> consumeQueueFileList =
this.consumeQueue.getFileSegmentList();
+ int low = 0, high = consumeQueueFileList.size() - 1;
+ int mid = low + (high - low) / 2;
+ while (low <= high) {
+ FileSegment fileSegment = consumeQueueFileList.get(mid);
+ if (fileSegment.getMinTimestamp() <= timestamp && timestamp <=
fileSegment.getMaxTimestamp()) {
+ break;
+ } else if (timestamp < fileSegment.getMinTimestamp()) {
+ high = mid - 1;
+ } else {
+ low = mid + 1;
+ }
+ mid = low + (high - low) / 2;
+ }
+ FileSegment target = consumeQueueFileList.get(mid);
+
// binary search lower bound index in a sorted array
- long minOffset = cqMin;
- long maxOffset = cqMax;
+ long minOffset = target.getBaseOffset() /
MessageFormatUtil.CONSUME_QUEUE_UNIT_SIZE;
+ long maxOffset = target.getCommitOffset() /
MessageFormatUtil.CONSUME_QUEUE_UNIT_SIZE - 1;
List<String> queryLog = new ArrayList<>();
while (minOffset < maxOffset) {
long middle = minOffset + (maxOffset - minOffset) / 2;
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatMessageFileTest.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatMessageFileTest.java
index 8208d27741..97768d0658 100644
---
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatMessageFileTest.java
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatMessageFileTest.java
@@ -177,13 +177,35 @@ public class FlatMessageFileTest {
// append message to consume queue
flatFile.consumeQueue.initOffset(50 * ConsumeQueue.CQ_STORE_UNIT_SIZE);
- for (int i = 0; i < 5; i++) {
- AppendResult appendResult = flatFile.appendConsumeQueue(new
DispatchRequest(
- mq.getTopic(), mq.getQueueId(), MessageFormatUtilTest.MSG_LEN
* i,
- MessageFormatUtilTest.MSG_LEN, 0, timestamp1, 50 + i,
+ AppendResult appendResult = flatFile.appendConsumeQueue(new
DispatchRequest(
+ mq.getTopic(), mq.getQueueId(), 0,
+ MessageFormatUtilTest.MSG_LEN, 0, timestamp1, 50,
"", "", 0, 0, null));
- Assert.assertEquals(AppendResult.SUCCESS, appendResult);
- }
+ Assert.assertEquals(AppendResult.SUCCESS, appendResult);
+
+ appendResult = flatFile.appendConsumeQueue(new DispatchRequest(
+ mq.getTopic(), mq.getQueueId(), MessageFormatUtilTest.MSG_LEN,
+ MessageFormatUtilTest.MSG_LEN, 0, timestamp2, 51,
+ "", "", 0, 0, null));
+ Assert.assertEquals(AppendResult.SUCCESS, appendResult);
+
+ appendResult = flatFile.appendConsumeQueue(new DispatchRequest(
+ mq.getTopic(), mq.getQueueId(), MessageFormatUtilTest.MSG_LEN
* 2,
+ MessageFormatUtilTest.MSG_LEN, 0, timestamp2, 52,
+ "", "", 0, 0, null));
+ Assert.assertEquals(AppendResult.SUCCESS, appendResult);
+
+ appendResult = flatFile.appendConsumeQueue(new DispatchRequest(
+ mq.getTopic(), mq.getQueueId(), MessageFormatUtilTest.MSG_LEN
* 3,
+ MessageFormatUtilTest.MSG_LEN, 0, timestamp2, 53,
+ "", "", 0, 0, null));
+ Assert.assertEquals(AppendResult.SUCCESS, appendResult);
+
+ appendResult = flatFile.appendConsumeQueue(new DispatchRequest(
+ mq.getTopic(), mq.getQueueId(), MessageFormatUtilTest.MSG_LEN
* 4,
+ MessageFormatUtilTest.MSG_LEN, 0, timestamp3, 54,
+ "", "", 0, 0, null));
+ Assert.assertEquals(AppendResult.SUCCESS, appendResult);
// commit message will increase max consume queue offset
Assert.assertTrue(flatFile.commitAsync().join());