duhenglucky commented on a change in pull request #3458:
URL: https://github.com/apache/rocketmq/pull/3458#discussion_r788540984
##########
File path:
store/src/test/java/org/apache/rocketmq/store/ScheduleMessageServiceTest.java
##########
@@ -71,14 +91,104 @@ public void testCorrectDelayOffset_whenInit() throws
Exception {
}
- private MessageStore buildMessageStore() throws Exception {
+ private MessageStoreConfig buildMessageStoreConfig() throws Exception {
MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
messageStoreConfig.setMappedFileSizeCommitLog(1024 * 1024 * 10);
messageStoreConfig.setMappedFileSizeConsumeQueue(1024 * 1024 * 10);
messageStoreConfig.setMaxHashSlotNum(10000);
messageStoreConfig.setMaxIndexNum(100 * 100);
messageStoreConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH);
messageStoreConfig.setFlushIntervalConsumeQueue(1);
- return new DefaultMessageStore(messageStoreConfig, new
BrokerStatsManager("simpleTest", true), null, new BrokerConfig());
+ return messageStoreConfig;
+ }
+
+ @Test
+ public void testHandlePutResultTask() throws Exception {
+ DefaultMessageStore messageStore = mock(DefaultMessageStore.class);
+ MessageStoreConfig config = buildMessageStoreConfig();
+ config.setEnableScheduleMessageStats(false);
+ config.setEnableScheduleAsyncDeliver(true);
+ when(messageStore.getMessageStoreConfig()).thenReturn(config);
+ ScheduleMessageService scheduleMessageService = new
ScheduleMessageService(messageStore);
+ scheduleMessageService.parseDelayLevel();
+
+ Field field =
scheduleMessageService.getClass().getDeclaredField("deliverPendingTable");
+ field.setAccessible(true);
+ Map<Integer /* level */,
LinkedBlockingQueue<ScheduleMessageService.PutResultProcess>>
deliverPendingTable =
+ (Map<Integer,
LinkedBlockingQueue<ScheduleMessageService.PutResultProcess>>)
field.get(scheduleMessageService);
+
+ field =
scheduleMessageService.getClass().getDeclaredField("offsetTable");
+ field.setAccessible(true);
+ ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable =
+ (ConcurrentMap<Integer /* level */, Long/* offset */>)
field.get(scheduleMessageService);
+ for (int i = 1; i <= scheduleMessageService.getMaxDelayLevel(); i++) {
+ offsetTable.put(i, 0L);
+ }
+
+ int deliverThreadPoolNums = Runtime.getRuntime().availableProcessors();
Review comment:
Is it better to set _deliverThreadPoolNums_ to _delaylevel_?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]