This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 4eead1364d [ISSUE #9912] Reduce excessive requests for consumer offset 
timestamps in tiered storage (#9991)
4eead1364d is described below

commit 4eead1364d074f40cbedacdc8051f5b4bce4072e
Author: lizhimins <[email protected]>
AuthorDate: Tue Jan 13 09:56:22 2026 +0800

    [ISSUE #9912] Reduce excessive requests for consumer offset timestamps in 
tiered storage (#9991)
    
    * [ISSUE #9912] Reduce excessive requests for consumer offset timestamps in 
tiered storage
    
    Signed-off-by: terrance.lzm <[email protected]>
    
    * [ISSUE #9912] Reduce excessive requests for consumer offset timestamps in 
tiered storage
    
    ---------
    
    Signed-off-by: terrance.lzm <[email protected]>
---
 .../rocketmq/tieredstore/TieredMessageStore.java   |  8 +---
 .../tieredstore/core/MessageStoreFetcherImpl.java  | 45 ++++++++++++++++------
 .../tieredstore/TieredMessageStoreTest.java        |  4 +-
 3 files changed, 37 insertions(+), 20 deletions(-)

diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
index b30f868d19..38946fd161 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
@@ -362,8 +362,7 @@ public class TieredMessageStore extends 
AbstractPluginMessageStore {
     }
 
     @Override
-    public CompletableFuture<Long> getMessageStoreTimeStampAsync(String topic, 
int queueId,
-        long consumeQueueOffset) {
+    public CompletableFuture<Long> getMessageStoreTimeStampAsync(String topic, 
int queueId, long consumeQueueOffset) {
         if (fetchFromCurrentStore(topic, queueId, consumeQueueOffset)) {
             Stopwatch stopwatch = Stopwatch.createStarted();
             return fetcher.getMessageStoreTimeStampAsync(topic, queueId, 
consumeQueueOffset)
@@ -374,11 +373,6 @@ public class TieredMessageStore extends 
AbstractPluginMessageStore {
                         .put(TieredStoreMetricsConstant.LABEL_TOPIC, topic)
                         .build();
                     
TieredStoreMetricsManager.apiLatency.record(stopwatch.elapsed(TimeUnit.MILLISECONDS),
 latencyAttributes);
-                    if (time == -1) {
-                        log.debug("GetEarliestMessageTimeAsync failed, try to 
get message time from next store, topic: {}, queue: {}, queue offset: {}",
-                            topic, queueId, consumeQueueOffset);
-                        return next.getMessageStoreTimeStamp(topic, queueId, 
consumeQueueOffset);
-                    }
                     return time;
                 });
         }
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
index f0e8b3ab50..f669f8940a 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.common.BoundaryType;
+import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.store.GetMessageResult;
 import org.apache.rocketmq.store.GetMessageStatus;
@@ -52,6 +53,7 @@ public class MessageStoreFetcherImpl implements 
MessageStoreFetcher {
     private static final Logger log = 
LoggerFactory.getLogger(MessageStoreUtil.TIERED_STORE_LOGGER_NAME);
 
     protected static final String CACHE_KEY_FORMAT = "%s@%d@%d";
+    protected static final String FETCHER_GROUP_NAME = 
MixAll.CID_RMQ_SYS_PREFIX + "FETCHER_TIMESTAMP";
 
     private final String brokerName;
     private final MetadataStore metadataStore;
@@ -389,18 +391,37 @@ public class MessageStoreFetcherImpl implements 
MessageStoreFetcher {
             return CompletableFuture.completedFuture(-1L);
         }
 
-        return flatFile.getConsumeQueueAsync(queueOffset)
-            .thenComposeAsync(cqItem -> {
-                long commitLogOffset = 
MessageFormatUtil.getCommitLogOffsetFromItem(cqItem);
-                int size = MessageFormatUtil.getSizeFromItem(cqItem);
-                return flatFile.getCommitLogAsync(commitLogOffset, size);
-            }, messageStore.getStoreExecutor().bufferFetchExecutor)
-            .thenApply(MessageFormatUtil::getStoreTimeStamp)
-            .exceptionally(e -> {
-                
log.error("MessageStoreFetcherImpl#getMessageStoreTimeStampAsync: " +
-                    "get or decode message failed, topic={}, queue={}, 
offset={}", topic, queueId, queueOffset, e);
-                return -1L;
-            });
+        // The Metrics thread frequently retrieves the storage timestamp of 
the latest message;
+        // as an alternative, return the queue's saved timestamp here.
+        if (queueOffset + 1L == flatFile.getConsumeQueueCommitOffset()) {
+            long timestamp = flatFile.getMaxStoreTimestamp();
+            return CompletableFuture.completedFuture(timestamp == 
Long.MAX_VALUE ? -1L : timestamp);
+        }
+
+        CompletableFuture<Long> future = new CompletableFuture<>();
+        try {
+            this.getMessageAsync(FETCHER_GROUP_NAME, topic, queueId, 
queueOffset, 1, null)
+                .whenComplete((result, e) -> {
+                    if (e != null) {
+                        
log.error("MessageStoreFetcherImpl#getMessageStoreTimeStampAsync: " +
+                            "Get or decode message failed, topic={}, queue={}, 
offset={}", topic, queueId, queueOffset, e);
+                        future.completeExceptionally(e);
+                        return;
+                    }
+                    if (result != null && result.getMessageBufferList() != null
+                        && !result.getMessageBufferList().isEmpty()) {
+                        long timestamp = 
MessageFormatUtil.getStoreTimeStamp(result.getMessageBufferList().get(0));
+                        
log.info("MessageStoreFetcherImpl#getMessageStoreTimeStampAsync: " +
+                            "topic={}, queue={}, offset={}, timestamp={}", 
topic, queueId, queueOffset, timestamp);
+                        future.complete(timestamp);
+                    } else {
+                        future.complete(-1L);
+                    }
+                });
+        } catch (Throwable t) {
+            future.completeExceptionally(t);
+        }
+        return future;
     }
 
     @Override
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
index 1a0240681c..f88779f09b 100644
--- 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
@@ -268,8 +268,10 @@ public class TieredMessageStoreTest {
         configuration.update(properties);
         Assert.assertEquals(1, (long) 
currentStore.getMessageStoreTimeStampAsync(mq.getTopic(), mq.getQueueId(), 
0).join());
 
+        // If data cannot be fetched from tiered storage,
+        // there is no need to fallback to local storage.
         Mockito.when(fetcher.getMessageStoreTimeStampAsync(anyString(), 
anyInt(), anyLong())).thenReturn(CompletableFuture.completedFuture(-1L));
-        Assert.assertEquals(3, (long) 
currentStore.getMessageStoreTimeStampAsync(mq.getTopic(), mq.getQueueId(), 
0).join());
+        Assert.assertEquals(-1L, (long) 
currentStore.getMessageStoreTimeStampAsync(mq.getTopic(), mq.getQueueId(), 
0).join());
     }
 
     @Test

Reply via email to