This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 35a747452c [ISSUE #9802] Fix the issue of delete logic in tiered
storage index service (#9803)
35a747452c is described below
commit 35a747452c3a5a5d1104efde05f7457f8b79b5e3
Author: lizhimins <[email protected]>
AuthorDate: Mon Nov 10 10:29:53 2025 +0800
[ISSUE #9802] Fix the issue of delete logic in tiered storage index service
(#9803)
---
.../apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java | 4 ++++
.../org/apache/rocketmq/tieredstore/index/IndexStoreService.java | 8 ++++----
2 files changed, 8 insertions(+), 4 deletions(-)
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
index 273ad91963..f0e8b3ab50 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
@@ -442,6 +442,10 @@ public class MessageStoreFetcherImpl implements
MessageStoreFetcher {
if (flatFile == null) {
continue;
}
+ if (indexItem.getOffset() < flatFile.getCommitLogMinOffset() ||
+ indexItem.getOffset() > flatFile.getCommitLogMaxOffset()) {
+ continue;
+ }
CompletableFuture<SelectMappedBufferResult> getMessageFuture =
flatFile
.getCommitLogAsync(indexItem.getOffset(),
indexItem.getSize())
.thenApply(messageBuffer -> new SelectMappedBufferResult(
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
index 00566d6872..2385628ed4 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
@@ -28,7 +28,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
@@ -238,7 +237,7 @@ public class IndexStoreService extends ServiceThread
implements IndexService {
ConcurrentNavigableMap<Long, IndexFile> pendingMap =
this.timeStoreTable.subMap(beginTime, true, endTime, true);
List<CompletableFuture<Void>> futureList = new
ArrayList<>(pendingMap.size());
- ConcurrentHashMap<String /* queueId-offset */, IndexItem> result =
new ConcurrentHashMap<>();
+ ConcurrentSkipListMap<String /* queueId-offset */, IndexItem>
result = new ConcurrentSkipListMap<>();
for (Map.Entry<Long, IndexFile> entry :
pendingMap.descendingMap().entrySet()) {
CompletableFuture<Void> completableFuture = entry.getValue()
@@ -246,7 +245,7 @@ public class IndexStoreService extends ServiceThread
implements IndexService {
.thenAccept(itemList -> itemList.forEach(indexItem -> {
if (result.size() < maxCount) {
result.put(String.format(
- "%d-%d", indexItem.getQueueId(),
indexItem.getOffset()), indexItem);
+ "%d-%20d", indexItem.getQueueId(),
indexItem.getOffset()), indexItem);
}
}));
futureList.add(completableFuture);
@@ -349,7 +348,8 @@ public class IndexStoreService extends ServiceThread
implements IndexService {
flatAppendFile.destroyExpiredFile(expireTimestamp);
timeStoreTable.entrySet().removeIf(entry ->
IndexFile.IndexStatusEnum.UPLOAD.equals(entry.getValue().getFileStatus()) &&
- entry.getKey() < flatAppendFile.getMinTimestamp());
+ (flatAppendFile.getFileSegmentList().isEmpty() ||
+ entry.getKey() < flatAppendFile.getMinTimestamp()));
int tableSize = (int) timeStoreTable.entrySet().stream()
.filter(entry ->
IndexFile.IndexStatusEnum.UPLOAD.equals(entry.getValue().getFileStatus()))
.count();