This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new a287355c46e [IOTDB-5706] Data inconsistency between IoT protocol 
replications (#11748)
a287355c46e is described below

commit a287355c46ed951803b9b97a5d6ece008ce66ebc
Author: Xiangpeng Hu <[email protected]>
AuthorDate: Wed Dec 20 19:25:57 2023 +0800

    [IOTDB-5706] Data inconsistency between IoT protocol replications (#11748)
---
 .../dataregion/DataRegionStateMachine.java         | 39 +++++++++++++++++++++-
 1 file changed, 38 insertions(+), 1 deletion(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
index 3feed8a9bf8..88c5307ab44 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
@@ -65,6 +65,10 @@ public class DataRegionStateMachine extends BaseStateMachine 
{
 
   protected DataRegion region;
 
+  private static final int MAX_WRITE_RETRY_TIMES = 5;
+
+  private static final long WRITE_RETRY_WAIT_TIME_IN_MS = 1000;
+
   public DataRegionStateMachine(DataRegion region) {
     this.region = region;
   }
@@ -237,7 +241,32 @@ public class DataRegionStateMachine extends 
BaseStateMachine {
   }
 
   protected TSStatus write(PlanNode planNode) {
-    return planNode.accept(new DataExecutionVisitor(), region);
+    // To ensure the Data inconsistency between multiple replications, we add 
retry in write
+    // operation.
+    TSStatus result = null;
+    int retryTime = 0;
+    while (retryTime < MAX_WRITE_RETRY_TIMES) {
+      result = planNode.accept(new DataExecutionVisitor(), region);
+      if (needRetry(result.getCode())) {
+        retryTime++;
+        logger.debug(
+            "write operation failed because {}, retryTime: {}.", 
result.getCode(), retryTime);
+        if (retryTime == MAX_WRITE_RETRY_TIMES) {
+          logger.error(
+              "write operation still failed after {} retry times, because {}.",
+              MAX_WRITE_RETRY_TIMES,
+              result.getCode());
+        }
+        try {
+          Thread.sleep(WRITE_RETRY_WAIT_TIME_IN_MS);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+      } else {
+        break;
+      }
+    }
+    return result;
   }
 
   @Override
@@ -271,4 +300,12 @@ public class DataRegionStateMachine extends 
BaseStateMachine {
       return null;
     }
   }
+
+  public static boolean needRetry(int statusCode) {
+    // To fix the atomicity problem, we only need to add retry for system 
reject.
+    // In other cases, such as readonly, we can return directly because there 
are retries at the
+    // consensus layer.
+    return statusCode == TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode()
+        || statusCode == TSStatusCode.WRITE_PROCESS_ERROR.getStatusCode();
+  }
 }

Reply via email to