This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 4eead1364d [ISSUE #9912] Reduce excessive requests for consumer offset
timestamps in tiered storage (#9991)
4eead1364d is described below
commit 4eead1364d074f40cbedacdc8051f5b4bce4072e
Author: lizhimins <[email protected]>
AuthorDate: Tue Jan 13 09:56:22 2026 +0800
[ISSUE #9912] Reduce excessive requests for consumer offset timestamps in
tiered storage (#9991)
* [ISSUE #9912] Reduce excessive requests for consumer offset timestamps in
tiered storage
Signed-off-by: terrance.lzm <[email protected]>
* [ISSUE #9912] Reduce excessive requests for consumer offset timestamps in
tiered storage
---------
Signed-off-by: terrance.lzm <[email protected]>
---
.../rocketmq/tieredstore/TieredMessageStore.java | 8 +---
.../tieredstore/core/MessageStoreFetcherImpl.java | 45 ++++++++++++++++------
.../tieredstore/TieredMessageStoreTest.java | 4 +-
3 files changed, 37 insertions(+), 20 deletions(-)
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
index b30f868d19..38946fd161 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
@@ -362,8 +362,7 @@ public class TieredMessageStore extends
AbstractPluginMessageStore {
}
@Override
- public CompletableFuture<Long> getMessageStoreTimeStampAsync(String topic,
int queueId,
- long consumeQueueOffset) {
+ public CompletableFuture<Long> getMessageStoreTimeStampAsync(String topic,
int queueId, long consumeQueueOffset) {
if (fetchFromCurrentStore(topic, queueId, consumeQueueOffset)) {
Stopwatch stopwatch = Stopwatch.createStarted();
return fetcher.getMessageStoreTimeStampAsync(topic, queueId,
consumeQueueOffset)
@@ -374,11 +373,6 @@ public class TieredMessageStore extends
AbstractPluginMessageStore {
.put(TieredStoreMetricsConstant.LABEL_TOPIC, topic)
.build();
TieredStoreMetricsManager.apiLatency.record(stopwatch.elapsed(TimeUnit.MILLISECONDS),
latencyAttributes);
- if (time == -1) {
- log.debug("GetEarliestMessageTimeAsync failed, try to
get message time from next store, topic: {}, queue: {}, queue offset: {}",
- topic, queueId, consumeQueueOffset);
- return next.getMessageStoreTimeStamp(topic, queueId,
consumeQueueOffset);
- }
return time;
});
}
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
index f0e8b3ab50..f669f8940a 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.BoundaryType;
+import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.GetMessageStatus;
@@ -52,6 +53,7 @@ public class MessageStoreFetcherImpl implements
MessageStoreFetcher {
private static final Logger log =
LoggerFactory.getLogger(MessageStoreUtil.TIERED_STORE_LOGGER_NAME);
protected static final String CACHE_KEY_FORMAT = "%s@%d@%d";
+ protected static final String FETCHER_GROUP_NAME =
MixAll.CID_RMQ_SYS_PREFIX + "FETCHER_TIMESTAMP";
private final String brokerName;
private final MetadataStore metadataStore;
@@ -389,18 +391,37 @@ public class MessageStoreFetcherImpl implements
MessageStoreFetcher {
return CompletableFuture.completedFuture(-1L);
}
- return flatFile.getConsumeQueueAsync(queueOffset)
- .thenComposeAsync(cqItem -> {
- long commitLogOffset =
MessageFormatUtil.getCommitLogOffsetFromItem(cqItem);
- int size = MessageFormatUtil.getSizeFromItem(cqItem);
- return flatFile.getCommitLogAsync(commitLogOffset, size);
- }, messageStore.getStoreExecutor().bufferFetchExecutor)
- .thenApply(MessageFormatUtil::getStoreTimeStamp)
- .exceptionally(e -> {
-
log.error("MessageStoreFetcherImpl#getMessageStoreTimeStampAsync: " +
- "get or decode message failed, topic={}, queue={},
offset={}", topic, queueId, queueOffset, e);
- return -1L;
- });
+ // The Metrics thread frequently retrieves the storage timestamp of
the latest message;
+ // as an alternative, return the queue's saved timestamp here.
+ if (queueOffset + 1L == flatFile.getConsumeQueueCommitOffset()) {
+ long timestamp = flatFile.getMaxStoreTimestamp();
+ return CompletableFuture.completedFuture(timestamp ==
Long.MAX_VALUE ? -1L : timestamp);
+ }
+
+ CompletableFuture<Long> future = new CompletableFuture<>();
+ try {
+ this.getMessageAsync(FETCHER_GROUP_NAME, topic, queueId,
queueOffset, 1, null)
+ .whenComplete((result, e) -> {
+ if (e != null) {
+
log.error("MessageStoreFetcherImpl#getMessageStoreTimeStampAsync: " +
+ "Get or decode message failed, topic={}, queue={},
offset={}", topic, queueId, queueOffset, e);
+ future.completeExceptionally(e);
+ return;
+ }
+ if (result != null && result.getMessageBufferList() != null
+ && !result.getMessageBufferList().isEmpty()) {
+ long timestamp =
MessageFormatUtil.getStoreTimeStamp(result.getMessageBufferList().get(0));
+
log.info("MessageStoreFetcherImpl#getMessageStoreTimeStampAsync: " +
+ "topic={}, queue={}, offset={}, timestamp={}",
topic, queueId, queueOffset, timestamp);
+ future.complete(timestamp);
+ } else {
+ future.complete(-1L);
+ }
+ });
+ } catch (Throwable t) {
+ future.completeExceptionally(t);
+ }
+ return future;
}
@Override
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
index 1a0240681c..f88779f09b 100644
---
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
@@ -268,8 +268,10 @@ public class TieredMessageStoreTest {
configuration.update(properties);
Assert.assertEquals(1, (long)
currentStore.getMessageStoreTimeStampAsync(mq.getTopic(), mq.getQueueId(),
0).join());
+ // If data cannot be fetched from tiered storage,
+ // there is no need to fallback to local storage.
Mockito.when(fetcher.getMessageStoreTimeStampAsync(anyString(),
anyInt(), anyLong())).thenReturn(CompletableFuture.completedFuture(-1L));
- Assert.assertEquals(3, (long)
currentStore.getMessageStoreTimeStampAsync(mq.getTopic(), mq.getQueueId(),
0).join());
+ Assert.assertEquals(-1L, (long)
currentStore.getMessageStoreTimeStampAsync(mq.getTopic(), mq.getQueueId(),
0).join());
}
@Test