This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 21c2110ef5 [IOTDB-5512] Fixed IoTConsensus may repeatedly send some
log when restarting (#9074)
21c2110ef5 is described below
commit 21c2110ef52c9253d81a889aec35f31d7e24e7b4
Author: Xiangpeng Hu <[email protected]>
AuthorDate: Mon Feb 20 11:51:38 2023 +0800
[IOTDB-5512] Fixed IoTConsensus may repeatedly send some log when
restarting (#9074)
---
.../iot/logdispatcher/IndexController.java | 10 ++++-----
.../consensus/iot/logdispatcher/LogDispatcher.java | 2 +-
.../consensus/iot/logdispatcher/SyncStatus.java | 9 ++++++--
.../iot/logdispatcher/IndexControllerTest.java | 25 +++++++++-------------
.../iot/logdispatcher/SyncStatusTest.java | 10 +++++----
.../apache/iotdb/db/service/IoTDBShutdownHook.java | 13 +++++++++++
6 files changed, 42 insertions(+), 27 deletions(-)
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexController.java
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexController.java
index 116a5f3c96..d69ca33c1d 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexController.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexController.java
@@ -70,7 +70,7 @@ public class IndexController {
restore();
}
- public long updateAndGet(long index) {
+ public long updateAndGet(long index, boolean forcePersist) {
try {
lock.writeLock().lock();
long newCurrentIndex = Math.max(currentIndex, index);
@@ -81,7 +81,7 @@ public class IndexController {
prefix,
storageDir);
currentIndex = newCurrentIndex;
- checkPersist();
+ checkPersist(forcePersist);
return currentIndex;
} finally {
lock.writeLock().unlock();
@@ -102,14 +102,14 @@ public class IndexController {
return lastFlushedIndex;
}
- private void checkPersist() {
- if (currentIndex - lastFlushedIndex >= checkpointGap) {
+ private void checkPersist(boolean forcePersist) {
+ if (forcePersist || currentIndex - lastFlushedIndex >= checkpointGap) {
persist();
}
}
private void persist() {
- long flushIndex = currentIndex - currentIndex % checkpointGap;
+ long flushIndex = currentIndex;
File oldFile = new File(storageDir, prefix + lastFlushedIndex);
File newFile = new File(storageDir, prefix + flushIndex);
try {
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index b75a4f4810..68a988558e 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -217,7 +217,7 @@ public class LogDispatcher {
peer,
initialSyncIndex,
config.getReplication().getCheckpointGap());
- this.syncStatus = new SyncStatus(controller, config);
+ this.syncStatus = new SyncStatus(controller, config,
impl::getSearchIndex);
this.walEntryIterator = reader.getReqIterator(START_INDEX);
this.metrics = new LogDispatcherThreadMetrics(this);
}
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
index 4c3c008783..de6d0691dc 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
@@ -25,18 +25,21 @@ import org.apache.iotdb.consensus.config.IoTConsensusConfig;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
+import java.util.function.LongSupplier;
public class SyncStatus {
private final IoTConsensusConfig config;
private final IndexController controller;
+ private final LongSupplier supplier;
private final LinkedList<Batch> pendingBatches = new LinkedList<>();
private final IoTConsensusMemoryManager iotConsensusMemoryManager =
IoTConsensusMemoryManager.getInstance();
- public SyncStatus(IndexController controller, IoTConsensusConfig config) {
+ public SyncStatus(IndexController controller, IoTConsensusConfig config,
LongSupplier supplier) {
this.controller = controller;
this.config = config;
+ this.supplier = supplier;
}
/** we may block here if the synchronization pipeline is full. */
@@ -62,7 +65,8 @@ public class SyncStatus {
Iterator<Batch> iterator = pendingBatches.iterator();
Batch current = iterator.next();
while (current.isSynced()) {
- controller.updateAndGet(current.getEndIndex());
+ controller.updateAndGet(
+ current.getEndIndex(), supplier.getAsLong() ==
current.getEndIndex());
iterator.remove();
iotConsensusMemoryManager.free(current.getSerializedSize(), false);
if (iterator.hasNext()) {
@@ -83,6 +87,7 @@ public class SyncStatus {
size += pendingBatch.getSerializedSize();
}
pendingBatches.clear();
+ controller.updateAndGet(0L, true);
iotConsensusMemoryManager.free(size, false);
}
diff --git
a/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexControllerTest.java
b/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexControllerTest.java
index 7051f47930..fff348516e 100644
---
a/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexControllerTest.java
+++
b/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexControllerTest.java
@@ -60,8 +60,7 @@ public class IndexControllerTest {
Assert.assertEquals(0, controller.getCurrentIndex());
Assert.assertEquals(0, controller.getLastFlushedIndex());
- controller.updateAndGet(CHECK_POINT_GAP - 1);
-
+ controller.updateAndGet(CHECK_POINT_GAP - 1, false);
Assert.assertEquals(CHECK_POINT_GAP - 1, controller.getCurrentIndex());
Assert.assertEquals(0, controller.getLastFlushedIndex());
@@ -69,24 +68,20 @@ public class IndexControllerTest {
Assert.assertEquals(0, controller.getCurrentIndex());
Assert.assertEquals(0, controller.getLastFlushedIndex());
- controller.updateAndGet(CHECK_POINT_GAP + 1);
- Assert.assertEquals(CHECK_POINT_GAP + 1, controller.getCurrentIndex());
- Assert.assertEquals(CHECK_POINT_GAP, controller.getLastFlushedIndex());
+ controller.updateAndGet(CHECK_POINT_GAP - 1, true);
+ Assert.assertEquals(CHECK_POINT_GAP - 1, controller.getCurrentIndex());
+ Assert.assertEquals(CHECK_POINT_GAP - 1, controller.getLastFlushedIndex());
controller = new IndexController(storageDir.getAbsolutePath(), peer, 0,
CHECK_POINT_GAP);
- Assert.assertEquals(CHECK_POINT_GAP, controller.getCurrentIndex());
- Assert.assertEquals(CHECK_POINT_GAP, controller.getLastFlushedIndex());
+ Assert.assertEquals(CHECK_POINT_GAP - 1, controller.getCurrentIndex());
+ Assert.assertEquals(CHECK_POINT_GAP - 1, controller.getLastFlushedIndex());
- controller.updateAndGet(CHECK_POINT_GAP * 2 - 1);
- Assert.assertEquals(CHECK_POINT_GAP * 2 - 1, controller.getCurrentIndex());
- Assert.assertEquals(CHECK_POINT_GAP, controller.getLastFlushedIndex());
+ controller.updateAndGet(CHECK_POINT_GAP * 2, false);
+ Assert.assertEquals(CHECK_POINT_GAP * 2, controller.getCurrentIndex());
+ Assert.assertEquals(CHECK_POINT_GAP * 2, controller.getLastFlushedIndex());
controller = new IndexController(storageDir.getAbsolutePath(), peer, 0,
CHECK_POINT_GAP);
- Assert.assertEquals(CHECK_POINT_GAP, controller.getCurrentIndex());
- Assert.assertEquals(CHECK_POINT_GAP, controller.getLastFlushedIndex());
-
- controller.updateAndGet(CHECK_POINT_GAP * 2 + 1);
- Assert.assertEquals(CHECK_POINT_GAP * 2 + 1, controller.getCurrentIndex());
+ Assert.assertEquals(CHECK_POINT_GAP * 2, controller.getCurrentIndex());
Assert.assertEquals(CHECK_POINT_GAP * 2, controller.getLastFlushedIndex());
}
diff --git
a/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatusTest.java
b/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatusTest.java
index 62be6e1a61..5b98604353 100644
---
a/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatusTest.java
+++
b/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatusTest.java
@@ -37,6 +37,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.function.LongSupplier;
public class SyncStatusTest {
@@ -46,6 +47,7 @@ public class SyncStatusTest {
new Peer(new DataRegionId(1), 2, new TEndPoint("127.0.0.1", 6667));
private static final IoTConsensusConfig config = new
IoTConsensusConfig.Builder().build();
private static final long CHECK_POINT_GAP = 500;
+ private static final LongSupplier supplier = () -> -1L;
@Before
public void setUp() throws IOException {
@@ -64,7 +66,7 @@ public class SyncStatusTest {
new IndexController(storageDir.getAbsolutePath(), peer, 0,
CHECK_POINT_GAP);
Assert.assertEquals(0, controller.getCurrentIndex());
- SyncStatus status = new SyncStatus(controller, config);
+ SyncStatus status = new SyncStatus(controller, config, supplier);
List<Batch> batchList = new ArrayList<>();
for (long i = 0; i < config.getReplication().getMaxPendingBatchesNum();
i++) {
@@ -96,7 +98,7 @@ public class SyncStatusTest {
Assert.assertEquals(0, controller.getCurrentIndex());
Assert.assertEquals(0, controller.getLastFlushedIndex());
- SyncStatus status = new SyncStatus(controller, config);
+ SyncStatus status = new SyncStatus(controller, config, supplier);
List<Batch> batchList = new ArrayList<>();
for (long i = 0; i < config.getReplication().getMaxPendingBatchesNum();
i++) {
@@ -134,7 +136,7 @@ public class SyncStatusTest {
Assert.assertEquals(0, controller.getCurrentIndex());
Assert.assertEquals(0, controller.getLastFlushedIndex());
- SyncStatus status = new SyncStatus(controller, config);
+ SyncStatus status = new SyncStatus(controller, config, supplier);
List<Batch> batchList = new ArrayList<>();
for (long i = 0; i < config.getReplication().getMaxPendingBatchesNum();
i++) {
@@ -184,7 +186,7 @@ public class SyncStatusTest {
new IndexController(storageDir.getAbsolutePath(), peer, 0,
CHECK_POINT_GAP);
Assert.assertEquals(0, controller.getCurrentIndex());
- SyncStatus status = new SyncStatus(controller, config);
+ SyncStatus status = new SyncStatus(controller, config, supplier);
List<Batch> batchList = new ArrayList<>();
for (long i = 0; i < config.getReplication().getMaxPendingBatchesNum();
i++) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/IoTDBShutdownHook.java
b/server/src/main/java/org/apache/iotdb/db/service/IoTDBShutdownHook.java
index b8ddc97d2d..bccc048467 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDBShutdownHook.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDBShutdownHook.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.directories.DirectoryChecker;
import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
+import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
import org.apache.iotdb.db.metadata.schemaregion.SchemaEngineMode;
@@ -70,6 +71,18 @@ public class IoTDBShutdownHook extends Thread {
.forEach(id ->
DataRegionConsensusImpl.getInstance().triggerSnapshot(id));
}
+ // close consensusImpl
+ try {
+ if (SchemaRegionConsensusImpl.getInstance() != null) {
+ SchemaRegionConsensusImpl.getInstance().stop();
+ }
+ if (DataRegionConsensusImpl.getInstance() != null) {
+ DataRegionConsensusImpl.getInstance().stop();
+ }
+ } catch (Exception e) {
+ logger.error("Stop ConsensusImpl error in IoTDBShutdownHook", e);
+ }
+
// clear lock file
DirectoryChecker.getInstance().deregisterAll();