This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 33cb22e1c0 [ISSUE #7018] fix append in tiered storage when message 
offset incorrect (#7019)
33cb22e1c0 is described below

commit 33cb22e1c0fa7ba980567117230fe443ff5dbd62
Author: lizhimins <[email protected]>
AuthorDate: Thu Jul 13 15:37:24 2023 +0800

    [ISSUE #7018] fix append in tiered storage when message offset incorrect 
(#7019)
    
    * fix append in tiered storage when message offset incorrect
---
 .../rocketmq/tieredstore/TieredDispatcher.java     | 25 +++++++++++-------
 .../tieredstore/file/CompositeFlatFile.java        | 30 +++++++++++-----------
 .../tieredstore/file/CompositeQueueFlatFile.java   |  2 +-
 .../file/CompositeQueueFlatFileTest.java           |  4 +--
 4 files changed, 34 insertions(+), 27 deletions(-)

diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
index 2a8e2ed71f..6584b0e89e 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
@@ -308,9 +308,18 @@ public class TieredDispatcher extends ServiceThread 
implements CommitLogDispatch
                 doRedispatchRequestToWriteMap(
                     result, flatFile, dispatchOffset, newCommitLogOffset, 
size, tagCode, message.getByteBuffer());
                 message.release();
-                if (result != AppendResult.SUCCESS) {
-                    dispatchOffset--;
-                    break;
+
+                switch (result) {
+                    case SUCCESS:
+                        continue;
+                    case FILE_CLOSED:
+                        
tieredFlatFileManager.destroyCompositeFile(flatFile.getMessageQueue());
+                        logger.info("TieredDispatcher#dispatchFlatFile: file 
has been close and destroy, " +
+                            "topic: {}, queueId: {}", topic, queueId);
+                        return;
+                    default:
+                        dispatchOffset--;
+                        break;
                 }
             }
 
@@ -341,15 +350,13 @@ public class TieredDispatcher extends ServiceThread 
implements CommitLogDispatch
 
         switch (result) {
             case SUCCESS:
-                break;
-            case OFFSET_INCORRECT:
                 long offset = MessageBufferUtil.getQueueOffset(message);
                 if (queueOffset != offset) {
-                    logger.error("[Bug] Commitlog offset incorrect, " +
-                            "result={}, topic={}, queueId={}, offset={}, msg 
offset={}",
-                        result, topic, queueId, queueOffset, offset);
+                    logger.error("Message cq offset in commitlog does not meet 
expectations, " +
+                            "result={}, topic={}, queueId={}, cq offset={}, 
msg offset={}",
+                        AppendResult.OFFSET_INCORRECT, topic, queueId, 
queueOffset, offset);
                 }
-                return;
+                break;
             case BUFFER_FULL:
                 logger.debug("Commitlog buffer full, result={}, topic={}, 
queueId={}, offset={}",
                     result, topic, queueId, queueOffset);
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java
index 1243f77213..8f8ba98b1d 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java
@@ -29,6 +29,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
@@ -58,7 +59,7 @@ public class CompositeFlatFile implements CompositeAccess {
      * dispatched to the current chunk, indicating the progress of the message 
distribution.
      * It's consume queue current offset.
      */
-    protected volatile long dispatchOffset;
+    protected final AtomicLong dispatchOffset;
 
     protected final ReentrantLock compositeFlatFileLock;
     protected final TieredMessageStoreConfig storeConfig;
@@ -75,6 +76,7 @@ public class CompositeFlatFile implements CompositeAccess {
         this.storeConfig = fileQueueFactory.getStoreConfig();
         this.readAheadFactor = this.storeConfig.getReadAheadMinFactor();
         this.metadataStore = 
TieredStoreUtil.getMetadataStore(this.storeConfig);
+        this.dispatchOffset = new AtomicLong();
         this.compositeFlatFileLock = new ReentrantLock();
         this.inFlightRequestMap = new ConcurrentHashMap<>();
         this.commitLog = new TieredCommitLog(fileQueueFactory, filePath);
@@ -83,8 +85,8 @@ public class CompositeFlatFile implements CompositeAccess {
     }
 
     protected void recoverMetadata() {
-        if (!consumeQueue.isInitialized() && this.dispatchOffset != -1) {
-            consumeQueue.setBaseOffset(this.dispatchOffset * 
TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
+        if (!consumeQueue.isInitialized() && this.dispatchOffset.get() != -1) {
+            consumeQueue.setBaseOffset(this.dispatchOffset.get() * 
TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
         }
     }
 
@@ -144,7 +146,7 @@ public class CompositeFlatFile implements CompositeAccess {
     }
 
     public long getDispatchOffset() {
-        return dispatchOffset;
+        return dispatchOffset.get();
     }
 
     @Override
@@ -309,7 +311,7 @@ public class CompositeFlatFile implements CompositeAccess {
         if (!consumeQueue.isInitialized()) {
             consumeQueue.setBaseOffset(offset * 
TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
         }
-        dispatchOffset = offset;
+        dispatchOffset.set(offset);
     }
 
     @Override
@@ -323,14 +325,9 @@ public class CompositeFlatFile implements CompositeAccess {
             return AppendResult.FILE_CLOSED;
         }
 
-        long queueOffset = MessageBufferUtil.getQueueOffset(message);
-        if (dispatchOffset != queueOffset) {
-            return AppendResult.OFFSET_INCORRECT;
-        }
-
         AppendResult result = commitLog.append(message, commit);
         if (result == AppendResult.SUCCESS) {
-            dispatchOffset = queueOffset + 1;
+            dispatchOffset.incrementAndGet();
         }
         return result;
     }
@@ -483,14 +480,17 @@ public class CompositeFlatFile implements CompositeAccess 
{
     }
 
     public void destroy() {
-        closed = true;
-        commitLog.destroy();
-        consumeQueue.destroy();
         try {
+            closed = true;
+            compositeFlatFileLock.lock();
+            commitLog.destroy();
+            consumeQueue.destroy();
             metadataStore.deleteFileSegment(filePath, 
FileSegmentType.COMMIT_LOG);
             metadataStore.deleteFileSegment(filePath, 
FileSegmentType.CONSUME_QUEUE);
         } catch (Exception e) {
-            LOGGER.error("CompositeFlatFile#destroy: clean metadata failed: ", 
e);
+            LOGGER.error("CompositeFlatFile#destroy: delete file failed", e);
+        } finally {
+            compositeFlatFileLock.unlock();
         }
     }
 }
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java
index c0cf79069d..f6c0afed05 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java
@@ -64,7 +64,7 @@ public class CompositeQueueFlatFile extends CompositeFlatFile 
{
         if (queueMetadata.getMaxOffset() < queueMetadata.getMinOffset()) {
             queueMetadata.setMaxOffset(queueMetadata.getMinOffset());
         }
-        this.dispatchOffset = queueMetadata.getMaxOffset();
+        this.dispatchOffset.set(queueMetadata.getMaxOffset());
     }
 
     public void persistMetadata() {
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java
index 9735a535e9..8322c72edf 100644
--- 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java
@@ -73,8 +73,8 @@ public class CompositeQueueFlatFileTest {
         CompositeQueueFlatFile flatFile = new 
CompositeQueueFlatFile(tieredFileAllocator, mq);
         ByteBuffer message = MessageBufferUtilTest.buildMockedMessageBuffer();
         AppendResult result = flatFile.appendCommitLog(message);
-        Assert.assertEquals(AppendResult.OFFSET_INCORRECT, result);
-        Assert.assertEquals(0L, 
flatFile.commitLog.getFlatFile().getFileToWrite().getAppendPosition());
+        Assert.assertEquals(AppendResult.SUCCESS, result);
+        Assert.assertEquals(122L, 
flatFile.commitLog.getFlatFile().getFileToWrite().getAppendPosition());
         Assert.assertEquals(0L, 
flatFile.commitLog.getFlatFile().getFileToWrite().getCommitPosition());
 
         flatFile = new CompositeQueueFlatFile(tieredFileAllocator, mq);

Reply via email to