This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new e5f05105ada Clear status in InsertNodes to fix consensus endless retry 
(#11819)
e5f05105ada is described below

commit e5f05105ada679854ae99f34df13bffee4e7da79
Author: Haonan <[email protected]>
AuthorDate: Tue Jan 2 17:45:15 2024 +0800

    Clear status in InsertNodes to fix consensus endless retry (#11819)
---
 .../statemachine/dataregion/DataExecutionVisitor.java      |  3 +++
 .../planner/plan/node/write/InsertMultiTabletsNode.java    |  4 ++++
 .../plan/planner/plan/node/write/InsertRowsNode.java       |  4 ++++
 .../planner/plan/node/write/InsertRowsOfOneDeviceNode.java |  4 ++++
 .../iotdb/db/storageengine/dataregion/DataRegion.java      | 14 ++++++++++++++
 5 files changed, 29 insertions(+)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
index f1342d90899..c685a949356 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
@@ -129,6 +129,7 @@ public class DataExecutionVisitor extends 
PlanVisitor<TSStatus, DataRegion> {
             failedEntry.getValue());
         // return WRITE_PROCESS_REJECT directly for the consensus retry logic
         if (failedEntry.getValue().getCode() == 
TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode()) {
+          node.clearResults();
           return failedEntry.getValue();
         }
       }
@@ -158,6 +159,7 @@ public class DataExecutionVisitor extends 
PlanVisitor<TSStatus, DataRegion> {
             failedEntry.getValue());
         // return WRITE_PROCESS_REJECT directly for the consensus retry logic
         if (failedEntry.getValue().getCode() == 
TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode()) {
+          node.clearResults();
           return failedEntry.getValue();
         }
       }
@@ -193,6 +195,7 @@ public class DataExecutionVisitor extends 
PlanVisitor<TSStatus, DataRegion> {
             failedEntry.getValue());
         // return WRITE_PROCESS_REJECT directly for the consensus retry logic
         if (failedEntry.getValue().getCode() == 
TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode()) {
+          node.clearResults();
           return failedEntry.getValue();
         }
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
index 69af99b48cf..a6cdd86c599 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
@@ -159,6 +159,10 @@ public class InsertMultiTabletsNode extends InsertNode {
     return results;
   }
 
+  public void clearResults() {
+    results.clear();
+  }
+
   public TSStatus[] getFailingStatus() {
     return StatusUtils.getFailingStatus(results, insertTabletNodeList.size());
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
index 6a506f465a6..63d013154f3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
@@ -107,6 +107,10 @@ public class InsertRowsNode extends InsertNode {
     return results;
   }
 
+  public void clearResults() {
+    results.clear();
+  }
+
   public TSStatus[] getFailingStatus() {
     return StatusUtils.getFailingStatus(results, insertRowNodeList.size());
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
index b7e7ee8d43e..21d33aceff8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
@@ -86,6 +86,10 @@ public class InsertRowsOfOneDeviceNode extends InsertNode {
     return results;
   }
 
+  public void clearResults() {
+    results.clear();
+  }
+
   @Override
   public void setSearchIndex(long index) {
     searchIndex = index;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 84adee4591d..2819d0a4a00 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -3074,6 +3074,20 @@ public class DataRegion implements IDataRegionForQuery {
         insertMultiTabletsNode
             .getResults()
             .put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
+      } catch (BatchProcessException e) {
+        // for each error
+        TSStatus firstStatus = null;
+        for (TSStatus status : e.getFailingStatus()) {
+          if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) 
{
+            firstStatus = status;
+          }
+          // return WRITE_PROCESS_REJECT directly for the consensus retry logic
+          if (status.getCode() == 
TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode()) {
+            insertMultiTabletsNode.getResults().put(i, status);
+            throw new BatchProcessException("Rejected inserting multi 
tablets");
+          }
+        }
+        insertMultiTabletsNode.getResults().put(i, firstStatus);
       }
     }
 

Reply via email to