This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 9899be15e [ISSUE #6933] Support recreate file if local cq and tiered 
storage offset not match
9899be15e is described below

commit 9899be15e7fcac45f60988ea84c41c1a6f615554
Author: lizhimins <[email protected]>
AuthorDate: Fri Jun 23 15:37:57 2023 +0800

    [ISSUE #6933] Support recreate file if local cq and tiered storage offset 
not match
---
 .../rocketmq/tieredstore/TieredDispatcher.java     | 229 +++++++++++++--------
 .../rocketmq/tieredstore/file/TieredCommitLog.java |   2 +-
 .../rocketmq/tieredstore/TieredDispatcherTest.java |  10 +-
 3 files changed, 147 insertions(+), 94 deletions(-)

diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
index d3ed01e86..0d89d305b 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
@@ -60,8 +60,8 @@ public class TieredDispatcher extends ServiceThread 
implements CommitLogDispatch
     private final MessageStore defaultStore;
     private final TieredMessageStoreConfig storeConfig;
     private final TieredFlatFileManager tieredFlatFileManager;
-    private final ReentrantLock dispatchLock;
-    private final ReentrantLock dispatchRequestListLock;
+    private final ReentrantLock dispatchTaskLock;
+    private final ReentrantLock dispatchWriteLock;
 
     private ConcurrentMap<CompositeQueueFlatFile, List<DispatchRequest>> 
dispatchRequestReadMap;
     private ConcurrentMap<CompositeQueueFlatFile, List<DispatchRequest>> 
dispatchRequestWriteMap;
@@ -73,19 +73,18 @@ public class TieredDispatcher extends ServiceThread 
implements CommitLogDispatch
         this.tieredFlatFileManager = 
TieredFlatFileManager.getInstance(storeConfig);
         this.dispatchRequestReadMap = new ConcurrentHashMap<>();
         this.dispatchRequestWriteMap = new ConcurrentHashMap<>();
-        this.dispatchLock = new ReentrantLock();
-        this.dispatchRequestListLock = new ReentrantLock();
+        this.dispatchTaskLock = new ReentrantLock();
+        this.dispatchWriteLock = new ReentrantLock();
         this.initScheduleTask();
     }
 
     private void initScheduleTask() {
-        TieredStoreExecutor.commonScheduledExecutor.scheduleWithFixedDelay(() 
-> {
+        TieredStoreExecutor.commonScheduledExecutor.scheduleWithFixedDelay(() 
->
             tieredFlatFileManager.deepCopyFlatFileToList().forEach(flatFile -> 
{
                 if (!flatFile.getCompositeFlatFileLock().isLocked()) {
                     dispatchFlatFile(flatFile);
                 }
-            });
-        }, 30, 10, TimeUnit.SECONDS);
+            }), 30, 10, TimeUnit.SECONDS);
     }
 
     @Override
@@ -99,43 +98,44 @@ public class TieredDispatcher extends ServiceThread 
implements CommitLogDispatch
             return;
         }
 
-        CompositeQueueFlatFile flatFile =
-            tieredFlatFileManager.getOrCreateFlatFileIfAbsent(new 
MessageQueue(topic, brokerName, request.getQueueId()));
+        CompositeQueueFlatFile flatFile = 
tieredFlatFileManager.getOrCreateFlatFileIfAbsent(
+            new MessageQueue(topic, brokerName, request.getQueueId()));
 
         if (flatFile == null) {
-            logger.error("[Bug]TieredDispatcher#dispatch: dispatch failed, " +
-                "can not create flatFile: topic: {}, queueId: {}", 
request.getTopic(), request.getQueueId());
+            logger.error("[Bug] TieredDispatcher#dispatch: get or create flat 
file failed, skip this request. ",
+                "topic: {}, queueId: {}", request.getTopic(), 
request.getQueueId());
             return;
         }
 
-        // prevent consume queue and index file falling too far
-        int groupCommitCount = storeConfig.getTieredStoreMaxGroupCommitCount();
-        if (dispatchRequestWriteMap.getOrDefault(flatFile, 
Collections.emptyList()).size() > groupCommitCount
-            || dispatchRequestReadMap.getOrDefault(flatFile, 
Collections.emptyList()).size() > groupCommitCount) {
+        if (detectFallBehind(flatFile)) {
             return;
         }
 
-        // init dispatch offset
+        // Set cq offset as commitlog first dispatch offset if flat file first 
init
         if (flatFile.getDispatchOffset() == -1) {
             flatFile.initOffset(request.getConsumeQueueOffset());
         }
 
         if (request.getConsumeQueueOffset() == flatFile.getDispatchOffset()) {
+
+            // In order to ensure the efficiency of dispatch operation and 
avoid high dispatch delay,
+            // it is not allowed to block for a long time here.
             try {
+                // Acquired flat file write lock to append commitlog
                 if (flatFile.getCompositeFlatFileLock().isLocked()
                     || !flatFile.getCompositeFlatFileLock().tryLock(3, 
TimeUnit.MILLISECONDS)) {
                     return;
                 }
             } catch (Exception e) {
-                logger.warn("TieredDispatcher#dispatch: dispatch failed, " +
-                    "can not get flatFile lock: topic: {}, queueId: {}", 
request.getTopic(), request.getQueueId(), e);
+                logger.warn("Temporarily skip dispatch request because we can 
not acquired write lock. " +
+                    "topic: {}, queueId: {}", request.getTopic(), 
request.getQueueId(), e);
                 if (flatFile.getCompositeFlatFileLock().isLocked()) {
                     flatFile.getCompositeFlatFileLock().unlock();
                 }
                 return;
             }
 
-            // double check
+            // double check whether the offset matches
             if (request.getConsumeQueueOffset() != 
flatFile.getDispatchOffset()) {
                 flatFile.getCompositeFlatFileLock().unlock();
                 return;
@@ -160,19 +160,20 @@ public class TieredDispatcher extends ServiceThread 
implements CommitLogDispatch
                 }
                 AppendResult result = 
flatFile.appendCommitLog(message.getByteBuffer());
                 long newCommitLogOffset = flatFile.getCommitLogMaxOffset() - 
message.getByteBuffer().remaining();
-                handleAppendCommitLogResult(result, flatFile, 
request.getConsumeQueueOffset(), flatFile.getDispatchOffset(),
+                doRedispatchRequestToWriteMap(result, flatFile, 
request.getConsumeQueueOffset(),
                     newCommitLogOffset, request.getMsgSize(), 
request.getTagsCode(), message.getByteBuffer());
 
                 if (result == AppendResult.SUCCESS) {
                     Attributes attributes = 
TieredStoreMetricsManager.newAttributesBuilder()
                         .put(TieredStoreMetricsConstant.LABEL_TOPIC, 
request.getTopic())
                         .put(TieredStoreMetricsConstant.LABEL_QUEUE_ID, 
request.getQueueId())
-                        .put(TieredStoreMetricsConstant.LABEL_FILE_TYPE, 
FileSegmentType.COMMIT_LOG.name().toLowerCase())
+                        .put(TieredStoreMetricsConstant.LABEL_FILE_TYPE,
+                            FileSegmentType.COMMIT_LOG.name().toLowerCase())
                         .build();
                     TieredStoreMetricsManager.messagesDispatchTotal.add(1, 
attributes);
                 }
             } catch (Exception throwable) {
-                logger.error("TieredDispatcher#dispatch: dispatch failed: " +
+                logger.error("TieredDispatcher#dispatch: dispatch has 
unexpected problem. " +
                         "topic: {}, queueId: {}, queue offset: {}", 
request.getTopic(), request.getQueueId(),
                     request.getConsumeQueueOffset(), throwable);
             } finally {
@@ -202,8 +203,7 @@ public class TieredDispatcher extends ServiceThread 
implements CommitLogDispatch
             try {
                 dispatchFlatFile(flatFile);
             } catch (Throwable throwable) {
-                logger.error("[Bug]TieredDispatcher#dispatchFlatFileAsync 
dispatch failed, " +
-                        "can not dispatch, topic: {}, queueId: {}",
+                logger.error("[Bug] TieredDispatcher#dispatchFlatFileAsync 
failed, topic: {}, queueId: {}",
                     flatFile.getMessageQueue().getTopic(), 
flatFile.getMessageQueue().getQueueId(), throwable);
             }
 
@@ -218,7 +218,7 @@ public class TieredDispatcher extends ServiceThread 
implements CommitLogDispatch
             return;
         }
 
-        if (flatFile.getDispatchOffset() == -1) {
+        if (flatFile.getDispatchOffset() == -1L) {
             return;
         }
 
@@ -234,6 +234,7 @@ public class TieredDispatcher extends ServiceThread 
implements CommitLogDispatch
         long minOffsetInQueue = defaultStore.getMinOffsetInQueue(topic, 
queueId);
         long maxOffsetInQueue = defaultStore.getMaxOffsetInQueue(topic, 
queueId);
 
+        // perhaps it was caused by local cq file corruption or ha truncation
         if (beforeOffset >= maxOffsetInQueue) {
             return;
         }
@@ -243,8 +244,8 @@ public class TieredDispatcher extends ServiceThread 
implements CommitLogDispatch
                 return;
             }
         } catch (Exception e) {
-            logger.warn("TieredDispatcher#dispatchFlatFile: dispatch failed, " 
+
-                "can not get flatFile lock: topic: {}, queueId: {}", 
mq.getTopic(), mq.getQueueId(), e);
+            logger.warn("TieredDispatcher#dispatchFlatFile: can not acquire 
flatFile lock, " +
+                "topic: {}, queueId: {}", mq.getTopic(), mq.getQueueId(), e);
             if (flatFile.getCompositeFlatFileLock().isLocked()) {
                 flatFile.getCompositeFlatFileLock().unlock();
             }
@@ -252,25 +253,30 @@ public class TieredDispatcher extends ServiceThread 
implements CommitLogDispatch
         }
 
         try {
-            long queueOffset = flatFile.getDispatchOffset();
-            if (minOffsetInQueue > queueOffset) {
-                logger.warn("BlobDispatcher#dispatchFlatFile: " +
-                        "message that needs to be dispatched does not exist: " 
+
-                        "topic: {}, queueId: {}, message queue offset: {}, min 
queue offset: {}",
-                    topic, queueId, queueOffset, minOffsetInQueue);
+            long dispatchOffset = flatFile.getDispatchOffset();
+            if (dispatchOffset < minOffsetInQueue) {
+                // If the tiered storage feature is turned off midway,
+                // it may cause cq discontinuity, resulting in data loss here.
+                logger.warn("TieredDispatcher#dispatchFlatFile: dispatch 
offset is too small, " +
+                        "topic: {}, queueId: {}, dispatch offset: {}, local cq 
offset range {}-{}",
+                    topic, queueId, dispatchOffset, minOffsetInQueue, 
maxOffsetInQueue);
                 flatFile.initOffset(minOffsetInQueue);
-                queueOffset = minOffsetInQueue;
+                dispatchOffset = minOffsetInQueue;
             }
-            beforeOffset = queueOffset;
+            beforeOffset = dispatchOffset;
 
-            // TODO flow control based on message size
-            long limit = Math.min(queueOffset + 100000, maxOffsetInQueue);
+            // flow control by max count, also we could do flow control based 
on message size
+            long maxCount = storeConfig.getTieredStoreGroupCommitCount();
+            long upperBound = Math.min(dispatchOffset + maxCount, 
maxOffsetInQueue);
             ConsumeQueue consumeQueue = (ConsumeQueue) 
defaultStore.getConsumeQueue(topic, queueId);
-            for (; queueOffset < limit; queueOffset++) {
-                SelectMappedBufferResult cqItem = 
consumeQueue.getIndexBuffer(queueOffset);
+
+            for (; dispatchOffset < upperBound; dispatchOffset++) {
+                // get consume queue
+                SelectMappedBufferResult cqItem = 
consumeQueue.getIndexBuffer(dispatchOffset);
                 if (cqItem == null) {
-                    logger.error("[Bug]TieredDispatcher#dispatchFlatFile: 
dispatch failed, " +
-                        "can not get cq item: topic: {}, queueId: {}, offset: 
{}", topic, queueId, queueOffset);
+                    logger.error("[Bug] TieredDispatcher#dispatchFlatFile: cq 
item is null, " +
+                            "topic: {}, queueId: {}, dispatch offset: {}, 
local cq offset range {}-{}",
+                        topic, queueId, dispatchOffset, minOffsetInQueue, 
maxOffsetInQueue);
                     return;
                 }
                 long commitLogOffset = 
CQItemBufferUtil.getCommitLogOffset(cqItem.getByteBuffer());
@@ -278,28 +284,34 @@ public class TieredDispatcher extends ServiceThread 
implements CommitLogDispatch
                 long tagCode = 
CQItemBufferUtil.getTagCode(cqItem.getByteBuffer());
                 cqItem.release();
 
+                // get message
                 SelectMappedBufferResult message = 
defaultStore.selectOneMessageByOffset(commitLogOffset, size);
                 if (message == null) {
-                    logger.error("TieredDispatcher#dispatchFlatFile: dispatch 
failed, " +
-                            "can not get message from next store: topic: {}, 
queueId: {}, commitLog offset: {}, size: {}",
+                    logger.error("TieredDispatcher#dispatchFlatFile: get 
message from next store failed, " +
+                            "topic: {}, queueId: {}, commitLog offset: {}, 
size: {}",
                         topic, queueId, commitLogOffset, size);
                     break;
                 }
+
+                // append commitlog will increase dispatch offset here
                 AppendResult result = 
flatFile.appendCommitLog(message.getByteBuffer(), true);
                 long newCommitLogOffset = flatFile.getCommitLogMaxOffset() - 
message.getByteBuffer().remaining();
-                handleAppendCommitLogResult(result, flatFile, queueOffset, 
flatFile.getDispatchOffset(), newCommitLogOffset, size, tagCode, 
message.getByteBuffer());
+                doRedispatchRequestToWriteMap(
+                    result, flatFile, dispatchOffset, newCommitLogOffset, 
size, tagCode, message.getByteBuffer());
                 message.release();
                 if (result != AppendResult.SUCCESS) {
-                    queueOffset--;
+                    dispatchOffset--;
                     break;
                 }
             }
+
             Attributes attributes = 
TieredStoreMetricsManager.newAttributesBuilder()
                 .put(TieredStoreMetricsConstant.LABEL_TOPIC, mq.getTopic())
                 .put(TieredStoreMetricsConstant.LABEL_QUEUE_ID, 
mq.getQueueId())
                 .put(TieredStoreMetricsConstant.LABEL_FILE_TYPE, 
FileSegmentType.COMMIT_LOG.name().toLowerCase())
                 .build();
-            TieredStoreMetricsManager.messagesDispatchTotal.add(queueOffset - 
beforeOffset, attributes);
+
+            TieredStoreMetricsManager.messagesDispatchTotal.add(dispatchOffset 
- beforeOffset, attributes);
         } finally {
             flatFile.getCompositeFlatFileLock().unlock();
         }
@@ -310,8 +322,10 @@ public class TieredDispatcher extends ServiceThread 
implements CommitLogDispatch
         }
     }
 
-    public void handleAppendCommitLogResult(AppendResult result, 
CompositeQueueFlatFile flatFile,
-        long queueOffset, long dispatchOffset, long newCommitLogOffset, int 
size, long tagCode, ByteBuffer message) {
+    // Submit cq to write map if append commitlog success
+    public void doRedispatchRequestToWriteMap(AppendResult result, 
CompositeQueueFlatFile flatFile,
+        long queueOffset, long newCommitLogOffset, int size, long tagCode, 
ByteBuffer message) {
+
         MessageQueue mq = flatFile.getMessageQueue();
         String topic = mq.getTopic();
         int queueId = mq.getQueueId();
@@ -322,18 +336,22 @@ public class TieredDispatcher extends ServiceThread 
implements CommitLogDispatch
             case OFFSET_INCORRECT:
                 long offset = MessageBufferUtil.getQueueOffset(message);
                 if (queueOffset != offset) {
-                    logger.error("[Bug]Dispatch append commit log, result={}, 
offset={}, msg offset={}", queueOffset, offset);
+                    logger.error("[Bug] Commitlog offset incorrect, " +
+                            "result={}, topic={}, queueId={}, offset={}, msg 
offset={}",
+                        result, topic, queueId, queueOffset, offset);
                 }
                 return;
             case BUFFER_FULL:
-                logger.debug("Commitlog buffer full, result={}, topic={}, 
queueId={}, offset={}",result, topic, queueId, queueOffset);
+                logger.debug("Commitlog buffer full, result={}, topic={}, 
queueId={}, offset={}",
+                    result, topic, queueId, queueOffset);
                 return;
             default:
-                logger.info("Commitlog append failed, result={}, topic={}, 
queueId={}, offset={}", result, topic, queueId, queueOffset);
+                logger.info("Commitlog append failed, result={}, topic={}, 
queueId={}, offset={}",
+                    result, topic, queueId, queueOffset);
                 return;
         }
 
-        dispatchRequestListLock.lock();
+        dispatchWriteLock.lock();
         try {
             Map<String, String> properties = 
MessageBufferUtil.getProperties(message);
             DispatchRequest dispatchRequest = new DispatchRequest(
@@ -348,49 +366,65 @@ public class TieredDispatcher extends ServiceThread 
implements CommitLogDispatch
                 
properties.getOrDefault(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, 
""),
                 0, 0, new HashMap<>());
             
dispatchRequest.setOffsetId(MessageBufferUtil.getOffsetId(message));
-            List<DispatchRequest> requestList = 
dispatchRequestWriteMap.computeIfAbsent(flatFile, k -> new ArrayList<>());
+            List<DispatchRequest> requestList =
+                dispatchRequestWriteMap.computeIfAbsent(flatFile, k -> new 
ArrayList<>());
             requestList.add(dispatchRequest);
             if (requestList.get(0).getConsumeQueueOffset() >= 
flatFile.getConsumeQueueMaxOffset()) {
                 wakeup();
             }
         } finally {
-            dispatchRequestListLock.unlock();
+            dispatchWriteLock.unlock();
         }
     }
 
     public void swapDispatchRequestList() {
-        dispatchRequestListLock.lock();
+        dispatchWriteLock.lock();
         try {
             dispatchRequestReadMap = dispatchRequestWriteMap;
             dispatchRequestWriteMap = new ConcurrentHashMap<>();
         } finally {
-            dispatchRequestListLock.unlock();
+            dispatchWriteLock.unlock();
         }
     }
 
-    public void sendBackDispatchRequestList() {
-        if (!dispatchRequestReadMap.isEmpty()) {
-            dispatchRequestListLock.lock();
-            try {
-                dispatchRequestReadMap.forEach((flatFile, requestList) -> {
-                    if (requestList.isEmpty()) {
-                        
logger.warn("[Bug]TieredDispatcher#sendBackDispatchRequestList: requestList is 
empty, no need to send back: topic: {}, queueId: {}", 
flatFile.getMessageQueue().getTopic(), flatFile.getMessageQueue().getQueueId());
-                        return;
-                    }
-                    List<DispatchRequest> requestListToWrite = 
dispatchRequestWriteMap.computeIfAbsent(flatFile, k -> new ArrayList<>());
-                    if (!requestListToWrite.isEmpty() && 
requestList.get(requestList.size() - 1).getConsumeQueueOffset() > 
requestListToWrite.get(0).getConsumeQueueOffset()) {
-                        
logger.warn("[Bug]TieredDispatcher#sendBackDispatchRequestList: dispatch 
request list is not continuous: topic: {}, queueId: {}, last list max offset: 
{}, new list min offset: {}",
-                            flatFile.getMessageQueue().getTopic(), 
flatFile.getMessageQueue().getQueueId(),
-                            requestList.get(requestList.size() - 
1).getConsumeQueueOffset(), requestListToWrite.get(0).getConsumeQueueOffset());
+    public void copySurvivorObject() {
+        if (dispatchRequestReadMap.isEmpty()) {
+            return;
+        }
+
+        try {
+            dispatchWriteLock.lock();
+            dispatchRequestReadMap.forEach((flatFile, requestList) -> {
+                String topic = flatFile.getMessageQueue().getTopic();
+                int queueId = flatFile.getMessageQueue().getQueueId();
+                if (requestList.isEmpty()) {
+                    logger.warn("Copy survivor object failed, dispatch request 
list is empty, " +
+                        "topic: {}, queueId: {}", topic, queueId);
+                    return;
+                }
+
+                List<DispatchRequest> requestListToWrite =
+                    dispatchRequestWriteMap.computeIfAbsent(flatFile, k -> new 
ArrayList<>());
+
+                if (!requestListToWrite.isEmpty()) {
+                    long readOffset = requestList.get(requestList.size() - 
1).getConsumeQueueOffset();
+                    long writeOffset = 
requestListToWrite.get(0).getConsumeQueueOffset();
+                    if (readOffset > writeOffset) {
+                        logger.warn("Copy survivor object failed, offset in 
request list are not continuous. " +
+                                "topic: {}, queueId: {}, read offset: {}, 
write offset: {}",
+                            topic, queueId, readOffset, writeOffset);
+
+                        // sort request list according cq offset
                         
requestList.sort(Comparator.comparingLong(DispatchRequest::getConsumeQueueOffset));
                     }
-                    requestList.addAll(requestListToWrite);
-                    dispatchRequestWriteMap.put(flatFile, requestList);
-                });
-                dispatchRequestReadMap = new ConcurrentHashMap<>();
-            } finally {
-                dispatchRequestListLock.unlock();
-            }
+                }
+
+                requestList.addAll(requestListToWrite);
+                dispatchRequestWriteMap.put(flatFile, requestList);
+            });
+            dispatchRequestReadMap = new ConcurrentHashMap<>();
+        } finally {
+            dispatchWriteLock.unlock();
         }
     }
 
@@ -425,9 +459,11 @@ public class TieredDispatcher extends ServiceThread 
implements CommitLogDispatch
                 // build consume queue
                 AppendResult result = flatFile.appendConsumeQueue(request, 
true);
 
+                // handle build cq result
                 if (AppendResult.SUCCESS.equals(result)) {
                     long cqCount = cqMetricsMap.computeIfAbsent(messageQueue, 
key -> 0L);
                     cqMetricsMap.put(messageQueue, cqCount + 1);
+
                     // build index
                     if (storeConfig.isMessageIndexEnable()) {
                         result = flatFile.appendIndexFile(request);
@@ -436,7 +472,8 @@ public class TieredDispatcher extends ServiceThread 
implements CommitLogDispatch
                             ifMetricsMap.put(messageQueue, ifCount + 1);
                             iterator.remove();
                         } else {
-                            logger.warn("Build indexFile failed, result: {}, 
topic: {}, queue: {}, queue offset: {}",
+                            logger.warn("Build index failed, skip this 
message, " +
+                                    "result: {}, topic: {}, queue: {}, request 
offset: {}",
                                 result, request.getTopic(), 
request.getQueueId(), request.getConsumeQueueOffset());
                         }
                     }
@@ -444,15 +481,30 @@ public class TieredDispatcher extends ServiceThread 
implements CommitLogDispatch
                 }
 
                 if (AppendResult.OFFSET_INCORRECT.equals(result)) {
-                    logger.error("Build consumeQueue and indexFile failed, 
offset is messed up, " +
-                            "try to rebuild cq: topic: {}, queue: {}, queue 
offset: {}, max queue offset: {}",
-                        request.getTopic(), request.getQueueId(),
+                    logger.error("Consume queue offset incorrect, try to 
recreated consume queue, " +
+                            "result: {}, topic: {}, queue: {}, request offset: 
{}, current cq offset: {}",
+                        result, request.getTopic(), request.getQueueId(),
                         request.getConsumeQueueOffset(), 
flatFile.getConsumeQueueMaxOffset());
 
                     try {
                         flatFile.getCompositeFlatFileLock().lock();
-                        // rollback dispatch offset, this operation will cause 
duplicate message in commitLog
-                        
flatFile.initOffset(flatFile.getConsumeQueueMaxOffset());
+
+                        // reset dispatch offset, this operation will cause 
duplicate message in commitLog
+                        long minOffsetInQueue =
+                            
defaultStore.getMinOffsetInQueue(request.getTopic(), request.getQueueId());
+
+                        // when dispatch offset is smaller than min offset in 
local cq
+                        // some messages may be lost at this time
+                        if (flatFile.getConsumeQueueMaxOffset() < 
minOffsetInQueue) {
+                            // if we use flatFile.destroy() directly will 
cause manager reference leak.
+                            
tieredFlatFileManager.destroyCompositeFile(flatFile.getMessageQueue());
+                            logger.warn("Found cq max offset is smaller than 
local cq min offset, " +
+                                    "so destroy tiered flat file to recreated, 
topic: {}, queueId: {}",
+                                request.getTopic(), request.getQueueId());
+                        } else {
+                            
flatFile.initOffset(flatFile.getConsumeQueueMaxOffset());
+                        }
+
                         // clean invalid dispatch request
                         dispatchRequestWriteMap.remove(flatFile);
                         requestList.clear();
@@ -462,7 +514,8 @@ public class TieredDispatcher extends ServiceThread 
implements CommitLogDispatch
                     break;
                 }
 
-                logger.warn("Build consumeQueue failed, result: {}, topic: {}, 
queue: {}, queue offset: {}",
+                // other append result
+                logger.warn("Append consume queue failed, result: {}, topic: 
{}, queue: {}, request offset: {}",
                     result, request.getTopic(), request.getQueueId(), 
request.getConsumeQueueOffset());
             }
 
@@ -490,18 +543,18 @@ public class TieredDispatcher extends ServiceThread 
implements CommitLogDispatch
             TieredStoreMetricsManager.messagesDispatchTotal.add(count, 
attributes);
         });
 
-        sendBackDispatchRequestList();
+        copySurvivorObject();
     }
 
     // Allow work-stealing
     public void doDispatchTask() {
         try {
-            dispatchLock.lock();
+            dispatchTaskLock.lock();
             buildConsumeQueueAndIndexFile();
         } catch (Exception e) {
-            logger.error("Build consumeQueue and indexFile failed", e);
+            logger.error("Tiered storage do dispatch task failed", e);
         } finally {
-            dispatchLock.unlock();
+            dispatchTaskLock.unlock();
         }
     }
 
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredCommitLog.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredCommitLog.java
index 67e49af55..92aea58be 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredCommitLog.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredCommitLog.java
@@ -111,7 +111,7 @@ public class TieredCommitLog {
         try {
             if (System.currentTimeMillis() - fileSegment.getMaxTimestamp() >
                 
TimeUnit.HOURS.toMillis(storeConfig.getCommitLogRollingInterval())
-                && fileSegment.getSize() > 
storeConfig.getCommitLogRollingMinimumSize()) {
+                && fileSegment.getAppendPosition() > 
storeConfig.getCommitLogRollingMinimumSize()) {
                 flatFile.rollingNewFile();
             }
         } catch (Exception e) {
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredDispatcherTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredDispatcherTest.java
index e5f3f9c6c..e6adef1d1 100644
--- 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredDispatcherTest.java
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredDispatcherTest.java
@@ -118,15 +118,15 @@ public class TieredDispatcherTest {
         flatFile.commitCommitLog();
         Assert.assertEquals(10, flatFile.getDispatchOffset());
 
-        dispatcher.handleAppendCommitLogResult(AppendResult.SUCCESS, flatFile, 
8, 8, 0, 0, 0, buffer1);
-        dispatcher.handleAppendCommitLogResult(AppendResult.SUCCESS, flatFile, 
9, 9, 0, 0, 0, buffer2);
+        dispatcher.doRedispatchRequestToWriteMap(AppendResult.SUCCESS, 
flatFile, 8, 8, 0, 0, buffer1);
+        dispatcher.doRedispatchRequestToWriteMap(AppendResult.SUCCESS, 
flatFile, 9, 9, 0, 0, buffer2);
         dispatcher.buildConsumeQueueAndIndexFile();
         Assert.assertEquals(7, flatFile.getConsumeQueueMaxOffset());
         Assert.assertEquals(7, flatFile.getDispatchOffset());
 
-        dispatcher.handleAppendCommitLogResult(AppendResult.SUCCESS, flatFile, 
7, 7, 0, 0, 0, buffer1);
-        dispatcher.handleAppendCommitLogResult(AppendResult.SUCCESS, flatFile, 
8, 8, 0, 0, 0, buffer2);
-        dispatcher.handleAppendCommitLogResult(AppendResult.SUCCESS, flatFile, 
9, 9, 0, 0, 0, buffer3);
+        dispatcher.doRedispatchRequestToWriteMap(AppendResult.SUCCESS, 
flatFile, 7, 7, 0, 0, buffer1);
+        dispatcher.doRedispatchRequestToWriteMap(AppendResult.SUCCESS, 
flatFile, 8, 8, 0, 0, buffer2);
+        dispatcher.doRedispatchRequestToWriteMap(AppendResult.SUCCESS, 
flatFile, 9, 9, 0, 0, buffer3);
         dispatcher.buildConsumeQueueAndIndexFile();
         Assert.assertEquals(10, flatFile.getConsumeQueueMaxOffset());
     }

Reply via email to