This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch xingtanzjr/fix_addpeer_ml_1_replic in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 1326bf47b87da39b0b1b24c0b54d83716ac978bc Author: Jinrui.Zhang <[email protected]> AuthorDate: Mon Nov 7 16:50:18 2022 +0800 fix the NPE when addPeer to a MultiLeader Group with 1 replic --- .../multileader/logdispatcher/LogDispatcher.java | 24 ++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java index 427f2d3945..606731e6a7 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java @@ -81,16 +81,20 @@ public class LogDispatcher { .map(x -> new LogDispatcherThread(x, impl.getConfig(), DEFAULT_INITIAL_SYNC_INDEX)) .collect(Collectors.toList()); if (!threads.isEmpty()) { - // We use cached thread pool here because each LogDispatcherThread will occupy one thread. - // And every LogDispatcherThread won't release its thread in this pool because it won't stop - // unless LogDispatcher stop. - // Thus, the size of this threadPool will be the same as the count of LogDispatcherThread. - this.executorService = - IoTDBThreadPoolFactory.newCachedThreadPool( - "LogDispatcher-" + impl.getThisNode().getGroupId()); + initLogSyncThreadPool(); } } + private void initLogSyncThreadPool() { + // We use cached thread pool here because each LogDispatcherThread will occupy one thread. + // And every LogDispatcherThread won't release its thread in this pool because it won't stop + // unless LogDispatcher stop. + // Thus, the size of this threadPool will be the same as the count of LogDispatcherThread. + this.executorService = + IoTDBThreadPoolFactory.newCachedThreadPool( + "LogDispatcher-" + impl.getThisNode().getGroupId()); + } + public synchronized void start() { if (!threads.isEmpty()) { threads.forEach(executorService::submit); @@ -118,9 +122,13 @@ public class LogDispatcher { if (stopped) { return; } - // LogDispatcherThread thread = new LogDispatcherThread(peer, impl.getConfig(), initialSyncIndex); threads.add(thread); + // If the initial replica is 1, the executorService won't be initialized. And when adding + // dispatcher thread, the executorService should be initialized manually + if (this.executorService == null) { + initLogSyncThreadPool(); + } executorService.submit(thread); }
