This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new e5f05105ada Clear status in InsertNodes to fix consensus endless retry
(#11819)
e5f05105ada is described below
commit e5f05105ada679854ae99f34df13bffee4e7da79
Author: Haonan <[email protected]>
AuthorDate: Tue Jan 2 17:45:15 2024 +0800
Clear status in InsertNodes to fix consensus endless retry (#11819)
---
.../statemachine/dataregion/DataExecutionVisitor.java | 3 +++
.../planner/plan/node/write/InsertMultiTabletsNode.java | 4 ++++
.../plan/planner/plan/node/write/InsertRowsNode.java | 4 ++++
.../planner/plan/node/write/InsertRowsOfOneDeviceNode.java | 4 ++++
.../iotdb/db/storageengine/dataregion/DataRegion.java | 14 ++++++++++++++
5 files changed, 29 insertions(+)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
index f1342d90899..c685a949356 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
@@ -129,6 +129,7 @@ public class DataExecutionVisitor extends
PlanVisitor<TSStatus, DataRegion> {
failedEntry.getValue());
// return WRITE_PROCESS_REJECT directly for the consensus retry logic
if (failedEntry.getValue().getCode() ==
TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode()) {
+ node.clearResults();
return failedEntry.getValue();
}
}
@@ -158,6 +159,7 @@ public class DataExecutionVisitor extends
PlanVisitor<TSStatus, DataRegion> {
failedEntry.getValue());
// return WRITE_PROCESS_REJECT directly for the consensus retry logic
if (failedEntry.getValue().getCode() ==
TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode()) {
+ node.clearResults();
return failedEntry.getValue();
}
}
@@ -193,6 +195,7 @@ public class DataExecutionVisitor extends
PlanVisitor<TSStatus, DataRegion> {
failedEntry.getValue());
// return WRITE_PROCESS_REJECT directly for the consensus retry logic
if (failedEntry.getValue().getCode() ==
TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode()) {
+ node.clearResults();
return failedEntry.getValue();
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
index 69af99b48cf..a6cdd86c599 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
@@ -159,6 +159,10 @@ public class InsertMultiTabletsNode extends InsertNode {
return results;
}
+ public void clearResults() {
+ results.clear();
+ }
+
public TSStatus[] getFailingStatus() {
return StatusUtils.getFailingStatus(results, insertTabletNodeList.size());
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
index 6a506f465a6..63d013154f3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
@@ -107,6 +107,10 @@ public class InsertRowsNode extends InsertNode {
return results;
}
+ public void clearResults() {
+ results.clear();
+ }
+
public TSStatus[] getFailingStatus() {
return StatusUtils.getFailingStatus(results, insertRowNodeList.size());
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
index b7e7ee8d43e..21d33aceff8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
@@ -86,6 +86,10 @@ public class InsertRowsOfOneDeviceNode extends InsertNode {
return results;
}
+ public void clearResults() {
+ results.clear();
+ }
+
@Override
public void setSearchIndex(long index) {
searchIndex = index;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 84adee4591d..2819d0a4a00 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -3074,6 +3074,20 @@ public class DataRegion implements IDataRegionForQuery {
insertMultiTabletsNode
.getResults()
.put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
+ } catch (BatchProcessException e) {
+ // for each error
+ TSStatus firstStatus = null;
+ for (TSStatus status : e.getFailingStatus()) {
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode())
{
+ firstStatus = status;
+ }
+ // return WRITE_PROCESS_REJECT directly for the consensus retry logic
+ if (status.getCode() ==
TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode()) {
+ insertMultiTabletsNode.getResults().put(i, status);
+ throw new BatchProcessException("Rejected inserting multi
tablets");
+ }
+ }
+ insertMultiTabletsNode.getResults().put(i, firstStatus);
}
}