This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 0f0324a7dd [ISSUE #8076] Fix correct min cq offset when delete tiered 
storage CommitLog (#8082)
0f0324a7dd is described below

commit 0f0324a7dd9b2e994aeb4a4f5c8631f8465daae5
Author: lizhimins <707364...@qq.com>
AuthorDate: Tue May 7 09:52:48 2024 +0800

    [ISSUE #8076] Fix correct min cq offset when delete tiered storage 
CommitLog (#8082)
---
 .../rocketmq/tieredstore/file/FlatCommitLogFile.java   | 13 +++++++++++++
 .../tieredstore/file/FlatCommitLogFileTest.java        | 18 ++++++++++++++++--
 .../rocketmq/tieredstore/file/FlatFileStoreTest.java   |  2 +-
 3 files changed, 30 insertions(+), 3 deletions(-)

diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatCommitLogFile.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatCommitLogFile.java
index 8a319ed389..6ac0939571 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatCommitLogFile.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatCommitLogFile.java
@@ -60,4 +60,17 @@ public class FlatCommitLogFile extends FlatAppendFile {
                 return firstOffset.get();
             });
     }
+
+    @Override
+    public void destroyExpiredFile(long expireTimestamp) {
+        long beforeOffset = this.getMinOffset();
+        super.destroyExpiredFile(expireTimestamp);
+        long afterOffset = this.getMinOffset();
+
+        if (beforeOffset != afterOffset) {
+            log.info("CommitLog min cq offset reset, filePath={}, offset={}, 
expireTimestamp={}, change={}-{}",
+                filePath, firstOffset.get(), expireTimestamp, beforeOffset, 
afterOffset);
+            firstOffset.set(GET_OFFSET_ERROR);
+        }
+    }
 }
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatCommitLogFileTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatCommitLogFileTest.java
index 7e030d305e..1e912690b2 100644
--- 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatCommitLogFileTest.java
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatCommitLogFileTest.java
@@ -93,19 +93,33 @@ public class FlatCommitLogFileTest {
         for (int i = 6; i < 9; i++) {
             ByteBuffer byteBuffer = 
MessageFormatUtilTest.buildMockedMessageBuffer();
             byteBuffer.putLong(MessageFormatUtil.QUEUE_OFFSET_POSITION, i);
-            Assert.assertEquals(AppendResult.SUCCESS, 
flatFile.append(byteBuffer, 1L));
+            Assert.assertEquals(AppendResult.SUCCESS, 
flatFile.append(byteBuffer, i));
         }
         Assert.assertEquals(-1L, 
flatFile.getMinOffsetFromFileAsync().join().longValue());
 
         // append some messages
         for (int i = 9; i < 30; i++) {
+            if (i == 20) {
+                flatFile.commitAsync().join();
+                flatFile.rollingNewFile(flatFile.getAppendOffset());
+            }
             ByteBuffer byteBuffer = 
MessageFormatUtilTest.buildMockedMessageBuffer();
             byteBuffer.putLong(MessageFormatUtil.QUEUE_OFFSET_POSITION, i);
-            Assert.assertEquals(AppendResult.SUCCESS, 
flatFile.append(byteBuffer, 1L));
+            Assert.assertEquals(AppendResult.SUCCESS, 
flatFile.append(byteBuffer, i));
         }
 
         flatFile.commitAsync().join();
         Assert.assertEquals(6L, flatFile.getMinOffsetFromFile());
         Assert.assertEquals(6L, 
flatFile.getMinOffsetFromFileAsync().join().longValue());
+
+        // recalculate min offset here
+        flatFile.destroyExpiredFile(20L);
+        Assert.assertEquals(20L, flatFile.getMinOffsetFromFile());
+        Assert.assertEquals(20L, 
flatFile.getMinOffsetFromFileAsync().join().longValue());
+
+        // clean expired file again
+        flatFile.destroyExpiredFile(20L);
+        Assert.assertEquals(20L, flatFile.getMinOffsetFromFile());
+        Assert.assertEquals(20L, 
flatFile.getMinOffsetFromFileAsync().join().longValue());
     }
 }
\ No newline at end of file
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatFileStoreTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatFileStoreTest.java
index 79647932da..2a007af4e9 100644
--- 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatFileStoreTest.java
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatFileStoreTest.java
@@ -46,7 +46,7 @@ public class FlatFileStoreTest {
         storeConfig = new MessageStoreConfig();
         storeConfig.setStorePathRootDir(storePath);
         
storeConfig.setTieredBackendServiceProvider(PosixFileSegment.class.getName());
-        storeConfig.setBrokerName(storeConfig.getBrokerName());
+        storeConfig.setBrokerName("brokerName");
         metadataStore = new DefaultMetadataStore(storeConfig);
     }
 

Reply via email to