This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 1c0c4e75cbe [IOTDB-6051] Fixed concurrency error in IoTConsensus UT
when stopping cluster (#10457)
1c0c4e75cbe is described below
commit 1c0c4e75cbe3cde1f4b467e28a22a23e2642d1ed
Author: Potato <[email protected]>
AuthorDate: Mon Jul 10 11:28:10 2023 +0800
[IOTDB-6051] Fixed concurrency error in IoTConsensus UT when stopping
cluster (#10457)
Signed-off-by: OneSizeFitQuorum <[email protected]>
---
.../main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java | 3 ++-
.../org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java | 2 +-
.../iotdb/consensus/iot/logdispatcher/LogDispatcher.java | 10 ++++------
3 files changed, 7 insertions(+), 8 deletions(-)
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
index 5cfe53459de..047d557ad55 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
@@ -145,8 +145,9 @@ public class IoTConsensus implements IConsensus {
@Override
public void stop() {
- clientManager.close();
stateMachineMap.values().parallelStream().forEach(IoTConsensusServerImpl::stop);
+ clientManager.close();
+ syncClientManager.close();
registerManager.deregisterAll();
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index c4d4d463ee3..0b76a690dba 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -138,7 +138,7 @@ public class IoTConsensusServerImpl {
consensusReqReader = (ConsensusReqReader) stateMachine.read(new
GetConsensusReqReaderPlan());
this.searchIndex = new
AtomicLong(consensusReqReader.getCurrentSearchIndex());
this.ioTConsensusServerMetrics = new IoTConsensusServerMetrics(this);
- this.logDispatcher = new LogDispatcher(this, clientManager,
ioTConsensusServerMetrics);
+ this.logDispatcher = new LogDispatcher(this, clientManager);
// Since the underlying wal does not persist safelyDeletedSearchIndex,
IoTConsensus needs to
// update wal with its syncIndex recovered from the consensus layer when
initializing.
// This prevents wal from being piled up if the safelyDeletedSearchIndex
is not updated after
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index 458faea805b..ef88d5a8163 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -28,7 +28,6 @@ import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
import org.apache.iotdb.consensus.config.IoTConsensusConfig;
import org.apache.iotdb.consensus.iot.IoTConsensusServerImpl;
-import org.apache.iotdb.consensus.iot.IoTConsensusServerMetrics;
import org.apache.iotdb.consensus.iot.client.AsyncIoTConsensusServiceClient;
import org.apache.iotdb.consensus.iot.client.DispatchLogHandler;
import org.apache.iotdb.consensus.iot.log.ConsensusReqReader;
@@ -70,8 +69,7 @@ public class LogDispatcher {
public LogDispatcher(
IoTConsensusServerImpl impl,
- IClientManager<TEndPoint, AsyncIoTConsensusServiceClient> clientManager,
- IoTConsensusServerMetrics ioTConsensusServerMetrics) {
+ IClientManager<TEndPoint, AsyncIoTConsensusServiceClient> clientManager)
{
this.impl = impl;
this.selfPeerId = impl.getThisNode().getNodeId();
this.clientManager = clientManager;
@@ -103,7 +101,6 @@ public class LogDispatcher {
public synchronized void stop() {
if (!threads.isEmpty()) {
- threads.forEach(LogDispatcherThread::stop);
executorService.shutdownNow();
int timeout = 10;
try {
@@ -114,6 +111,7 @@ public class LogDispatcher {
Thread.currentThread().interrupt();
logger.error("Unexpected Interruption when closing LogDispatcher
service ");
}
+ threads.forEach(LogDispatcherThread::stop);
}
stopped = true;
}
@@ -224,6 +222,7 @@ public class LogDispatcher {
this.syncStatus = new SyncStatus(controller, config);
this.walEntryIterator = reader.getReqIterator(START_INDEX);
this.logDispatcherThreadMetrics = new LogDispatcherThreadMetrics(this);
+ MetricService.getInstance().addMetricSet(logDispatcherThreadMetrics);
}
public IndexController getController() {
@@ -303,10 +302,9 @@ public class LogDispatcher {
@Override
public void run() {
logger.info("{}: Dispatcher for {} starts", impl.getThisNode(), peer);
- MetricService.getInstance().addMetricSet(logDispatcherThreadMetrics);
try {
Batch batch;
- while (!Thread.interrupted() && !stopped) {
+ while (!Thread.interrupted()) {
long startTime = System.nanoTime();
while ((batch = getBatch()).isEmpty()) {
// we may block here if there is no requests in the queue