This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 0f0324a7dd [ISSUE #8076] Fix correct min cq offset when delete tiered
storage CommitLog (#8082)
0f0324a7dd is described below
commit 0f0324a7dd9b2e994aeb4a4f5c8631f8465daae5
Author: lizhimins <[email protected]>
AuthorDate: Tue May 7 09:52:48 2024 +0800
[ISSUE #8076] Fix correct min cq offset when delete tiered storage
CommitLog (#8082)
---
.../rocketmq/tieredstore/file/FlatCommitLogFile.java | 13 +++++++++++++
.../tieredstore/file/FlatCommitLogFileTest.java | 18 ++++++++++++++++--
.../rocketmq/tieredstore/file/FlatFileStoreTest.java | 2 +-
3 files changed, 30 insertions(+), 3 deletions(-)
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatCommitLogFile.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatCommitLogFile.java
index 8a319ed389..6ac0939571 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatCommitLogFile.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatCommitLogFile.java
@@ -60,4 +60,17 @@ public class FlatCommitLogFile extends FlatAppendFile {
return firstOffset.get();
});
}
+
+ @Override
+ public void destroyExpiredFile(long expireTimestamp) {
+ long beforeOffset = this.getMinOffset();
+ super.destroyExpiredFile(expireTimestamp);
+ long afterOffset = this.getMinOffset();
+
+ if (beforeOffset != afterOffset) {
+ log.info("CommitLog min cq offset reset, filePath={}, offset={},
expireTimestamp={}, change={}-{}",
+ filePath, firstOffset.get(), expireTimestamp, beforeOffset,
afterOffset);
+ firstOffset.set(GET_OFFSET_ERROR);
+ }
+ }
}
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatCommitLogFileTest.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatCommitLogFileTest.java
index 7e030d305e..1e912690b2 100644
---
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatCommitLogFileTest.java
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatCommitLogFileTest.java
@@ -93,19 +93,33 @@ public class FlatCommitLogFileTest {
for (int i = 6; i < 9; i++) {
ByteBuffer byteBuffer =
MessageFormatUtilTest.buildMockedMessageBuffer();
byteBuffer.putLong(MessageFormatUtil.QUEUE_OFFSET_POSITION, i);
- Assert.assertEquals(AppendResult.SUCCESS,
flatFile.append(byteBuffer, 1L));
+ Assert.assertEquals(AppendResult.SUCCESS,
flatFile.append(byteBuffer, i));
}
Assert.assertEquals(-1L,
flatFile.getMinOffsetFromFileAsync().join().longValue());
// append some messages
for (int i = 9; i < 30; i++) {
+ if (i == 20) {
+ flatFile.commitAsync().join();
+ flatFile.rollingNewFile(flatFile.getAppendOffset());
+ }
ByteBuffer byteBuffer =
MessageFormatUtilTest.buildMockedMessageBuffer();
byteBuffer.putLong(MessageFormatUtil.QUEUE_OFFSET_POSITION, i);
- Assert.assertEquals(AppendResult.SUCCESS,
flatFile.append(byteBuffer, 1L));
+ Assert.assertEquals(AppendResult.SUCCESS,
flatFile.append(byteBuffer, i));
}
flatFile.commitAsync().join();
Assert.assertEquals(6L, flatFile.getMinOffsetFromFile());
Assert.assertEquals(6L,
flatFile.getMinOffsetFromFileAsync().join().longValue());
+
+ // recalculate min offset here
+ flatFile.destroyExpiredFile(20L);
+ Assert.assertEquals(20L, flatFile.getMinOffsetFromFile());
+ Assert.assertEquals(20L,
flatFile.getMinOffsetFromFileAsync().join().longValue());
+
+ // clean expired file again
+ flatFile.destroyExpiredFile(20L);
+ Assert.assertEquals(20L, flatFile.getMinOffsetFromFile());
+ Assert.assertEquals(20L,
flatFile.getMinOffsetFromFileAsync().join().longValue());
}
}
\ No newline at end of file
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatFileStoreTest.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatFileStoreTest.java
index 79647932da..2a007af4e9 100644
---
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatFileStoreTest.java
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatFileStoreTest.java
@@ -46,7 +46,7 @@ public class FlatFileStoreTest {
storeConfig = new MessageStoreConfig();
storeConfig.setStorePathRootDir(storePath);
storeConfig.setTieredBackendServiceProvider(PosixFileSegment.class.getName());
- storeConfig.setBrokerName(storeConfig.getBrokerName());
+ storeConfig.setBrokerName("brokerName");
metadataStore = new DefaultMetadataStore(storeConfig);
}