This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new c96a0b566 [ISSUE #6933] Support delete expired or damaged file in 
tiered storage and optimize fetch code (#6952)
c96a0b566 is described below

commit c96a0b56658b48b17b762a1d2894e6d0576acad1
Author: lizhimins <[email protected]>
AuthorDate: Tue Jun 27 17:53:43 2023 +0800

    [ISSUE #6933] Support delete expired or damaged file in tiered storage and 
optimize fetch code (#6952)
    
    If cq dispatch smaller than local store min offset, do self-healing logic 
for storage and rebuild automatically
---
 .../rocketmq/tieredstore/MessageStoreFetcher.java  |  80 +++++++++
 .../rocketmq/tieredstore/TieredDispatcher.java     |  15 +-
 .../rocketmq/tieredstore/TieredMessageFetcher.java | 196 +++++++++++++--------
 .../rocketmq/tieredstore/file/TieredFlatFile.java  |  10 +-
 .../rocketmq/tieredstore/file/TieredIndexFile.java |  17 +-
 .../metrics/TieredStoreMetricsManager.java         |   4 +-
 .../inputstream/TieredCommitLogInputStream.java    |   3 +-
 .../tieredstore/TieredMessageFetcherTest.java      |  16 +-
 8 files changed, 239 insertions(+), 102 deletions(-)

diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/MessageStoreFetcher.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/MessageStoreFetcher.java
new file mode 100644
index 000000000..f4d576d29
--- /dev/null
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/MessageStoreFetcher.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tieredstore;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.rocketmq.store.GetMessageResult;
+import org.apache.rocketmq.store.MessageFilter;
+import org.apache.rocketmq.store.QueryMessageResult;
+import org.apache.rocketmq.tieredstore.common.BoundaryType;
+
+public interface MessageStoreFetcher {
+
+    /**
+     * Asynchronous get the store time of the earliest message in this store.
+     *
+     * @return timestamp of the earliest message in this store.
+     */
+    CompletableFuture<Long> getEarliestMessageTimeAsync(String topic, int 
queueId);
+
+    /**
+     * Asynchronous get the store time of the message specified.
+     *
+     * @param topic              Message topic.
+     * @param queueId            Queue ID.
+     * @param consumeQueueOffset Consume queue offset.
+     * @return store timestamp of the message.
+     */
+    CompletableFuture<Long> getMessageStoreTimeStampAsync(String topic, int 
queueId, long consumeQueueOffset);
+
+    /**
+     * Look up the physical offset of the message whose store timestamp is as 
specified.
+     *
+     * @param topic     Topic of the message.
+     * @param queueId   Queue ID.
+     * @param timestamp Timestamp to look up.
+     * @return physical offset which matches.
+     */
+    long getOffsetInQueueByTime(String topic, int queueId, long timestamp, 
BoundaryType type);
+
+    /**
+     * Asynchronous get message
+     *
+     * @param group         Consumer group that launches this query.
+     * @param topic         Topic to query.
+     * @param queueId       Queue ID to query.
+     * @param offset        Logical offset to start from.
+     * @param maxCount      Maximum count of messages to query.
+     * @param messageFilter Message filter used to screen desired messages.
+     * @return Matched messages.
+     */
+    CompletableFuture<GetMessageResult> getMessageAsync(
+        String group, String topic, int queueId, long offset, int maxCount, 
MessageFilter messageFilter);
+
+    /**
+     * Asynchronous query messages by given key.
+     *
+     * @param topic    Topic of the message.
+     * @param key      Message key.
+     * @param maxCount Maximum count of the messages possible.
+     * @param begin    Begin timestamp.
+     * @param end      End timestamp.
+     */
+    CompletableFuture<QueryMessageResult> queryMessageAsync(
+        String topic, String key, int maxCount, long begin, long end);
+}
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
index 0d89d305b..2a8e2ed71 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
@@ -260,8 +260,16 @@ public class TieredDispatcher extends ServiceThread 
implements CommitLogDispatch
                 logger.warn("TieredDispatcher#dispatchFlatFile: dispatch 
offset is too small, " +
                         "topic: {}, queueId: {}, dispatch offset: {}, local cq 
offset range {}-{}",
                     topic, queueId, dispatchOffset, minOffsetInQueue, 
maxOffsetInQueue);
-                flatFile.initOffset(minOffsetInQueue);
-                dispatchOffset = minOffsetInQueue;
+
+                // when dispatch offset is smaller than min offset in local cq
+                // some earliest messages may be lost at this time
+                
tieredFlatFileManager.destroyCompositeFile(flatFile.getMessageQueue());
+                CompositeQueueFlatFile newFlatFile =
+                    tieredFlatFileManager.getOrCreateFlatFileIfAbsent(new 
MessageQueue(topic, brokerName, queueId));
+                if (newFlatFile != null) {
+                    newFlatFile.initOffset(maxOffsetInQueue);
+                }
+                return;
             }
             beforeOffset = dispatchOffset;
 
@@ -290,7 +298,8 @@ public class TieredDispatcher extends ServiceThread 
implements CommitLogDispatch
                     logger.error("TieredDispatcher#dispatchFlatFile: get 
message from next store failed, " +
                             "topic: {}, queueId: {}, commitLog offset: {}, 
size: {}",
                         topic, queueId, commitLogOffset, size);
-                    break;
+                    // not dispatch immediately
+                    return;
                 }
 
                 // append commitlog will increase dispatch offset here
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
index 39a2e2aff..8802a73a3 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
@@ -60,52 +60,49 @@ import 
org.apache.rocketmq.tieredstore.util.CQItemBufferUtil;
 import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
 import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
 
-public class TieredMessageFetcher {
+public class TieredMessageFetcher implements MessageStoreFetcher {
+
     private static final Logger LOGGER = 
LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
 
-    private final TieredMessageStoreConfig storeConfig;
     private final String brokerName;
-    private TieredMetadataStore metadataStore;
+    private final TieredMessageStoreConfig storeConfig;
+    private final TieredMetadataStore metadataStore;
     private final TieredFlatFileManager flatFileManager;
-    protected final Cache<MessageCacheKey, SelectMappedBufferResultWrapper> 
readAheadCache;
+    private final Cache<MessageCacheKey, SelectMappedBufferResultWrapper> 
readAheadCache;
 
     public TieredMessageFetcher(TieredMessageStoreConfig storeConfig) {
         this.storeConfig = storeConfig;
         this.brokerName = storeConfig.getBrokerName();
+        this.metadataStore = TieredStoreUtil.getMetadataStore(storeConfig);
         this.flatFileManager = TieredFlatFileManager.getInstance(storeConfig);
-        this.readAheadCache = Caffeine.newBuilder()
+        this.readAheadCache = this.initCache(storeConfig);
+    }
+
+    private Cache<MessageCacheKey, SelectMappedBufferResultWrapper> 
initCache(TieredMessageStoreConfig storeConfig) {
+        long memoryMaxSize =
+            (long) (Runtime.getRuntime().maxMemory() * 
storeConfig.getReadAheadCacheSizeThresholdRate());
+
+        return Caffeine.newBuilder()
             .scheduler(Scheduler.systemScheduler())
-            // TODO adjust expire time dynamically
             .expireAfterWrite(storeConfig.getReadAheadCacheExpireDuration(), 
TimeUnit.MILLISECONDS)
-            .maximumWeight((long) (Runtime.getRuntime().maxMemory() * 
storeConfig.getReadAheadCacheSizeThresholdRate()))
+            .maximumWeight(memoryMaxSize)
+            // Using the buffer size of messages to calculate memory usage
             .weigher((MessageCacheKey key, SelectMappedBufferResultWrapper 
msg) -> msg.getDuplicateResult().getSize())
             .recordStats()
             .build();
-        try {
-            this.metadataStore = TieredStoreUtil.getMetadataStore(storeConfig);
-        } catch (Exception ignored) {
-
-        }
     }
 
-    public Cache<MessageCacheKey, SelectMappedBufferResultWrapper> 
getReadAheadCache() {
-        return readAheadCache;
-    }
+    protected SelectMappedBufferResultWrapper 
putMessageToCache(CompositeFlatFile flatFile,
+        long queueOffset, SelectMappedBufferResult result, long minOffset, 
long maxOffset, int size) {
 
-    public CompletableFuture<GetMessageResult> 
getMessageFromCacheAsync(CompositeQueueFlatFile flatFile,
-        String group, long queueOffset, int maxMsgNums) {
-        // wait for inflight request by default
-        return getMessageFromCacheAsync(flatFile, group, queueOffset, 
maxMsgNums, true);
+        return putMessageToCache(flatFile, queueOffset, result, minOffset, 
maxOffset, size, false);
     }
 
-    protected SelectMappedBufferResultWrapper 
putMessageToCache(CompositeFlatFile flatFile, long queueOffset,
-        SelectMappedBufferResult msg, long minOffset, long maxOffset, int 
size) {
-        return putMessageToCache(flatFile, queueOffset, msg, minOffset, 
maxOffset, size, false);
-    }
+    protected SelectMappedBufferResultWrapper 
putMessageToCache(CompositeFlatFile flatFile,
+        long queueOffset, SelectMappedBufferResult result, long minOffset, 
long maxOffset, int size, boolean used) {
 
-    protected SelectMappedBufferResultWrapper 
putMessageToCache(CompositeFlatFile flatFile, long queueOffset,
-        SelectMappedBufferResult msg, long minOffset, long maxOffset, int 
size, boolean used) {
-        SelectMappedBufferResultWrapper wrapper = new 
SelectMappedBufferResultWrapper(msg, queueOffset, minOffset, maxOffset, size);
+        SelectMappedBufferResultWrapper wrapper =
+            new SelectMappedBufferResultWrapper(result, queueOffset, 
minOffset, maxOffset, size);
         if (used) {
             wrapper.addAccessCount();
         }
@@ -113,9 +110,20 @@ public class TieredMessageFetcher {
         return wrapper;
     }
 
+    // Visible for metrics monitor
+    public Cache<MessageCacheKey, SelectMappedBufferResultWrapper> 
getMessageCache() {
+        return readAheadCache;
+    }
+
+    // Waiting for the request in transit to complete
+    protected CompletableFuture<GetMessageResult> getMessageFromCacheAsync(
+        CompositeQueueFlatFile flatFile, String group, long queueOffset, int 
maxCount) {
+
+        return getMessageFromCacheAsync(flatFile, group, queueOffset, 
maxCount, true);
+    }
+
     @Nullable
-    protected SelectMappedBufferResultWrapper 
getMessageFromCache(CompositeFlatFile flatFile,
-        long queueOffset) {
+    protected SelectMappedBufferResultWrapper 
getMessageFromCache(CompositeFlatFile flatFile, long queueOffset) {
         MessageCacheKey cacheKey = new MessageCacheKey(flatFile, queueOffset);
         return readAheadCache.getIfPresent(cacheKey);
     }
@@ -135,21 +143,21 @@ public class TieredMessageFetcher {
         }
     }
 
-    private void preFetchMessage(CompositeQueueFlatFile flatFile, String 
group, int maxMsgNums,
-        long nextBeginOffset) {
-        if (maxMsgNums == 1 || flatFile.getReadAheadFactor() == 1) {
+    private void prefetchMessage(CompositeQueueFlatFile flatFile, String 
group, int maxCount, long nextBeginOffset) {
+        if (maxCount == 1 || flatFile.getReadAheadFactor() == 1) {
             return;
         }
+
         MessageQueue mq = flatFile.getMessageQueue();
-        // make sure there is only one inflight request per group and request 
range
-        int prefetchBatchSize = Math.min(maxMsgNums * 
flatFile.getReadAheadFactor(), storeConfig.getReadAheadMessageCountThreshold());
+        // make sure there is only one request per group and request range
+        int prefetchBatchSize = Math.min(maxCount * 
flatFile.getReadAheadFactor(), storeConfig.getReadAheadMessageCountThreshold());
         InFlightRequestFuture inflightRequest = 
flatFile.getInflightRequest(group, nextBeginOffset, prefetchBatchSize);
         if (!inflightRequest.isAllDone()) {
             return;
         }
 
         synchronized (flatFile) {
-            inflightRequest = flatFile.getInflightRequest(nextBeginOffset, 
maxMsgNums);
+            inflightRequest = flatFile.getInflightRequest(nextBeginOffset, 
maxCount);
             if (!inflightRequest.isAllDone()) {
                 return;
             }
@@ -161,7 +169,10 @@ public class TieredMessageFetcher {
             int cacheRemainCount = (int) (maxOffsetOfLastRequest - 
nextBeginOffset);
             LOGGER.debug("TieredMessageFetcher#preFetchMessage: group={}, 
nextBeginOffset={}, maxOffsetOfLastRequest={}, lastRequestIsExpired={}, 
cacheRemainCount={}",
                 group, nextBeginOffset, maxOffsetOfLastRequest, 
lastRequestIsExpired, cacheRemainCount);
-            if (lastRequestIsExpired || maxOffsetOfLastRequest != -1L && 
nextBeginOffset >= inflightRequest.getStartOffset()) {
+
+            if (lastRequestIsExpired
+                || maxOffsetOfLastRequest != -1L && nextBeginOffset >= 
inflightRequest.getStartOffset()) {
+
                 long queueOffset;
                 if (lastRequestIsExpired) {
                     queueOffset = nextBeginOffset;
@@ -171,35 +182,35 @@ public class TieredMessageFetcher {
                     flatFile.increaseReadAheadFactor();
                 }
 
-                int factor = Math.min(flatFile.getReadAheadFactor(), 
storeConfig.getReadAheadMessageCountThreshold() / maxMsgNums);
+                int factor = Math.min(flatFile.getReadAheadFactor(), 
storeConfig.getReadAheadMessageCountThreshold() / maxCount);
                 int flag = 0;
                 int concurrency = 1;
                 if (factor > 
storeConfig.getReadAheadBatchSizeFactorThreshold()) {
                     flag = factor % 
storeConfig.getReadAheadBatchSizeFactorThreshold() == 0 ? 0 : 1;
                     concurrency = factor / 
storeConfig.getReadAheadBatchSizeFactorThreshold() + flag;
                 }
-                int requestBatchSize = maxMsgNums * Math.min(factor, 
storeConfig.getReadAheadBatchSizeFactorThreshold());
+                int requestBatchSize = maxCount * Math.min(factor, 
storeConfig.getReadAheadBatchSizeFactorThreshold());
 
                 List<Pair<Integer, CompletableFuture<Long>>> futureList = new 
ArrayList<>();
                 long nextQueueOffset = queueOffset;
                 if (flag == 1) {
-                    int firstBatchSize = factor % 
storeConfig.getReadAheadBatchSizeFactorThreshold() * maxMsgNums;
-                    CompletableFuture<Long> future = 
prefetchAndPutMsgToCache(flatFile, mq, nextQueueOffset, firstBatchSize);
+                    int firstBatchSize = factor % 
storeConfig.getReadAheadBatchSizeFactorThreshold() * maxCount;
+                    CompletableFuture<Long> future = 
prefetchMessageThenPutToCache(flatFile, mq, nextQueueOffset, firstBatchSize);
                     futureList.add(Pair.of(firstBatchSize, future));
                     nextQueueOffset += firstBatchSize;
                 }
                 for (long i = 0; i < concurrency - flag; i++) {
-                    CompletableFuture<Long> future = 
prefetchAndPutMsgToCache(flatFile, mq, nextQueueOffset + i * requestBatchSize, 
requestBatchSize);
+                    CompletableFuture<Long> future = 
prefetchMessageThenPutToCache(flatFile, mq, nextQueueOffset + i * 
requestBatchSize, requestBatchSize);
                     futureList.add(Pair.of(requestBatchSize, future));
                 }
-                flatFile.putInflightRequest(group, queueOffset, maxMsgNums * 
factor, futureList);
+                flatFile.putInflightRequest(group, queueOffset, maxCount * 
factor, futureList);
                 LOGGER.debug("TieredMessageFetcher#preFetchMessage: try to 
prefetch messages for later requests: next begin offset: {}, request offset: 
{}, factor: {}, flag: {}, request batch: {}, concurrency: {}",
                     nextBeginOffset, queueOffset, factor, flag, 
requestBatchSize, concurrency);
             }
         }
     }
 
-    private CompletableFuture<Long> 
prefetchAndPutMsgToCache(CompositeQueueFlatFile flatFile, MessageQueue mq,
+    private CompletableFuture<Long> 
prefetchMessageThenPutToCache(CompositeQueueFlatFile flatFile, MessageQueue mq,
         long queueOffset, int batchSize) {
         return getMessageFromTieredStoreAsync(flatFile, queueOffset, batchSize)
             .thenApplyAsync(result -> {
@@ -235,13 +246,14 @@ public class TieredMessageFetcher {
             }, TieredStoreExecutor.fetchDataExecutor);
     }
 
-    private CompletableFuture<GetMessageResult> 
getMessageFromCacheAsync(CompositeQueueFlatFile flatFile,
-        String group, long queueOffset, int maxMsgNums, boolean 
waitInflightRequest) {
+    public CompletableFuture<GetMessageResult> 
getMessageFromCacheAsync(CompositeQueueFlatFile flatFile,
+        String group, long queueOffset, int maxCount, boolean 
waitInflightRequest) {
+
         MessageQueue mq = flatFile.getMessageQueue();
 
         long lastGetOffset = queueOffset - 1;
-        List<SelectMappedBufferResultWrapper> resultWrapperList = new 
ArrayList<>(maxMsgNums);
-        for (int i = 0; i < maxMsgNums; i++) {
+        List<SelectMappedBufferResultWrapper> resultWrapperList = new 
ArrayList<>(maxCount);
+        for (int i = 0; i < maxCount; i++) {
             lastGetOffset++;
             SelectMappedBufferResultWrapper wrapper = 
getMessageFromCache(flatFile, lastGetOffset);
             if (wrapper == null) {
@@ -257,26 +269,26 @@ public class TieredMessageFetcher {
                 .put(TieredStoreMetricsConstant.LABEL_TOPIC, mq.getTopic())
                 .put(TieredStoreMetricsConstant.LABEL_GROUP, group)
                 .build();
-            TieredStoreMetricsManager.cacheAccess.add(maxMsgNums, attributes);
+            TieredStoreMetricsManager.cacheAccess.add(maxCount, attributes);
             TieredStoreMetricsManager.cacheHit.add(resultWrapperList.size(), 
attributes);
         }
 
         // if no cached message found and there is currently an inflight 
request, wait for the request to end before continuing
         if (resultWrapperList.isEmpty() && waitInflightRequest) {
-            CompletableFuture<Long> future = 
flatFile.getInflightRequest(group, queueOffset, maxMsgNums)
+            CompletableFuture<Long> future = 
flatFile.getInflightRequest(group, queueOffset, maxCount)
                 .getFuture(queueOffset);
             if (!future.isDone()) {
                 Stopwatch stopwatch = Stopwatch.createStarted();
                 // to prevent starvation issues, only allow waiting for 
inflight request once
                 return future.thenCompose(v -> {
                     
LOGGER.debug("TieredMessageFetcher#getMessageFromCacheAsync: wait for inflight 
request cost: {}ms", stopwatch.elapsed(TimeUnit.MILLISECONDS));
-                    return getMessageFromCacheAsync(flatFile, group, 
queueOffset, maxMsgNums, false);
+                    return getMessageFromCacheAsync(flatFile, group, 
queueOffset, maxCount, false);
                 });
             }
         }
 
         // try to get message from cache again when prefetch request is done
-        for (int i = 0; i < maxMsgNums - resultWrapperList.size(); i++) {
+        for (int i = 0; i < maxCount - resultWrapperList.size(); i++) {
             lastGetOffset++;
             SelectMappedBufferResultWrapper wrapper = 
getMessageFromCache(flatFile, lastGetOffset);
             if (wrapper == null) {
@@ -288,11 +300,11 @@ public class TieredMessageFetcher {
 
         recordCacheAccess(flatFile, group, queueOffset, resultWrapperList);
 
-        // if cache is hit, result will be returned immediately and 
asynchronously prefetch messages for later requests
+        // if cache hit, result will be returned immediately and 
asynchronously prefetch messages for later requests
         if (!resultWrapperList.isEmpty()) {
             LOGGER.debug("TieredMessageFetcher#getMessageFromCacheAsync: cache 
hit: topic: {}, queue: {}, queue offset: {}, max message num: {}, cache hit 
num: {}",
-                mq.getTopic(), mq.getQueueId(), queueOffset, maxMsgNums, 
resultWrapperList.size());
-            preFetchMessage(flatFile, group, maxMsgNums, lastGetOffset + 1);
+                mq.getTopic(), mq.getQueueId(), queueOffset, maxCount, 
resultWrapperList.size());
+            prefetchMessage(flatFile, group, maxCount, lastGetOffset + 1);
 
             GetMessageResult result = new GetMessageResult();
             result.setStatus(GetMessageStatus.FOUND);
@@ -305,10 +317,10 @@ public class TieredMessageFetcher {
 
         // if cache is miss, immediately pull messages
         LOGGER.warn("TieredMessageFetcher#getMessageFromCacheAsync: cache 
miss: topic: {}, queue: {}, queue offset: {}, max message num: {}",
-            mq.getTopic(), mq.getQueueId(), queueOffset, maxMsgNums);
+            mq.getTopic(), mq.getQueueId(), queueOffset, maxCount);
         CompletableFuture<GetMessageResult> resultFuture;
         synchronized (flatFile) {
-            int batchSize = maxMsgNums * storeConfig.getReadAheadMinFactor();
+            int batchSize = maxCount * storeConfig.getReadAheadMinFactor();
             resultFuture = getMessageFromTieredStoreAsync(flatFile, 
queueOffset, batchSize)
                 .thenApplyAsync(result -> {
                     if (result.getStatus() != GetMessageStatus.FOUND) {
@@ -329,8 +341,8 @@ public class TieredMessageFetcher {
                         SelectMappedBufferResult msg = msgList.get(i);
                         // put message into cache
                         SelectMappedBufferResultWrapper resultWrapper = 
putMessageToCache(flatFile, offset, msg, minOffset, maxOffset, size, true);
-                        // try to meet maxMsgNums
-                        if (newResult.getMessageMapedList().size() < 
maxMsgNums) {
+                        // try to meet maxCount
+                        if (newResult.getMessageMapedList().size() < maxCount) 
{
                             
newResult.addMessage(resultWrapper.getDuplicateResult(), offset);
                         }
                     }
@@ -349,6 +361,7 @@ public class TieredMessageFetcher {
 
     public CompletableFuture<GetMessageResult> 
getMessageFromTieredStoreAsync(CompositeQueueFlatFile flatFile,
         long queueOffset, int batchSize) {
+
         GetMessageResult result = new GetMessageResult();
         result.setMinOffset(flatFile.getConsumeQueueMinOffset());
         result.setMaxOffset(flatFile.getConsumeQueueCommitOffset());
@@ -361,12 +374,15 @@ public class TieredMessageFetcher {
                     result.setStatus(GetMessageStatus.OFFSET_OVERFLOW_ONE);
                     result.setNextBeginOffset(queueOffset);
                     return CompletableFuture.completedFuture(result);
+                case ILLEGAL_PARAM:
+                case ILLEGAL_OFFSET:
                 default:
                     result.setStatus(GetMessageStatus.OFFSET_FOUND_NULL);
                     result.setNextBeginOffset(queueOffset);
                     return CompletableFuture.completedFuture(result);
             }
         }
+
         CompletableFuture<ByteBuffer> readCommitLogFuture = 
readConsumeQueueFuture.thenComposeAsync(cqBuffer -> {
             long firstCommitLogOffset = 
CQItemBufferUtil.getCommitLogOffset(cqBuffer);
             cqBuffer.position(cqBuffer.remaining() - 
TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
@@ -433,8 +449,10 @@ public class TieredMessageFetcher {
         });
     }
 
-    public CompletableFuture<GetMessageResult> getMessageAsync(String group, 
String topic, int queueId,
-        long queueOffset, int maxMsgNums, final MessageFilter messageFilter) {
+    @Override
+    public CompletableFuture<GetMessageResult> getMessageAsync(
+        String group, String topic, int queueId, long queueOffset, int 
maxCount, final MessageFilter messageFilter) {
+
         CompositeQueueFlatFile flatFile = flatFileManager.getFlatFile(new 
MessageQueue(topic, brokerName, queueId));
         if (flatFile == null) {
             GetMessageResult result = new GetMessageResult();
@@ -442,10 +460,11 @@ public class TieredMessageFetcher {
             result.setStatus(GetMessageStatus.NO_MATCHED_LOGIC_QUEUE);
             return CompletableFuture.completedFuture(result);
         }
+
         GetMessageResult result = new GetMessageResult();
         long minQueueOffset = flatFile.getConsumeQueueMinOffset();
-        result.setMinOffset(minQueueOffset);
         long maxQueueOffset = flatFile.getConsumeQueueCommitOffset();
+        result.setMinOffset(minQueueOffset);
         result.setMaxOffset(maxQueueOffset);
 
         if (flatFile.getConsumeQueueCommitOffset() <= 0) {
@@ -468,24 +487,29 @@ public class TieredMessageFetcher {
             return CompletableFuture.completedFuture(result);
         }
 
-        return getMessageFromCacheAsync(flatFile, group, queueOffset, 
maxMsgNums);
+        return getMessageFromCacheAsync(flatFile, group, queueOffset, 
maxCount);
     }
 
+    @Override
     public CompletableFuture<Long> getEarliestMessageTimeAsync(String topic, 
int queueId) {
         CompositeFlatFile flatFile = flatFileManager.getFlatFile(new 
MessageQueue(topic, brokerName, queueId));
         if (flatFile == null) {
             return CompletableFuture.completedFuture(-1L);
         }
 
-        return flatFile.getCommitLogAsync(flatFile.getCommitLogMinOffset(), 
MessageBufferUtil.STORE_TIMESTAMP_POSITION + 8)
+        // read from timestamp to timestamp + length
+        int length = MessageBufferUtil.STORE_TIMESTAMP_POSITION + 8;
+        return flatFile.getCommitLogAsync(flatFile.getCommitLogMinOffset(), 
length)
             .thenApply(MessageBufferUtil::getStoreTimeStamp);
     }
 
+    @Override
     public CompletableFuture<Long> getMessageStoreTimeStampAsync(String topic, 
int queueId, long queueOffset) {
         CompositeFlatFile flatFile = flatFileManager.getFlatFile(new 
MessageQueue(topic, brokerName, queueId));
         if (flatFile == null) {
             return CompletableFuture.completedFuture(-1L);
         }
+
         return flatFile.getConsumeQueueAsync(queueOffset)
             .thenComposeAsync(cqItem -> {
                 long commitLogOffset = 
CQItemBufferUtil.getCommitLogOffset(cqItem);
@@ -494,27 +518,33 @@ public class TieredMessageFetcher {
             }, TieredStoreExecutor.fetchDataExecutor)
             .thenApply(MessageBufferUtil::getStoreTimeStamp)
             .exceptionally(e -> {
-                
LOGGER.error("TieredMessageFetcher#getMessageStoreTimeStampAsync: get or decode 
message failed: topic: {}, queue: {}, offset: {}", topic, queueId, queueOffset, 
e);
+                
LOGGER.error("TieredMessageFetcher#getMessageStoreTimeStampAsync: " +
+                    "get or decode message failed: topic: {}, queue: {}, 
offset: {}", topic, queueId, queueOffset, e);
                 return -1L;
             });
     }
 
-    public long getOffsetInQueueByTime(String topic, int queueId, long 
timestamp,
-        BoundaryType type) {
+    @Override
+    public long getOffsetInQueueByTime(String topic, int queueId, long 
timestamp, BoundaryType type) {
         CompositeFlatFile flatFile = flatFileManager.getFlatFile(new 
MessageQueue(topic, brokerName, queueId));
         if (flatFile == null) {
             return -1L;
         }
+
         try {
             return flatFile.getOffsetInConsumeQueueByTime(timestamp, type);
         } catch (Exception e) {
-            LOGGER.error("TieredMessageFetcher#getOffsetInQueueByTime: get 
offset in queue by time failed: topic: {}, queue: {}, timestamp: {}, type: {}", 
topic, queueId, timestamp, type, e);
+            LOGGER.error("TieredMessageFetcher#getOffsetInQueueByTime: " +
+                "get offset in queue by time failed: topic: {}, queue: {}, 
timestamp: {}, type: {}",
+                topic, queueId, timestamp, type, e);
         }
         return -1L;
     }
 
-    public CompletableFuture<QueryMessageResult> queryMessageAsync(String 
topic, String key, int maxNum, long begin,
-        long end) {
+    @Override
+    public CompletableFuture<QueryMessageResult> queryMessageAsync(
+        String topic, String key, int maxCount, long begin, long end) {
+
         TieredIndexFile indexFile = 
TieredFlatFileManager.getIndexFile(storeConfig);
 
         int hashCode = 
TieredIndexFile.indexKeyHashMethod(TieredIndexFile.buildKey(topic, key));
@@ -522,12 +552,12 @@ public class TieredMessageFetcher {
         try {
             TopicMetadata topicMetadata = metadataStore.getTopic(topic);
             if (topicMetadata == null) {
-                LOGGER.info("TieredMessageFetcher#queryMessageAsync: get topic 
id from metadata failed, topic metadata not found: topic: {}", topic);
+                LOGGER.info("TieredMessageFetcher#queryMessageAsync, topic 
metadata not found, topic: {}", topic);
                 return CompletableFuture.completedFuture(new 
QueryMessageResult());
             }
             topicId = topicMetadata.getTopicId();
         } catch (Exception e) {
-            LOGGER.error("TieredMessageFetcher#queryMessageAsync: get topic id 
from metadata failed: topic: {}", topic, e);
+            LOGGER.error("TieredMessageFetcher#queryMessageAsync, get topic id 
failed, topic: {}", topic, e);
             return CompletableFuture.completedFuture(new QueryMessageResult());
         }
 
@@ -535,15 +565,22 @@ public class TieredMessageFetcher {
             .thenCompose(indexBufferList -> {
                 QueryMessageResult result = new QueryMessageResult();
                 int resultCount = 0;
-                List<CompletableFuture<Void>> futureList = new 
ArrayList<>(maxNum);
+                List<CompletableFuture<Void>> futureList = new 
ArrayList<>(maxCount);
                 for (Pair<Long, ByteBuffer> pair : indexBufferList) {
                     Long fileBeginTimestamp = pair.getKey();
                     ByteBuffer indexBuffer = pair.getValue();
+
                     if (indexBuffer.remaining() % 
TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE != 0) {
-                        
LOGGER.error("[Bug]TieredMessageFetcher#queryMessageAsync: index buffer size {} 
is not multiple of index item size {}", indexBuffer.remaining(), 
TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE);
+                        LOGGER.error("[Bug] 
TieredMessageFetcher#queryMessageAsync: " +
+                            "index buffer size {} is not multiple of index 
item size {}",
+                            indexBuffer.remaining(), 
TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE);
                         continue;
                     }
-                    for (int indexOffset = indexBuffer.position(); indexOffset 
< indexBuffer.limit(); indexOffset += 
TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE) {
+
+                    for (int indexOffset = indexBuffer.position();
+                        indexOffset < indexBuffer.limit();
+                        indexOffset += 
TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE) {
+
                         int indexItemHashCode = 
indexBuffer.getInt(indexOffset);
                         if (indexItemHashCode != hashCode) {
                             continue;
@@ -555,11 +592,13 @@ public class TieredMessageFetcher {
                         }
 
                         int queueId = indexBuffer.getInt(indexOffset + 4 + 4);
-                        CompositeFlatFile flatFile = 
TieredFlatFileManager.getInstance(storeConfig).getFlatFile(new 
MessageQueue(topic, brokerName, queueId));
+                        CompositeFlatFile flatFile =
+                            flatFileManager.getFlatFile(new 
MessageQueue(topic, brokerName, queueId));
                         if (flatFile == null) {
                             continue;
                         }
 
+                        // decode index item
                         long offset = indexBuffer.getLong(indexOffset + 4 + 4 
+ 4);
                         int size = indexBuffer.getInt(indexOffset + 4 + 4 + 4 
+ 8);
                         int timeDiff = indexBuffer.getInt(indexOffset + 4 + 4 
+ 4 + 8 + 4);
@@ -567,16 +606,19 @@ public class TieredMessageFetcher {
                         if (indexTimestamp < begin || indexTimestamp > end) {
                             continue;
                         }
+
                         CompletableFuture<Void> getMessageFuture = 
flatFile.getCommitLogAsync(offset, size)
-                            .thenAccept(messageBuffer -> result.addMessage(new 
SelectMappedBufferResult(0, messageBuffer, size, null)));
+                            .thenAccept(messageBuffer -> result.addMessage(
+                                new SelectMappedBufferResult(0, messageBuffer, 
size, null)));
                         futureList.add(getMessageFuture);
 
                         resultCount++;
-                        if (resultCount >= maxNum) {
+                        if (resultCount >= maxCount) {
                             break;
                         }
                     }
-                    if (resultCount >= maxNum) {
+
+                    if (resultCount >= maxCount) {
                         break;
                     }
                 }
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
index 67b32c3a7..a71323348 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
@@ -493,16 +493,16 @@ public class TieredFlatFile {
                         fileSegment.destroyFile();
                         if (!fileSegment.exists()) {
                             tieredMetadataStore.deleteFileSegment(filePath, 
fileType, metadata.getBaseOffset());
-                            logger.info("expired file {} is been destroyed", 
fileSegment.getPath());
+                            logger.info("Destroyed expired file, file path: 
{}", fileSegment.getPath());
                         }
                     } catch (Exception e) {
-                        logger.error("destroy expired failed: file path: {}, 
file type: {}",
+                        logger.error("Destroyed expired file failed, file 
path: {}, file type: {}",
                             filePath, fileType, e);
                     }
                 }
             });
         } catch (Exception e) {
-            logger.error("destroy expired file failed: file path: {}, file 
type: {}", filePath, fileType);
+            logger.error("Destroyed expired file, file path: {}, file type: 
{}", filePath, fileType);
         }
     }
 
@@ -520,7 +520,7 @@ public class TieredFlatFile {
                             this.updateFileSegment(segment);
                         } catch (Exception e) {
                             // TODO handle update segment metadata failed 
exception
-                            logger.error("update file segment metadata failed: 
" +
+                            logger.error("Update file segment metadata failed: 
" +
                                     "file path: {}, file type: {}, base 
offset: {}",
                                 filePath, fileType, segment.getBaseOffset(), 
e);
                         }
@@ -531,7 +531,7 @@ public class TieredFlatFile {
                 );
             }
         } catch (Exception e) {
-            logger.error("commit file segment failed: topic: {}, queue: {}, 
file type: {}", filePath, fileType, e);
+            logger.error("Commit file segment failed: topic: {}, queue: {}, 
file type: {}", filePath, fileType, e);
         }
         if (sync) {
             CompletableFuture.allOf(futureList.toArray(new 
CompletableFuture[0])).join();
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredIndexFile.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredIndexFile.java
index 0acf4b197..50beb01ae 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredIndexFile.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredIndexFile.java
@@ -44,18 +44,21 @@ public class TieredIndexFile {
 
     private static final Logger logger = 
LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
 
-    public static final int INDEX_FILE_BEGIN_MAGIC_CODE = 0xCCDDEEFF ^ 
1880681586 + 4;
-    public static final int INDEX_FILE_END_MAGIC_CODE = 0xCCDDEEFF ^ 
1880681586 + 8;
-    public static final int INDEX_FILE_HEADER_SIZE = 28;
-    public static final int INDEX_FILE_HASH_SLOT_SIZE = 8;
-    public static final int INDEX_FILE_HASH_ORIGIN_INDEX_SIZE = 32;
-    public static final int INDEX_FILE_HASH_COMPACT_INDEX_SIZE = 28;
-
+    // header format:
+    // magic code(4) + begin timestamp(8) + end timestamp(8) + slot num(4) + 
index num(4)
     public static final int INDEX_FILE_HEADER_MAGIC_CODE_POSITION = 0;
     public static final int INDEX_FILE_HEADER_BEGIN_TIME_STAMP_POSITION = 4;
     public static final int INDEX_FILE_HEADER_END_TIME_STAMP_POSITION = 12;
     public static final int INDEX_FILE_HEADER_SLOT_NUM_POSITION = 20;
     public static final int INDEX_FILE_HEADER_INDEX_NUM_POSITION = 24;
+    public static final int INDEX_FILE_HEADER_SIZE = 28;
+
+    // index item
+    public static final int INDEX_FILE_BEGIN_MAGIC_CODE = 0xCCDDEEFF ^ 
1880681586 + 4;
+    public static final int INDEX_FILE_END_MAGIC_CODE = 0xCCDDEEFF ^ 
1880681586 + 8;
+    public static final int INDEX_FILE_HASH_SLOT_SIZE = 8;
+    public static final int INDEX_FILE_HASH_ORIGIN_INDEX_SIZE = 32;
+    public static final int INDEX_FILE_HASH_COMPACT_INDEX_SIZE = 28;
 
     private static final String INDEX_FILE_DIR_NAME = "tiered_index_file";
     private static final String CUR_INDEX_FILE_NAME = "0000";
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java
index 60f3b1468..3ca0fb614 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java
@@ -259,14 +259,14 @@ public class TieredStoreMetricsManager {
         cacheCount = meter.gaugeBuilder(GAUGE_CACHE_COUNT)
             .setDescription("Tiered store cache message count")
             .ofLongs()
-            .buildWithCallback(measurement -> 
measurement.record(fetcher.getReadAheadCache().estimatedSize(), 
newAttributesBuilder().build()));
+            .buildWithCallback(measurement -> 
measurement.record(fetcher.getMessageCache().estimatedSize(), 
newAttributesBuilder().build()));
 
         cacheBytes = meter.gaugeBuilder(GAUGE_CACHE_BYTES)
             .setDescription("Tiered store cache message bytes")
             .setUnit("bytes")
             .ofLongs()
             .buildWithCallback(measurement -> {
-                Optional<Policy.Eviction<MessageCacheKey, 
SelectMappedBufferResultWrapper>> eviction = 
fetcher.getReadAheadCache().policy().eviction();
+                Optional<Policy.Eviction<MessageCacheKey, 
SelectMappedBufferResultWrapper>> eviction = 
fetcher.getMessageCache().policy().eviction();
                 eviction.ifPresent(resultEviction -> 
measurement.record(resultEviction.weightedSize().orElse(0), 
newAttributesBuilder().build()));
             });
 
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredCommitLogInputStream.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredCommitLogInputStream.java
index c988d42fa..c70bb7656 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredCommitLogInputStream.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredCommitLogInputStream.java
@@ -78,7 +78,8 @@ public class TieredCommitLogInputStream extends 
TieredFileSegmentInputStream {
             commitLogOffset += readPosInCurBuffer;
             readPosInCurBuffer = 0;
         }
-        if (readPosInCurBuffer >= MessageBufferUtil.PHYSICAL_OFFSET_POSITION 
&& readPosInCurBuffer < MessageBufferUtil.SYS_FLAG_OFFSET_POSITION) {
+        if (readPosInCurBuffer >= MessageBufferUtil.PHYSICAL_OFFSET_POSITION
+            && readPosInCurBuffer < 
MessageBufferUtil.SYS_FLAG_OFFSET_POSITION) {
             res = (int) ((commitLogOffset >> (8 * 
(MessageBufferUtil.SYS_FLAG_OFFSET_POSITION - readPosInCurBuffer - 1))) & 0xff);
             readPosInCurBuffer++;
         } else {
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
index 209afbbfc..df3720bab 100644
--- 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.tieredstore;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.SystemUtils;
 import org.apache.commons.lang3.tuple.Triple;
@@ -141,9 +142,9 @@ public class TieredMessageFetcherTest {
         Assert.assertNotNull(flatFile);
 
         fetcher.recordCacheAccess(flatFile, "prevent-invalid-cache", 0, new 
ArrayList<>());
-        Assert.assertEquals(0, fetcher.readAheadCache.estimatedSize());
+        Assert.assertEquals(0, fetcher.getMessageCache().estimatedSize());
         fetcher.putMessageToCache(flatFile, 0, new SelectMappedBufferResult(0, 
msg1, msg1.remaining(), null), 0, 0, 1);
-        Assert.assertEquals(1, fetcher.readAheadCache.estimatedSize());
+        Assert.assertEquals(1, fetcher.getMessageCache().estimatedSize());
 
         GetMessageResult getMessageResult = 
fetcher.getMessageFromCacheAsync(flatFile, "group", 0, 32).join();
         Assert.assertEquals(GetMessageStatus.FOUND, 
getMessageResult.getStatus());
@@ -151,21 +152,22 @@ public class TieredMessageFetcherTest {
         Assert.assertEquals(msg1, 
getMessageResult.getMessageBufferList().get(0));
 
         Awaitility.waitAtMost(3, TimeUnit.SECONDS)
-            .until(() -> fetcher.readAheadCache.estimatedSize() == 2);
+            .until(() -> fetcher.getMessageCache().estimatedSize() == 2);
         ArrayList<SelectMappedBufferResultWrapper> wrapperList = new 
ArrayList<>();
         wrapperList.add(fetcher.getMessageFromCache(flatFile, 0));
         fetcher.recordCacheAccess(flatFile, "prevent-invalid-cache", 0, 
wrapperList);
-        Assert.assertEquals(1, fetcher.readAheadCache.estimatedSize());
+        Assert.assertEquals(1, fetcher.getMessageCache().estimatedSize());
         wrapperList.clear();
         wrapperList.add(fetcher.getMessageFromCache(flatFile, 1));
         fetcher.recordCacheAccess(flatFile, "prevent-invalid-cache", 0, 
wrapperList);
-        Assert.assertEquals(1, fetcher.readAheadCache.estimatedSize());
+        Assert.assertEquals(1, fetcher.getMessageCache().estimatedSize());
 
-        SelectMappedBufferResult messageFromCache = 
fetcher.getMessageFromCache(flatFile, 1).getDuplicateResult();
+        SelectMappedBufferResult messageFromCache =
+            Objects.requireNonNull(fetcher.getMessageFromCache(flatFile, 
1)).getDuplicateResult();
         fetcher.recordCacheAccess(flatFile, "group", 0, wrapperList);
         Assert.assertNotNull(messageFromCache);
         Assert.assertEquals(msg2, messageFromCache.getByteBuffer());
-        Assert.assertEquals(0, fetcher.readAheadCache.estimatedSize());
+        Assert.assertEquals(0, fetcher.getMessageCache().estimatedSize());
     }
 
     @Test


Reply via email to