This is an automated email from the ASF dual-hosted git repository. tanxinyu pushed a commit to branch iotconsensus_retry_corrupt_data in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 360d7bd73a9a0f0fdbcd72e5861b9df8288baea6 Author: OneSizeFitQuorum <[email protected]> AuthorDate: Thu Jan 18 19:27:31 2024 +0800 finish Signed-off-by: OneSizeFitQuorum <[email protected]> --- .../consensus/iot/logdispatcher/LogDispatcher.java | 24 +++++++++++++--------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java index 00d136695f4..ba869d7c596 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java @@ -424,8 +424,9 @@ public class LogDispatcher { // Prevents gap between logs. For example, some requests are not written into the queue when // the queue is full. In this case, requests need to be loaded from the WAL if (startIndex != prev.getSearchIndex()) { - constructBatchFromWAL(startIndex, prev.getSearchIndex(), batches); - if (!batches.canAccumulate()) { + boolean hasCorruptedData = + constructBatchFromWAL(startIndex, prev.getSearchIndex(), batches); + if (hasCorruptedData || !batches.canAccumulate()) { batches.buildIndex(); logger.debug( "{} : accumulated a {} from wal", impl.getThisNode().getGroupId(), batches); @@ -448,8 +449,9 @@ public class LogDispatcher { // Prevents gap between logs. For example, some logs are not written into the queue when // the queue is full. In this case, requests need to be loaded from the WAL if (current.getSearchIndex() != prev.getSearchIndex() + 1) { - constructBatchFromWAL(prev.getSearchIndex() + 1, current.getSearchIndex(), batches); - if (!batches.canAccumulate()) { + boolean hasCorruptedData = + constructBatchFromWAL(prev.getSearchIndex() + 1, current.getSearchIndex(), batches); + if (hasCorruptedData || !batches.canAccumulate()) { batches.buildIndex(); logger.debug( "gap {} : accumulated a {} from queue and wal when gap", @@ -495,13 +497,14 @@ public class LogDispatcher { return syncStatus; } - private void constructBatchFromWAL(long currentIndex, long maxIndex, Batch logBatches) { + private boolean constructBatchFromWAL(long currentIndex, long maxIndex, Batch logBatches) { logger.debug( "DataRegion[{}]->{}: currentIndex: {}, maxIndex: {}", peer.getGroupId().getId(), peer.getEndpoint().getIp(), currentIndex, maxIndex); + boolean hasCorruptedData = false; // targetIndex is the index of request that we need to find long targetIndex = currentIndex; // Even if there is no WAL files, these code won't produce error. @@ -524,13 +527,11 @@ public class LogDispatcher { continue; } else if (data.getSearchIndex() > targetIndex) { logger.warn( - "search for one Entry which index is {}, but find a larger one, index : {}", + "search for one Entry which index is {}, but find a larger one, index : {}." + + "Perhaps the wal file is corrupted, in which case we skip it and choose a larger index to replicate", targetIndex, data.getSearchIndex()); - if (data.getSearchIndex() >= maxIndex) { - // if the index of request is larger than maxIndex, then finish - break; - } + hasCorruptedData = true; } targetIndex = data.getSearchIndex() + 1; data.buildSerializedRequests(); @@ -538,6 +539,9 @@ public class LogDispatcher { logBatches.addTLogEntry( new TLogEntry(data.getSerializedRequests(), data.getSearchIndex(), true)); } + // In the case of corrupt Data, we return true so that we can send a batch as soon as + // possible, avoiding potential duplication + return hasCorruptedData; } private void constructBatchIndexedFromConsensusRequest(
