This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f16cab3b08 Pipe: handle insert data type mismatch exceptions as 
idempotent exceptions when partial insert is enabled (#12432)
0f16cab3b08 is described below

commit 0f16cab3b088800648d73f869a4cc50fd088cb5a
Author: Steve Yurong Su <[email protected]>
AuthorDate: Fri Apr 26 19:27:28 2024 +0800

    Pipe: handle insert data type mismatch exceptions as idempotent exceptions 
when partial insert is enabled (#12432)
---
 .../db/exception/metadata/DataTypeMismatchException.java     |  7 ++++++-
 .../pipe/receiver/visitor/PipeStatementTSStatusVisitor.java  | 12 ++++++++++++
 2 files changed, 18 insertions(+), 1 deletion(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/DataTypeMismatchException.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/DataTypeMismatchException.java
index 4482b647ce5..8165f291765 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/DataTypeMismatchException.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/DataTypeMismatchException.java
@@ -24,6 +24,10 @@ import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.tsfile.enums.TSDataType;
 
 public class DataTypeMismatchException extends MetadataException {
+
+  // NOTICE: DO NOT CHANGE THIS STRING, IT IS USED IN THE ERROR HANDLING OF 
PIPE
+  public static final String REGISTERED_TYPE_STRING = "registered type";
+
   public DataTypeMismatchException(
       String deviceName,
       String measurementName,
@@ -34,9 +38,10 @@ public class DataTypeMismatchException extends 
MetadataException {
     super(
         String.format(
             "data type of %s.%s is not consistent, "
-                + "registered type %s, inserting type %s, timestamp %s, value 
%s",
+                + "%s %s, inserting type %s, timestamp %s, value %s",
             deviceName,
             measurementName,
+            REGISTERED_TYPE_STRING,
             typeInSchema,
             insertType,
             time,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
index 7efd7b84473..6cfd503cb38 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
@@ -20,6 +20,9 @@
 package org.apache.iotdb.db.pipe.receiver.visitor;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
 import org.apache.iotdb.db.queryengine.plan.statement.Statement;
 import org.apache.iotdb.db.queryengine.plan.statement.StatementNode;
 import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
@@ -45,6 +48,9 @@ import org.apache.iotdb.rpc.TSStatusCode;
  * the processes that generate the following {@link TSStatus}es in the class.
  */
 public class PipeStatementTSStatusVisitor extends StatementVisitor<TSStatus, 
TSStatus> {
+
+  private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
   @Override
   public TSStatus visitNode(final StatementNode node, final TSStatus context) {
     return context;
@@ -80,6 +86,12 @@ public class PipeStatementTSStatusVisitor extends 
StatementVisitor<TSStatus, TSS
       return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
           .setMessage(context.getMessage());
     } else if (context.getCode() == 
TSStatusCode.METADATA_ERROR.getStatusCode()) {
+      if 
(context.getMessage().contains(DataTypeMismatchException.REGISTERED_TYPE_STRING)
+          && config.isEnablePartialInsert()) {
+        return new TSStatus(
+                
TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
+            .setMessage(context.getMessage());
+      }
       return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
           .setMessage(context.getMessage());
     }

Reply via email to