This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch iotconsensus_retry_corrupt_data
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 360d7bd73a9a0f0fdbcd72e5861b9df8288baea6
Author: OneSizeFitQuorum <[email protected]>
AuthorDate: Thu Jan 18 19:27:31 2024 +0800

    finish
    
    Signed-off-by: OneSizeFitQuorum <[email protected]>
---
 .../consensus/iot/logdispatcher/LogDispatcher.java | 24 +++++++++++++---------
 1 file changed, 14 insertions(+), 10 deletions(-)

diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index 00d136695f4..ba869d7c596 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -424,8 +424,9 @@ public class LogDispatcher {
         // Prevents gap between logs. For example, some requests are not 
written into the queue when
         // the queue is full. In this case, requests need to be loaded from 
the WAL
         if (startIndex != prev.getSearchIndex()) {
-          constructBatchFromWAL(startIndex, prev.getSearchIndex(), batches);
-          if (!batches.canAccumulate()) {
+          boolean hasCorruptedData =
+              constructBatchFromWAL(startIndex, prev.getSearchIndex(), 
batches);
+          if (hasCorruptedData || !batches.canAccumulate()) {
             batches.buildIndex();
             logger.debug(
                 "{} : accumulated a {} from wal", 
impl.getThisNode().getGroupId(), batches);
@@ -448,8 +449,9 @@ public class LogDispatcher {
           // Prevents gap between logs. For example, some logs are not written 
into the queue when
           // the queue is full. In this case, requests need to be loaded from 
the WAL
           if (current.getSearchIndex() != prev.getSearchIndex() + 1) {
-            constructBatchFromWAL(prev.getSearchIndex() + 1, 
current.getSearchIndex(), batches);
-            if (!batches.canAccumulate()) {
+            boolean hasCorruptedData =
+                constructBatchFromWAL(prev.getSearchIndex() + 1, 
current.getSearchIndex(), batches);
+            if (hasCorruptedData || !batches.canAccumulate()) {
               batches.buildIndex();
               logger.debug(
                   "gap {} : accumulated a {} from queue and wal when gap",
@@ -495,13 +497,14 @@ public class LogDispatcher {
       return syncStatus;
     }
 
-    private void constructBatchFromWAL(long currentIndex, long maxIndex, Batch 
logBatches) {
+    private boolean constructBatchFromWAL(long currentIndex, long maxIndex, 
Batch logBatches) {
       logger.debug(
           "DataRegion[{}]->{}: currentIndex: {}, maxIndex: {}",
           peer.getGroupId().getId(),
           peer.getEndpoint().getIp(),
           currentIndex,
           maxIndex);
+      boolean hasCorruptedData = false;
       // targetIndex is the index of request that we need to find
       long targetIndex = currentIndex;
       // Even if there is no WAL files, these code won't produce error.
@@ -524,13 +527,11 @@ public class LogDispatcher {
           continue;
         } else if (data.getSearchIndex() > targetIndex) {
           logger.warn(
-              "search for one Entry which index is {}, but find a larger one, 
index : {}",
+              "search for one Entry which index is {}, but find a larger one, 
index : {}."
+                  + "Perhaps the wal file is corrupted, in which case we skip 
it and choose a larger index to replicate",
               targetIndex,
               data.getSearchIndex());
-          if (data.getSearchIndex() >= maxIndex) {
-            // if the index of request is larger than maxIndex, then finish
-            break;
-          }
+          hasCorruptedData = true;
         }
         targetIndex = data.getSearchIndex() + 1;
         data.buildSerializedRequests();
@@ -538,6 +539,9 @@ public class LogDispatcher {
         logBatches.addTLogEntry(
             new TLogEntry(data.getSerializedRequests(), data.getSearchIndex(), 
true));
       }
+      // In the case of corrupt Data, we return true so that we can send a 
batch as soon as
+      // possible, avoiding potential duplication
+      return hasCorruptedData;
     }
 
     private void constructBatchIndexedFromConsensusRequest(

Reply via email to