This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 1dfdae910bf [IOTDB-6266] Add the ability to flush syncIndex and update
reader periodically for IoTConsensus (#11691)
1dfdae910bf is described below
commit 1dfdae910bfd2eb9f610d04a9e96385d0406cdfc
Author: Potato <[email protected]>
AuthorDate: Wed Dec 13 10:23:10 2023 +0800
[IOTDB-6266] Add the ability to flush syncIndex and update reader
periodically for IoTConsensus (#11691)
---
.../apache/iotdb/consensus/iot/IoTConsensus.java | 31 +++++++++++++++++-----
.../consensus/iot/IoTConsensusServerImpl.java | 14 ++++++----
.../consensus/iot/client/DispatchLogHandler.java | 2 +-
.../iot/logdispatcher/IndexController.java | 3 +--
.../consensus/iot/logdispatcher/LogDispatcher.java | 11 ++++++++
.../consensus/iot/logdispatcher/SyncStatus.java | 4 +--
.../iot/logdispatcher/IndexControllerTest.java | 14 +++++++---
.../iotdb/commons/concurrent/ThreadName.java | 4 +--
8 files changed, 61 insertions(+), 22 deletions(-)
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
index 3da7df08dc1..babe939393f 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.service.RegisterManager;
@@ -68,12 +69,14 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class IoTConsensus implements IConsensus {
+ private static final long READER_UPDATE_INTERVAL_IN_MINUTES = 3;
private final Logger logger = LoggerFactory.getLogger(IoTConsensus.class);
private final TEndPoint thisNode;
@@ -87,7 +90,8 @@ public class IoTConsensus implements IConsensus {
private final IoTConsensusConfig config;
private final IClientManager<TEndPoint, AsyncIoTConsensusServiceClient>
clientManager;
private final IClientManager<TEndPoint, SyncIoTConsensusServiceClient>
syncClientManager;
- private final ScheduledExecutorService retryService;
+ private final ScheduledExecutorService backgroundTaskService;
+ private Future<?> updateReaderFuture;
public IoTConsensus(ConsensusConfig config, Registry registry) {
this.thisNode = config.getThisNodeEndPoint();
@@ -104,9 +108,9 @@ public class IoTConsensus implements IConsensus {
new IClientManager.Factory<TEndPoint, SyncIoTConsensusServiceClient>()
.createClientManager(
new
SyncIoTConsensusServiceClientPoolFactory(config.getIotConsensusConfig()));
- this.retryService =
+ this.backgroundTaskService =
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
- ThreadName.LOG_DISPATCHER_RETRY_EXECUTOR.getName());
+ ThreadName.IOT_CONSENSUS_BACKGROUND_TASK_EXECUTOR.getName());
// init IoTConsensus memory manager
IoTConsensusMemoryManager.getInstance()
.init(
@@ -123,6 +127,18 @@ public class IoTConsensus implements IConsensus {
} catch (StartupException e) {
throw new IOException(e);
}
+ if (updateReaderFuture == null) {
+ updateReaderFuture =
+ ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+ backgroundTaskService,
+ () ->
+ stateMachineMap
+ .values()
+ .forEach(impl ->
impl.getLogDispatcher().checkAndFlushIndex()),
+ READER_UPDATE_INTERVAL_IN_MINUTES,
+ READER_UPDATE_INTERVAL_IN_MINUTES,
+ TimeUnit.MINUTES);
+ }
}
private void initAndRecover() throws IOException {
@@ -143,7 +159,7 @@ public class IoTConsensus implements IConsensus {
new Peer(consensusGroupId, thisNodeId, thisNode),
new ArrayList<>(),
registry.apply(consensusGroupId),
- retryService,
+ backgroundTaskService,
clientManager,
syncClientManager,
config);
@@ -156,13 +172,14 @@ public class IoTConsensus implements IConsensus {
@Override
public synchronized void stop() {
+ Optional.ofNullable(updateReaderFuture).ifPresent(future ->
future.cancel(false));
stateMachineMap.values().parallelStream().forEach(IoTConsensusServerImpl::stop);
clientManager.close();
syncClientManager.close();
registerManager.deregisterAll();
- retryService.shutdown();
+ backgroundTaskService.shutdown();
try {
- retryService.awaitTermination(5, TimeUnit.SECONDS);
+ backgroundTaskService.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
logger.warn("{}: interrupted when shutting down add Executor with
exception {}", this, e);
Thread.currentThread().interrupt();
@@ -225,7 +242,7 @@ public class IoTConsensus implements IConsensus {
new Peer(groupId, thisNodeId, thisNode),
peers,
registry.apply(groupId),
- retryService,
+ backgroundTaskService,
clientManager,
syncClientManager,
config);
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index e8f74cd0dfa..4e43306d7c5 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -113,14 +113,14 @@ public class IoTConsensusServerImpl {
private final IClientManager<TEndPoint, SyncIoTConsensusServiceClient>
syncClientManager;
private final IoTConsensusServerMetrics ioTConsensusServerMetrics;
private final String consensusGroupId;
- private final ScheduledExecutorService retryService;
+ private final ScheduledExecutorService backgroundTaskService;
public IoTConsensusServerImpl(
String storageDir,
Peer thisNode,
List<Peer> configuration,
IStateMachine stateMachine,
- ScheduledExecutorService retryService,
+ ScheduledExecutorService backgroundTaskService,
IClientManager<TEndPoint, AsyncIoTConsensusServiceClient> clientManager,
IClientManager<TEndPoint, SyncIoTConsensusServiceClient>
syncClientManager,
IoTConsensusConfig config) {
@@ -136,7 +136,7 @@ public class IoTConsensusServerImpl {
} else {
persistConfiguration();
}
- this.retryService = retryService;
+ this.backgroundTaskService = backgroundTaskService;
this.config = config;
this.consensusGroupId = thisNode.getGroupId().toString();
consensusReqReader = (ConsensusReqReader) stateMachine.read(new
GetConsensusReqReaderPlan());
@@ -733,8 +733,12 @@ public class IoTConsensusServerImpl {
return searchIndex;
}
- public ScheduledExecutorService getRetryService() {
- return retryService;
+ public ScheduledExecutorService getBackgroundTaskService() {
+ return backgroundTaskService;
+ }
+
+ public LogDispatcher getLogDispatcher() {
+ return logDispatcher;
}
public IoTConsensusServerMetrics getIoTConsensusServerMetrics() {
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
index 2814c6fb833..88a8d90ebaa 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
@@ -110,7 +110,7 @@ public class DispatchLogHandler implements
AsyncMethodCallback<TSyncLogEntriesRe
thread.getConfig().getReplication().getMaxRetryWaitTimeMs());
thread
.getImpl()
- .getRetryService()
+ .getBackgroundTaskService()
.schedule(
() -> {
if (thread.isStopped()) {
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexController.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexController.java
index 61ace504758..ddafaf2f010 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexController.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexController.java
@@ -69,7 +69,7 @@ public class IndexController {
restore();
}
- public long updateAndGet(long index, boolean forcePersist) {
+ public void update(long index, boolean forcePersist) {
try {
lock.writeLock().lock();
long newCurrentIndex = Math.max(currentIndex, index);
@@ -81,7 +81,6 @@ public class IndexController {
storageDir);
currentIndex = newCurrentIndex;
checkPersist(forcePersist);
- return currentIndex;
} finally {
lock.writeLock().unlock();
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index 4ff4ed86104..47da07944f0 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -62,6 +62,7 @@ public class LogDispatcher {
private final IClientManager<TEndPoint, AsyncIoTConsensusServiceClient>
clientManager;
private ExecutorService executorService;
+ private final ConsensusReqReader reader;
private boolean stopped = false;
private final AtomicLong logEntriesFromWAL = new AtomicLong(0);
@@ -71,6 +72,7 @@ public class LogDispatcher {
IoTConsensusServerImpl impl,
IClientManager<TEndPoint, AsyncIoTConsensusServiceClient> clientManager)
{
this.impl = impl;
+ this.reader = (ConsensusReqReader) impl.getStateMachine().read(new
GetConsensusReqReaderPlan());
this.selfPeerId = impl.getThisNode().getNodeId();
this.clientManager = clientManager;
this.threads =
@@ -157,6 +159,15 @@ public class LogDispatcher {
return
threads.stream().mapToLong(LogDispatcherThread::getLastFlushedSyncIndex).min();
}
+ public void checkAndFlushIndex() {
+ threads.forEach(
+ thread -> {
+ IndexController controller = thread.getController();
+ controller.update(controller.getCurrentIndex(), true);
+ });
+ reader.setSafelyDeletedSearchIndex(impl.getMinFlushedSyncIndex());
+ }
+
public void offer(IndexedConsensusRequest request) {
// we don't need to serialize and offer request when replicaNum is 1.
if (!threads.isEmpty()) {
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
index 48580d5d758..17c232a294b 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
@@ -65,7 +65,7 @@ public class SyncStatus {
Iterator<Batch> iterator = pendingBatches.iterator();
Batch current = iterator.next();
while (current.isSynced()) {
- controller.updateAndGet(current.getEndIndex(), false);
+ controller.update(current.getEndIndex(), false);
iterator.remove();
iotConsensusMemoryManager.free(current.getSerializedSize(), false);
if (iterator.hasNext()) {
@@ -86,7 +86,7 @@ public class SyncStatus {
size += pendingBatch.getSerializedSize();
}
pendingBatches.clear();
- controller.updateAndGet(0L, true);
+ controller.update(0L, true);
iotConsensusMemoryManager.free(size, false);
}
diff --git
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexControllerTest.java
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexControllerTest.java
index a34c6ca0222..0f02bb21cef 100644
---
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexControllerTest.java
+++
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexControllerTest.java
@@ -60,7 +60,7 @@ public class IndexControllerTest {
Assert.assertEquals(0, controller.getCurrentIndex());
Assert.assertEquals(0, controller.getLastFlushedIndex());
- controller.updateAndGet(CHECK_POINT_GAP - 1, false);
+ controller.update(CHECK_POINT_GAP - 1, false);
Assert.assertEquals(CHECK_POINT_GAP - 1, controller.getCurrentIndex());
Assert.assertEquals(0, controller.getLastFlushedIndex());
@@ -68,7 +68,7 @@ public class IndexControllerTest {
Assert.assertEquals(0, controller.getCurrentIndex());
Assert.assertEquals(0, controller.getLastFlushedIndex());
- controller.updateAndGet(CHECK_POINT_GAP - 1, true);
+ controller.update(CHECK_POINT_GAP - 1, true);
Assert.assertEquals(CHECK_POINT_GAP - 1, controller.getCurrentIndex());
Assert.assertEquals(CHECK_POINT_GAP - 1, controller.getLastFlushedIndex());
@@ -76,13 +76,21 @@ public class IndexControllerTest {
Assert.assertEquals(CHECK_POINT_GAP - 1, controller.getCurrentIndex());
Assert.assertEquals(CHECK_POINT_GAP - 1, controller.getLastFlushedIndex());
- controller.updateAndGet(CHECK_POINT_GAP * 2, false);
+ controller.update(CHECK_POINT_GAP * 2, false);
Assert.assertEquals(CHECK_POINT_GAP * 2, controller.getCurrentIndex());
Assert.assertEquals(CHECK_POINT_GAP * 2, controller.getLastFlushedIndex());
controller = new IndexController(storageDir.getAbsolutePath(), peer, 0,
CHECK_POINT_GAP);
Assert.assertEquals(CHECK_POINT_GAP * 2, controller.getCurrentIndex());
Assert.assertEquals(CHECK_POINT_GAP * 2, controller.getLastFlushedIndex());
+
+ controller.update(CHECK_POINT_GAP * 2 - 1, true);
+ Assert.assertEquals(CHECK_POINT_GAP * 2, controller.getCurrentIndex());
+ Assert.assertEquals(CHECK_POINT_GAP * 2, controller.getLastFlushedIndex());
+
+ controller.update(CHECK_POINT_GAP * 2 + 1, true);
+ Assert.assertEquals(CHECK_POINT_GAP * 2 + 1, controller.getCurrentIndex());
+ Assert.assertEquals(CHECK_POINT_GAP * 2 + 1,
controller.getLastFlushedIndex());
}
@Test
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index bd6c8097f1e..b7c59457e74 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -103,7 +103,7 @@ public enum ThreadName {
IOT_CONSENSUS_RPC_PROCESSOR("IoTConsensusRPC-Processor"),
ASYNC_DATANODE_IOT_CONSENSUS_CLIENT_POOL("AsyncDataNodeIoTConsensusServiceClientPool"),
LOG_DISPATCHER("LogDispatcher"),
- LOG_DISPATCHER_RETRY_EXECUTOR("LogDispatcherRetryExecutor"),
+ IOT_CONSENSUS_BACKGROUND_TASK_EXECUTOR("IoTConsensusBackgroundTaskExecutor"),
// -------------------------- Ratis --------------------------
// NOTICE: The thread name of ratis cannot be edited here!
// We list the thread name here just for distinguishing what module the
thread belongs to.
@@ -235,7 +235,7 @@ public enum ThreadName {
IOT_CONSENSUS_RPC_PROCESSOR,
ASYNC_DATANODE_IOT_CONSENSUS_CLIENT_POOL,
LOG_DISPATCHER,
- LOG_DISPATCHER_RETRY_EXECUTOR));
+ IOT_CONSENSUS_BACKGROUND_TASK_EXECUTOR));
private static final Set<ThreadName> ratisThreadNames =
new HashSet<>(