This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 0f16cab3b08 Pipe: handle insert data type mismatch exceptions as
idempotent exceptions when partial insert is enabled (#12432)
0f16cab3b08 is described below
commit 0f16cab3b088800648d73f869a4cc50fd088cb5a
Author: Steve Yurong Su <[email protected]>
AuthorDate: Fri Apr 26 19:27:28 2024 +0800
Pipe: handle insert data type mismatch exceptions as idempotent exceptions
when partial insert is enabled (#12432)
---
.../db/exception/metadata/DataTypeMismatchException.java | 7 ++++++-
.../pipe/receiver/visitor/PipeStatementTSStatusVisitor.java | 12 ++++++++++++
2 files changed, 18 insertions(+), 1 deletion(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/DataTypeMismatchException.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/DataTypeMismatchException.java
index 4482b647ce5..8165f291765 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/DataTypeMismatchException.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/DataTypeMismatchException.java
@@ -24,6 +24,10 @@ import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.tsfile.enums.TSDataType;
public class DataTypeMismatchException extends MetadataException {
+
+ // NOTICE: DO NOT CHANGE THIS STRING, IT IS USED IN THE ERROR HANDLING OF
PIPE
+ public static final String REGISTERED_TYPE_STRING = "registered type";
+
public DataTypeMismatchException(
String deviceName,
String measurementName,
@@ -34,9 +38,10 @@ public class DataTypeMismatchException extends
MetadataException {
super(
String.format(
"data type of %s.%s is not consistent, "
- + "registered type %s, inserting type %s, timestamp %s, value
%s",
+ + "%s %s, inserting type %s, timestamp %s, value %s",
deviceName,
measurementName,
+ REGISTERED_TYPE_STRING,
typeInSchema,
insertType,
time,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
index 7efd7b84473..6cfd503cb38 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
@@ -20,6 +20,9 @@
package org.apache.iotdb.db.pipe.receiver.visitor;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.db.queryengine.plan.statement.StatementNode;
import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
@@ -45,6 +48,9 @@ import org.apache.iotdb.rpc.TSStatusCode;
* the processes that generate the following {@link TSStatus}es in the class.
*/
public class PipeStatementTSStatusVisitor extends StatementVisitor<TSStatus,
TSStatus> {
+
+ private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
@Override
public TSStatus visitNode(final StatementNode node, final TSStatus context) {
return context;
@@ -80,6 +86,12 @@ public class PipeStatementTSStatusVisitor extends
StatementVisitor<TSStatus, TSS
return new
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
} else if (context.getCode() ==
TSStatusCode.METADATA_ERROR.getStatusCode()) {
+ if
(context.getMessage().contains(DataTypeMismatchException.REGISTERED_TYPE_STRING)
+ && config.isEnablePartialInsert()) {
+ return new TSStatus(
+
TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
+ .setMessage(context.getMessage());
+ }
return new
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
}