This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 554665ef95 [IOTDB-3353] Refactor insert exception processing (#6451)
554665ef95 is described below
commit 554665ef958130a1cf0b91ae2dbfc3eec5df63fc
Author: Mrquan <[email protected]>
AuthorDate: Tue Jun 28 11:00:51 2022 +0800
[IOTDB-3353] Refactor insert exception processing (#6451)
---
.../statemachine/visitor/DataExecutionVisitor.java | 49 +++++++++++++++++++---
.../iotdb/db/engine/storagegroup/DataRegion.java | 14 ++++---
.../iotdb/db/exception/BatchProcessException.java | 5 +++
3 files changed, 56 insertions(+), 12 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/visitor/DataExecutionVisitor.java
b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/visitor/DataExecutionVisitor.java
index bbda1bc260..418bb2b5e2 100644
---
a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/visitor/DataExecutionVisitor.java
+++
b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/visitor/DataExecutionVisitor.java
@@ -33,13 +33,12 @@ import
org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsNode;
import
org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
-import org.apache.iotdb.rpc.RpcUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.Arrays;
+import java.util.Map;
public class DataExecutionVisitor extends PlanVisitor<TSStatus, DataRegion> {
private static final Logger LOGGER =
LoggerFactory.getLogger(DataExecutionVisitor.class);
@@ -69,7 +68,13 @@ public class DataExecutionVisitor extends
PlanVisitor<TSStatus, DataRegion> {
LOGGER.error("Error in executing plan node: {}", node, e);
return StatusUtils.EXECUTE_STATEMENT_ERROR;
} catch (BatchProcessException e) {
- return RpcUtils.getStatus(Arrays.asList(e.getFailingStatus()));
+ LOGGER.warn(
+ "Batch failure in executing a InsertTabletNode. device: {},
startTime: {}, measurements: {}, failing status: {}",
+ node.getDevicePath(),
+ node.getTimes()[0],
+ node.getMeasurements(),
+ e.getFailingStatus());
+ return StatusUtils.EXECUTE_STATEMENT_ERROR;
}
}
@@ -79,7 +84,18 @@ public class DataExecutionVisitor extends
PlanVisitor<TSStatus, DataRegion> {
dataRegion.insert(node);
return StatusUtils.OK;
} catch (BatchProcessException e) {
- return RpcUtils.getStatus(Arrays.asList(e.getFailingStatus()));
+ LOGGER.warn("Batch failure in executing a InsertRowsNode.");
+ // for each error
+ for (Map.Entry<Integer, TSStatus> failedEntry :
node.getResults().entrySet()) {
+ InsertRowNode insertRowNode =
node.getInsertRowNodeList().get(failedEntry.getKey());
+ LOGGER.warn(
+ "Insert row failed. device: {}, time: {}, measurements: {},
failing status: {}",
+ insertRowNode.getDevicePath(),
+ insertRowNode.getTime(),
+ insertRowNode.getMeasurements(),
+ failedEntry.getValue());
+ }
+ return StatusUtils.EXECUTE_STATEMENT_ERROR;
}
}
@@ -89,7 +105,18 @@ public class DataExecutionVisitor extends
PlanVisitor<TSStatus, DataRegion> {
dataRegion.insertTablets(node);
return StatusUtils.OK;
} catch (BatchProcessException e) {
- return RpcUtils.getStatus(Arrays.asList(e.getFailingStatus()));
+ LOGGER.warn("Batch failure in executing a InsertMultiTabletsNode.");
+ for (Map.Entry<Integer, TSStatus> failedEntry :
node.getResults().entrySet()) {
+ InsertTabletNode insertTabletNode =
+ node.getInsertTabletNodeList().get(failedEntry.getKey());
+ LOGGER.warn(
+ "Insert tablet failed. device: {}, startTime: {}, measurements:
{}, failing status: {}",
+ insertTabletNode.getDevicePath(),
+ insertTabletNode.getTimes()[0],
+ insertTabletNode.getMeasurements(),
+ failedEntry.getValue());
+ }
+ return StatusUtils.EXECUTE_STATEMENT_ERROR;
}
}
@@ -103,7 +130,17 @@ public class DataExecutionVisitor extends
PlanVisitor<TSStatus, DataRegion> {
LOGGER.error("Error in executing plan node: {}", node, e);
return StatusUtils.EXECUTE_STATEMENT_ERROR;
} catch (BatchProcessException e) {
- return RpcUtils.getStatus(Arrays.asList(e.getFailingStatus()));
+ LOGGER.warn("Batch failure in executing a InsertRowsOfOneDeviceNode.");
+ for (Map.Entry<Integer, TSStatus> failedEntry :
node.getResults().entrySet()) {
+ InsertRowNode insertRowNode =
node.getInsertRowNodeList().get(failedEntry.getKey());
+ LOGGER.warn(
+ "Insert row failed. device: {}, time: {}, measurements: {},
failing status: {}",
+ insertRowNode.getDevicePath(),
+ insertRowNode.getTime(),
+ insertRowNode.getMeasurements(),
+ failedEntry.getValue());
+ }
+ return StatusUtils.EXECUTE_STATEMENT_ERROR;
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index 4fb701ee0b..3514b38a8c 100755
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@ -1053,7 +1053,7 @@ public class DataRegion {
*/
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity
warning
public void insertTablet(InsertTabletNode insertTabletNode)
- throws BatchProcessException, TriggerExecutionException,
WriteProcessException {
+ throws TriggerExecutionException, BatchProcessException,
WriteProcessException {
writeLock("insertTablet");
try {
@@ -1081,7 +1081,9 @@ public class DataRegion {
}
// loc pointing at first legal position
if (loc == insertTabletNode.getRowCount()) {
- throw new BatchProcessException(results);
+ throw new OutOfTTLException(
+ insertTabletNode.getTimes()[insertTabletNode.getTimes().length -
1],
+ (System.currentTimeMillis() - dataTTL));
}
// TODO(Trigger)// fire trigger before insertion
@@ -3493,7 +3495,7 @@ public class DataRegion {
writeUnlock();
}
if (!insertRowsOfOneDeviceNode.getResults().isEmpty()) {
- throw new
BatchProcessException(insertRowsOfOneDeviceNode.getFailingStatus());
+ throw new BatchProcessException("Partial failed inserting rows of one
device");
}
}
@@ -3513,7 +3515,7 @@ public class DataRegion {
}
if (!insertRowsNode.getResults().isEmpty()) {
- throw new BatchProcessException(insertRowsNode.getFailingStatus());
+ throw new BatchProcessException("Partial failed inserting rows");
}
}
@@ -3528,7 +3530,7 @@ public class DataRegion {
InsertTabletNode insertTabletNode =
insertMultiTabletsNode.getInsertTabletNodeList().get(i);
try {
insertTablet(insertTabletNode);
- } catch (TriggerExecutionException | BatchProcessException |
WriteProcessException e) {
+ } catch (TriggerExecutionException | WriteProcessException |
BatchProcessException e) {
insertMultiTabletsNode
.getResults()
.put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
@@ -3536,7 +3538,7 @@ public class DataRegion {
}
if (!insertMultiTabletsNode.getResults().isEmpty()) {
- throw new
BatchProcessException(insertMultiTabletsNode.getFailingStatus());
+ throw new BatchProcessException("Partial failed inserting multi
tablets");
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/exception/BatchProcessException.java
b/server/src/main/java/org/apache/iotdb/db/exception/BatchProcessException.java
index ee617f5a23..a0442e53e2 100644
---
a/server/src/main/java/org/apache/iotdb/db/exception/BatchProcessException.java
+++
b/server/src/main/java/org/apache/iotdb/db/exception/BatchProcessException.java
@@ -33,6 +33,11 @@ public class BatchProcessException extends
QueryProcessException {
this.failingStatus = failingStatus;
}
+ public BatchProcessException(String message) {
+ super(message);
+ failingStatus = null;
+ }
+
public TSStatus[] getFailingStatus() {
return failingStatus;
}