This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 861d3eec35b Pipe: Fixed the idempotent semantic for MULTIPLE_ERROR 
(#17177)
861d3eec35b is described below

commit 861d3eec35b216b8d3cb97beef7e627d024c58e6
Author: Caideyipi <[email protected]>
AuthorDate: Mon Feb 9 11:57:03 2026 +0800

    Pipe: Fixed the idempotent semantic for MULTIPLE_ERROR (#17177)
    
    * idemp
    
    * grass
    
    * fix
    
    * fix
    
    * coverage
    
    * coverage
    
    * fix
---
 .../auto/enhanced/IoTDBPipeIdempotentIT.java       |   2 -
 .../protocol/thrift/IoTDBDataNodeReceiver.java     |   4 +-
 .../visitor/PipeStatementTSStatusVisitor.java      | 219 ++++++++++-----------
 ...leStatementDataTypeConvertExecutionVisitor.java |  12 +-
 ...eeStatementDataTypeConvertExecutionVisitor.java |  10 +-
 .../receiver/PipeStatementTsStatusVisitorTest.java |  65 ++++++
 6 files changed, 180 insertions(+), 132 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeIdempotentIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeIdempotentIT.java
index 53da5d3f498..fb499ae5e43 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeIdempotentIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeIdempotentIT.java
@@ -400,8 +400,6 @@ public class IoTDBPipeIdempotentIT extends 
AbstractPipeDualTreeModelAutoIT {
         Collections.singleton("2,"));
   }
 
-  // Table model
-
   private void testIdempotent(
       final List<String> beforeSqlList,
       final String testSql,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index f0796a3b1a0..f161a249666 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -840,7 +840,7 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
             receiverId.get(),
             statement.getPipeLoggingString(),
             result);
-        return statement.accept(STATEMENT_STATUS_VISITOR, result);
+        return STATEMENT_STATUS_VISITOR.process(statement, result);
       }
     } catch (final Exception e) {
       PipeLogger.log(
@@ -849,7 +849,7 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
           "Receiver id = %s: Exception encountered while executing statement 
%s: ",
           receiverId.get(),
           statement.getPipeLoggingString());
-      return statement.accept(STATEMENT_EXCEPTION_VISITOR, e);
+      return STATEMENT_EXCEPTION_VISITOR.process(statement, e);
     } finally {
       if (Objects.nonNull(allocatedMemoryBlock)) {
         allocatedMemoryBlock.close();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
index 7d1b08b1238..f3472e63d71 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.pipe.receiver.visitor;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.pipe.receiver.PipeReceiverStatusHandler;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
@@ -43,6 +44,8 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.BatchAct
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.view.CreateLogicalViewStatement;
 import org.apache.iotdb.rpc.TSStatusCode;
 
+import java.util.stream.Collectors;
+
 /**
  * This visitor translated some {@link TSStatus} to pipe related status to 
help sender classify them
  * and apply different error handling tactics. Please DO NOT modify the {@link 
TSStatus} returned by
@@ -53,227 +56,213 @@ public class PipeStatementTSStatusVisitor extends 
StatementVisitor<TSStatus, TSS
   private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
 
   @Override
-  public TSStatus visitNode(final StatementNode node, final TSStatus context) {
-    return context;
+  public TSStatus process(final StatementNode node, final TSStatus status) {
+    return status.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()
+        ? PipeReceiverStatusHandler.getPriorStatus(
+            status.getSubStatus().stream()
+                .map(subStatus -> node.accept(this, subStatus))
+                .collect(Collectors.toList()))
+        : node.accept(this, status);
+  }
+
+  @Override
+  public TSStatus visitNode(final StatementNode node, final TSStatus status) {
+    return status;
   }
 
   @Override
   public TSStatus visitLoadFile(
-      final LoadTsFileStatement loadTsFileStatement, final TSStatus context) {
-    if (context.getCode() == TSStatusCode.SYSTEM_READ_ONLY.getStatusCode()
-        || context.getCode() == TSStatusCode.LOAD_FILE_ERROR.getStatusCode()
-            && context.getMessage() != null
-            && context.getMessage().contains("memory")) {
+      final LoadTsFileStatement loadTsFileStatement, final TSStatus status) {
+    if (status.getCode() == TSStatusCode.SYSTEM_READ_ONLY.getStatusCode()
+        || status.getCode() == TSStatusCode.LOAD_FILE_ERROR.getStatusCode()
+            && status.getMessage() != null
+            && status.getMessage().contains("memory")) {
       return new TSStatus(
               
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
-          .setMessage(context.getMessage());
+          .setMessage(status.getMessage());
     }
-    return super.visitLoadFile(loadTsFileStatement, context);
+    return super.visitLoadFile(loadTsFileStatement, status);
   }
 
   @Override
   public TSStatus visitInsertTablet(
-      final InsertTabletStatement insertTabletStatement, final TSStatus 
context) {
-    return visitInsertBase(insertTabletStatement, context);
+      final InsertTabletStatement insertTabletStatement, final TSStatus 
status) {
+    return visitInsertBase(insertTabletStatement, status);
   }
 
   @Override
   public TSStatus visitInsertRow(
-      final InsertRowStatement insertRowStatement, final TSStatus context) {
-    return visitInsertBase(insertRowStatement, context);
+      final InsertRowStatement insertRowStatement, final TSStatus status) {
+    return visitInsertBase(insertRowStatement, status);
   }
 
   @Override
   public TSStatus visitInsertRows(
-      final InsertRowsStatement insertRowsStatement, final TSStatus context) {
-    return visitInsertBase(insertRowsStatement, context);
+      final InsertRowsStatement insertRowsStatement, final TSStatus status) {
+    return visitInsertBase(insertRowsStatement, status);
   }
 
   @Override
   public TSStatus visitInsertMultiTablets(
-      final InsertMultiTabletsStatement insertMultiTabletsStatement, final 
TSStatus context) {
-    return visitInsertBase(insertMultiTabletsStatement, context);
+      final InsertMultiTabletsStatement insertMultiTabletsStatement, final 
TSStatus status) {
+    return visitInsertBase(insertMultiTabletsStatement, status);
   }
 
   @Override
   public TSStatus visitInsertBase(
-      final InsertBaseStatement insertBaseStatement, final TSStatus context) {
+      final InsertBaseStatement insertBaseStatement, final TSStatus status) {
     // If the system is read-only, we shall not classify it into temporary 
unavailable exception to
     // avoid to many logs
-    if (context.getCode() == 
TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode()) {
+    if (status.getCode() == TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode()) 
{
       return new TSStatus(
               
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
-          .setMessage(context.getMessage());
-    } else if (context.getCode() == TSStatusCode.OUT_OF_TTL.getStatusCode()) {
+          .setMessage(status.getMessage());
+    } else if (status.getCode() == TSStatusCode.OUT_OF_TTL.getStatusCode()) {
       return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
-          .setMessage(context.getMessage());
-    } else if (context.getCode() == 
TSStatusCode.DATABASE_NOT_EXIST.getStatusCode()) {
+          .setMessage(status.getMessage());
+    } else if (status.getCode() == 
TSStatusCode.DATABASE_NOT_EXIST.getStatusCode()) {
       return new TSStatus(
               
TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode())
-          .setMessage(context.getMessage());
-    } else if (context.getCode() == 
TSStatusCode.METADATA_ERROR.getStatusCode()) {
-      if 
(context.getMessage().contains(DataTypeMismatchException.REGISTERED_TYPE_STRING)
+          .setMessage(status.getMessage());
+    } else if (status.getCode() == 
TSStatusCode.METADATA_ERROR.getStatusCode()) {
+      if 
(status.getMessage().contains(DataTypeMismatchException.REGISTERED_TYPE_STRING)
           && config.isEnablePartialInsert()) {
         return new TSStatus(
                 
TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
-            .setMessage(context.getMessage());
+            .setMessage(status.getMessage());
       }
-      if (context.getMessage().contains("does not exist")) {
+      if (status.getMessage().contains("does not exist")) {
         return new TSStatus(
                 
TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode())
-            .setMessage(context.getMessage());
+            .setMessage(status.getMessage());
       }
     }
-    return visitStatement(insertBaseStatement, context);
+    return visitStatement(insertBaseStatement, status);
   }
 
   @Override
   public TSStatus visitCreateTimeseries(
-      final CreateTimeSeriesStatement statement, final TSStatus context) {
-    return visitGeneralCreateTimeSeries(statement, context);
+      final CreateTimeSeriesStatement statement, final TSStatus status) {
+    return visitGeneralCreateTimeSeries(statement, status);
   }
 
   @Override
   public TSStatus visitCreateAlignedTimeseries(
-      final CreateAlignedTimeSeriesStatement statement, final TSStatus 
context) {
-    return visitGeneralCreateTimeSeries(statement, context);
+      final CreateAlignedTimeSeriesStatement statement, final TSStatus status) 
{
+    return visitGeneralCreateTimeSeries(statement, status);
   }
 
-  private TSStatus visitGeneralCreateTimeSeries(final Statement statement, 
final TSStatus context) {
-    if (context.getCode() == 
TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()
-        || context.getCode() == 
TSStatusCode.ALIAS_ALREADY_EXIST.getStatusCode()) {
+  private TSStatus visitGeneralCreateTimeSeries(final Statement statement, 
final TSStatus status) {
+    if (status.getCode() == 
TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()
+        || status.getCode() == 
TSStatusCode.ALIAS_ALREADY_EXIST.getStatusCode()) {
       return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
-          .setMessage(context.getMessage());
-    } else if (context.getCode() == 
TSStatusCode.PATH_ALREADY_EXIST.getStatusCode()
-        || context.getCode() == 
TSStatusCode.SCHEMA_QUOTA_EXCEEDED.getStatusCode()) {
+          .setMessage(status.getMessage());
+    } else if (status.getCode() == 
TSStatusCode.PATH_ALREADY_EXIST.getStatusCode()
+        || status.getCode() == 
TSStatusCode.SCHEMA_QUOTA_EXCEEDED.getStatusCode()) {
       return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
-          .setMessage(context.getMessage());
+          .setMessage(status.getMessage());
     }
-    return visitStatement(statement, context);
+    return visitStatement(statement, status);
   }
 
   @Override
   public TSStatus visitCreateMultiTimeSeries(
-      final CreateMultiTimeSeriesStatement createMultiTimeSeriesStatement, 
final TSStatus context) {
-    return visitGeneralCreateMultiTimeseries(createMultiTimeSeriesStatement, 
context);
+      final CreateMultiTimeSeriesStatement createMultiTimeSeriesStatement, 
final TSStatus status) {
+    return visitGeneralCreateMultiTimeSeries(createMultiTimeSeriesStatement, 
status);
   }
 
   @Override
   public TSStatus visitInternalCreateTimeseries(
       final InternalCreateTimeSeriesStatement 
internalCreateTimeSeriesStatement,
-      final TSStatus context) {
-    return 
visitGeneralCreateMultiTimeseries(internalCreateTimeSeriesStatement, context);
+      final TSStatus status) {
+    return 
visitGeneralCreateMultiTimeSeries(internalCreateTimeSeriesStatement, status);
   }
 
   @Override
   public TSStatus visitInternalCreateMultiTimeSeries(
       final InternalCreateMultiTimeSeriesStatement 
internalCreateMultiTimeSeriesStatement,
-      final TSStatus context) {
-    return 
visitGeneralCreateMultiTimeseries(internalCreateMultiTimeSeriesStatement, 
context);
+      final TSStatus status) {
+    return 
visitGeneralCreateMultiTimeSeries(internalCreateMultiTimeSeriesStatement, 
status);
   }
 
-  private TSStatus visitGeneralCreateMultiTimeseries(
-      final Statement statement, final TSStatus context) {
-    if (context.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
-      for (final TSStatus status : context.getSubStatus()) {
-        if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
-            && status.getCode() != 
TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()
-            && status.getCode() != 
TSStatusCode.ALIAS_ALREADY_EXIST.getStatusCode()) {
-          return visitStatement(statement, context);
-        }
-      }
+  private TSStatus visitGeneralCreateMultiTimeSeries(
+      final Statement statement, final TSStatus status) {
+    if (status.getCode() == 
TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()
+        || status.getCode() == 
TSStatusCode.ALIAS_ALREADY_EXIST.getStatusCode()) {
       return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
-          .setMessage(context.getMessage());
-    } else if (context.getCode() == 
TSStatusCode.SCHEMA_QUOTA_EXCEEDED.getStatusCode()) {
+          .setMessage(status.getMessage());
+    } else if (status.getCode() == 
TSStatusCode.SCHEMA_QUOTA_EXCEEDED.getStatusCode()) {
       return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
-          .setMessage(context.getMessage());
+          .setMessage(status.getMessage());
     }
-    return visitStatement(statement, context);
+    return visitStatement(statement, status);
   }
 
   @Override
   public TSStatus visitAlterTimeSeries(
-      final AlterTimeSeriesStatement alterTimeSeriesStatement, final TSStatus 
context) {
-    if (context.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()) {
-      if (context.getMessage().contains("already")) {
+      final AlterTimeSeriesStatement alterTimeSeriesStatement, final TSStatus 
status) {
+    if (status.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()) {
+      if (status.getMessage().contains("already")) {
         return new TSStatus(
                 
TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
-            .setMessage(context.getMessage());
-      } else if (context.getMessage().contains("does not")) {
+            .setMessage(status.getMessage());
+      } else if (status.getMessage().contains("does not")) {
         return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
-            .setMessage(context.getMessage());
+            .setMessage(status.getMessage());
       }
-    } else if (context.getCode() == 
TSStatusCode.PATH_NOT_EXIST.getStatusCode()) {
+    } else if (status.getCode() == 
TSStatusCode.PATH_NOT_EXIST.getStatusCode()) {
       return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
-          .setMessage(context.getMessage());
+          .setMessage(status.getMessage());
     }
-    return visitStatement(alterTimeSeriesStatement, context);
+    return visitStatement(alterTimeSeriesStatement, status);
   }
 
   @Override
   public TSStatus visitCreateLogicalView(
-      final CreateLogicalViewStatement createLogicalViewStatement, final 
TSStatus context) {
-    if (context.getCode() == 
TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
-      return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
-          .setMessage(context.getMessage());
-    } else if (context.getCode() == 
TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
-      for (final TSStatus status : context.getSubStatus()) {
-        if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
-            && status.getCode() != 
TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
-          return visitStatement(createLogicalViewStatement, context);
-        }
-      }
+      final CreateLogicalViewStatement createLogicalViewStatement, final 
TSStatus status) {
+    if (status.getCode() == 
TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
       return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
-          .setMessage(context.getMessage());
+          .setMessage(status.getMessage());
     }
-    return super.visitCreateLogicalView(createLogicalViewStatement, context);
+    return super.visitCreateLogicalView(createLogicalViewStatement, status);
   }
 
   @Override
   public TSStatus visitActivateTemplate(
-      final ActivateTemplateStatement activateTemplateStatement, final 
TSStatus context) {
-    return visitGeneralActivateTemplate(activateTemplateStatement, context);
+      final ActivateTemplateStatement activateTemplateStatement, final 
TSStatus status) {
+    return visitGeneralActivateTemplate(activateTemplateStatement, status);
   }
 
   @Override
   public TSStatus visitBatchActivateTemplate(
-      final BatchActivateTemplateStatement batchActivateTemplateStatement, 
final TSStatus context) {
-    boolean userConflict = false;
-    if (context.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
-      for (final TSStatus status : context.getSubStatus()) {
-        if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
-            && status.getCode() != 
TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) {
-          return visitStatement(batchActivateTemplateStatement, context);
-        }
-        if (context.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()
-            && context.isSetMessage()
-            && context.getMessage().contains("has not been set any template")) 
{
-          userConflict = true;
-        }
-      }
-      return (userConflict
-              ? new TSStatus(
-                  
TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode())
-              : new TSStatus(
-                  
TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()))
-          .setMessage(context.getMessage());
+      final BatchActivateTemplateStatement batchActivateTemplateStatement, 
final TSStatus status) {
+    if (status.getCode() == TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) {
+      return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
+          .setMessage(status.getMessage());
+    }
+    if (status.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()
+        && status.isSetMessage()
+        && status.getMessage().contains("has not been set any template")) {
+      return new TSStatus(
+              
TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode())
+          .setMessage(status.getMessage());
     }
-    return visitGeneralActivateTemplate(batchActivateTemplateStatement, 
context);
+    return visitGeneralActivateTemplate(batchActivateTemplateStatement, 
status);
   }
 
   private TSStatus visitGeneralActivateTemplate(
-      final Statement activateTemplateStatement, final TSStatus context) {
-    if (context.getCode() == TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) {
+      final Statement activateTemplateStatement, final TSStatus status) {
+    if (status.getCode() == TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) {
       return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
-          .setMessage(context.getMessage());
+          .setMessage(status.getMessage());
     }
-    if (context.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()
-        && context.isSetMessage()
-        && context.getMessage().contains("has not been set any template")) {
+    if (status.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()
+        && status.isSetMessage()
+        && status.getMessage().contains("has not been set any template")) {
       return new TSStatus(
               
TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode())
-          .setMessage(context.getMessage());
+          .setMessage(status.getMessage());
     }
-    return visitStatement(activateTemplateStatement, context);
+    return visitStatement(activateTemplateStatement, status);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTableStatementDataTypeConvertExecutionVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTableStatementDataTypeConvertExecutionVisitor.java
index d19aa71c234..10d0423e6fe 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTableStatementDataTypeConvertExecutionVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTableStatementDataTypeConvertExecutionVisitor.java
@@ -154,9 +154,8 @@ public class 
PipeTableStatementDataTypeConvertExecutionVisitor
           TSStatus result;
           try {
             result =
-                statement.accept(
-                    IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR,
-                    statementExecutor.execute(statement, databaseName));
+                IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR.process(
+                    statement, statementExecutor.execute(statement, 
databaseName));
 
             // Retry max 5 times if the write process is rejected
             for (int i = 0;
@@ -167,15 +166,14 @@ public class 
PipeTableStatementDataTypeConvertExecutionVisitor
                 i++) {
               Thread.sleep(100L * (i + 1));
               result =
-                  statement.accept(
-                      IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR,
-                      statementExecutor.execute(statement, databaseName));
+                  IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR.process(
+                      statement, statementExecutor.execute(statement, 
databaseName));
             }
           } catch (final Exception e) {
             if (e instanceof InterruptedException) {
               Thread.currentThread().interrupt();
             }
-            result = 
statement.accept(IoTDBDataNodeReceiver.STATEMENT_EXCEPTION_VISITOR, e);
+            result = 
IoTDBDataNodeReceiver.STATEMENT_EXCEPTION_VISITOR.process(statement, e);
           }
 
           if (!(result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java
index 282b378a2d2..ae60b87450a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java
@@ -114,9 +114,8 @@ public class 
PipeTreeStatementDataTypeConvertExecutionVisitor
           TSStatus result;
           try {
             result =
-                statement.accept(
-                    IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR,
-                    statementExecutor.execute(statement));
+                IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR.process(
+                    statement, statementExecutor.execute(statement));
 
             // Retry max 5 times if the write process is rejected
             for (int i = 0;
@@ -127,9 +126,8 @@ public class 
PipeTreeStatementDataTypeConvertExecutionVisitor
                 i++) {
               Thread.sleep(100L * (i + 1));
               result =
-                  statement.accept(
-                      IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR,
-                      statementExecutor.execute(statement));
+                  IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR.process(
+                      statement, statementExecutor.execute(statement));
             }
           } catch (final Exception e) {
             if (e instanceof InterruptedException) {
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java
new file mode 100644
index 00000000000..2b20f1d91ef
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.receiver;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.utils.StatusUtils;
+import org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiver;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.BatchActivateTemplateStatement;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+public class PipeStatementTsStatusVisitorTest {
+
+  @Test
+  public void testActivateTemplate() {
+    Assert.assertEquals(
+        
TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode(),
+        IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR
+            .process(
+                new BatchActivateTemplateStatement(Collections.emptyList()),
+                new TSStatus(TSStatusCode.MULTIPLE_ERROR.getStatusCode())
+                    .setSubStatus(
+                        Arrays.asList(
+                            StatusUtils.OK,
+                            new 
TSStatus(TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()))))
+            .getCode());
+  }
+
+  @Test
+  public void testTTLIdempotency() {
+    Assert.assertEquals(
+        
TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode(),
+        IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR
+            .process(
+                new InsertRowsStatement(),
+                new TSStatus(TSStatusCode.MULTIPLE_ERROR.getStatusCode())
+                    .setSubStatus(
+                        Arrays.asList(
+                            StatusUtils.OK, new 
TSStatus(TSStatusCode.OUT_OF_TTL.getStatusCode()))))
+            .getCode());
+  }
+}

Reply via email to