This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new a287355c46e [IOTDB-5706] Data inconsistency between IoT protocol
replications (#11748)
a287355c46e is described below
commit a287355c46ed951803b9b97a5d6ece008ce66ebc
Author: Xiangpeng Hu <[email protected]>
AuthorDate: Wed Dec 20 19:25:57 2023 +0800
[IOTDB-5706] Data inconsistency between IoT protocol replications (#11748)
---
.../dataregion/DataRegionStateMachine.java | 39 +++++++++++++++++++++-
1 file changed, 38 insertions(+), 1 deletion(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
index 3feed8a9bf8..88c5307ab44 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
@@ -65,6 +65,10 @@ public class DataRegionStateMachine extends BaseStateMachine
{
protected DataRegion region;
+ private static final int MAX_WRITE_RETRY_TIMES = 5;
+
+ private static final long WRITE_RETRY_WAIT_TIME_IN_MS = 1000;
+
public DataRegionStateMachine(DataRegion region) {
this.region = region;
}
@@ -237,7 +241,32 @@ public class DataRegionStateMachine extends
BaseStateMachine {
}
protected TSStatus write(PlanNode planNode) {
- return planNode.accept(new DataExecutionVisitor(), region);
+ // To ensure the Data inconsistency between multiple replications, we add
retry in write
+ // operation.
+ TSStatus result = null;
+ int retryTime = 0;
+ while (retryTime < MAX_WRITE_RETRY_TIMES) {
+ result = planNode.accept(new DataExecutionVisitor(), region);
+ if (needRetry(result.getCode())) {
+ retryTime++;
+ logger.debug(
+ "write operation failed because {}, retryTime: {}.",
result.getCode(), retryTime);
+ if (retryTime == MAX_WRITE_RETRY_TIMES) {
+ logger.error(
+ "write operation still failed after {} retry times, because {}.",
+ MAX_WRITE_RETRY_TIMES,
+ result.getCode());
+ }
+ try {
+ Thread.sleep(WRITE_RETRY_WAIT_TIME_IN_MS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ } else {
+ break;
+ }
+ }
+ return result;
}
@Override
@@ -271,4 +300,12 @@ public class DataRegionStateMachine extends
BaseStateMachine {
return null;
}
}
+
+ public static boolean needRetry(int statusCode) {
+ // To fix the atomicity problem, we only need to add retry for system
reject.
+ // In other cases, such as readonly, we can return directly because there
are retries at the
+ // consensus layer.
+ return statusCode == TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode()
+ || statusCode == TSStatusCode.WRITE_PROCESS_ERROR.getStatusCode();
+ }
}