This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 9bb73b9a38 [#ISSUE 7222] Bug fix and refactoring of the Indexfile in
tiered storage (#7224)
9bb73b9a38 is described below
commit 9bb73b9a38548b99ac5126c40380c3c2e7fc586e
Author: lizhimins <[email protected]>
AuthorDate: Wed Aug 23 09:46:27 2023 +0800
[#ISSUE 7222] Bug fix and refactoring of the Indexfile in tiered storage
(#7224)
---
.../rocketmq/tieredstore/file/TieredIndexFile.java | 38 ++++++++--
.../tieredstore/file/TieredIndexFileTest.java | 84 +++++-----------------
2 files changed, 52 insertions(+), 70 deletions(-)
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredIndexFile.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredIndexFile.java
index 50beb01ae4..eda5e01065 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredIndexFile.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredIndexFile.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.tieredstore.file;
+import com.google.common.annotations.VisibleForTesting;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -99,7 +100,7 @@ public class TieredIndexFile {
this::doScheduleTask, 10, 10, TimeUnit.SECONDS);
}
- private void doScheduleTask() {
+ protected void doScheduleTask() {
try {
curFileLock.lock();
try {
@@ -145,6 +146,11 @@ public class TieredIndexFile {
}
}
+ @VisibleForTesting
+ public MappedFile getPreMappedFile() {
+ return preMappedFile;
+ }
+
private void initFile() throws IOException {
curMappedFile = new DefaultMappedFile(curFilePath, fileMaxSize);
initIndexFileHeader(curMappedFile);
@@ -156,19 +162,26 @@ public class TieredIndexFile {
if (isFileSealed(curMappedFile)) {
if (preFileExists) {
- preFile.delete();
+ if (preFile.delete()) {
+ logger.info("Pre IndexFile deleted success", preFilepath);
+ } else {
+ logger.error("Pre IndexFile deleted failed", preFilepath);
+ }
}
boolean rename = curMappedFile.renameTo(preFilepath);
if (rename) {
preMappedFile = curMappedFile;
curMappedFile = new DefaultMappedFile(curFilePath,
fileMaxSize);
+ initIndexFileHeader(curMappedFile);
preFileExists = true;
}
}
+
if (preFileExists) {
synchronized (TieredIndexFile.class) {
if (inflightCompactFuture.isDone()) {
- inflightCompactFuture =
TieredStoreExecutor.compactIndexFileExecutor.submit(new
CompactTask(storeConfig, preMappedFile, flatFile), null);
+ inflightCompactFuture =
TieredStoreExecutor.compactIndexFileExecutor.submit(
+ new CompactTask(storeConfig, preMappedFile, flatFile),
null);
}
}
}
@@ -261,7 +274,8 @@ public class TieredIndexFile {
}
}
- public CompletableFuture<List<Pair<Long, ByteBuffer>>> queryAsync(String
topic, String key, long beginTime, long endTime) {
+ public CompletableFuture<List<Pair<Long, ByteBuffer>>> queryAsync(String
topic, String key, long beginTime,
+ long endTime) {
int hashCode = indexKeyHashMethod(buildKey(topic, key));
int slotPosition = hashCode % maxHashSlotNum;
List<TieredFileSegment> fileSegmentList =
flatFile.getFileListByTime(beginTime, endTime);
@@ -355,7 +369,7 @@ public class TieredIndexFile {
private final int fileMaxSize;
private MappedFile originFile;
private TieredFlatFile fileQueue;
- private final MappedFile compactFile;
+ private MappedFile compactFile;
public CompactTask(TieredMessageStoreConfig storeConfig, MappedFile
originFile,
TieredFlatFile fileQueue) throws IOException {
@@ -381,6 +395,17 @@ public class TieredIndexFile {
} catch (Throwable throwable) {
logger.error("TieredIndexFile#compactTask: compact index file
failed:", throwable);
}
+
+ try {
+ if (originFile != null) {
+ originFile.destroy(-1);
+ }
+ if (compactFile != null) {
+ compactFile.destroy(-1);
+ }
+ } catch (Throwable throwable) {
+ logger.error("TieredIndexFile#compactTask: destroy index file
failed:", throwable);
+ }
}
public void compact() {
@@ -396,6 +421,8 @@ public class TieredIndexFile {
fileQueue.commit(true);
compactFile.destroy(-1);
originFile.destroy(-1);
+ compactFile = null;
+ originFile = null;
}
private void buildCompactFile() {
@@ -414,6 +441,7 @@ public class TieredIndexFile {
if (slotValue != -1) {
int indexTotalSize = 0;
int indexPosition = slotValue;
+
while (indexPosition >= 0 && indexPosition < maxIndexNum) {
int indexOffset = INDEX_FILE_HEADER_SIZE +
maxHashSlotNum * INDEX_FILE_HASH_SLOT_SIZE
+ indexPosition *
INDEX_FILE_HASH_ORIGIN_INDEX_SIZE;
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java
index 7ef49578dd..262d6645b3 100644
---
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java
@@ -19,9 +19,8 @@ package org.apache.rocketmq.tieredstore.file;
import com.sun.jna.Platform;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.time.Duration;
import java.util.List;
-import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang3.SystemUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.tieredstore.TieredStoreTestUtil;
@@ -31,9 +30,7 @@ import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
-import org.junit.Assume;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
public class TieredIndexFileTest {
@@ -45,11 +42,12 @@ public class TieredIndexFileTest {
@Before
public void setUp() {
storeConfig = new TieredMessageStoreConfig();
+ storeConfig.setBrokerName("IndexFileBroker");
storeConfig.setStorePathRootDir(storePath);
-
storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.provider.memory.MemoryFileSegment");
- storeConfig.setTieredStoreIndexFileMaxHashSlotNum(2);
- storeConfig.setTieredStoreIndexFileMaxIndexNum(3);
- mq = new MessageQueue("TieredIndexFileTest",
storeConfig.getBrokerName(), 1);
+
storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.provider.posix.PosixFileSegment");
+ storeConfig.setTieredStoreIndexFileMaxHashSlotNum(5);
+ storeConfig.setTieredStoreIndexFileMaxIndexNum(20);
+ mq = new MessageQueue("IndexFileTest", storeConfig.getBrokerName(), 1);
TieredStoreUtil.getMetadataStore(storeConfig);
TieredStoreExecutor.init();
}
@@ -61,77 +59,33 @@ public class TieredIndexFileTest {
TieredStoreExecutor.shutdown();
}
- @Ignore
@Test
public void testAppendAndQuery() throws IOException,
ClassNotFoundException, NoSuchMethodException {
if (Platform.isWindows()) {
return;
}
- // skip this test on windows
- Assume.assumeFalse(SystemUtils.IS_OS_WINDOWS);
-
TieredFileAllocator fileQueueFactory = new
TieredFileAllocator(storeConfig);
TieredIndexFile indexFile = new TieredIndexFile(fileQueueFactory,
storePath);
+
indexFile.append(mq, 0, "key3", 3, 300, 1000);
indexFile.append(mq, 0, "key2", 2, 200, 1100);
indexFile.append(mq, 0, "key1", 1, 100, 1200);
- Awaitility.waitAtMost(5, TimeUnit.SECONDS)
- .until(() -> {
- List<Pair<Long, ByteBuffer>> indexList =
indexFile.queryAsync(mq.getTopic(), "key1", 1000, 1200).join();
- if (indexList.size() != 1) {
- return false;
- }
-
- ByteBuffer indexBuffer = indexList.get(0).getValue();
-
Assert.assertEquals(TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE * 2,
indexBuffer.remaining());
-
- Assert.assertEquals(1, indexBuffer.getLong(4 + 4 + 4));
- Assert.assertEquals(100, indexBuffer.getInt(4 + 4 + 4 + 8));
- Assert.assertEquals(200, indexBuffer.getInt(4 + 4 + 4 + 8 +
4));
-
- Assert.assertEquals(3,
indexBuffer.getLong(TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE + 4 + 4
+ 4));
- Assert.assertEquals(300,
indexBuffer.getInt(TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE + 4 + 4 +
4 + 8));
- Assert.assertEquals(0,
indexBuffer.getInt(TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE + 4 + 4 +
4 + 8 + 4));
- return true;
- });
-
- indexFile.append(mq, 0, "key4", 4, 400, 1300);
- indexFile.append(mq, 0, "key4", 4, 400, 1300);
- indexFile.append(mq, 0, "key4", 4, 400, 1300);
-
- Awaitility.waitAtMost(5, TimeUnit.SECONDS)
- .until(() -> {
- List<Pair<Long, ByteBuffer>> indexList =
indexFile.queryAsync(mq.getTopic(), "key4", 1300, 1300).join();
- if (indexList.size() != 1) {
- return false;
- }
-
- ByteBuffer indexBuffer = indexList.get(0).getValue();
-
Assert.assertEquals(TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE * 3,
indexBuffer.remaining());
- Assert.assertEquals(4, indexBuffer.getLong(4 + 4 + 4));
- Assert.assertEquals(400, indexBuffer.getInt(4 + 4 + 4 + 8));
- Assert.assertEquals(0, indexBuffer.getInt(4 + 4 + 4 + 8 + 4));
- return true;
- });
-
- List<Pair<Long, ByteBuffer>> indexList =
indexFile.queryAsync(mq.getTopic(), "key1", 1300, 1300).join();
+ // do not do schedule task here
+ TieredStoreExecutor.shutdown();
+ List<Pair<Long, ByteBuffer>> indexList =
+ indexFile.queryAsync(mq.getTopic(), "key1", 1000, 1200).join();
Assert.assertEquals(0, indexList.size());
- indexList = indexFile.queryAsync(mq.getTopic(), "key4", 1200,
1300).join();
- Assert.assertEquals(2, indexList.size());
-
- ByteBuffer indexBuffer = indexList.get(0).getValue();
- Assert.assertEquals(TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE
* 3, indexBuffer.remaining());
- Assert.assertEquals(4, indexBuffer.getLong(4 + 4 + 4));
- Assert.assertEquals(400, indexBuffer.getInt(4 + 4 + 4 + 8));
- Assert.assertEquals(0, indexBuffer.getInt(4 + 4 + 4 + 8 + 4));
+ // do compaction once
+ TieredStoreExecutor.init();
+ storeConfig.setTieredStoreIndexFileRollingIdleInterval(0);
+ indexFile.doScheduleTask();
+ Awaitility.await().atMost(Duration.ofSeconds(10))
+ .until(() -> !indexFile.getPreMappedFile().getFile().exists());
- indexBuffer = indexList.get(1).getValue();
-
Assert.assertEquals(TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE,
indexBuffer.remaining());
- Assert.assertEquals(2, indexBuffer.getLong(4 + 4 + 4));
- Assert.assertEquals(200, indexBuffer.getInt(4 + 4 + 4 + 8));
- Assert.assertEquals(100, indexBuffer.getInt(4 + 4 + 4 + 8 + 4));
+ indexList = indexFile.queryAsync(mq.getTopic(), "key1", 1000,
1200).join();
+ Assert.assertEquals(1, indexList.size());
}
}