This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch ignore-partial-success in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 9249a6a85b9b030142dec711d8ba72b6c2f28488 Author: Steve Yurong Su <[email protected]> AuthorDate: Fri Apr 26 18:05:00 2024 +0800 Pipe: handle insert data type mismatch exceptions as idempotent exceptions when partial insert is enabled --- .../db/exception/metadata/DataTypeMismatchException.java | 7 ++++++- .../pipe/receiver/visitor/PipeStatementTSStatusVisitor.java | 12 ++++++++++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/DataTypeMismatchException.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/DataTypeMismatchException.java index 4482b647ce5..8165f291765 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/DataTypeMismatchException.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/DataTypeMismatchException.java @@ -24,6 +24,10 @@ import org.apache.iotdb.commons.exception.MetadataException; import org.apache.tsfile.enums.TSDataType; public class DataTypeMismatchException extends MetadataException { + + // NOTICE: DO NOT CHANGE THIS STRING, IT IS USED IN THE ERROR HANDLING OF PIPE + public static final String REGISTERED_TYPE_STRING = "registered type"; + public DataTypeMismatchException( String deviceName, String measurementName, @@ -34,9 +38,10 @@ public class DataTypeMismatchException extends MetadataException { super( String.format( "data type of %s.%s is not consistent, " - + "registered type %s, inserting type %s, timestamp %s, value %s", + + "%s %s, inserting type %s, timestamp %s, value %s", deviceName, measurementName, + REGISTERED_TYPE_STRING, typeInSchema, insertType, time, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java index 7efd7b84473..6cfd503cb38 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java @@ -20,6 +20,9 @@ package org.apache.iotdb.db.pipe.receiver.visitor; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException; import org.apache.iotdb.db.queryengine.plan.statement.Statement; import org.apache.iotdb.db.queryengine.plan.statement.StatementNode; import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor; @@ -45,6 +48,9 @@ import org.apache.iotdb.rpc.TSStatusCode; * the processes that generate the following {@link TSStatus}es in the class. */ public class PipeStatementTSStatusVisitor extends StatementVisitor<TSStatus, TSStatus> { + + private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); + @Override public TSStatus visitNode(final StatementNode node, final TSStatus context) { return context; @@ -80,6 +86,12 @@ public class PipeStatementTSStatusVisitor extends StatementVisitor<TSStatus, TSS return new TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()) .setMessage(context.getMessage()); } else if (context.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()) { + if (context.getMessage().contains(DataTypeMismatchException.REGISTERED_TYPE_STRING) + && config.isEnablePartialInsert()) { + return new TSStatus( + TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()) + .setMessage(context.getMessage()); + } return new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode()) .setMessage(context.getMessage()); }
