This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 21c2110ef5 [IOTDB-5512] Fixed IoTConsensus may repeatedly send some 
log when restarting (#9074)
21c2110ef5 is described below

commit 21c2110ef52c9253d81a889aec35f31d7e24e7b4
Author: Xiangpeng Hu <[email protected]>
AuthorDate: Mon Feb 20 11:51:38 2023 +0800

    [IOTDB-5512] Fixed IoTConsensus may repeatedly send some log when 
restarting (#9074)
---
 .../iot/logdispatcher/IndexController.java         | 10 ++++-----
 .../consensus/iot/logdispatcher/LogDispatcher.java |  2 +-
 .../consensus/iot/logdispatcher/SyncStatus.java    |  9 ++++++--
 .../iot/logdispatcher/IndexControllerTest.java     | 25 +++++++++-------------
 .../iot/logdispatcher/SyncStatusTest.java          | 10 +++++----
 .../apache/iotdb/db/service/IoTDBShutdownHook.java | 13 +++++++++++
 6 files changed, 42 insertions(+), 27 deletions(-)

diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexController.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexController.java
index 116a5f3c96..d69ca33c1d 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexController.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexController.java
@@ -70,7 +70,7 @@ public class IndexController {
     restore();
   }
 
-  public long updateAndGet(long index) {
+  public long updateAndGet(long index, boolean forcePersist) {
     try {
       lock.writeLock().lock();
       long newCurrentIndex = Math.max(currentIndex, index);
@@ -81,7 +81,7 @@ public class IndexController {
           prefix,
           storageDir);
       currentIndex = newCurrentIndex;
-      checkPersist();
+      checkPersist(forcePersist);
       return currentIndex;
     } finally {
       lock.writeLock().unlock();
@@ -102,14 +102,14 @@ public class IndexController {
     return lastFlushedIndex;
   }
 
-  private void checkPersist() {
-    if (currentIndex - lastFlushedIndex >= checkpointGap) {
+  private void checkPersist(boolean forcePersist) {
+    if (forcePersist || currentIndex - lastFlushedIndex >= checkpointGap) {
       persist();
     }
   }
 
   private void persist() {
-    long flushIndex = currentIndex - currentIndex % checkpointGap;
+    long flushIndex = currentIndex;
     File oldFile = new File(storageDir, prefix + lastFlushedIndex);
     File newFile = new File(storageDir, prefix + flushIndex);
     try {
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index b75a4f4810..68a988558e 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -217,7 +217,7 @@ public class LogDispatcher {
               peer,
               initialSyncIndex,
               config.getReplication().getCheckpointGap());
-      this.syncStatus = new SyncStatus(controller, config);
+      this.syncStatus = new SyncStatus(controller, config, 
impl::getSearchIndex);
       this.walEntryIterator = reader.getReqIterator(START_INDEX);
       this.metrics = new LogDispatcherThreadMetrics(this);
     }
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
index 4c3c008783..de6d0691dc 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
@@ -25,18 +25,21 @@ import org.apache.iotdb.consensus.config.IoTConsensusConfig;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.function.LongSupplier;
 
 public class SyncStatus {
 
   private final IoTConsensusConfig config;
   private final IndexController controller;
+  private final LongSupplier supplier;
   private final LinkedList<Batch> pendingBatches = new LinkedList<>();
   private final IoTConsensusMemoryManager iotConsensusMemoryManager =
       IoTConsensusMemoryManager.getInstance();
 
-  public SyncStatus(IndexController controller, IoTConsensusConfig config) {
+  public SyncStatus(IndexController controller, IoTConsensusConfig config, 
LongSupplier supplier) {
     this.controller = controller;
     this.config = config;
+    this.supplier = supplier;
   }
 
   /** we may block here if the synchronization pipeline is full. */
@@ -62,7 +65,8 @@ public class SyncStatus {
         Iterator<Batch> iterator = pendingBatches.iterator();
         Batch current = iterator.next();
         while (current.isSynced()) {
-          controller.updateAndGet(current.getEndIndex());
+          controller.updateAndGet(
+              current.getEndIndex(), supplier.getAsLong() == 
current.getEndIndex());
           iterator.remove();
           iotConsensusMemoryManager.free(current.getSerializedSize(), false);
           if (iterator.hasNext()) {
@@ -83,6 +87,7 @@ public class SyncStatus {
       size += pendingBatch.getSerializedSize();
     }
     pendingBatches.clear();
+    controller.updateAndGet(0L, true);
     iotConsensusMemoryManager.free(size, false);
   }
 
diff --git 
a/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexControllerTest.java
 
b/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexControllerTest.java
index 7051f47930..fff348516e 100644
--- 
a/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexControllerTest.java
+++ 
b/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexControllerTest.java
@@ -60,8 +60,7 @@ public class IndexControllerTest {
     Assert.assertEquals(0, controller.getCurrentIndex());
     Assert.assertEquals(0, controller.getLastFlushedIndex());
 
-    controller.updateAndGet(CHECK_POINT_GAP - 1);
-
+    controller.updateAndGet(CHECK_POINT_GAP - 1, false);
     Assert.assertEquals(CHECK_POINT_GAP - 1, controller.getCurrentIndex());
     Assert.assertEquals(0, controller.getLastFlushedIndex());
 
@@ -69,24 +68,20 @@ public class IndexControllerTest {
     Assert.assertEquals(0, controller.getCurrentIndex());
     Assert.assertEquals(0, controller.getLastFlushedIndex());
 
-    controller.updateAndGet(CHECK_POINT_GAP + 1);
-    Assert.assertEquals(CHECK_POINT_GAP + 1, controller.getCurrentIndex());
-    Assert.assertEquals(CHECK_POINT_GAP, controller.getLastFlushedIndex());
+    controller.updateAndGet(CHECK_POINT_GAP - 1, true);
+    Assert.assertEquals(CHECK_POINT_GAP - 1, controller.getCurrentIndex());
+    Assert.assertEquals(CHECK_POINT_GAP - 1, controller.getLastFlushedIndex());
 
     controller = new IndexController(storageDir.getAbsolutePath(), peer, 0, 
CHECK_POINT_GAP);
-    Assert.assertEquals(CHECK_POINT_GAP, controller.getCurrentIndex());
-    Assert.assertEquals(CHECK_POINT_GAP, controller.getLastFlushedIndex());
+    Assert.assertEquals(CHECK_POINT_GAP - 1, controller.getCurrentIndex());
+    Assert.assertEquals(CHECK_POINT_GAP - 1, controller.getLastFlushedIndex());
 
-    controller.updateAndGet(CHECK_POINT_GAP * 2 - 1);
-    Assert.assertEquals(CHECK_POINT_GAP * 2 - 1, controller.getCurrentIndex());
-    Assert.assertEquals(CHECK_POINT_GAP, controller.getLastFlushedIndex());
+    controller.updateAndGet(CHECK_POINT_GAP * 2, false);
+    Assert.assertEquals(CHECK_POINT_GAP * 2, controller.getCurrentIndex());
+    Assert.assertEquals(CHECK_POINT_GAP * 2, controller.getLastFlushedIndex());
 
     controller = new IndexController(storageDir.getAbsolutePath(), peer, 0, 
CHECK_POINT_GAP);
-    Assert.assertEquals(CHECK_POINT_GAP, controller.getCurrentIndex());
-    Assert.assertEquals(CHECK_POINT_GAP, controller.getLastFlushedIndex());
-
-    controller.updateAndGet(CHECK_POINT_GAP * 2 + 1);
-    Assert.assertEquals(CHECK_POINT_GAP * 2 + 1, controller.getCurrentIndex());
+    Assert.assertEquals(CHECK_POINT_GAP * 2, controller.getCurrentIndex());
     Assert.assertEquals(CHECK_POINT_GAP * 2, controller.getLastFlushedIndex());
   }
 
diff --git 
a/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatusTest.java
 
b/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatusTest.java
index 62be6e1a61..5b98604353 100644
--- 
a/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatusTest.java
+++ 
b/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatusTest.java
@@ -37,6 +37,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.function.LongSupplier;
 
 public class SyncStatusTest {
 
@@ -46,6 +47,7 @@ public class SyncStatusTest {
       new Peer(new DataRegionId(1), 2, new TEndPoint("127.0.0.1", 6667));
   private static final IoTConsensusConfig config = new 
IoTConsensusConfig.Builder().build();
   private static final long CHECK_POINT_GAP = 500;
+  private static final LongSupplier supplier = () -> -1L;
 
   @Before
   public void setUp() throws IOException {
@@ -64,7 +66,7 @@ public class SyncStatusTest {
         new IndexController(storageDir.getAbsolutePath(), peer, 0, 
CHECK_POINT_GAP);
     Assert.assertEquals(0, controller.getCurrentIndex());
 
-    SyncStatus status = new SyncStatus(controller, config);
+    SyncStatus status = new SyncStatus(controller, config, supplier);
     List<Batch> batchList = new ArrayList<>();
 
     for (long i = 0; i < config.getReplication().getMaxPendingBatchesNum(); 
i++) {
@@ -96,7 +98,7 @@ public class SyncStatusTest {
     Assert.assertEquals(0, controller.getCurrentIndex());
     Assert.assertEquals(0, controller.getLastFlushedIndex());
 
-    SyncStatus status = new SyncStatus(controller, config);
+    SyncStatus status = new SyncStatus(controller, config, supplier);
     List<Batch> batchList = new ArrayList<>();
 
     for (long i = 0; i < config.getReplication().getMaxPendingBatchesNum(); 
i++) {
@@ -134,7 +136,7 @@ public class SyncStatusTest {
     Assert.assertEquals(0, controller.getCurrentIndex());
     Assert.assertEquals(0, controller.getLastFlushedIndex());
 
-    SyncStatus status = new SyncStatus(controller, config);
+    SyncStatus status = new SyncStatus(controller, config, supplier);
     List<Batch> batchList = new ArrayList<>();
 
     for (long i = 0; i < config.getReplication().getMaxPendingBatchesNum(); 
i++) {
@@ -184,7 +186,7 @@ public class SyncStatusTest {
         new IndexController(storageDir.getAbsolutePath(), peer, 0, 
CHECK_POINT_GAP);
     Assert.assertEquals(0, controller.getCurrentIndex());
 
-    SyncStatus status = new SyncStatus(controller, config);
+    SyncStatus status = new SyncStatus(controller, config, supplier);
     List<Batch> batchList = new ArrayList<>();
 
     for (long i = 0; i < config.getReplication().getMaxPendingBatchesNum(); 
i++) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/IoTDBShutdownHook.java 
b/server/src/main/java/org/apache/iotdb/db/service/IoTDBShutdownHook.java
index b8ddc97d2d..bccc048467 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDBShutdownHook.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDBShutdownHook.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.directories.DirectoryChecker;
 import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
+import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
 import org.apache.iotdb.db.metadata.schemaregion.SchemaEngineMode;
@@ -70,6 +71,18 @@ public class IoTDBShutdownHook extends Thread {
           .forEach(id -> 
DataRegionConsensusImpl.getInstance().triggerSnapshot(id));
     }
 
+    // close consensusImpl
+    try {
+      if (SchemaRegionConsensusImpl.getInstance() != null) {
+        SchemaRegionConsensusImpl.getInstance().stop();
+      }
+      if (DataRegionConsensusImpl.getInstance() != null) {
+        DataRegionConsensusImpl.getInstance().stop();
+      }
+    } catch (Exception e) {
+      logger.error("Stop ConsensusImpl error in IoTDBShutdownHook", e);
+    }
+
     // clear lock file
     DirectoryChecker.getInstance().deregisterAll();
 

Reply via email to