This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 3b4fb7b6d96 [IOTDB-6257] Safely Delete IoT WAL with LastFlushedIndex
To Support Kill -9 (#11614)
3b4fb7b6d96 is described below
commit 3b4fb7b6d96b55b4638ba0e3f7f518df8b5be726
Author: Xiangpeng Hu <[email protected]>
AuthorDate: Tue Nov 28 17:35:09 2023 +0800
[IOTDB-6257] Safely Delete IoT WAL with LastFlushedIndex To Support Kill -9
(#11614)
* persist
* seprate sync index and flusded index
* rename
* resolve commets
* resolve commets
* resolve commets
* fix retry log
---
.../consensus/iot/IoTConsensusServerImpl.java | 23 +++++++++--------
.../consensus/iot/IoTConsensusServerMetrics.java | 2 +-
.../consensus/iot/client/DispatchLogHandler.java | 15 +++++++++--
.../iot/logdispatcher/IndexController.java | 2 --
.../consensus/iot/logdispatcher/LogDispatcher.java | 14 ++++++++--
.../service/IoTConsensusRPCServiceProcessor.java | 2 +-
.../apache/iotdb/consensus/iot/ReplicateTest.java | 30 +++++++++-------------
7 files changed, 51 insertions(+), 37 deletions(-)
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index 1f8b50ee2bb..e8f74cd0dfa 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -181,10 +181,7 @@ public class IoTConsensusServerImpl {
ioTConsensusServerMetrics.recordGetStateMachineLockTime(
getStateMachineLockTime - consensusWriteStartTime);
if (needBlockWrite()) {
- logger.info(
- "[Throttle Down] index:{}, safeIndex:{}",
- getSearchIndex(),
- getCurrentSafelyDeletedSearchIndex());
+ logger.info("[Throttle Down] index:{}, safeIndex:{}",
getSearchIndex(), getMinSyncIndex());
try {
boolean timeout =
!stateMachineCondition.await(
@@ -213,7 +210,7 @@ public class IoTConsensusServerImpl {
logger.info(
"DataRegion[{}]: index after build: safeIndex:{}, searchIndex: {}",
thisNode.getGroupId(),
- getCurrentSafelyDeletedSearchIndex(),
+ getMinSyncIndex(),
indexedConsensusRequest.getSearchIndex());
}
IConsensusRequest planNode =
stateMachine.deserializeRequest(indexedConsensusRequest);
@@ -560,7 +557,7 @@ public class IoTConsensusServerImpl {
* @throws ConsensusGroupModifyPeerException
*/
public void buildSyncLogChannel(Peer targetPeer) throws
ConsensusGroupModifyPeerException {
- buildSyncLogChannel(targetPeer, getCurrentSafelyDeletedSearchIndex());
+ buildSyncLogChannel(targetPeer, getMinSyncIndex());
}
public void buildSyncLogChannel(Peer targetPeer, long initialSyncIndex)
@@ -674,10 +671,14 @@ public class IoTConsensusServerImpl {
* In the case of multiple copies, the minimum synchronization index is
selected. In the case of
* single copies, the current index is selected
*/
- public long getCurrentSafelyDeletedSearchIndex() {
+ public long getMinSyncIndex() {
return logDispatcher.getMinSyncIndex().orElseGet(searchIndex::get);
}
+ public long getMinFlushedSyncIndex() {
+ return logDispatcher.getMinFlushedSyncIndex().orElseGet(searchIndex::get);
+ }
+
public String getStorageDir() {
return storageDir;
}
@@ -695,8 +696,8 @@ public class IoTConsensusServerImpl {
}
public long getSyncLag() {
- long safeIndex = getCurrentSafelyDeletedSearchIndex();
- return getSearchIndex() - safeIndex;
+ long minSyncIndex = getMinSyncIndex();
+ return getSearchIndex() - minSyncIndex;
}
public IoTConsensusConfig getConfig() {
@@ -801,13 +802,13 @@ public class IoTConsensusServerImpl {
if (configuration.size() == 1) {
consensusReqReader.setSafelyDeletedSearchIndex(Long.MAX_VALUE);
} else {
-
consensusReqReader.setSafelyDeletedSearchIndex(getCurrentSafelyDeletedSearchIndex());
+ consensusReqReader.setSafelyDeletedSearchIndex(getMinFlushedSyncIndex());
}
}
public void checkAndUpdateSearchIndex() {
long currentSearchIndex = searchIndex.get();
- long safelyDeletedSearchIndex = getCurrentSafelyDeletedSearchIndex();
+ long safelyDeletedSearchIndex = getMinFlushedSyncIndex();
if (currentSearchIndex < safelyDeletedSearchIndex) {
logger.warn(
"The searchIndex for this region({}) is smaller than the
safelyDeletedSearchIndex when "
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerMetrics.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerMetrics.java
index 736236f47d5..117f9712635 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerMetrics.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerMetrics.java
@@ -92,7 +92,7 @@ public class IoTConsensusServerMetrics implements IMetricSet {
Metric.IOT_CONSENSUS.toString(),
MetricLevel.IMPORTANT,
impl,
- IoTConsensusServerImpl::getCurrentSafelyDeletedSearchIndex,
+ IoTConsensusServerImpl::getMinSyncIndex,
Tag.NAME.toString(),
IMPL,
Tag.REGION.toString(),
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
index 93d63ea6389..2814c6fb833 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.consensus.iot.client;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.consensus.iot.logdispatcher.Batch;
import
org.apache.iotdb.consensus.iot.logdispatcher.LogDispatcher.LogDispatcherThread;
import org.apache.iotdb.consensus.iot.logdispatcher.LogDispatcherThreadMetrics;
@@ -30,7 +31,9 @@ import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.List;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
public class DispatchLogHandler implements
AsyncMethodCallback<TSyncLogEntriesRes> {
@@ -55,16 +58,24 @@ public class DispatchLogHandler implements
AsyncMethodCallback<TSyncLogEntriesRe
@Override
public void onComplete(TSyncLogEntriesRes response) {
if (response.getStatuses().stream().anyMatch(status ->
needRetry(status.getCode()))) {
+ List<String> retryStatusMessages =
+ response.getStatuses().stream()
+ .filter(status -> needRetry(status.getCode()))
+ .map(TSStatus::getMessage)
+ .collect(Collectors.toList());
+
+ String messages = String.join(", ", retryStatusMessages);
logger.warn(
"Can not send {} to peer {} for {} times because {}",
batch,
thread.getPeer(),
++retryCount,
- response.getStatuses().get(0).getMessage());
+ messages);
sleepCorrespondingTimeAndRetryAsynchronous();
} else {
thread.getSyncStatus().removeBatch(batch);
- // update safely deleted search index after current sync index is
updated by removeBatch
+ // update safely deleted search index after last flushed sync index may
be updated by
+ // removeBatch
thread.updateSafelyDeletedSearchIndex();
}
logDispatcherThreadMetrics.recordSyncLogTimePerRequest(System.nanoTime() -
createTime);
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexController.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexController.java
index b233acb8ea9..61ace504758 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexController.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IndexController.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.consensus.iot.logdispatcher;
-import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.ratis.utils.Utils;
@@ -97,7 +96,6 @@ public class IndexController {
}
}
- @TestOnly
public long getLastFlushedIndex() {
return lastFlushedIndex;
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index 93c931b84b6..4ff4ed86104 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -153,6 +153,10 @@ public class LogDispatcher {
return
threads.stream().mapToLong(LogDispatcherThread::getCurrentSyncIndex).min();
}
+ public synchronized OptionalLong getMinFlushedSyncIndex() {
+ return
threads.stream().mapToLong(LogDispatcherThread::getLastFlushedSyncIndex).min();
+ }
+
public void offer(IndexedConsensusRequest request) {
// we don't need to serialize and offer request when replicaNum is 1.
if (!threads.isEmpty()) {
@@ -233,6 +237,10 @@ public class LogDispatcher {
return controller.getCurrentIndex();
}
+ public long getLastFlushedSyncIndex() {
+ return controller.getLastFlushedIndex();
+ }
+
public Peer getPeer() {
return peer;
}
@@ -343,8 +351,10 @@ public class LogDispatcher {
public void updateSafelyDeletedSearchIndex() {
// update safely deleted search index to delete outdated info,
// indicating that insert nodes whose search index are before this value
can be deleted
- // safely
- long currentSafelyDeletedSearchIndex =
impl.getCurrentSafelyDeletedSearchIndex();
+ // safely.
+ //
+ // Use minFlushedSyncIndex here to reserve the WAL which are not flushed
and support kill -9.
+ long currentSafelyDeletedSearchIndex = impl.getMinFlushedSyncIndex();
reader.setSafelyDeletedSearchIndex(currentSafelyDeletedSearchIndex);
// notify
if (impl.unblockWrite()) {
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
index 9f795c2f536..2a80ea3281a 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
@@ -245,7 +245,7 @@ public class IoTConsensusRPCServiceProcessor implements
IoTConsensusIService.Asy
return;
}
long searchIndex = impl.getSearchIndex();
- long safeIndex = impl.getCurrentSafelyDeletedSearchIndex();
+ long safeIndex = impl.getMinSyncIndex();
resultHandler.onComplete(
new TWaitSyncLogCompleteRes(searchIndex == safeIndex, searchIndex,
safeIndex));
}
diff --git
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
index 03903319cee..0e7eb9e1559 100644
---
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
+++
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
@@ -182,7 +182,7 @@ public class ReplicateTest {
for (int i = 0; i < 3; i++) {
long start = System.currentTimeMillis();
- while (servers.get(i).getImpl(gid).getCurrentSafelyDeletedSearchIndex()
< CHECK_POINT_GAP) {
+ while (servers.get(i).getImpl(gid).getMinSyncIndex() < CHECK_POINT_GAP) {
long current = System.currentTimeMillis();
if ((current - start) > 60 * 1000) {
Assert.fail("Unable to replicate entries");
@@ -191,12 +191,9 @@ public class ReplicateTest {
}
}
- Assert.assertEquals(
- CHECK_POINT_GAP,
servers.get(0).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
- Assert.assertEquals(
- CHECK_POINT_GAP,
servers.get(1).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
- Assert.assertEquals(
- CHECK_POINT_GAP,
servers.get(2).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
+ Assert.assertEquals(CHECK_POINT_GAP,
servers.get(0).getImpl(gid).getMinSyncIndex());
+ Assert.assertEquals(CHECK_POINT_GAP,
servers.get(1).getImpl(gid).getMinSyncIndex());
+ Assert.assertEquals(CHECK_POINT_GAP,
servers.get(2).getImpl(gid).getMinSyncIndex());
Assert.assertEquals(CHECK_POINT_GAP * 3,
stateMachines.get(0).getRequestSet().size());
Assert.assertEquals(CHECK_POINT_GAP * 3,
stateMachines.get(1).getRequestSet().size());
Assert.assertEquals(CHECK_POINT_GAP * 3,
stateMachines.get(2).getRequestSet().size());
@@ -217,7 +214,7 @@ public class ReplicateTest {
for (int i = 0; i < 3; i++) {
long start = System.currentTimeMillis();
- while
(servers.get(i).getImpl(gid).getCurrentSafelyDeletedSearchIndex() <
CHECK_POINT_GAP) {
+ while (servers.get(i).getImpl(gid).getMinSyncIndex() <
CHECK_POINT_GAP) {
long current = System.currentTimeMillis();
if ((current - start) > 60 * 1000) {
Assert.fail("Unable to recover entries");
@@ -226,12 +223,9 @@ public class ReplicateTest {
}
}
- Assert.assertEquals(
- CHECK_POINT_GAP,
servers.get(0).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
- Assert.assertEquals(
- CHECK_POINT_GAP,
servers.get(1).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
- Assert.assertEquals(
- CHECK_POINT_GAP,
servers.get(2).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
+ Assert.assertEquals(CHECK_POINT_GAP,
servers.get(0).getImpl(gid).getMinSyncIndex());
+ Assert.assertEquals(CHECK_POINT_GAP,
servers.get(1).getImpl(gid).getMinSyncIndex());
+ Assert.assertEquals(CHECK_POINT_GAP,
servers.get(2).getImpl(gid).getMinSyncIndex());
} catch (IOException e) {
if (e.getCause() instanceof StartupException) {
@@ -265,8 +259,8 @@ public class ReplicateTest {
Assert.assertEquals(i + 1, servers.get(1).getImpl(gid).getSearchIndex());
}
- Assert.assertEquals(0,
servers.get(0).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
- Assert.assertEquals(0,
servers.get(1).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
+ Assert.assertEquals(0, servers.get(0).getImpl(gid).getMinSyncIndex());
+ Assert.assertEquals(0, servers.get(1).getImpl(gid).getMinSyncIndex());
try {
stopServer();
@@ -286,10 +280,10 @@ public class ReplicateTest {
long start = System.currentTimeMillis();
// should be [CHECK_POINT_GAP, CHECK_POINT_GAP * 2 - 1] after
// replicating all entries
- while
(servers.get(i).getImpl(gid).getCurrentSafelyDeletedSearchIndex() <
CHECK_POINT_GAP) {
+ while (servers.get(i).getImpl(gid).getMinSyncIndex() <
CHECK_POINT_GAP) {
long current = System.currentTimeMillis();
if ((current - start) > 60 * 1000) {
- logger.error("{}",
servers.get(i).getImpl(gid).getCurrentSafelyDeletedSearchIndex());
+ logger.error("{}", servers.get(i).getImpl(gid).getMinSyncIndex());
Assert.fail("Unable to replicate entries");
}
Thread.sleep(100);