This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new a5f8279d621 Ensuring IoTconsensus data consistency during region
migration (#12438)
a5f8279d621 is described below
commit a5f8279d621a966eb9a92ea972a8e8f5db210370
Author: Xiangpeng Hu <[email protected]>
AuthorDate: Sun Apr 28 19:22:16 2024 +0800
Ensuring IoTconsensus data consistency during region migration (#12438)
* data consistency
* data consistency
---
.../apache/iotdb/consensus/iot/IoTConsensus.java | 2 ++
.../consensus/iot/IoTConsensusServerImpl.java | 25 +++++++++++++++-------
.../consensus/iot/logdispatcher/LogDispatcher.java | 3 +--
3 files changed, 20 insertions(+), 10 deletions(-)
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
index 01d86f851db..5a4832e43b2 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
@@ -341,6 +341,8 @@ public class IoTConsensus implements IConsensus {
"[IoTConsensus] failed to cleanup side effects after failed to add
remote peer", mpe);
}
throw new ConsensusException(e);
+ } finally {
+ impl.checkAndUnlockSafeDeletedSearchIndex();
}
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index e886ad51b4c..34f2de4aec5 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -128,6 +128,7 @@ public class IoTConsensusServerImpl {
private final ScheduledExecutorService backgroundTaskService;
private final IoTConsensusRateLimiter ioTConsensusRateLimiter =
IoTConsensusRateLimiter.getInstance();
+ private volatile long lastPinnedSearchIndexForMigration = -1;
public IoTConsensusServerImpl(
String storageDir,
@@ -514,7 +515,7 @@ public class IoTConsensusServerImpl {
if (peer.equals(thisNode)) {
// use searchIndex for thisNode as the initialSyncIndex because
targetPeer will load the
// snapshot produced by thisNode
- buildSyncLogChannel(targetPeer, searchIndex.get());
+ buildSyncLogChannel(targetPeer, lastPinnedSearchIndexForMigration);
} else {
// use RPC to tell other peers to build sync log channel to target peer
try (SyncIoTConsensusServiceClient client =
@@ -552,14 +553,13 @@ public class IoTConsensusServerImpl {
// The configuration will be modified during iterating because we will add
the targetPeer to
// configuration
ImmutableList<Peer> currentMembers =
ImmutableList.copyOf(this.configuration);
+ removeSyncLogChannel(targetPeer);
for (Peer peer : currentMembers) {
if (peer.equals(targetPeer)) {
// if the targetPeer is the same as current peer, skip it because
removing itself is illegal
continue;
}
- if (peer.equals(thisNode)) {
- removeSyncLogChannel(targetPeer);
- } else {
+ if (!peer.equals(thisNode)) {
// use RPC to tell other peers to build sync log channel to target peer
try (SyncIoTConsensusServiceClient client =
syncClientManager.borrowClient(peer.getEndpoint())) {
@@ -770,7 +770,9 @@ public class IoTConsensusServerImpl {
}
public long getMinFlushedSyncIndex() {
- return logDispatcher.getMinFlushedSyncIndex().orElseGet(searchIndex::get);
+ return lastPinnedSearchIndexForMigration == -1
+ ? logDispatcher.getMinFlushedSyncIndex().orElseGet(searchIndex::get)
+ : lastPinnedSearchIndexForMigration;
}
public String getStorageDir() {
@@ -887,9 +889,16 @@ public class IoTConsensusServerImpl {
* lost.
*/
public void checkAndLockSafeDeletedSearchIndex() {
- if (configuration.size() == 1) {
- consensusReqReader.setSafelyDeletedSearchIndex(searchIndex.get());
- }
+ lastPinnedSearchIndexForMigration = searchIndex.get();
+ consensusReqReader.setSafelyDeletedSearchIndex(getMinFlushedSyncIndex());
+ }
+
+ /**
+ * We should unlock safelyDeletedSearchIndex after addPeer to avoid
potential data accumulation.
+ */
+ public void checkAndUnlockSafeDeletedSearchIndex() {
+ lastPinnedSearchIndexForMigration = -1;
+ checkAndUpdateSafeDeletedSearchIndex();
}
/**
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index adf8218b46d..d7424f136d7 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -397,8 +397,7 @@ public class LogDispatcher {
// safely.
//
// Use minFlushedSyncIndex here to reserve the WAL which are not flushed
and support kill -9.
- long currentSafelyDeletedSearchIndex = impl.getMinFlushedSyncIndex();
- reader.setSafelyDeletedSearchIndex(currentSafelyDeletedSearchIndex);
+ reader.setSafelyDeletedSearchIndex(impl.getMinFlushedSyncIndex());
// notify
if (impl.unblockWrite()) {
impl.signal();