This is an automated email from the ASF dual-hosted git repository.

lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new b2deef179d [ISSUE #7144] Accelerate the recovery speed of the tiered 
storage module (#7145)
b2deef179d is described below

commit b2deef179dbc6a9eb1a2b6dd7b652d95cb768295
Author: lizhimins <[email protected]>
AuthorDate: Thu Aug 10 10:38:47 2023 +0800

    [ISSUE #7144] Accelerate the recovery speed of the tiered storage module 
(#7145)
---
 .../rocketmq/tieredstore/TieredDispatcher.java     |   3 +
 .../rocketmq/tieredstore/TieredMessageStore.java   |   2 +-
 .../tieredstore/common/TieredStoreExecutor.java    |  25 ++--
 .../tieredstore/file/CompositeFlatFile.java        |  15 +--
 .../tieredstore/file/CompositeQueueFlatFile.java   |  20 ++-
 .../rocketmq/tieredstore/file/TieredCommitLog.java |  24 ++--
 .../rocketmq/tieredstore/file/TieredFlatFile.java  |  42 ++++---
 .../tieredstore/file/TieredFlatFileManager.java    | 135 ++++++++++++---------
 .../tieredstore/metadata/FileSegmentMetadata.java  |  26 +++-
 .../rocketmq/tieredstore/TieredDispatcherTest.java |  15 ++-
 .../tieredstore/TieredMessageFetcherTest.java      |   2 +-
 .../file/CompositeQueueFlatFileTest.java           |   2 +-
 .../file/TieredFlatFileManagerTest.java            |   7 +-
 13 files changed, 194 insertions(+), 124 deletions(-)

diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
index bb58ea7dd5..1746190cdb 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
@@ -279,6 +279,9 @@ public class TieredDispatcher extends ServiceThread 
implements CommitLogDispatch
             long upperBound = Math.min(dispatchOffset + maxCount, 
maxOffsetInQueue);
             ConsumeQueue consumeQueue = (ConsumeQueue) 
defaultStore.getConsumeQueue(topic, queueId);
 
+            logger.debug("DispatchFlatFile race, topic={}, queueId={}, cq 
range={}-{}, dispatch offset={}-{}",
+                topic, queueId, minOffsetInQueue, maxOffsetInQueue, 
dispatchOffset, upperBound - 1);
+
             for (; dispatchOffset < upperBound; dispatchOffset++) {
                 // get consume queue
                 SelectMappedBufferResult cqItem = 
consumeQueue.getIndexBuffer(dispatchOffset);
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
index 1f12410f2e..ced1fb8181 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
@@ -147,7 +147,7 @@ public class TieredMessageStore extends 
AbstractPluginMessageStore {
         int queueId, long offset, int maxMsgNums, MessageFilter messageFilter) 
{
 
         if (!viaTieredStorage(topic, queueId, offset, maxMsgNums)) {
-            logger.debug("GetMessageAsync from next store topic: {}, queue: 
{}, offset: {}", topic, queueId, offset);
+            logger.trace("GetMessageAsync from next store topic: {}, queue: 
{}, offset: {}", topic, queueId, offset);
             return next.getMessageAsync(group, topic, queueId, offset, 
maxMsgNums, messageFilter);
         }
 
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java
index 6eb3478b3d..6dd0e8846e 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java
@@ -43,18 +43,9 @@ public class TieredStoreExecutor {
     public static ExecutorService compactIndexFileExecutor;
 
     public static void init() {
-        dispatchThreadPoolQueue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
-        dispatchExecutor = new ThreadPoolExecutor(
-            Math.max(2, Runtime.getRuntime().availableProcessors()),
-            Math.max(16, Runtime.getRuntime().availableProcessors() * 4),
-            1000 * 60,
-            TimeUnit.MILLISECONDS,
-            dispatchThreadPoolQueue,
-            new ThreadFactoryImpl("TieredCommonExecutor_"));
-
         commonScheduledExecutor = new ScheduledThreadPoolExecutor(
             Math.max(4, Runtime.getRuntime().availableProcessors()),
-            new ThreadFactoryImpl("TieredCommonScheduledExecutor_"));
+            new ThreadFactoryImpl("TieredCommonExecutor_"));
 
         commitExecutor = new ScheduledThreadPoolExecutor(
             Math.max(16, Runtime.getRuntime().availableProcessors() * 4),
@@ -62,7 +53,17 @@ public class TieredStoreExecutor {
 
         cleanExpiredFileExecutor = new ScheduledThreadPoolExecutor(
             Math.max(4, Runtime.getRuntime().availableProcessors()),
-            new ThreadFactoryImpl("TieredCleanExpiredFileExecutor_"));
+            new ThreadFactoryImpl("TieredCleanFileExecutor_"));
+
+        dispatchThreadPoolQueue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
+        dispatchExecutor = new ThreadPoolExecutor(
+            Math.max(2, Runtime.getRuntime().availableProcessors()),
+            Math.max(16, Runtime.getRuntime().availableProcessors() * 4),
+            1000 * 60,
+            TimeUnit.MILLISECONDS,
+            dispatchThreadPoolQueue,
+            new ThreadFactoryImpl("TieredDispatchExecutor_"),
+            new ThreadPoolExecutor.DiscardOldestPolicy());
 
         fetchDataThreadPoolQueue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
         fetchDataExecutor = new ThreadPoolExecutor(
@@ -71,7 +72,7 @@ public class TieredStoreExecutor {
             1000 * 60,
             TimeUnit.MILLISECONDS,
             fetchDataThreadPoolQueue,
-            new ThreadFactoryImpl("TieredFetchDataExecutor_"));
+            new ThreadFactoryImpl("TieredFetchExecutor_"));
 
         compactIndexFileThreadPoolQueue = new 
LinkedBlockingQueue<>(QUEUE_CAPACITY);
         compactIndexFileExecutor = new ThreadPoolExecutor(
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java
index df4baf33f4..5ad3a6ff32 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java
@@ -76,20 +76,15 @@ public class CompositeFlatFile implements CompositeAccess {
         this.storeConfig = fileQueueFactory.getStoreConfig();
         this.readAheadFactor = this.storeConfig.getReadAheadMinFactor();
         this.metadataStore = 
TieredStoreUtil.getMetadataStore(this.storeConfig);
-        this.dispatchOffset = new AtomicLong();
         this.compositeFlatFileLock = new ReentrantLock();
         this.inFlightRequestMap = new ConcurrentHashMap<>();
         this.commitLog = new TieredCommitLog(fileQueueFactory, filePath);
         this.consumeQueue = new TieredConsumeQueue(fileQueueFactory, filePath);
+        this.dispatchOffset = new AtomicLong(
+            this.consumeQueue.isInitialized() ? 
this.getConsumeQueueCommitOffset() : -1L);
         this.groupOffsetCache = this.initOffsetCache();
     }
 
-    protected void recoverMetadata() {
-        if (!consumeQueue.isInitialized() && this.dispatchOffset.get() != -1) {
-            consumeQueue.setBaseOffset(this.dispatchOffset.get() * 
TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
-        }
-    }
-
     private Cache<String, Long> initOffsetCache() {
         return Caffeine.newBuilder()
             .expireAfterWrite(2, TimeUnit.MINUTES)
@@ -310,10 +305,12 @@ public class CompositeFlatFile implements CompositeAccess 
{
 
     @Override
     public void initOffset(long offset) {
-        if (!consumeQueue.isInitialized()) {
+        if (consumeQueue.isInitialized()) {
+            dispatchOffset.set(this.getConsumeQueueCommitOffset());
+        } else {
             consumeQueue.setBaseOffset(offset * 
TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
+            dispatchOffset.set(offset);
         }
-        dispatchOffset.set(offset);
     }
 
     @Override
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java
index f6c0afed05..0a797f465f 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java
@@ -36,8 +36,7 @@ public class CompositeQueueFlatFile extends CompositeFlatFile 
{
     public CompositeQueueFlatFile(TieredFileAllocator fileQueueFactory, 
MessageQueue messageQueue) {
         super(fileQueueFactory, TieredStoreUtil.toPath(messageQueue));
         this.messageQueue = messageQueue;
-        this.recoverTopicMetadata();
-        super.recoverMetadata();
+        this.recoverQueueMetadata();
         this.indexFile = TieredFlatFileManager.getIndexFile(storeConfig);
     }
 
@@ -46,11 +45,12 @@ public class CompositeQueueFlatFile extends 
CompositeFlatFile {
         if (!consumeQueue.isInitialized()) {
             queueMetadata.setMinOffset(offset);
             queueMetadata.setMaxOffset(offset);
+            metadataStore.updateQueue(queueMetadata);
         }
         super.initOffset(offset);
     }
 
-    public void recoverTopicMetadata() {
+    public void recoverQueueMetadata() {
         TopicMetadata topicMetadata = 
this.metadataStore.getTopic(messageQueue.getTopic());
         if (topicMetadata == null) {
             topicMetadata = 
this.metadataStore.addTopic(messageQueue.getTopic(), -1L);
@@ -64,18 +64,16 @@ public class CompositeQueueFlatFile extends 
CompositeFlatFile {
         if (queueMetadata.getMaxOffset() < queueMetadata.getMinOffset()) {
             queueMetadata.setMaxOffset(queueMetadata.getMinOffset());
         }
-        this.dispatchOffset.set(queueMetadata.getMaxOffset());
     }
 
-    public void persistMetadata() {
+    public void flushMetadata() {
         try {
-            if (consumeQueue.getCommitOffset() < queueMetadata.getMinOffset()) 
{
-                return;
-            }
-            queueMetadata.setMaxOffset(consumeQueue.getCommitOffset() / 
TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
+            queueMetadata.setMinOffset(super.getConsumeQueueMinOffset());
+            queueMetadata.setMaxOffset(super.getConsumeQueueMaxOffset());
             metadataStore.updateQueue(queueMetadata);
         } catch (Exception e) {
-            LOGGER.error("CompositeFlatFile#flushMetadata: update queue 
metadata failed: topic: {}, queue: {}", messageQueue.getTopic(), 
messageQueue.getQueueId(), e);
+            LOGGER.error("CompositeFlatFile#flushMetadata error, topic: {}, 
queue: {}",
+                messageQueue.getTopic(), messageQueue.getQueueId(), e);
         }
     }
 
@@ -114,7 +112,7 @@ public class CompositeQueueFlatFile extends 
CompositeFlatFile {
     @Override
     public void shutdown() {
         super.shutdown();
-        metadataStore.updateQueue(queueMetadata);
+        this.flushMetadata();
     }
 
     @Override
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredCommitLog.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredCommitLog.java
index 80e1bce506..0e5f79132f 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredCommitLog.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredCommitLog.java
@@ -50,7 +50,7 @@ public class TieredCommitLog {
         this.storeConfig = fileQueueFactory.getStoreConfig();
         this.flatFile = fileQueueFactory.createFlatFileForCommitLog(filePath);
         this.minConsumeQueueOffset = new AtomicLong(NOT_EXIST_MIN_OFFSET);
-        this.correctMinOffset();
+        this.correctMinOffsetAsync();
     }
 
     @VisibleForTesting
@@ -91,17 +91,26 @@ public class TieredCommitLog {
         return flatFile.getFileToWrite().getMaxTimestamp();
     }
 
-    public synchronized long correctMinOffset() {
+    public long correctMinOffset() {
+        try {
+            return correctMinOffsetAsync().get();
+        } catch (Exception e) {
+            log.error("Correct min offset failed in clean expired file", e);
+        }
+        return NOT_EXIST_MIN_OFFSET;
+    }
+
+    public synchronized CompletableFuture<Long> correctMinOffsetAsync() {
         if (flatFile.getFileSegmentCount() == 0) {
             this.minConsumeQueueOffset.set(NOT_EXIST_MIN_OFFSET);
-            return NOT_EXIST_MIN_OFFSET;
+            return CompletableFuture.completedFuture(NOT_EXIST_MIN_OFFSET);
         }
 
         // queue offset field length is 8
         int length = MessageBufferUtil.QUEUE_OFFSET_POSITION + 8;
         if (flatFile.getCommitOffset() - flatFile.getMinOffset() < length) {
             this.minConsumeQueueOffset.set(NOT_EXIST_MIN_OFFSET);
-            return NOT_EXIST_MIN_OFFSET;
+            return CompletableFuture.completedFuture(NOT_EXIST_MIN_OFFSET);
         }
 
         try {
@@ -109,7 +118,8 @@ public class TieredCommitLog {
                 .thenApply(buffer -> {
                     long offset = MessageBufferUtil.getQueueOffset(buffer);
                     minConsumeQueueOffset.set(offset);
-                    log.info("Correct commitlog min cq offset success, 
filePath={}, min cq offset={}, range={}-{}",
+                    log.debug("Correct commitlog min cq offset success, " +
+                            "filePath={}, min cq offset={}, commitlog 
range={}-{}",
                         flatFile.getFilePath(), offset, 
flatFile.getMinOffset(), flatFile.getCommitOffset());
                     return offset;
                 })
@@ -117,11 +127,11 @@ public class TieredCommitLog {
                     log.warn("Correct commitlog min cq offset error, 
filePath={}, range={}-{}",
                         flatFile.getFilePath(), flatFile.getMinOffset(), 
flatFile.getCommitOffset(), throwable);
                     return minConsumeQueueOffset.get();
-                }).get();
+                });
         } catch (Exception e) {
             log.error("Correct commitlog min cq offset error, filePath={}", 
flatFile.getFilePath(), e);
         }
-        return minConsumeQueueOffset.get();
+        return CompletableFuture.completedFuture(minConsumeQueueOffset.get());
     }
 
     public AppendResult append(ByteBuffer byteBuf) {
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
index 75ce8d89f2..426c4e09d3 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
@@ -16,6 +16,7 @@
  */
 package org.apache.rocketmq.tieredstore.file;
 
+import com.alibaba.fastjson.JSON;
 import com.google.common.annotations.VisibleForTesting;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -24,6 +25,7 @@ import java.util.Comparator;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -178,32 +180,26 @@ public class TieredFlatFile {
     private FileSegmentMetadata 
getOrCreateFileSegmentMetadata(TieredFileSegment fileSegment) {
 
         FileSegmentMetadata metadata = tieredMetadataStore.getFileSegment(
-            fileSegment.getPath(), fileSegment.getFileType(), 
fileSegment.getBaseOffset());
-
-        if (metadata != null) {
-            return metadata;
-        }
+            this.filePath, fileSegment.getFileType(), 
fileSegment.getBaseOffset());
 
         // Note: file segment path may not the same as file base path, use 
base path here.
-        metadata = new FileSegmentMetadata(
-            this.filePath, fileSegment.getBaseOffset(), 
fileSegment.getFileType().getType());
-
-        if (fileSegment.isClosed()) {
-            metadata.setStatus(FileSegmentMetadata.STATUS_DELETED);
+        if (metadata == null) {
+            metadata = new FileSegmentMetadata(
+                this.filePath, fileSegment.getBaseOffset(), 
fileSegment.getFileType().getType());
+            metadata.setCreateTimestamp(fileSegment.getMinTimestamp());
+            metadata.setBeginTimestamp(fileSegment.getMinTimestamp());
+            metadata.setEndTimestamp(fileSegment.getMaxTimestamp());
+            if (fileSegment.isClosed()) {
+                metadata.setStatus(FileSegmentMetadata.STATUS_DELETED);
+            }
+            this.tieredMetadataStore.updateFileSegment(metadata);
         }
-
-        metadata.setBeginTimestamp(fileSegment.getMinTimestamp());
-        metadata.setEndTimestamp(fileSegment.getMaxTimestamp());
-
-        // Submit to persist
-        this.tieredMetadataStore.updateFileSegment(metadata);
         return metadata;
     }
 
     /**
      * FileQueue Status: Sealed | Sealed | Sealed | Not sealed, Allow appended 
&& Not Full
      */
-    @VisibleForTesting
     public void updateFileSegment(TieredFileSegment fileSegment) {
         FileSegmentMetadata segmentMetadata = 
getOrCreateFileSegmentMetadata(fileSegment);
 
@@ -219,9 +215,16 @@ public class TieredFlatFile {
         }
 
         segmentMetadata.setSize(fileSegment.getCommitPosition());
-        segmentMetadata.setBeginTimestamp(fileSegment.getMinTimestamp());
         segmentMetadata.setEndTimestamp(fileSegment.getMaxTimestamp());
-        this.tieredMetadataStore.updateFileSegment(segmentMetadata);
+
+        FileSegmentMetadata metadata = tieredMetadataStore.getFileSegment(
+            this.filePath, fileSegment.getFileType(), 
fileSegment.getBaseOffset());
+
+        if (!Objects.equals(metadata, segmentMetadata)) {
+            this.tieredMetadataStore.updateFileSegment(segmentMetadata);
+            logger.debug("TieredFlatFile#UpdateSegmentMetadata, filePath: {}, 
content: {}",
+                segmentMetadata.getPath(), JSON.toJSONString(segmentMetadata));
+        }
     }
 
     private void checkAndFixFileSize() {
@@ -257,6 +260,7 @@ public class TieredFlatFile {
                 logger.warn("TieredFlatFile#checkAndFixFileSize: fix last file 
{} size: origin: {}, actual: {}",
                     lastFile.getPath(), lastFile.getCommitOffset() - 
lastFile.getBaseOffset(), lastFileSize);
                 lastFile.initPosition(lastFileSize);
+                this.updateFileSegment(lastFile);
             }
         }
     }
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
index aeca44b8cb..e9ae4a5a52 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
@@ -16,16 +16,19 @@
  */
 package org.apache.rocketmq.tieredstore.file;
 
+import com.google.common.base.Stopwatch;
 import com.google.common.collect.ImmutableList;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.Nullable;
+import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
@@ -36,6 +39,7 @@ import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
 
 public class TieredFlatFileManager {
 
+    private static final Logger BROKER_LOG = 
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
     private static final Logger logger = 
LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
 
     private static volatile TieredFlatFileManager instance;
@@ -44,7 +48,7 @@ public class TieredFlatFileManager {
     private final TieredMetadataStore metadataStore;
     private final TieredMessageStoreConfig storeConfig;
     private final TieredFileAllocator tieredFileAllocator;
-    private final ConcurrentMap<MessageQueue, CompositeQueueFlatFile> 
queueFlatFileMap;
+    private final ConcurrentMap<MessageQueue, CompositeQueueFlatFile> 
flatFileConcurrentMap;
 
     public TieredFlatFileManager(TieredMessageStoreConfig storeConfig)
         throws ClassNotFoundException, NoSuchMethodException {
@@ -52,23 +56,20 @@ public class TieredFlatFileManager {
         this.storeConfig = storeConfig;
         this.metadataStore = TieredStoreUtil.getMetadataStore(storeConfig);
         this.tieredFileAllocator = new TieredFileAllocator(storeConfig);
-        this.queueFlatFileMap = new ConcurrentHashMap<>();
+        this.flatFileConcurrentMap = new ConcurrentHashMap<>();
         this.doScheduleTask();
     }
 
     public static TieredFlatFileManager getInstance(TieredMessageStoreConfig 
storeConfig) {
-        if (storeConfig == null) {
+        if (storeConfig == null || instance != null) {
             return instance;
         }
-
-        if (instance == null) {
-            synchronized (TieredFlatFileManager.class) {
-                if (instance == null) {
-                    try {
-                        instance = new TieredFlatFileManager(storeConfig);
-                    } catch (Exception e) {
-                        logger.error("TieredFlatFileManager#getInstance: 
create flat file manager failed", e);
-                    }
+        synchronized (TieredFlatFileManager.class) {
+            if (instance == null) {
+                try {
+                    instance = new TieredFlatFileManager(storeConfig);
+                } catch (Exception e) {
+                    logger.error("Construct FlatFileManager instance error", 
e);
                 }
             }
         }
@@ -88,7 +89,7 @@ public class TieredFlatFileManager {
                             TieredStoreUtil.RMQ_SYS_TIERED_STORE_INDEX_TOPIC, 
storeConfig.getBrokerName(), 0));
                         indexFile = new TieredIndexFile(new 
TieredFileAllocator(storeConfig), filePath);
                     } catch (Exception e) {
-                        logger.error("TieredFlatFileManager#getIndexFile: 
create index file failed", e);
+                        logger.error("Construct FlatFileManager indexFile 
error", e);
                     }
                 }
             }
@@ -105,7 +106,7 @@ public class TieredFlatFileManager {
                     flatFile.commitCommitLog();
                 } catch (Throwable e) {
                     MessageQueue mq = flatFile.getMessageQueue();
-                    logger.error("commit commitLog periodically failed: topic: 
{}, queue: {}",
+                    logger.error("Commit commitLog periodically failed: topic: 
{}, queue: {}",
                         mq.getTopic(), mq.getQueueId(), e);
                 }
             }, delay, TimeUnit.MILLISECONDS);
@@ -114,7 +115,7 @@ public class TieredFlatFileManager {
                     flatFile.commitConsumeQueue();
                 } catch (Throwable e) {
                     MessageQueue mq = flatFile.getMessageQueue();
-                    logger.error("commit consumeQueue periodically failed: 
topic: {}, queue: {}",
+                    logger.error("Commit consumeQueue periodically failed: 
topic: {}, queue: {}",
                         mq.getTopic(), mq.getQueueId(), e);
                 }
             }, delay, TimeUnit.MILLISECONDS);
@@ -125,7 +126,7 @@ public class TieredFlatFileManager {
                     indexFile.commit(true);
                 }
             } catch (Throwable e) {
-                logger.error("commit indexFile periodically failed", e);
+                logger.error("Commit indexFile periodically failed", e);
             }
         }, 0, TimeUnit.MILLISECONDS);
     }
@@ -160,7 +161,7 @@ public class TieredFlatFileManager {
             try {
                 doCommit();
             } catch (Throwable e) {
-                logger.error("commit flat file periodically failed: ", e);
+                logger.error("Commit flat file periodically failed: ", e);
             }
         }, 60, 60, TimeUnit.SECONDS);
 
@@ -168,49 +169,73 @@ public class TieredFlatFileManager {
             try {
                 doCleanExpiredFile();
             } catch (Throwable e) {
-                logger.error("clean expired flat file failed: ", e);
+                logger.error("Clean expired flat file failed: ", e);
             }
         }, 30, 30, TimeUnit.SECONDS);
     }
 
     public boolean load() {
+        Stopwatch stopwatch = Stopwatch.createStarted();
         try {
-            AtomicLong topicSequenceNumber = new AtomicLong();
-            List<Future<?>> futureList = new ArrayList<>();
-            queueFlatFileMap.clear();
-            metadataStore.iterateTopic(topicMetadata -> {
+            flatFileConcurrentMap.clear();
+            this.recoverSequenceNumber();
+            this.recoverTieredFlatFile();
+            logger.info("Message store recover end, total cost={}ms", 
stopwatch.elapsed(TimeUnit.MILLISECONDS));
+        } catch (Exception e) {
+            long costTime = stopwatch.elapsed(TimeUnit.MILLISECONDS);
+            logger.info("Message store recover error, total cost={}ms", 
costTime);
+            BROKER_LOG.error("Message store recover error, total cost={}ms", 
costTime, e);
+            return false;
+        }
+        return true;
+    }
+
+    public void recoverSequenceNumber() {
+        AtomicLong topicSequenceNumber = new AtomicLong();
+        metadataStore.iterateTopic(topicMetadata -> {
+            if (topicMetadata != null && topicMetadata.getTopicId() > 0) {
                 topicSequenceNumber.set(Math.max(topicSequenceNumber.get(), 
topicMetadata.getTopicId()));
-                Future<?> future = 
TieredStoreExecutor.dispatchExecutor.submit(() -> {
-                    if (topicMetadata.getStatus() != 0) {
-                        return;
-                    }
+            }
+        });
+        
metadataStore.setTopicSequenceNumber(topicSequenceNumber.incrementAndGet());
+    }
+
+    public void recoverTieredFlatFile() {
+        Semaphore semaphore = new Semaphore((int) 
(TieredStoreExecutor.QUEUE_CAPACITY * 0.75));
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+        metadataStore.iterateTopic(topicMetadata -> {
+            try {
+                semaphore.acquire();
+                CompletableFuture<Void> future = CompletableFuture.runAsync(() 
-> {
                     try {
-                        metadataStore.iterateQueue(topicMetadata.getTopic(),
-                            queueMetadata -> getOrCreateFlatFileIfAbsent(
-                                new MessageQueue(topicMetadata.getTopic(),
-                                    storeConfig.getBrokerName(),
-                                    queueMetadata.getQueue().getQueueId())));
+                        Stopwatch subWatch = Stopwatch.createStarted();
+                        if (topicMetadata.getStatus() != 0) {
+                            return;
+                        }
+                        AtomicLong queueCount = new AtomicLong();
+                        metadataStore.iterateQueue(topicMetadata.getTopic(), 
queueMetadata -> {
+                            this.getOrCreateFlatFileIfAbsent(new 
MessageQueue(topicMetadata.getTopic(),
+                                storeConfig.getBrokerName(), 
queueMetadata.getQueue().getQueueId()));
+                            queueCount.incrementAndGet();
+                        });
+                        logger.info("Recover TopicFlatFile, topic: {}, 
queueCount: {}, cost: {}ms",
+                            topicMetadata.getTopic(), queueCount.get(), 
subWatch.elapsed(TimeUnit.MILLISECONDS));
                     } catch (Exception e) {
-                        logger.error("load mq composite flat file from 
metadata failed", e);
+                        logger.error("Recover TopicFlatFile error, topic: {}", 
topicMetadata.getTopic(), e);
+                    } finally {
+                        semaphore.release();
                     }
-                });
-                futureList.add(future);
-            });
-
-            // Wait for load all metadata done
-            for (Future<?> future : futureList) {
-                future.get();
+                }, TieredStoreExecutor.commitExecutor);
+                futures.add(future);
+            } catch (Exception e) {
+                throw new RuntimeException(e);
             }
-            
metadataStore.setTopicSequenceNumber(topicSequenceNumber.incrementAndGet());
-        } catch (Exception e) {
-            logger.error("load mq composite flat file from metadata failed", 
e);
-            return false;
-        }
-        return true;
+        });
+        CompletableFuture.allOf(futures.toArray(new 
CompletableFuture[0])).join();
     }
 
     public void cleanup() {
-        queueFlatFileMap.clear();
+        flatFileConcurrentMap.clear();
         cleanStaticReference();
     }
 
@@ -221,27 +246,25 @@ public class TieredFlatFileManager {
 
     @Nullable
     public CompositeQueueFlatFile getOrCreateFlatFileIfAbsent(MessageQueue 
messageQueue) {
-        return queueFlatFileMap.computeIfAbsent(messageQueue, mq -> {
+        return flatFileConcurrentMap.computeIfAbsent(messageQueue, mq -> {
             try {
-                
logger.debug("TieredFlatFileManager#getOrCreateFlatFileIfAbsent: " +
-                        "try to create new flat file: topic: {}, queueId: {}",
+                logger.debug("Create new TopicFlatFile, topic: {}, queueId: 
{}",
                     messageQueue.getTopic(), messageQueue.getQueueId());
                 return new CompositeQueueFlatFile(tieredFileAllocator, mq);
             } catch (Exception e) {
-                
logger.error("TieredFlatFileManager#getOrCreateFlatFileIfAbsent: " +
-                        "create new flat file: topic: {}, queueId: {}",
+                logger.debug("Create new TopicFlatFile failed, topic: {}, 
queueId: {}",
                     messageQueue.getTopic(), messageQueue.getQueueId(), e);
-                return null;
             }
+            return null;
         });
     }
 
     public CompositeQueueFlatFile getFlatFile(MessageQueue messageQueue) {
-        return queueFlatFileMap.get(messageQueue);
+        return flatFileConcurrentMap.get(messageQueue);
     }
 
     public ImmutableList<CompositeQueueFlatFile> deepCopyFlatFileToList() {
-        return ImmutableList.copyOf(queueFlatFileMap.values());
+        return ImmutableList.copyOf(flatFileConcurrentMap.values());
     }
 
     public void shutdown() {
@@ -270,7 +293,7 @@ public class TieredFlatFileManager {
         }
 
         // delete memory reference
-        CompositeQueueFlatFile flatFile = queueFlatFileMap.remove(mq);
+        CompositeQueueFlatFile flatFile = flatFileConcurrentMap.remove(mq);
         if (flatFile != null) {
             MessageQueue messageQueue = flatFile.getMessageQueue();
             logger.info("TieredFlatFileManager#destroyCompositeFile: " +
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metadata/FileSegmentMetadata.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metadata/FileSegmentMetadata.java
index 1b232fc750..2f0fd71deb 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metadata/FileSegmentMetadata.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metadata/FileSegmentMetadata.java
@@ -16,6 +16,8 @@
  */
 package org.apache.rocketmq.tieredstore.metadata;
 
+import java.util.Objects;
+
 public class FileSegmentMetadata {
 
     public static final int STATUS_NEW = 0;
@@ -43,7 +45,6 @@ public class FileSegmentMetadata {
         this.baseOffset = baseOffset;
         this.type = type;
         this.status = STATUS_NEW;
-        this.createTimestamp = System.currentTimeMillis();
     }
 
     public void markSealed() {
@@ -122,4 +123,27 @@ public class FileSegmentMetadata {
     public void setSealTimestamp(long sealTimestamp) {
         this.sealTimestamp = sealTimestamp;
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+        FileSegmentMetadata metadata = (FileSegmentMetadata) o;
+        return size == metadata.size
+            && baseOffset == metadata.baseOffset
+            && status == metadata.status
+            && path.equals(metadata.path)
+            && type == metadata.type
+            && createTimestamp == metadata.createTimestamp
+            && beginTimestamp == metadata.beginTimestamp
+            && endTimestamp == metadata.endTimestamp
+            && sealTimestamp == metadata.sealTimestamp;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(type, path, baseOffset, status, size, 
createTimestamp, beginTimestamp, endTimestamp, sealTimestamp);
+    }
 }
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredDispatcherTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredDispatcherTest.java
index e6adef1d1d..5791dc9a4e 100644
--- 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredDispatcherTest.java
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredDispatcherTest.java
@@ -116,19 +116,20 @@ public class TieredDispatcherTest {
         buffer3.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 9);
         flatFile.appendCommitLog(buffer3);
         flatFile.commitCommitLog();
-        Assert.assertEquals(10, flatFile.getDispatchOffset());
+        Assert.assertEquals(9 + 1, flatFile.getDispatchOffset());
+        Assert.assertEquals(9, flatFile.getCommitLogDispatchCommitOffset());
 
         dispatcher.doRedispatchRequestToWriteMap(AppendResult.SUCCESS, 
flatFile, 8, 8, 0, 0, buffer1);
         dispatcher.doRedispatchRequestToWriteMap(AppendResult.SUCCESS, 
flatFile, 9, 9, 0, 0, buffer2);
         dispatcher.buildConsumeQueueAndIndexFile();
         Assert.assertEquals(7, flatFile.getConsumeQueueMaxOffset());
-        Assert.assertEquals(7, flatFile.getDispatchOffset());
 
         dispatcher.doRedispatchRequestToWriteMap(AppendResult.SUCCESS, 
flatFile, 7, 7, 0, 0, buffer1);
         dispatcher.doRedispatchRequestToWriteMap(AppendResult.SUCCESS, 
flatFile, 8, 8, 0, 0, buffer2);
         dispatcher.doRedispatchRequestToWriteMap(AppendResult.SUCCESS, 
flatFile, 9, 9, 0, 0, buffer3);
         dispatcher.buildConsumeQueueAndIndexFile();
-        Assert.assertEquals(10, flatFile.getConsumeQueueMaxOffset());
+        Assert.assertEquals(6, flatFile.getConsumeQueueMinOffset());
+        Assert.assertEquals(9 + 1, flatFile.getConsumeQueueMaxOffset());
     }
 
     @Test
@@ -142,6 +143,7 @@ public class TieredDispatcherTest {
         Mockito.when(defaultStore.getMinOffsetInQueue(mq.getTopic(), 
mq.getQueueId())).thenReturn(0L);
         Mockito.when(defaultStore.getMaxOffsetInQueue(mq.getTopic(), 
mq.getQueueId())).thenReturn(9L);
 
+        // mock cq item, position = 7
         ByteBuffer cqItem = 
ByteBuffer.allocate(ConsumeQueue.CQ_STORE_UNIT_SIZE);
         cqItem.putLong(7);
         cqItem.putInt(MessageBufferUtilTest.MSG_LEN);
@@ -150,13 +152,13 @@ public class TieredDispatcherTest {
         SelectMappedBufferResult mockResult = new SelectMappedBufferResult(0, 
cqItem, ConsumeQueue.CQ_STORE_UNIT_SIZE, null);
         Mockito.when(((ConsumeQueue) 
defaultStore.getConsumeQueue(mq.getTopic(), 
mq.getQueueId())).getIndexBuffer(6)).thenReturn(mockResult);
 
+        // mock cq item, position = 8
         cqItem = ByteBuffer.allocate(ConsumeQueue.CQ_STORE_UNIT_SIZE);
         cqItem.putLong(8);
         cqItem.putInt(MessageBufferUtilTest.MSG_LEN);
         cqItem.putLong(1);
         cqItem.flip();
         mockResult = new SelectMappedBufferResult(0, cqItem, 
ConsumeQueue.CQ_STORE_UNIT_SIZE, null);
-
         Mockito.when(((ConsumeQueue) 
defaultStore.getConsumeQueue(mq.getTopic(), 
mq.getQueueId())).getIndexBuffer(7)).thenReturn(mockResult);
 
         mockResult = new SelectMappedBufferResult(0, 
MessageBufferUtilTest.buildMockedMessageBuffer(), 
MessageBufferUtilTest.MSG_LEN, null);
@@ -167,7 +169,10 @@ public class TieredDispatcherTest {
         mockResult = new SelectMappedBufferResult(0, msg, 
MessageBufferUtilTest.MSG_LEN, null);
         Mockito.when(defaultStore.selectOneMessageByOffset(8, 
MessageBufferUtilTest.MSG_LEN)).thenReturn(mockResult);
 
-        
dispatcher.dispatchFlatFile(flatFileManager.getOrCreateFlatFileIfAbsent(mq));
+        CompositeQueueFlatFile flatFile = 
flatFileManager.getOrCreateFlatFileIfAbsent(mq);
+        Assert.assertNotNull(flatFile);
+        flatFile.initOffset(7);
+        dispatcher.dispatchFlatFile(flatFile);
         Assert.assertEquals(8, 
flatFileManager.getFlatFile(mq).getDispatchOffset());
     }
 }
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
index d75b2f9164..774c6cf646 100644
--- 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
@@ -23,6 +23,7 @@ import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.SystemUtils;
 import org.apache.commons.lang3.tuple.Triple;
+import org.apache.rocketmq.common.BoundaryType;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.store.DispatchRequest;
 import org.apache.rocketmq.store.GetMessageResult;
@@ -40,7 +41,6 @@ import org.apache.rocketmq.tieredstore.file.TieredIndexFile;
 import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
 import org.apache.rocketmq.tieredstore.util.MessageBufferUtilTest;
 import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
-import org.apache.rocketmq.common.BoundaryType;
 import org.awaitility.Awaitility;
 import org.junit.After;
 import org.junit.Assert;
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java
index 27efe111e6..2e028ada32 100644
--- 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java
@@ -119,7 +119,7 @@ public class CompositeQueueFlatFileTest {
         Assert.assertEquals(AppendResult.SUCCESS, result);
 
         file.commit(true);
-        file.persistMetadata();
+        file.flushMetadata();
 
         QueueMetadata queueMetadata = metadataStore.getQueue(mq);
         Assert.assertEquals(53, queueMetadata.getMaxOffset());
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManagerTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManagerTest.java
index b7528c5e4f..20fe4dd702 100644
--- 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManagerTest.java
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManagerTest.java
@@ -72,10 +72,15 @@ public class TieredFlatFileManagerTest {
 
         CompositeFlatFile flatFile = flatFileManager.getFlatFile(mq);
         Assert.assertNotNull(flatFile);
-        Assert.assertEquals(100, flatFile.getDispatchOffset());
+        Assert.assertEquals(-1L, flatFile.getDispatchOffset());
+        flatFile.initOffset(100L);
+        Assert.assertEquals(100L, flatFile.getDispatchOffset());
+        flatFile.initOffset(200L);
+        Assert.assertEquals(100L, flatFile.getDispatchOffset());
 
         CompositeFlatFile flatFile1 = flatFileManager.getFlatFile(mq1);
         Assert.assertNotNull(flatFile1);
+        flatFile1.initOffset(200L);
         Assert.assertEquals(200, flatFile1.getDispatchOffset());
 
         flatFileManager.destroyCompositeFile(mq);


Reply via email to