This is an automated email from the ASF dual-hosted git repository. tanxinyu pushed a commit to branch check_iot_consensus_searchIndex in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit dd8322a12bdbcd4391820bc3287331b39fa39c63 Author: OneSizeFitQuorum <[email protected]> AuthorDate: Fri Nov 10 15:48:33 2023 +0800 fix Signed-off-by: OneSizeFitQuorum <[email protected]> --- .../consensus/iot/IoTConsensusServerImpl.java | 24 ++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java index 66b7ccd976a..39a85ee22a2 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java @@ -143,11 +143,14 @@ public class IoTConsensusServerImpl { this.searchIndex = new AtomicLong(consensusReqReader.getCurrentSearchIndex()); this.ioTConsensusServerMetrics = new IoTConsensusServerMetrics(this); this.logDispatcher = new LogDispatcher(this, clientManager); + // Since the underlying wal does not persist safelyDeletedSearchIndex, IoTConsensus needs to // update wal with its syncIndex recovered from the consensus layer when initializing. // This prevents wal from being piled up if the safelyDeletedSearchIndex is not updated after // the restart and Leader migration occurs checkAndUpdateSafeDeletedSearchIndex(); + // see message in logs for details + checkAndUpdateSearchIndex(); } public IStateMachine getStateMachine() { @@ -803,6 +806,27 @@ public class IoTConsensusServerImpl { } } + public void checkAndUpdateSearchIndex() { + long currentSearchIndex = searchIndex.get(); + long safelyDeletedSearchIndex = getCurrentSafelyDeletedSearchIndex(); + if (currentSearchIndex < safelyDeletedSearchIndex) { + logger.warn( + "The searchIndex({}) of the current region({}) is less than the safelyDeletedSearchIndex({})," + + " which means that the data of the current node has not been flushed by the wal to disk," + + " but has been synchronized to another node, and the different replicas are now inconsistent" + + " and cannot be automatically recovered. " + + "To prevent subsequent logs from marking smaller searchIndex and exacerbating the inconsistency, " + + "we manually set the searchIndex({}) to safelyDeletedSearchIndex({}) here to reduce the impact" + + " of this problem in the future", + currentSearchIndex, + consensusGroupId, + safelyDeletedSearchIndex, + currentSearchIndex, + safelyDeletedSearchIndex); + searchIndex.set(safelyDeletedSearchIndex); + } + } + public TSStatus syncLog(int sourcePeerId, IConsensusRequest request) { return cacheQueueMap .computeIfAbsent(sourcePeerId, SyncLogCacheQueue::new)
