This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new fe724b3 add delete IndexFile
new 189c6e3 Merge pull request #1817 from rushsky518/deleteFiles_unit_test
fe724b3 is described below
commit fe724b30356e221da3b7268da266a7a8fc453a3f
Author: rushsky518 <[email protected]>
AuthorDate: Tue Mar 3 12:53:04 2020 +0800
add delete IndexFile
---
.../store/DefaultMessageStoreCleanFilesTest.java | 71 ++++++++++++++++++++--
1 file changed, 67 insertions(+), 4 deletions(-)
diff --git
a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java
b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java
index fd510f1..abf7356 100644
---
a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java
+++
b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java
@@ -19,8 +19,12 @@ package org.apache.rocketmq.store;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.index.IndexFile;
+import org.apache.rocketmq.store.index.IndexService;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.junit.After;
import org.junit.Before;
@@ -31,7 +35,9 @@ import java.lang.reflect.Field;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
+import java.util.ArrayList;
import java.util.Calendar;
+import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@@ -52,6 +58,7 @@ public class DefaultMessageStoreCleanFilesTest {
private SocketAddress storeHost;
private String topic = "test";
+ private String keys = "hello";
private int queueId = 0;
private int fileCountCommitLog = 55;
// exactly one message per CommitLog file.
@@ -87,6 +94,9 @@ public class DefaultMessageStoreCleanFilesTest {
MappedFileQueue consumeQueue = getMappedFileQueueConsumeQueue();
assertEquals(fileCountConsumeQueue,
consumeQueue.getMappedFiles().size());
+ int fileCountIndexFile = getFileCountIndexFile();
+ assertEquals(fileCountIndexFile, getIndexFileList().size());
+
int expireFileCount = 15;
expireFiles(commitLogQueue, expireFileCount);
@@ -101,6 +111,10 @@ public class DefaultMessageStoreCleanFilesTest {
int msgCountPerFile = getMsgCountPerConsumeQueueMappedFile();
int expectDeleteCountConsumeQueue = (int) Math.floor((double)
expectDeletedCount / msgCountPerFile);
assertEquals(fileCountConsumeQueue -
expectDeleteCountConsumeQueue, consumeQueue.getMappedFiles().size());
+
+ int msgCountPerIndexFile = getMsgCountPerIndexFile();
+ int expectDeleteCountIndexFile = (int) Math.floor((double)
expectDeletedCount / msgCountPerIndexFile);
+ assertEquals(fileCountIndexFile - expectDeleteCountIndexFile,
getIndexFileList().size());
}
}
@@ -126,6 +140,9 @@ public class DefaultMessageStoreCleanFilesTest {
MappedFileQueue consumeQueue = getMappedFileQueueConsumeQueue();
assertEquals(fileCountConsumeQueue,
consumeQueue.getMappedFiles().size());
+ int fileCountIndexFile = getFileCountIndexFile();
+ assertEquals(fileCountIndexFile, getIndexFileList().size());
+
int expireFileCount = 15;
expireFiles(commitLogQueue, expireFileCount);
@@ -140,6 +157,10 @@ public class DefaultMessageStoreCleanFilesTest {
int msgCountPerFile = getMsgCountPerConsumeQueueMappedFile();
int expectDeleteCountConsumeQueue = (int) Math.floor((double)
expectDeletedCount / msgCountPerFile);
assertEquals(fileCountConsumeQueue -
expectDeleteCountConsumeQueue, consumeQueue.getMappedFiles().size());
+
+ int msgCountPerIndexFile = getMsgCountPerIndexFile();
+ int expectDeleteCountIndexFile = (int) Math.floor((double)
expectDeletedCount / msgCountPerIndexFile);
+ assertEquals(fileCountIndexFile - expectDeleteCountIndexFile,
getIndexFileList().size());
}
}
@@ -165,6 +186,9 @@ public class DefaultMessageStoreCleanFilesTest {
MappedFileQueue consumeQueue = getMappedFileQueueConsumeQueue();
assertEquals(fileCountConsumeQueue,
consumeQueue.getMappedFiles().size());
+ int fileCountIndexFile = getFileCountIndexFile();
+ assertEquals(fileCountIndexFile, getIndexFileList().size());
+
// In this case, there is no need to expire the files.
// int expireFileCount = 15;
// expireFiles(commitLogQueue, expireFileCount);
@@ -181,6 +205,10 @@ public class DefaultMessageStoreCleanFilesTest {
int msgCountPerFile = getMsgCountPerConsumeQueueMappedFile();
int expectDeleteCountConsumeQueue = (int) Math.floor((double) (a *
10) / msgCountPerFile);
assertEquals(fileCountConsumeQueue -
expectDeleteCountConsumeQueue, consumeQueue.getMappedFiles().size());
+
+ int msgCountPerIndexFile = getMsgCountPerIndexFile();
+ int expectDeleteCountIndexFile = (int) Math.floor((double) (a *
10) / msgCountPerIndexFile);
+ assertEquals(fileCountIndexFile - expectDeleteCountIndexFile,
getIndexFileList().size());
}
}
@@ -208,6 +236,9 @@ public class DefaultMessageStoreCleanFilesTest {
MappedFileQueue consumeQueue = getMappedFileQueueConsumeQueue();
assertEquals(fileCountConsumeQueue,
consumeQueue.getMappedFiles().size());
+ int fileCountIndexFile = getFileCountIndexFile();
+ assertEquals(fileCountIndexFile, getIndexFileList().size());
+
int expireFileCount = 15;
expireFiles(commitLogQueue, expireFileCount);
@@ -222,6 +253,10 @@ public class DefaultMessageStoreCleanFilesTest {
int msgCountPerFile = getMsgCountPerConsumeQueueMappedFile();
int expectDeleteCountConsumeQueue = (int) Math.floor((double)
expectDeletedCount / msgCountPerFile);
assertEquals(fileCountConsumeQueue -
expectDeleteCountConsumeQueue, consumeQueue.getMappedFiles().size());
+
+ int msgCountPerIndexFile = getMsgCountPerIndexFile();
+ int expectDeleteCountIndexFile = (int) Math.floor((double) (a *
10) / msgCountPerIndexFile);
+ assertEquals(fileCountIndexFile - expectDeleteCountIndexFile,
getIndexFileList().size());
}
}
@@ -274,32 +309,60 @@ public class DefaultMessageStoreCleanFilesTest {
return fileQueue;
}
+ private ArrayList<IndexFile> getIndexFileList() throws Exception {
+ Field indexServiceField =
messageStore.getClass().getDeclaredField("indexService");
+ indexServiceField.setAccessible(true);
+ IndexService indexService = (IndexService)
indexServiceField.get(messageStore);
+
+ Field indexFileListField =
indexService.getClass().getDeclaredField("indexFileList");
+ indexFileListField.setAccessible(true);
+ ArrayList<IndexFile> indexFileList = (ArrayList<IndexFile>)
indexFileListField.get(indexService);
+
+ return indexFileList;
+ }
+
private int getFileCountConsumeQueue() {
int countPerFile = getMsgCountPerConsumeQueueMappedFile();
double fileCount = (double) msgCount / countPerFile;
return (int) Math.ceil(fileCount);
}
+ private int getFileCountIndexFile() {
+ int countPerFile = getMsgCountPerIndexFile();
+ double fileCount = (double) msgCount / countPerFile;
+ return (int) Math.ceil(fileCount);
+ }
+
private int getMsgCountPerConsumeQueueMappedFile() {
int size =
messageStore.getMessageStoreConfig().getMappedFileSizeConsumeQueue();
return size / CQ_STORE_UNIT_SIZE;// 7 in this case
}
+ private int getMsgCountPerIndexFile() {
+ // 7 in this case
+ return messageStore.getMessageStoreConfig().getMaxIndexNum() - 1;
+ }
+
private void buildAndPutMessagesToMessageStore(int msgCount) throws
Exception {
int msgLen = topic.getBytes(CHARSET_UTF8).length + 91;
+ Map<String, String> properties = new HashMap<>(4);
+ properties.put(MessageConst.PROPERTY_KEYS, keys);
+ String s = MessageDecoder.messageProperties2String(properties);
+ int propertiesLen = s.getBytes(CHARSET_UTF8).length;
int commitLogEndFileMinBlankLength = 4 + 4;
- int singleMsgBodyLen = mappedFileSize - msgLen -
commitLogEndFileMinBlankLength;
+ int singleMsgBodyLen = mappedFileSize - msgLen - propertiesLen -
commitLogEndFileMinBlankLength;
for (int i = 0; i < msgCount; i++) {
MessageExtBrokerInner msg = new MessageExtBrokerInner();
msg.setTopic(topic);
msg.setBody(new byte[singleMsgBodyLen]);
- msg.setKeys(String.valueOf(System.currentTimeMillis()));
+ msg.setKeys(keys);
msg.setQueueId(queueId);
msg.setSysFlag(0);
msg.setBornTimestamp(System.currentTimeMillis());
msg.setStoreHost(storeHost);
msg.setBornHost(bornHost);
+
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
PutMessageResult result = messageStore.putMessage(msg);
assertTrue(result != null && result.isOk());
}
@@ -324,8 +387,8 @@ public class DefaultMessageStoreCleanFilesTest {
MessageStoreConfig messageStoreConfig = new
MessageStoreConfigForTest();
messageStoreConfig.setMappedFileSizeCommitLog(mappedFileSize);
messageStoreConfig.setMappedFileSizeConsumeQueue(mappedFileSize);
- messageStoreConfig.setMaxHashSlotNum(10000);
- messageStoreConfig.setMaxIndexNum(100 * 100);
+ messageStoreConfig.setMaxHashSlotNum(100);
+ messageStoreConfig.setMaxIndexNum(8);
messageStoreConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH);
messageStoreConfig.setFlushIntervalConsumeQueue(1);