This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 359787d929f Pipe: Fixed the idempotent semantic for MULTIPLE_ERROR
(#17177) (#17185)
359787d929f is described below
commit 359787d929fe2f5677f1cca836f9094f7a7c3cc0
Author: Caideyipi <[email protected]>
AuthorDate: Mon Feb 9 17:55:39 2026 +0800
Pipe: Fixed the idempotent semantic for MULTIPLE_ERROR (#17177) (#17185)
* idemp
* grass
* fix
* fix
* coverage
* coverage
* fix
---
.../protocol/thrift/IoTDBDataNodeReceiver.java | 4 +-
...peStatementDataTypeConvertExecutionVisitor.java | 10 +-
.../visitor/PipeStatementTSStatusVisitor.java | 219 ++++++++++-----------
.../receiver/PipeStatementTsStatusVisitorTest.java | 65 ++++++
4 files changed, 175 insertions(+), 123 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 2bf0b42f0a2..334c3530414 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -702,7 +702,7 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
receiverId.get(),
statement.getPipeLoggingString(),
result);
- return statement.accept(STATEMENT_STATUS_VISITOR, result);
+ return STATEMENT_STATUS_VISITOR.process(statement, result);
}
} catch (final Exception e) {
PipeLogger.log(
@@ -711,7 +711,7 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
"Receiver id = %s: Exception encountered while executing statement
%s: ",
receiverId.get(),
statement.getPipeLoggingString());
- return statement.accept(STATEMENT_EXCEPTION_VISITOR, e);
+ return STATEMENT_EXCEPTION_VISITOR.process(statement, e);
} finally {
if (Objects.nonNull(allocatedMemoryBlock)) {
allocatedMemoryBlock.close();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
index 2b7b2c2ffd6..7443879bec0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
@@ -113,9 +113,8 @@ public class PipeStatementDataTypeConvertExecutionVisitor
TSStatus result;
try {
result =
- statement.accept(
- IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR,
- statementExecutor.execute(statement));
+ IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR.process(
+ statement, statementExecutor.execute(statement));
// Retry max 5 times if the write process is rejected
for (int i = 0;
@@ -126,9 +125,8 @@ public class PipeStatementDataTypeConvertExecutionVisitor
i++) {
Thread.sleep(100L * (i + 1));
result =
- statement.accept(
- IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR,
- statementExecutor.execute(statement));
+ IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR.process(
+ statement, statementExecutor.execute(statement));
}
} catch (final Exception e) {
if (e instanceof InterruptedException) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
index 84e3c9875ed..5210b98607f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.receiver.visitor;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.pipe.receiver.PipeReceiverStatusHandler;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
@@ -43,6 +44,8 @@ import
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.BatchAct
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.view.CreateLogicalViewStatement;
import org.apache.iotdb.rpc.TSStatusCode;
+import java.util.stream.Collectors;
+
/**
* This visitor translated some {@link TSStatus} to pipe related status to
help sender classify them
* and apply different error handling tactics. Please DO NOT modify the {@link
TSStatus} returned by
@@ -53,226 +56,212 @@ public class PipeStatementTSStatusVisitor extends
StatementVisitor<TSStatus, TSS
private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
@Override
- public TSStatus visitNode(final StatementNode node, final TSStatus context) {
- return context;
+ public TSStatus process(final StatementNode node, final TSStatus status) {
+ return status.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()
+ ? PipeReceiverStatusHandler.getPriorStatus(
+ status.getSubStatus().stream()
+ .map(subStatus -> node.accept(this, subStatus))
+ .collect(Collectors.toList()))
+ : node.accept(this, status);
+ }
+
+ @Override
+ public TSStatus visitNode(final StatementNode node, final TSStatus status) {
+ return status;
}
@Override
public TSStatus visitLoadFile(
- final LoadTsFileStatement loadTsFileStatement, final TSStatus context) {
- if (context.getCode() == TSStatusCode.SYSTEM_READ_ONLY.getStatusCode()
- || context.getCode() == TSStatusCode.LOAD_FILE_ERROR.getStatusCode()
- && context.getMessage() != null
- && context.getMessage().contains("memory")) {
+ final LoadTsFileStatement loadTsFileStatement, final TSStatus status) {
+ if (status.getCode() == TSStatusCode.SYSTEM_READ_ONLY.getStatusCode()
+ || status.getCode() == TSStatusCode.LOAD_FILE_ERROR.getStatusCode()
+ && status.getMessage() != null
+ && status.getMessage().contains("memory")) {
return new TSStatus(
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
- .setMessage(context.getMessage());
+ .setMessage(status.getMessage());
}
- return super.visitLoadFile(loadTsFileStatement, context);
+ return super.visitLoadFile(loadTsFileStatement, status);
}
@Override
public TSStatus visitInsertTablet(
- final InsertTabletStatement insertTabletStatement, final TSStatus
context) {
- return visitInsertBase(insertTabletStatement, context);
+ final InsertTabletStatement insertTabletStatement, final TSStatus
status) {
+ return visitInsertBase(insertTabletStatement, status);
}
@Override
public TSStatus visitInsertRow(
- final InsertRowStatement insertRowStatement, final TSStatus context) {
- return visitInsertBase(insertRowStatement, context);
+ final InsertRowStatement insertRowStatement, final TSStatus status) {
+ return visitInsertBase(insertRowStatement, status);
}
@Override
public TSStatus visitInsertRows(
- final InsertRowsStatement insertRowsStatement, final TSStatus context) {
- return visitInsertBase(insertRowsStatement, context);
+ final InsertRowsStatement insertRowsStatement, final TSStatus status) {
+ return visitInsertBase(insertRowsStatement, status);
}
@Override
public TSStatus visitInsertMultiTablets(
- final InsertMultiTabletsStatement insertMultiTabletsStatement, final
TSStatus context) {
- return visitInsertBase(insertMultiTabletsStatement, context);
+ final InsertMultiTabletsStatement insertMultiTabletsStatement, final
TSStatus status) {
+ return visitInsertBase(insertMultiTabletsStatement, status);
}
private TSStatus visitInsertBase(
- final InsertBaseStatement insertBaseStatement, final TSStatus context) {
+ final InsertBaseStatement insertBaseStatement, final TSStatus status) {
// If the system is read-only, we shall not classify it into temporary
unavailable exception to
// avoid to many logs
- if (context.getCode() ==
TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode()) {
+ if (status.getCode() == TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode())
{
return new TSStatus(
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
- .setMessage(context.getMessage());
- } else if (context.getCode() == TSStatusCode.OUT_OF_TTL.getStatusCode()) {
+ .setMessage(status.getMessage());
+ } else if (status.getCode() == TSStatusCode.OUT_OF_TTL.getStatusCode()) {
return new
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
- .setMessage(context.getMessage());
- } else if (context.getCode() ==
TSStatusCode.DATABASE_NOT_EXIST.getStatusCode()) {
+ .setMessage(status.getMessage());
+ } else if (status.getCode() ==
TSStatusCode.DATABASE_NOT_EXIST.getStatusCode()) {
return new TSStatus(
TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode())
- .setMessage(context.getMessage());
- } else if (context.getCode() ==
TSStatusCode.METADATA_ERROR.getStatusCode()) {
- if
(context.getMessage().contains(DataTypeMismatchException.REGISTERED_TYPE_STRING)
+ .setMessage(status.getMessage());
+ } else if (status.getCode() ==
TSStatusCode.METADATA_ERROR.getStatusCode()) {
+ if
(status.getMessage().contains(DataTypeMismatchException.REGISTERED_TYPE_STRING)
&& config.isEnablePartialInsert()) {
return new TSStatus(
TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
- .setMessage(context.getMessage());
+ .setMessage(status.getMessage());
}
- if (context.getMessage().contains("does not exist")) {
+ if (status.getMessage().contains("does not exist")) {
return new TSStatus(
TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode())
- .setMessage(context.getMessage());
+ .setMessage(status.getMessage());
}
}
- return visitStatement(insertBaseStatement, context);
+ return visitStatement(insertBaseStatement, status);
}
@Override
public TSStatus visitCreateTimeseries(
- final CreateTimeSeriesStatement statement, final TSStatus context) {
- return visitGeneralCreateTimeSeries(statement, context);
+ final CreateTimeSeriesStatement statement, final TSStatus status) {
+ return visitGeneralCreateTimeSeries(statement, status);
}
@Override
public TSStatus visitCreateAlignedTimeseries(
- final CreateAlignedTimeSeriesStatement statement, final TSStatus
context) {
- return visitGeneralCreateTimeSeries(statement, context);
+ final CreateAlignedTimeSeriesStatement statement, final TSStatus status)
{
+ return visitGeneralCreateTimeSeries(statement, status);
}
- private TSStatus visitGeneralCreateTimeSeries(final Statement statement,
final TSStatus context) {
- if (context.getCode() ==
TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()
- || context.getCode() ==
TSStatusCode.ALIAS_ALREADY_EXIST.getStatusCode()) {
+ private TSStatus visitGeneralCreateTimeSeries(final Statement statement,
final TSStatus status) {
+ if (status.getCode() ==
TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()
+ || status.getCode() ==
TSStatusCode.ALIAS_ALREADY_EXIST.getStatusCode()) {
return new
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
- .setMessage(context.getMessage());
- } else if (context.getCode() ==
TSStatusCode.PATH_ALREADY_EXIST.getStatusCode()
- || context.getCode() ==
TSStatusCode.SCHEMA_QUOTA_EXCEEDED.getStatusCode()) {
+ .setMessage(status.getMessage());
+ } else if (status.getCode() ==
TSStatusCode.PATH_ALREADY_EXIST.getStatusCode()
+ || status.getCode() ==
TSStatusCode.SCHEMA_QUOTA_EXCEEDED.getStatusCode()) {
return new
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
- .setMessage(context.getMessage());
+ .setMessage(status.getMessage());
}
- return visitStatement(statement, context);
+ return visitStatement(statement, status);
}
@Override
public TSStatus visitCreateMultiTimeSeries(
- final CreateMultiTimeSeriesStatement createMultiTimeSeriesStatement,
final TSStatus context) {
- return visitGeneralCreateMultiTimeseries(createMultiTimeSeriesStatement,
context);
+ final CreateMultiTimeSeriesStatement createMultiTimeSeriesStatement,
final TSStatus status) {
+ return visitGeneralCreateMultiTimeSeries(createMultiTimeSeriesStatement,
status);
}
@Override
public TSStatus visitInternalCreateTimeseries(
final InternalCreateTimeSeriesStatement
internalCreateTimeSeriesStatement,
- final TSStatus context) {
- return
visitGeneralCreateMultiTimeseries(internalCreateTimeSeriesStatement, context);
+ final TSStatus status) {
+ return
visitGeneralCreateMultiTimeSeries(internalCreateTimeSeriesStatement, status);
}
@Override
public TSStatus visitInternalCreateMultiTimeSeries(
final InternalCreateMultiTimeSeriesStatement
internalCreateMultiTimeSeriesStatement,
- final TSStatus context) {
- return
visitGeneralCreateMultiTimeseries(internalCreateMultiTimeSeriesStatement,
context);
+ final TSStatus status) {
+ return
visitGeneralCreateMultiTimeSeries(internalCreateMultiTimeSeriesStatement,
status);
}
- private TSStatus visitGeneralCreateMultiTimeseries(
- final Statement statement, final TSStatus context) {
- if (context.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
- for (final TSStatus status : context.getSubStatus()) {
- if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
- && status.getCode() !=
TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()
- && status.getCode() !=
TSStatusCode.ALIAS_ALREADY_EXIST.getStatusCode()) {
- return visitStatement(statement, context);
- }
- }
+ private TSStatus visitGeneralCreateMultiTimeSeries(
+ final Statement statement, final TSStatus status) {
+ if (status.getCode() ==
TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()
+ || status.getCode() ==
TSStatusCode.ALIAS_ALREADY_EXIST.getStatusCode()) {
return new
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
- .setMessage(context.getMessage());
- } else if (context.getCode() ==
TSStatusCode.SCHEMA_QUOTA_EXCEEDED.getStatusCode()) {
+ .setMessage(status.getMessage());
+ } else if (status.getCode() ==
TSStatusCode.SCHEMA_QUOTA_EXCEEDED.getStatusCode()) {
return new
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
- .setMessage(context.getMessage());
+ .setMessage(status.getMessage());
}
- return visitStatement(statement, context);
+ return visitStatement(statement, status);
}
@Override
public TSStatus visitAlterTimeSeries(
- final AlterTimeSeriesStatement alterTimeSeriesStatement, final TSStatus
context) {
- if (context.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()) {
- if (context.getMessage().contains("already")) {
+ final AlterTimeSeriesStatement alterTimeSeriesStatement, final TSStatus
status) {
+ if (status.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()) {
+ if (status.getMessage().contains("already")) {
return new TSStatus(
TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
- .setMessage(context.getMessage());
- } else if (context.getMessage().contains("does not")) {
+ .setMessage(status.getMessage());
+ } else if (status.getMessage().contains("does not")) {
return new
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
- .setMessage(context.getMessage());
+ .setMessage(status.getMessage());
}
- } else if (context.getCode() ==
TSStatusCode.PATH_NOT_EXIST.getStatusCode()) {
+ } else if (status.getCode() ==
TSStatusCode.PATH_NOT_EXIST.getStatusCode()) {
return new
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
- .setMessage(context.getMessage());
+ .setMessage(status.getMessage());
}
- return visitStatement(alterTimeSeriesStatement, context);
+ return visitStatement(alterTimeSeriesStatement, status);
}
@Override
public TSStatus visitCreateLogicalView(
- final CreateLogicalViewStatement createLogicalViewStatement, final
TSStatus context) {
- if (context.getCode() ==
TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
- return new
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
- .setMessage(context.getMessage());
- } else if (context.getCode() ==
TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
- for (final TSStatus status : context.getSubStatus()) {
- if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
- && status.getCode() !=
TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
- return visitStatement(createLogicalViewStatement, context);
- }
- }
+ final CreateLogicalViewStatement createLogicalViewStatement, final
TSStatus status) {
+ if (status.getCode() ==
TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
return new
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
- .setMessage(context.getMessage());
+ .setMessage(status.getMessage());
}
- return super.visitCreateLogicalView(createLogicalViewStatement, context);
+ return super.visitCreateLogicalView(createLogicalViewStatement, status);
}
@Override
public TSStatus visitActivateTemplate(
- final ActivateTemplateStatement activateTemplateStatement, final
TSStatus context) {
- return visitGeneralActivateTemplate(activateTemplateStatement, context);
+ final ActivateTemplateStatement activateTemplateStatement, final
TSStatus status) {
+ return visitGeneralActivateTemplate(activateTemplateStatement, status);
}
@Override
public TSStatus visitBatchActivateTemplate(
- final BatchActivateTemplateStatement batchActivateTemplateStatement,
final TSStatus context) {
- boolean userConflict = false;
- if (context.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
- for (final TSStatus status : context.getSubStatus()) {
- if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
- && status.getCode() !=
TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) {
- return visitStatement(batchActivateTemplateStatement, context);
- }
- if (context.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()
- && context.isSetMessage()
- && context.getMessage().contains("has not been set any template"))
{
- userConflict = true;
- }
- }
- return (userConflict
- ? new TSStatus(
-
TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode())
- : new TSStatus(
-
TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()))
- .setMessage(context.getMessage());
+ final BatchActivateTemplateStatement batchActivateTemplateStatement,
final TSStatus status) {
+ if (status.getCode() == TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) {
+ return new
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
+ .setMessage(status.getMessage());
+ }
+ if (status.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()
+ && status.isSetMessage()
+ && status.getMessage().contains("has not been set any template")) {
+ return new TSStatus(
+
TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode())
+ .setMessage(status.getMessage());
}
- return visitGeneralActivateTemplate(batchActivateTemplateStatement,
context);
+ return visitGeneralActivateTemplate(batchActivateTemplateStatement,
status);
}
private TSStatus visitGeneralActivateTemplate(
- final Statement activateTemplateStatement, final TSStatus context) {
- if (context.getCode() == TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) {
+ final Statement activateTemplateStatement, final TSStatus status) {
+ if (status.getCode() == TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) {
return new
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
- .setMessage(context.getMessage());
+ .setMessage(status.getMessage());
}
- if (context.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()
- && context.isSetMessage()
- && context.getMessage().contains("has not been set any template")) {
+ if (status.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()
+ && status.isSetMessage()
+ && status.getMessage().contains("has not been set any template")) {
return new TSStatus(
TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode())
- .setMessage(context.getMessage());
+ .setMessage(status.getMessage());
}
- return visitStatement(activateTemplateStatement, context);
+ return visitStatement(activateTemplateStatement, status);
}
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java
new file mode 100644
index 00000000000..2b20f1d91ef
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.receiver;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.utils.StatusUtils;
+import org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiver;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.BatchActivateTemplateStatement;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+public class PipeStatementTsStatusVisitorTest {
+
+ @Test
+ public void testActivateTemplate() {
+ Assert.assertEquals(
+
TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode(),
+ IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR
+ .process(
+ new BatchActivateTemplateStatement(Collections.emptyList()),
+ new TSStatus(TSStatusCode.MULTIPLE_ERROR.getStatusCode())
+ .setSubStatus(
+ Arrays.asList(
+ StatusUtils.OK,
+ new
TSStatus(TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()))))
+ .getCode());
+ }
+
+ @Test
+ public void testTTLIdempotency() {
+ Assert.assertEquals(
+
TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode(),
+ IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR
+ .process(
+ new InsertRowsStatement(),
+ new TSStatus(TSStatusCode.MULTIPLE_ERROR.getStatusCode())
+ .setSubStatus(
+ Arrays.asList(
+ StatusUtils.OK, new
TSStatus(TSStatusCode.OUT_OF_TTL.getStatusCode()))))
+ .getCode());
+ }
+}