This is an automated email from the ASF dual-hosted git repository. zhouxzhan pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 04b8400eb997ce7f144aff661a8febdac0ebd4bf Author: nowinkey <[email protected]> AuthorDate: Wed Feb 8 21:07:49 2023 +0800 Refactor MainBatchDispatchRequestService and DispatchService service object locations --- .../apache/rocketmq/store/DefaultMessageStore.java | 37 ++++++++++++---------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index baa71e1ba..48146c638 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -135,10 +135,6 @@ public class DefaultMessageStore implements MessageStore { private ReputMessageService reputMessageService; - private MainBatchDispatchRequestService mainBatchDispatchRequestService; - - private DispatchService dispatchService; - private HAService haService; // CompactionLog @@ -242,8 +238,6 @@ public class DefaultMessageStore implements MessageStore { this.reputMessageService = new ReputMessageService(); } else { this.reputMessageService = new ConcurrentReputMessageService(); - this.mainBatchDispatchRequestService = new MainBatchDispatchRequestService(); - this.dispatchService = new DispatchService(); } this.transientStorePool = new TransientStorePool(this); @@ -380,11 +374,6 @@ public class DefaultMessageStore implements MessageStore { this.reputMessageService.setReputFromOffset(this.commitLog.getConfirmOffset()); this.reputMessageService.start(); - if (messageStoreConfig.isEnableBuildConsumeQueueConcurrently()) { - this.mainBatchDispatchRequestService.start(); - this.dispatchService.start(); - } - // Checking is not necessary, as long as the dLedger's implementation exactly follows the definition of Recover, // which is eliminating the dispatch inconsistency between the commitLog and consumeQueue at the end of recovery. this.doRecheckReputOffsetFromCq(); @@ -482,12 +471,7 @@ public class DefaultMessageStore implements MessageStore { } this.commitLog.shutdown(); this.reputMessageService.shutdown(); - if (mainBatchDispatchRequestService != null) { - mainBatchDispatchRequestService.shutdown(); - } - if (dispatchService != null) { - dispatchService.shutdown(); - } + this.flushConsumeQueueService.shutdown(); this.allocateMappedFileService.shutdown(); this.storeCheckpoint.flush(); @@ -2983,6 +2967,16 @@ public class DefaultMessageStore implements MessageStore { private long batchId = 0; + private MainBatchDispatchRequestService mainBatchDispatchRequestService; + + private DispatchService dispatchService; + + public ConcurrentReputMessageService(){ + super(); + this.mainBatchDispatchRequestService = new MainBatchDispatchRequestService(); + this.dispatchService = new DispatchService(); + } + public void createBatchDispatchRequest(ByteBuffer byteBuffer, int position, int size) { if (position < 0) { return; @@ -2992,6 +2986,13 @@ public class DefaultMessageStore implements MessageStore { batchDispatchRequestQueue.offer(task); } + @Override + public void start() { + super.start(); + this.mainBatchDispatchRequestService.start(); + this.dispatchService.start(); + } + @Override public void doReput() { if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) { @@ -3092,6 +3093,8 @@ public class DefaultMessageStore implements MessageStore { this.reputFromOffset); } + this.mainBatchDispatchRequestService.shutdown(); + this.dispatchService.shutdown(); super.shutdown(); }
