This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 359787d929f Pipe: Fixed the idempotent semantic for MULTIPLE_ERROR 
(#17177) (#17185)
359787d929f is described below

commit 359787d929fe2f5677f1cca836f9094f7a7c3cc0
Author: Caideyipi <[email protected]>
AuthorDate: Mon Feb 9 17:55:39 2026 +0800

    Pipe: Fixed the idempotent semantic for MULTIPLE_ERROR (#17177) (#17185)
    
    * idemp
    
    * grass
    
    * fix
    
    * fix
    
    * coverage
    
    * coverage
    
    * fix
---
 .../protocol/thrift/IoTDBDataNodeReceiver.java     |   4 +-
 ...peStatementDataTypeConvertExecutionVisitor.java |  10 +-
 .../visitor/PipeStatementTSStatusVisitor.java      | 219 ++++++++++-----------
 .../receiver/PipeStatementTsStatusVisitorTest.java |  65 ++++++
 4 files changed, 175 insertions(+), 123 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 2bf0b42f0a2..334c3530414 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -702,7 +702,7 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
             receiverId.get(),
             statement.getPipeLoggingString(),
             result);
-        return statement.accept(STATEMENT_STATUS_VISITOR, result);
+        return STATEMENT_STATUS_VISITOR.process(statement, result);
       }
     } catch (final Exception e) {
       PipeLogger.log(
@@ -711,7 +711,7 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
           "Receiver id = %s: Exception encountered while executing statement 
%s: ",
           receiverId.get(),
           statement.getPipeLoggingString());
-      return statement.accept(STATEMENT_EXCEPTION_VISITOR, e);
+      return STATEMENT_EXCEPTION_VISITOR.process(statement, e);
     } finally {
       if (Objects.nonNull(allocatedMemoryBlock)) {
         allocatedMemoryBlock.close();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
index 2b7b2c2ffd6..7443879bec0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
@@ -113,9 +113,8 @@ public class PipeStatementDataTypeConvertExecutionVisitor
           TSStatus result;
           try {
             result =
-                statement.accept(
-                    IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR,
-                    statementExecutor.execute(statement));
+                IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR.process(
+                    statement, statementExecutor.execute(statement));
 
             // Retry max 5 times if the write process is rejected
             for (int i = 0;
@@ -126,9 +125,8 @@ public class PipeStatementDataTypeConvertExecutionVisitor
                 i++) {
               Thread.sleep(100L * (i + 1));
               result =
-                  statement.accept(
-                      IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR,
-                      statementExecutor.execute(statement));
+                  IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR.process(
+                      statement, statementExecutor.execute(statement));
             }
           } catch (final Exception e) {
             if (e instanceof InterruptedException) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
index 84e3c9875ed..5210b98607f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.pipe.receiver.visitor;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.pipe.receiver.PipeReceiverStatusHandler;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
@@ -43,6 +44,8 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.BatchAct
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.view.CreateLogicalViewStatement;
 import org.apache.iotdb.rpc.TSStatusCode;
 
+import java.util.stream.Collectors;
+
 /**
  * This visitor translated some {@link TSStatus} to pipe related status to 
help sender classify them
  * and apply different error handling tactics. Please DO NOT modify the {@link 
TSStatus} returned by
@@ -53,226 +56,212 @@ public class PipeStatementTSStatusVisitor extends 
StatementVisitor<TSStatus, TSS
   private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
 
   @Override
-  public TSStatus visitNode(final StatementNode node, final TSStatus context) {
-    return context;
+  public TSStatus process(final StatementNode node, final TSStatus status) {
+    return status.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()
+        ? PipeReceiverStatusHandler.getPriorStatus(
+            status.getSubStatus().stream()
+                .map(subStatus -> node.accept(this, subStatus))
+                .collect(Collectors.toList()))
+        : node.accept(this, status);
+  }
+
+  @Override
+  public TSStatus visitNode(final StatementNode node, final TSStatus status) {
+    return status;
   }
 
   @Override
   public TSStatus visitLoadFile(
-      final LoadTsFileStatement loadTsFileStatement, final TSStatus context) {
-    if (context.getCode() == TSStatusCode.SYSTEM_READ_ONLY.getStatusCode()
-        || context.getCode() == TSStatusCode.LOAD_FILE_ERROR.getStatusCode()
-            && context.getMessage() != null
-            && context.getMessage().contains("memory")) {
+      final LoadTsFileStatement loadTsFileStatement, final TSStatus status) {
+    if (status.getCode() == TSStatusCode.SYSTEM_READ_ONLY.getStatusCode()
+        || status.getCode() == TSStatusCode.LOAD_FILE_ERROR.getStatusCode()
+            && status.getMessage() != null
+            && status.getMessage().contains("memory")) {
       return new TSStatus(
               
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
-          .setMessage(context.getMessage());
+          .setMessage(status.getMessage());
     }
-    return super.visitLoadFile(loadTsFileStatement, context);
+    return super.visitLoadFile(loadTsFileStatement, status);
   }
 
   @Override
   public TSStatus visitInsertTablet(
-      final InsertTabletStatement insertTabletStatement, final TSStatus 
context) {
-    return visitInsertBase(insertTabletStatement, context);
+      final InsertTabletStatement insertTabletStatement, final TSStatus 
status) {
+    return visitInsertBase(insertTabletStatement, status);
   }
 
   @Override
   public TSStatus visitInsertRow(
-      final InsertRowStatement insertRowStatement, final TSStatus context) {
-    return visitInsertBase(insertRowStatement, context);
+      final InsertRowStatement insertRowStatement, final TSStatus status) {
+    return visitInsertBase(insertRowStatement, status);
   }
 
   @Override
   public TSStatus visitInsertRows(
-      final InsertRowsStatement insertRowsStatement, final TSStatus context) {
-    return visitInsertBase(insertRowsStatement, context);
+      final InsertRowsStatement insertRowsStatement, final TSStatus status) {
+    return visitInsertBase(insertRowsStatement, status);
   }
 
   @Override
   public TSStatus visitInsertMultiTablets(
-      final InsertMultiTabletsStatement insertMultiTabletsStatement, final 
TSStatus context) {
-    return visitInsertBase(insertMultiTabletsStatement, context);
+      final InsertMultiTabletsStatement insertMultiTabletsStatement, final 
TSStatus status) {
+    return visitInsertBase(insertMultiTabletsStatement, status);
   }
 
   private TSStatus visitInsertBase(
-      final InsertBaseStatement insertBaseStatement, final TSStatus context) {
+      final InsertBaseStatement insertBaseStatement, final TSStatus status) {
     // If the system is read-only, we shall not classify it into temporary 
unavailable exception to
     // avoid to many logs
-    if (context.getCode() == 
TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode()) {
+    if (status.getCode() == TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode()) 
{
       return new TSStatus(
               
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
-          .setMessage(context.getMessage());
-    } else if (context.getCode() == TSStatusCode.OUT_OF_TTL.getStatusCode()) {
+          .setMessage(status.getMessage());
+    } else if (status.getCode() == TSStatusCode.OUT_OF_TTL.getStatusCode()) {
       return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
-          .setMessage(context.getMessage());
-    } else if (context.getCode() == 
TSStatusCode.DATABASE_NOT_EXIST.getStatusCode()) {
+          .setMessage(status.getMessage());
+    } else if (status.getCode() == 
TSStatusCode.DATABASE_NOT_EXIST.getStatusCode()) {
       return new TSStatus(
               
TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode())
-          .setMessage(context.getMessage());
-    } else if (context.getCode() == 
TSStatusCode.METADATA_ERROR.getStatusCode()) {
-      if 
(context.getMessage().contains(DataTypeMismatchException.REGISTERED_TYPE_STRING)
+          .setMessage(status.getMessage());
+    } else if (status.getCode() == 
TSStatusCode.METADATA_ERROR.getStatusCode()) {
+      if 
(status.getMessage().contains(DataTypeMismatchException.REGISTERED_TYPE_STRING)
           && config.isEnablePartialInsert()) {
         return new TSStatus(
                 
TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
-            .setMessage(context.getMessage());
+            .setMessage(status.getMessage());
       }
-      if (context.getMessage().contains("does not exist")) {
+      if (status.getMessage().contains("does not exist")) {
         return new TSStatus(
                 
TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode())
-            .setMessage(context.getMessage());
+            .setMessage(status.getMessage());
       }
     }
-    return visitStatement(insertBaseStatement, context);
+    return visitStatement(insertBaseStatement, status);
   }
 
   @Override
   public TSStatus visitCreateTimeseries(
-      final CreateTimeSeriesStatement statement, final TSStatus context) {
-    return visitGeneralCreateTimeSeries(statement, context);
+      final CreateTimeSeriesStatement statement, final TSStatus status) {
+    return visitGeneralCreateTimeSeries(statement, status);
   }
 
   @Override
   public TSStatus visitCreateAlignedTimeseries(
-      final CreateAlignedTimeSeriesStatement statement, final TSStatus 
context) {
-    return visitGeneralCreateTimeSeries(statement, context);
+      final CreateAlignedTimeSeriesStatement statement, final TSStatus status) 
{
+    return visitGeneralCreateTimeSeries(statement, status);
   }
 
-  private TSStatus visitGeneralCreateTimeSeries(final Statement statement, 
final TSStatus context) {
-    if (context.getCode() == 
TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()
-        || context.getCode() == 
TSStatusCode.ALIAS_ALREADY_EXIST.getStatusCode()) {
+  private TSStatus visitGeneralCreateTimeSeries(final Statement statement, 
final TSStatus status) {
+    if (status.getCode() == 
TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()
+        || status.getCode() == 
TSStatusCode.ALIAS_ALREADY_EXIST.getStatusCode()) {
       return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
-          .setMessage(context.getMessage());
-    } else if (context.getCode() == 
TSStatusCode.PATH_ALREADY_EXIST.getStatusCode()
-        || context.getCode() == 
TSStatusCode.SCHEMA_QUOTA_EXCEEDED.getStatusCode()) {
+          .setMessage(status.getMessage());
+    } else if (status.getCode() == 
TSStatusCode.PATH_ALREADY_EXIST.getStatusCode()
+        || status.getCode() == 
TSStatusCode.SCHEMA_QUOTA_EXCEEDED.getStatusCode()) {
       return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
-          .setMessage(context.getMessage());
+          .setMessage(status.getMessage());
     }
-    return visitStatement(statement, context);
+    return visitStatement(statement, status);
   }
 
   @Override
   public TSStatus visitCreateMultiTimeSeries(
-      final CreateMultiTimeSeriesStatement createMultiTimeSeriesStatement, 
final TSStatus context) {
-    return visitGeneralCreateMultiTimeseries(createMultiTimeSeriesStatement, 
context);
+      final CreateMultiTimeSeriesStatement createMultiTimeSeriesStatement, 
final TSStatus status) {
+    return visitGeneralCreateMultiTimeSeries(createMultiTimeSeriesStatement, 
status);
   }
 
   @Override
   public TSStatus visitInternalCreateTimeseries(
       final InternalCreateTimeSeriesStatement 
internalCreateTimeSeriesStatement,
-      final TSStatus context) {
-    return 
visitGeneralCreateMultiTimeseries(internalCreateTimeSeriesStatement, context);
+      final TSStatus status) {
+    return 
visitGeneralCreateMultiTimeSeries(internalCreateTimeSeriesStatement, status);
   }
 
   @Override
   public TSStatus visitInternalCreateMultiTimeSeries(
       final InternalCreateMultiTimeSeriesStatement 
internalCreateMultiTimeSeriesStatement,
-      final TSStatus context) {
-    return 
visitGeneralCreateMultiTimeseries(internalCreateMultiTimeSeriesStatement, 
context);
+      final TSStatus status) {
+    return 
visitGeneralCreateMultiTimeSeries(internalCreateMultiTimeSeriesStatement, 
status);
   }
 
-  private TSStatus visitGeneralCreateMultiTimeseries(
-      final Statement statement, final TSStatus context) {
-    if (context.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
-      for (final TSStatus status : context.getSubStatus()) {
-        if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
-            && status.getCode() != 
TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()
-            && status.getCode() != 
TSStatusCode.ALIAS_ALREADY_EXIST.getStatusCode()) {
-          return visitStatement(statement, context);
-        }
-      }
+  private TSStatus visitGeneralCreateMultiTimeSeries(
+      final Statement statement, final TSStatus status) {
+    if (status.getCode() == 
TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()
+        || status.getCode() == 
TSStatusCode.ALIAS_ALREADY_EXIST.getStatusCode()) {
       return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
-          .setMessage(context.getMessage());
-    } else if (context.getCode() == 
TSStatusCode.SCHEMA_QUOTA_EXCEEDED.getStatusCode()) {
+          .setMessage(status.getMessage());
+    } else if (status.getCode() == 
TSStatusCode.SCHEMA_QUOTA_EXCEEDED.getStatusCode()) {
       return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
-          .setMessage(context.getMessage());
+          .setMessage(status.getMessage());
     }
-    return visitStatement(statement, context);
+    return visitStatement(statement, status);
   }
 
   @Override
   public TSStatus visitAlterTimeSeries(
-      final AlterTimeSeriesStatement alterTimeSeriesStatement, final TSStatus 
context) {
-    if (context.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()) {
-      if (context.getMessage().contains("already")) {
+      final AlterTimeSeriesStatement alterTimeSeriesStatement, final TSStatus 
status) {
+    if (status.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()) {
+      if (status.getMessage().contains("already")) {
         return new TSStatus(
                 
TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
-            .setMessage(context.getMessage());
-      } else if (context.getMessage().contains("does not")) {
+            .setMessage(status.getMessage());
+      } else if (status.getMessage().contains("does not")) {
         return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
-            .setMessage(context.getMessage());
+            .setMessage(status.getMessage());
       }
-    } else if (context.getCode() == 
TSStatusCode.PATH_NOT_EXIST.getStatusCode()) {
+    } else if (status.getCode() == 
TSStatusCode.PATH_NOT_EXIST.getStatusCode()) {
       return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
-          .setMessage(context.getMessage());
+          .setMessage(status.getMessage());
     }
-    return visitStatement(alterTimeSeriesStatement, context);
+    return visitStatement(alterTimeSeriesStatement, status);
   }
 
   @Override
   public TSStatus visitCreateLogicalView(
-      final CreateLogicalViewStatement createLogicalViewStatement, final 
TSStatus context) {
-    if (context.getCode() == 
TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
-      return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
-          .setMessage(context.getMessage());
-    } else if (context.getCode() == 
TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
-      for (final TSStatus status : context.getSubStatus()) {
-        if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
-            && status.getCode() != 
TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
-          return visitStatement(createLogicalViewStatement, context);
-        }
-      }
+      final CreateLogicalViewStatement createLogicalViewStatement, final 
TSStatus status) {
+    if (status.getCode() == 
TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
       return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
-          .setMessage(context.getMessage());
+          .setMessage(status.getMessage());
     }
-    return super.visitCreateLogicalView(createLogicalViewStatement, context);
+    return super.visitCreateLogicalView(createLogicalViewStatement, status);
   }
 
   @Override
   public TSStatus visitActivateTemplate(
-      final ActivateTemplateStatement activateTemplateStatement, final 
TSStatus context) {
-    return visitGeneralActivateTemplate(activateTemplateStatement, context);
+      final ActivateTemplateStatement activateTemplateStatement, final 
TSStatus status) {
+    return visitGeneralActivateTemplate(activateTemplateStatement, status);
   }
 
   @Override
   public TSStatus visitBatchActivateTemplate(
-      final BatchActivateTemplateStatement batchActivateTemplateStatement, 
final TSStatus context) {
-    boolean userConflict = false;
-    if (context.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
-      for (final TSStatus status : context.getSubStatus()) {
-        if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
-            && status.getCode() != 
TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) {
-          return visitStatement(batchActivateTemplateStatement, context);
-        }
-        if (context.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()
-            && context.isSetMessage()
-            && context.getMessage().contains("has not been set any template")) 
{
-          userConflict = true;
-        }
-      }
-      return (userConflict
-              ? new TSStatus(
-                  
TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode())
-              : new TSStatus(
-                  
TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()))
-          .setMessage(context.getMessage());
+      final BatchActivateTemplateStatement batchActivateTemplateStatement, 
final TSStatus status) {
+    if (status.getCode() == TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) {
+      return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
+          .setMessage(status.getMessage());
+    }
+    if (status.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()
+        && status.isSetMessage()
+        && status.getMessage().contains("has not been set any template")) {
+      return new TSStatus(
+              
TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode())
+          .setMessage(status.getMessage());
     }
-    return visitGeneralActivateTemplate(batchActivateTemplateStatement, 
context);
+    return visitGeneralActivateTemplate(batchActivateTemplateStatement, 
status);
   }
 
   private TSStatus visitGeneralActivateTemplate(
-      final Statement activateTemplateStatement, final TSStatus context) {
-    if (context.getCode() == TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) {
+      final Statement activateTemplateStatement, final TSStatus status) {
+    if (status.getCode() == TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) {
       return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
-          .setMessage(context.getMessage());
+          .setMessage(status.getMessage());
     }
-    if (context.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()
-        && context.isSetMessage()
-        && context.getMessage().contains("has not been set any template")) {
+    if (status.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()
+        && status.isSetMessage()
+        && status.getMessage().contains("has not been set any template")) {
       return new TSStatus(
               
TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode())
-          .setMessage(context.getMessage());
+          .setMessage(status.getMessage());
     }
-    return visitStatement(activateTemplateStatement, context);
+    return visitStatement(activateTemplateStatement, status);
   }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java
new file mode 100644
index 00000000000..2b20f1d91ef
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.receiver;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.utils.StatusUtils;
+import org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiver;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.BatchActivateTemplateStatement;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+public class PipeStatementTsStatusVisitorTest {
+
+  @Test
+  public void testActivateTemplate() {
+    Assert.assertEquals(
+        
TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode(),
+        IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR
+            .process(
+                new BatchActivateTemplateStatement(Collections.emptyList()),
+                new TSStatus(TSStatusCode.MULTIPLE_ERROR.getStatusCode())
+                    .setSubStatus(
+                        Arrays.asList(
+                            StatusUtils.OK,
+                            new 
TSStatus(TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()))))
+            .getCode());
+  }
+
+  @Test
+  public void testTTLIdempotency() {
+    Assert.assertEquals(
+        
TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode(),
+        IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR
+            .process(
+                new InsertRowsStatement(),
+                new TSStatus(TSStatusCode.MULTIPLE_ERROR.getStatusCode())
+                    .setSubStatus(
+                        Arrays.asList(
+                            StatusUtils.OK, new 
TSStatus(TSStatusCode.OUT_OF_TTL.getStatusCode()))))
+            .getCode());
+  }
+}

Reply via email to