This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 35a747452c [ISSUE #9802] Fix the issue of delete logic in tiered 
storage index service (#9803)
35a747452c is described below

commit 35a747452c3a5a5d1104efde05f7457f8b79b5e3
Author: lizhimins <[email protected]>
AuthorDate: Mon Nov 10 10:29:53 2025 +0800

    [ISSUE #9802] Fix the issue of delete logic in tiered storage index service 
(#9803)
---
 .../apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java | 4 ++++
 .../org/apache/rocketmq/tieredstore/index/IndexStoreService.java  | 8 ++++----
 2 files changed, 8 insertions(+), 4 deletions(-)

diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
index 273ad91963..f0e8b3ab50 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
@@ -442,6 +442,10 @@ public class MessageStoreFetcherImpl implements 
MessageStoreFetcher {
                 if (flatFile == null) {
                     continue;
                 }
+                if (indexItem.getOffset() < flatFile.getCommitLogMinOffset() ||
+                    indexItem.getOffset() > flatFile.getCommitLogMaxOffset()) {
+                    continue;
+                }
                 CompletableFuture<SelectMappedBufferResult> getMessageFuture = 
flatFile
                     .getCommitLogAsync(indexItem.getOffset(), 
indexItem.getSize())
                     .thenApply(messageBuffer -> new SelectMappedBufferResult(
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
index 00566d6872..2385628ed4 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
@@ -28,7 +28,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.TimeUnit;
@@ -238,7 +237,7 @@ public class IndexStoreService extends ServiceThread 
implements IndexService {
             ConcurrentNavigableMap<Long, IndexFile> pendingMap =
                 this.timeStoreTable.subMap(beginTime, true, endTime, true);
             List<CompletableFuture<Void>> futureList = new 
ArrayList<>(pendingMap.size());
-            ConcurrentHashMap<String /* queueId-offset */, IndexItem> result = 
new ConcurrentHashMap<>();
+            ConcurrentSkipListMap<String /* queueId-offset */, IndexItem> 
result = new ConcurrentSkipListMap<>();
 
             for (Map.Entry<Long, IndexFile> entry : 
pendingMap.descendingMap().entrySet()) {
                 CompletableFuture<Void> completableFuture = entry.getValue()
@@ -246,7 +245,7 @@ public class IndexStoreService extends ServiceThread 
implements IndexService {
                     .thenAccept(itemList -> itemList.forEach(indexItem -> {
                         if (result.size() < maxCount) {
                             result.put(String.format(
-                                "%d-%d", indexItem.getQueueId(), 
indexItem.getOffset()), indexItem);
+                                "%d-%20d", indexItem.getQueueId(), 
indexItem.getOffset()), indexItem);
                         }
                     }));
                 futureList.add(completableFuture);
@@ -349,7 +348,8 @@ public class IndexStoreService extends ServiceThread 
implements IndexService {
             flatAppendFile.destroyExpiredFile(expireTimestamp);
             timeStoreTable.entrySet().removeIf(entry ->
                 
IndexFile.IndexStatusEnum.UPLOAD.equals(entry.getValue().getFileStatus()) &&
-                    entry.getKey() < flatAppendFile.getMinTimestamp());
+                    (flatAppendFile.getFileSegmentList().isEmpty() ||
+                        entry.getKey() < flatAppendFile.getMinTimestamp()));
             int tableSize = (int) timeStoreTable.entrySet().stream()
                 .filter(entry -> 
IndexFile.IndexStatusEnum.UPLOAD.equals(entry.getValue().getFileStatus()))
                 .count();

Reply via email to