This is an automated email from the ASF dual-hosted git repository.

lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new e5d33722d9 [ISSUE #10223] Not query the index of system topics in 
tiered storage (#10224)
e5d33722d9 is described below

commit e5d33722d9ccd5992951f761264af02a805ef67c
Author: lizhimins <[email protected]>
AuthorDate: Mon Mar 30 13:34:53 2026 +0800

    [ISSUE #10223] Not query the index of system topics in tiered storage 
(#10224)
---
 .../main/java/org/apache/rocketmq/store/DefaultMessageStore.java | 3 ++-
 .../rocketmq/tieredstore/core/MessageStoreFetcherImpl.java       | 6 ++++++
 .../rocketmq/tieredstore/core/MessageStoreFetcherImplTest.java   | 9 +++++++++
 3 files changed, 17 insertions(+), 1 deletion(-)

diff --git 
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 4409bb599b..aee767dae2 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -1501,7 +1501,8 @@ public class DefaultMessageStore implements MessageStore {
         return queryMessageResult;
     }
 
-    @Override public CompletableFuture<QueryMessageResult> 
queryMessageAsync(String topic, String key,
+    @Override
+    public CompletableFuture<QueryMessageResult> queryMessageAsync(String 
topic, String key,
         int maxNum, long begin, long end) {
         return CompletableFuture.completedFuture(queryMessage(topic, key, 
maxNum, begin, end));
     }
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
index f669f8940a..2a5dc2dd8a 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
@@ -61,6 +61,7 @@ public class MessageStoreFetcherImpl implements 
MessageStoreFetcher {
     private final TieredMessageStore messageStore;
     private final IndexService indexService;
     private final FlatFileStore flatFileStore;
+    private final MessageStoreFilter topicFilter;
     private final long memoryMaxSize;
     private final Cache<String /* topic@queueId@offset */, SelectBufferResult> 
fetcherCache;
 
@@ -78,6 +79,7 @@ public class MessageStoreFetcherImpl implements 
MessageStoreFetcher {
         this.messageStore = messageStore;
         this.indexService = indexService;
         this.metadataStore = flatFileStore.getMetadataStore();
+        this.topicFilter = messageStore.getTopicFilter();
         this.memoryMaxSize =
             (long) (Runtime.getRuntime().maxMemory() * 
storeConfig.getReadAheadCacheSizeThresholdRate());
         this.fetcherCache = this.initCache(storeConfig);
@@ -437,6 +439,10 @@ public class MessageStoreFetcherImpl implements 
MessageStoreFetcher {
     public CompletableFuture<QueryMessageResult> queryMessageAsync(
         String topic, String key, int maxCount, long begin, long end) {
 
+        if (topicFilter.filterTopic(topic)) {
+            return CompletableFuture.completedFuture(new QueryMessageResult());
+        }
+
         long topicId;
         try {
             TopicMetadata topicMetadata = metadataStore.getTopic(topic);
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImplTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImplTest.java
index fdcdec066f..fd681f27b7 100644
--- 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImplTest.java
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImplTest.java
@@ -21,8 +21,10 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.time.Duration;
 import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.rocketmq.common.BoundaryType;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.topic.TopicValidator;
 import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.store.DefaultMessageFilter;
 import org.apache.rocketmq.store.GetMessageResult;
@@ -81,6 +83,9 @@ public class MessageStoreFetcherImplTest {
             groupName, mq.getTopic(), 0, 0, 32, null).join();
         Assert.assertEquals(GetMessageStatus.NO_MATCHED_LOGIC_QUEUE, 
getMessageResult.getStatus());
 
+        FieldUtils.writeField(fetcher,
+            "topicFilter", new MessageStoreTopicFilter(storeConfig), true);
+
         getMessageResult = fetcher.getMessageAsync(
             groupName, mq.getTopic(), mq.getQueueId(), 0, 32, null).join();
         Assert.assertEquals(GetMessageStatus.OFFSET_TOO_SMALL, 
getMessageResult.getStatus());
@@ -325,5 +330,9 @@ public class MessageStoreFetcherImplTest {
         queryMessageResult = fetcher.queryMessageAsync(
             mq.getTopic(), "uk", 120, 0L, System.currentTimeMillis()).join();
         Assert.assertEquals(100, 
queryMessageResult.getMessageBufferList().size());
+
+        queryMessageResult = 
fetcher.queryMessageAsync(TopicValidator.SYSTEM_TOPIC_PREFIX + mq.getTopic(),
+            "uk", 120, 0L, System.currentTimeMillis()).join();
+        Assert.assertEquals(0, 
queryMessageResult.getMessageBufferList().size());
     }
 }

Reply via email to