This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new fe724b3  add delete IndexFile
     new 189c6e3  Merge pull request #1817 from rushsky518/deleteFiles_unit_test
fe724b3 is described below

commit fe724b30356e221da3b7268da266a7a8fc453a3f
Author: rushsky518 <[email protected]>
AuthorDate: Tue Mar 3 12:53:04 2020 +0800

    add delete IndexFile
---
 .../store/DefaultMessageStoreCleanFilesTest.java   | 71 ++++++++++++++++++++--
 1 file changed, 67 insertions(+), 4 deletions(-)

diff --git 
a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java
 
b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java
index fd510f1..abf7356 100644
--- 
a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java
+++ 
b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java
@@ -19,8 +19,12 @@ package org.apache.rocketmq.store;
 
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.store.config.FlushDiskType;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.index.IndexFile;
+import org.apache.rocketmq.store.index.IndexService;
 import org.apache.rocketmq.store.stats.BrokerStatsManager;
 import org.junit.After;
 import org.junit.Before;
@@ -31,7 +35,9 @@ import java.lang.reflect.Field;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
+import java.util.ArrayList;
 import java.util.Calendar;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 
@@ -52,6 +58,7 @@ public class DefaultMessageStoreCleanFilesTest {
     private SocketAddress storeHost;
 
     private String topic = "test";
+    private String keys = "hello";
     private int queueId = 0;
     private int fileCountCommitLog = 55;
     // exactly one message per CommitLog file.
@@ -87,6 +94,9 @@ public class DefaultMessageStoreCleanFilesTest {
         MappedFileQueue consumeQueue = getMappedFileQueueConsumeQueue();
         assertEquals(fileCountConsumeQueue, 
consumeQueue.getMappedFiles().size());
 
+        int fileCountIndexFile = getFileCountIndexFile();
+        assertEquals(fileCountIndexFile, getIndexFileList().size());
+
         int expireFileCount = 15;
         expireFiles(commitLogQueue, expireFileCount);
 
@@ -101,6 +111,10 @@ public class DefaultMessageStoreCleanFilesTest {
             int msgCountPerFile = getMsgCountPerConsumeQueueMappedFile();
             int expectDeleteCountConsumeQueue = (int) Math.floor((double) 
expectDeletedCount / msgCountPerFile);
             assertEquals(fileCountConsumeQueue - 
expectDeleteCountConsumeQueue, consumeQueue.getMappedFiles().size());
+
+            int msgCountPerIndexFile = getMsgCountPerIndexFile();
+            int expectDeleteCountIndexFile = (int) Math.floor((double) 
expectDeletedCount / msgCountPerIndexFile);
+            assertEquals(fileCountIndexFile - expectDeleteCountIndexFile, 
getIndexFileList().size());
         }
     }
 
@@ -126,6 +140,9 @@ public class DefaultMessageStoreCleanFilesTest {
         MappedFileQueue consumeQueue = getMappedFileQueueConsumeQueue();
         assertEquals(fileCountConsumeQueue, 
consumeQueue.getMappedFiles().size());
 
+        int fileCountIndexFile = getFileCountIndexFile();
+        assertEquals(fileCountIndexFile, getIndexFileList().size());
+
         int expireFileCount = 15;
         expireFiles(commitLogQueue, expireFileCount);
 
@@ -140,6 +157,10 @@ public class DefaultMessageStoreCleanFilesTest {
             int msgCountPerFile = getMsgCountPerConsumeQueueMappedFile();
             int expectDeleteCountConsumeQueue = (int) Math.floor((double) 
expectDeletedCount / msgCountPerFile);
             assertEquals(fileCountConsumeQueue - 
expectDeleteCountConsumeQueue, consumeQueue.getMappedFiles().size());
+
+            int msgCountPerIndexFile = getMsgCountPerIndexFile();
+            int expectDeleteCountIndexFile = (int) Math.floor((double) 
expectDeletedCount / msgCountPerIndexFile);
+            assertEquals(fileCountIndexFile - expectDeleteCountIndexFile, 
getIndexFileList().size());
         }
     }
 
@@ -165,6 +186,9 @@ public class DefaultMessageStoreCleanFilesTest {
         MappedFileQueue consumeQueue = getMappedFileQueueConsumeQueue();
         assertEquals(fileCountConsumeQueue, 
consumeQueue.getMappedFiles().size());
 
+        int fileCountIndexFile = getFileCountIndexFile();
+        assertEquals(fileCountIndexFile, getIndexFileList().size());
+
         // In this case, there is no need to expire the files.
         // int expireFileCount = 15;
         // expireFiles(commitLogQueue, expireFileCount);
@@ -181,6 +205,10 @@ public class DefaultMessageStoreCleanFilesTest {
             int msgCountPerFile = getMsgCountPerConsumeQueueMappedFile();
             int expectDeleteCountConsumeQueue = (int) Math.floor((double) (a * 
10) / msgCountPerFile);
             assertEquals(fileCountConsumeQueue - 
expectDeleteCountConsumeQueue, consumeQueue.getMappedFiles().size());
+
+            int msgCountPerIndexFile = getMsgCountPerIndexFile();
+            int expectDeleteCountIndexFile = (int) Math.floor((double) (a * 
10) / msgCountPerIndexFile);
+            assertEquals(fileCountIndexFile - expectDeleteCountIndexFile, 
getIndexFileList().size());
         }
     }
 
@@ -208,6 +236,9 @@ public class DefaultMessageStoreCleanFilesTest {
         MappedFileQueue consumeQueue = getMappedFileQueueConsumeQueue();
         assertEquals(fileCountConsumeQueue, 
consumeQueue.getMappedFiles().size());
 
+        int fileCountIndexFile = getFileCountIndexFile();
+        assertEquals(fileCountIndexFile, getIndexFileList().size());
+
         int expireFileCount = 15;
         expireFiles(commitLogQueue, expireFileCount);
 
@@ -222,6 +253,10 @@ public class DefaultMessageStoreCleanFilesTest {
             int msgCountPerFile = getMsgCountPerConsumeQueueMappedFile();
             int expectDeleteCountConsumeQueue = (int) Math.floor((double) 
expectDeletedCount / msgCountPerFile);
             assertEquals(fileCountConsumeQueue - 
expectDeleteCountConsumeQueue, consumeQueue.getMappedFiles().size());
+
+            int msgCountPerIndexFile = getMsgCountPerIndexFile();
+            int expectDeleteCountIndexFile = (int) Math.floor((double) (a * 
10) / msgCountPerIndexFile);
+            assertEquals(fileCountIndexFile - expectDeleteCountIndexFile, 
getIndexFileList().size());
         }
     }
 
@@ -274,32 +309,60 @@ public class DefaultMessageStoreCleanFilesTest {
         return fileQueue;
     }
 
+    private ArrayList<IndexFile> getIndexFileList() throws Exception {
+        Field indexServiceField = 
messageStore.getClass().getDeclaredField("indexService");
+        indexServiceField.setAccessible(true);
+        IndexService indexService = (IndexService) 
indexServiceField.get(messageStore);
+
+        Field indexFileListField = 
indexService.getClass().getDeclaredField("indexFileList");
+        indexFileListField.setAccessible(true);
+        ArrayList<IndexFile> indexFileList = (ArrayList<IndexFile>) 
indexFileListField.get(indexService);
+
+        return indexFileList;
+    }
+
     private int getFileCountConsumeQueue() {
         int countPerFile = getMsgCountPerConsumeQueueMappedFile();
         double fileCount = (double) msgCount / countPerFile;
         return (int) Math.ceil(fileCount);
     }
 
+    private int getFileCountIndexFile() {
+        int countPerFile = getMsgCountPerIndexFile();
+        double fileCount = (double) msgCount / countPerFile;
+        return (int) Math.ceil(fileCount);
+    }
+
     private int getMsgCountPerConsumeQueueMappedFile() {
         int size = 
messageStore.getMessageStoreConfig().getMappedFileSizeConsumeQueue();
         return size / CQ_STORE_UNIT_SIZE;// 7 in this case
     }
 
+    private int getMsgCountPerIndexFile() {
+        // 7 in this case
+        return messageStore.getMessageStoreConfig().getMaxIndexNum() - 1;
+    }
+
     private void buildAndPutMessagesToMessageStore(int msgCount) throws 
Exception {
         int msgLen = topic.getBytes(CHARSET_UTF8).length + 91;
+        Map<String, String> properties = new HashMap<>(4);
+        properties.put(MessageConst.PROPERTY_KEYS, keys);
+        String s = MessageDecoder.messageProperties2String(properties);
+        int propertiesLen = s.getBytes(CHARSET_UTF8).length;
         int commitLogEndFileMinBlankLength = 4 + 4;
-        int singleMsgBodyLen = mappedFileSize - msgLen - 
commitLogEndFileMinBlankLength;
+        int singleMsgBodyLen = mappedFileSize - msgLen - propertiesLen - 
commitLogEndFileMinBlankLength;
 
         for (int i = 0; i < msgCount; i++) {
             MessageExtBrokerInner msg = new MessageExtBrokerInner();
             msg.setTopic(topic);
             msg.setBody(new byte[singleMsgBodyLen]);
-            msg.setKeys(String.valueOf(System.currentTimeMillis()));
+            msg.setKeys(keys);
             msg.setQueueId(queueId);
             msg.setSysFlag(0);
             msg.setBornTimestamp(System.currentTimeMillis());
             msg.setStoreHost(storeHost);
             msg.setBornHost(bornHost);
+            
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
             PutMessageResult result = messageStore.putMessage(msg);
             assertTrue(result != null && result.isOk());
         }
@@ -324,8 +387,8 @@ public class DefaultMessageStoreCleanFilesTest {
         MessageStoreConfig messageStoreConfig = new 
MessageStoreConfigForTest();
         messageStoreConfig.setMappedFileSizeCommitLog(mappedFileSize);
         messageStoreConfig.setMappedFileSizeConsumeQueue(mappedFileSize);
-        messageStoreConfig.setMaxHashSlotNum(10000);
-        messageStoreConfig.setMaxIndexNum(100 * 100);
+        messageStoreConfig.setMaxHashSlotNum(100);
+        messageStoreConfig.setMaxIndexNum(8);
         messageStoreConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH);
         messageStoreConfig.setFlushIntervalConsumeQueue(1);
 

Reply via email to