This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 1dfdae910bf [IOTDB-6266] Add the ability to flush syncIndex and update 
reader periodically for IoTConsensus (#11691)
1dfdae910bf is described below

commit 1dfdae910bfd2eb9f610d04a9e96385d0406cdfc
Author: Potato <[email protected]>
AuthorDate: Wed Dec 13 10:23:10 2023 +0800

    [IOTDB-6266] Add the ability to flush syncIndex and update reader 
periodically for IoTConsensus (#11691)
---
 .../apache/iotdb/consensus/iot/IoTConsensus.java   | 31 +++++++++++++++++-----
 .../consensus/iot/IoTConsensusServerImpl.java      | 14 ++++++----
 .../consensus/iot/client/DispatchLogHandler.java   |  2 +-
 .../iot/logdispatcher/IndexController.java         |  3 +--
 .../consensus/iot/logdispatcher/LogDispatcher.java | 11 ++++++++
 .../consensus/iot/logdispatcher/SyncStatus.java    |  4 +--
 .../iot/logdispatcher/IndexControllerTest.java     | 14 +++++++---
 .../iotdb/commons/concurrent/ThreadName.java       |  4 +--
 8 files changed, 61 insertions(+), 22 deletions(-)

diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
index 3da7df08dc1..babe939393f 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.exception.StartupException;
 import org.apache.iotdb.commons.service.RegisterManager;
@@ -68,12 +69,14 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 public class IoTConsensus implements IConsensus {
 
+  private static final long READER_UPDATE_INTERVAL_IN_MINUTES = 3;
   private final Logger logger = LoggerFactory.getLogger(IoTConsensus.class);
 
   private final TEndPoint thisNode;
@@ -87,7 +90,8 @@ public class IoTConsensus implements IConsensus {
   private final IoTConsensusConfig config;
   private final IClientManager<TEndPoint, AsyncIoTConsensusServiceClient> 
clientManager;
   private final IClientManager<TEndPoint, SyncIoTConsensusServiceClient> 
syncClientManager;
-  private final ScheduledExecutorService retryService;
+  private final ScheduledExecutorService backgroundTaskService;
+  private Future<?> updateReaderFuture;
 
   public IoTConsensus(ConsensusConfig config, Registry registry) {
     this.thisNode = config.getThisNodeEndPoint();
@@ -104,9 +108,9 @@ public class IoTConsensus implements IConsensus {
         new IClientManager.Factory<TEndPoint, SyncIoTConsensusServiceClient>()
             .createClientManager(
                 new 
SyncIoTConsensusServiceClientPoolFactory(config.getIotConsensusConfig()));
-    this.retryService =
+    this.backgroundTaskService =
         IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
-            ThreadName.LOG_DISPATCHER_RETRY_EXECUTOR.getName());
+            ThreadName.IOT_CONSENSUS_BACKGROUND_TASK_EXECUTOR.getName());
     // init IoTConsensus memory manager
     IoTConsensusMemoryManager.getInstance()
         .init(
@@ -123,6 +127,18 @@ public class IoTConsensus implements IConsensus {
     } catch (StartupException e) {
       throw new IOException(e);
     }
+    if (updateReaderFuture == null) {
+      updateReaderFuture =
+          ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+              backgroundTaskService,
+              () ->
+                  stateMachineMap
+                      .values()
+                      .forEach(impl -> 
impl.getLogDispatcher().checkAndFlushIndex()),
+              READER_UPDATE_INTERVAL_IN_MINUTES,
+              READER_UPDATE_INTERVAL_IN_MINUTES,
+              TimeUnit.MINUTES);
+    }
   }
 
   private void initAndRecover() throws IOException {
@@ -143,7 +159,7 @@ public class IoTConsensus implements IConsensus {
                   new Peer(consensusGroupId, thisNodeId, thisNode),
                   new ArrayList<>(),
                   registry.apply(consensusGroupId),
-                  retryService,
+                  backgroundTaskService,
                   clientManager,
                   syncClientManager,
                   config);
@@ -156,13 +172,14 @@ public class IoTConsensus implements IConsensus {
 
   @Override
   public synchronized void stop() {
+    Optional.ofNullable(updateReaderFuture).ifPresent(future -> 
future.cancel(false));
     
stateMachineMap.values().parallelStream().forEach(IoTConsensusServerImpl::stop);
     clientManager.close();
     syncClientManager.close();
     registerManager.deregisterAll();
-    retryService.shutdown();
+    backgroundTaskService.shutdown();
     try {
-      retryService.awaitTermination(5, TimeUnit.SECONDS);
+      backgroundTaskService.awaitTermination(5, TimeUnit.SECONDS);
     } catch (InterruptedException e) {
       logger.warn("{}: interrupted when shutting down add Executor with 
exception {}", this, e);
       Thread.currentThread().interrupt();
@@ -225,7 +242,7 @@ public class IoTConsensus implements IConsensus {
                           new Peer(groupId, thisNodeId, thisNode),
                           peers,
                           registry.apply(groupId),
-                          retryService,
+                          backgroundTaskService,
                           clientManager,
                           syncClientManager,
                           config);
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index e8f74cd0dfa..4e43306d7c5 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -113,14 +113,14 @@ public class IoTConsensusServerImpl {
   private final IClientManager<TEndPoint, SyncIoTConsensusServiceClient> 
syncClientManager;
   private final IoTConsensusServerMetrics ioTConsensusServerMetrics;
   private final String consensusGroupId;
-  private final ScheduledExecutorService retryService;
+  private final ScheduledExecutorService backgroundTaskService;
 
   public IoTConsensusServerImpl(
       String storageDir,
       Peer thisNode,
       List<Peer> configuration,
       IStateMachine stateMachine,
-      ScheduledExecutorService retryService,
+      ScheduledExecutorService backgroundTaskService,
       IClientManager<TEndPoint, AsyncIoTConsensusServiceClient> clientManager,
       IClientManager<TEndPoint, SyncIoTConsensusServiceClient> 
syncClientManager,
       IoTConsensusConfig config) {
@@ -136,7 +136,7 @@ public class IoTConsensusServerImpl {
     } else {
       persistConfiguration();
     }
-    this.retryService = retryService;
+    this.backgroundTaskService = backgroundTaskService;
     this.config = config;
     this.consensusGroupId = thisNode.getGroupId().toString();
     consensusReqReader = (ConsensusReqReader) stateMachine.read(new 
GetConsensusReqReaderPlan());
@@ -733,8 +733,12 @@ public class IoTConsensusServerImpl {
     return searchIndex;
   }
 
-  public ScheduledExecutorService getRetryService() {
-    return retryService;
+  public ScheduledExecutorService getBackgroundTaskService() {
+    return backgroundTaskService;
+  }
+
+  public LogDispatcher getLogDispatcher() {
+    return logDispatcher;
   }
 
   public IoTConsensusServerMetrics getIoTConsensusServerMetrics() {
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
index 2814c6fb833..88a8d90ebaa 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
@@ -110,7 +110,7 @@ public class DispatchLogHandler implements 
AsyncMethodCallback<TSyncLogEntriesRe
             thread.getConfig().getReplication().getMaxRetryWaitTimeMs());
     thread
         .getImpl()
-        .getRetryService()
+        .getBackgroundTaskService()
         .schedule(
             () -> {
               if (thread.isStopped()) {
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexController.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexController.java
index 61ace504758..ddafaf2f010 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexController.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexController.java
@@ -69,7 +69,7 @@ public class IndexController {
     restore();
   }
 
-  public long updateAndGet(long index, boolean forcePersist) {
+  public void update(long index, boolean forcePersist) {
     try {
       lock.writeLock().lock();
       long newCurrentIndex = Math.max(currentIndex, index);
@@ -81,7 +81,6 @@ public class IndexController {
           storageDir);
       currentIndex = newCurrentIndex;
       checkPersist(forcePersist);
-      return currentIndex;
     } finally {
       lock.writeLock().unlock();
     }
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index 4ff4ed86104..47da07944f0 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -62,6 +62,7 @@ public class LogDispatcher {
   private final IClientManager<TEndPoint, AsyncIoTConsensusServiceClient> 
clientManager;
   private ExecutorService executorService;
 
+  private final ConsensusReqReader reader;
   private boolean stopped = false;
 
   private final AtomicLong logEntriesFromWAL = new AtomicLong(0);
@@ -71,6 +72,7 @@ public class LogDispatcher {
       IoTConsensusServerImpl impl,
       IClientManager<TEndPoint, AsyncIoTConsensusServiceClient> clientManager) 
{
     this.impl = impl;
+    this.reader = (ConsensusReqReader) impl.getStateMachine().read(new 
GetConsensusReqReaderPlan());
     this.selfPeerId = impl.getThisNode().getNodeId();
     this.clientManager = clientManager;
     this.threads =
@@ -157,6 +159,15 @@ public class LogDispatcher {
     return 
threads.stream().mapToLong(LogDispatcherThread::getLastFlushedSyncIndex).min();
   }
 
+  public void checkAndFlushIndex() {
+    threads.forEach(
+        thread -> {
+          IndexController controller = thread.getController();
+          controller.update(controller.getCurrentIndex(), true);
+        });
+    reader.setSafelyDeletedSearchIndex(impl.getMinFlushedSyncIndex());
+  }
+
   public void offer(IndexedConsensusRequest request) {
     // we don't need to serialize and offer request when replicaNum is 1.
     if (!threads.isEmpty()) {
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
index 48580d5d758..17c232a294b 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
@@ -65,7 +65,7 @@ public class SyncStatus {
         Iterator<Batch> iterator = pendingBatches.iterator();
         Batch current = iterator.next();
         while (current.isSynced()) {
-          controller.updateAndGet(current.getEndIndex(), false);
+          controller.update(current.getEndIndex(), false);
           iterator.remove();
           iotConsensusMemoryManager.free(current.getSerializedSize(), false);
           if (iterator.hasNext()) {
@@ -86,7 +86,7 @@ public class SyncStatus {
       size += pendingBatch.getSerializedSize();
     }
     pendingBatches.clear();
-    controller.updateAndGet(0L, true);
+    controller.update(0L, true);
     iotConsensusMemoryManager.free(size, false);
   }
 
diff --git 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexControllerTest.java
 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexControllerTest.java
index a34c6ca0222..0f02bb21cef 100644
--- 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexControllerTest.java
+++ 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexControllerTest.java
@@ -60,7 +60,7 @@ public class IndexControllerTest {
     Assert.assertEquals(0, controller.getCurrentIndex());
     Assert.assertEquals(0, controller.getLastFlushedIndex());
 
-    controller.updateAndGet(CHECK_POINT_GAP - 1, false);
+    controller.update(CHECK_POINT_GAP - 1, false);
     Assert.assertEquals(CHECK_POINT_GAP - 1, controller.getCurrentIndex());
     Assert.assertEquals(0, controller.getLastFlushedIndex());
 
@@ -68,7 +68,7 @@ public class IndexControllerTest {
     Assert.assertEquals(0, controller.getCurrentIndex());
     Assert.assertEquals(0, controller.getLastFlushedIndex());
 
-    controller.updateAndGet(CHECK_POINT_GAP - 1, true);
+    controller.update(CHECK_POINT_GAP - 1, true);
     Assert.assertEquals(CHECK_POINT_GAP - 1, controller.getCurrentIndex());
     Assert.assertEquals(CHECK_POINT_GAP - 1, controller.getLastFlushedIndex());
 
@@ -76,13 +76,21 @@ public class IndexControllerTest {
     Assert.assertEquals(CHECK_POINT_GAP - 1, controller.getCurrentIndex());
     Assert.assertEquals(CHECK_POINT_GAP - 1, controller.getLastFlushedIndex());
 
-    controller.updateAndGet(CHECK_POINT_GAP * 2, false);
+    controller.update(CHECK_POINT_GAP * 2, false);
     Assert.assertEquals(CHECK_POINT_GAP * 2, controller.getCurrentIndex());
     Assert.assertEquals(CHECK_POINT_GAP * 2, controller.getLastFlushedIndex());
 
     controller = new IndexController(storageDir.getAbsolutePath(), peer, 0, 
CHECK_POINT_GAP);
     Assert.assertEquals(CHECK_POINT_GAP * 2, controller.getCurrentIndex());
     Assert.assertEquals(CHECK_POINT_GAP * 2, controller.getLastFlushedIndex());
+
+    controller.update(CHECK_POINT_GAP * 2 - 1, true);
+    Assert.assertEquals(CHECK_POINT_GAP * 2, controller.getCurrentIndex());
+    Assert.assertEquals(CHECK_POINT_GAP * 2, controller.getLastFlushedIndex());
+
+    controller.update(CHECK_POINT_GAP * 2 + 1, true);
+    Assert.assertEquals(CHECK_POINT_GAP * 2 + 1, controller.getCurrentIndex());
+    Assert.assertEquals(CHECK_POINT_GAP * 2 + 1, 
controller.getLastFlushedIndex());
   }
 
   @Test
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index bd6c8097f1e..b7c59457e74 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -103,7 +103,7 @@ public enum ThreadName {
   IOT_CONSENSUS_RPC_PROCESSOR("IoTConsensusRPC-Processor"),
   
ASYNC_DATANODE_IOT_CONSENSUS_CLIENT_POOL("AsyncDataNodeIoTConsensusServiceClientPool"),
   LOG_DISPATCHER("LogDispatcher"),
-  LOG_DISPATCHER_RETRY_EXECUTOR("LogDispatcherRetryExecutor"),
+  IOT_CONSENSUS_BACKGROUND_TASK_EXECUTOR("IoTConsensusBackgroundTaskExecutor"),
   // -------------------------- Ratis --------------------------
   // NOTICE: The thread name of ratis cannot be edited here!
   // We list the thread name here just for distinguishing what module the 
thread belongs to.
@@ -235,7 +235,7 @@ public enum ThreadName {
               IOT_CONSENSUS_RPC_PROCESSOR,
               ASYNC_DATANODE_IOT_CONSENSUS_CLIENT_POOL,
               LOG_DISPATCHER,
-              LOG_DISPATCHER_RETRY_EXECUTOR));
+              IOT_CONSENSUS_BACKGROUND_TASK_EXECUTOR));
 
   private static final Set<ThreadName> ratisThreadNames =
       new HashSet<>(

Reply via email to