This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new aa2687c1fe7 Pipe: Fix BatchActivateTemplateStatement is not handled
correctly when some of the timeseries already exists (#12587)
aa2687c1fe7 is described below
commit aa2687c1fe70ed25127542c04d3bdf15aea1badc
Author: Caideyipi <[email protected]>
AuthorDate: Sat May 25 02:39:53 2024 +0800
Pipe: Fix BatchActivateTemplateStatement is not handled correctly when some
of the timeseries already exists (#12587)
---
.../visitor/PipeStatementExceptionVisitor.java | 39 ++++++++++++++++------
.../visitor/PipeStatementTSStatusVisitor.java | 10 ++++++
2 files changed, 38 insertions(+), 11 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java
index 7391fa32fcc..7cbe5bed416 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.receiver.visitor;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.db.exception.LoadRuntimeOutOfMemoryException;
import org.apache.iotdb.db.exception.sql.SemanticException;
@@ -44,13 +45,14 @@ import org.apache.iotdb.rpc.TSStatusCode;
*/
public class PipeStatementExceptionVisitor extends StatementVisitor<TSStatus,
Exception> {
@Override
- public TSStatus visitNode(StatementNode node, Exception context) {
+ public TSStatus visitNode(final StatementNode node, final Exception context)
{
return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode())
.setMessage(context.getMessage());
}
@Override
- public TSStatus visitLoadFile(LoadTsFileStatement loadTsFileStatement,
Exception context) {
+ public TSStatus visitLoadFile(
+ final LoadTsFileStatement loadTsFileStatement, final Exception context) {
if (context instanceof LoadRuntimeOutOfMemoryException) {
return new TSStatus(
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
@@ -63,51 +65,57 @@ public class PipeStatementExceptionVisitor extends
StatementVisitor<TSStatus, Ex
}
@Override
- public TSStatus visitCreateTimeseries(CreateTimeSeriesStatement statement,
Exception context) {
+ public TSStatus visitCreateTimeseries(
+ final CreateTimeSeriesStatement statement, final Exception context) {
return visitGeneralCreateTimeSeries(statement, context);
}
@Override
public TSStatus visitCreateAlignedTimeseries(
- CreateAlignedTimeSeriesStatement statement, Exception context) {
+ final CreateAlignedTimeSeriesStatement statement, final Exception
context) {
return visitGeneralCreateTimeSeries(statement, context);
}
@Override
public TSStatus visitCreateMultiTimeseries(
- CreateMultiTimeSeriesStatement statement, Exception context) {
+ final CreateMultiTimeSeriesStatement statement, final Exception context)
{
return visitGeneralCreateTimeSeries(statement, context);
}
@Override
public TSStatus visitInternalCreateTimeseries(
- InternalCreateTimeSeriesStatement statement, Exception context) {
+ final InternalCreateTimeSeriesStatement statement, final Exception
context) {
return visitGeneralCreateTimeSeries(statement, context);
}
@Override
public TSStatus visitInternalCreateMultiTimeSeries(
- InternalCreateMultiTimeSeriesStatement statement, Exception context) {
+ final InternalCreateMultiTimeSeriesStatement statement, final Exception
context) {
return visitGeneralCreateTimeSeries(statement, context);
}
- private TSStatus visitGeneralCreateTimeSeries(Statement statement, Exception
context) {
+ private TSStatus visitGeneralCreateTimeSeries(
+ final Statement statement, final Exception context) {
if (context instanceof SemanticException) {
return new
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
+ } else if (isAutoCreateConflict(context)) {
+ return new
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
+ .setMessage(context.getCause().getMessage());
}
return visitStatement(statement, context);
}
@Override
public TSStatus visitActivateTemplate(
- ActivateTemplateStatement activateTemplateStatement, Exception context) {
+ final ActivateTemplateStatement activateTemplateStatement, final
Exception context) {
return visitGeneralActivateTemplate(activateTemplateStatement, context);
}
@Override
public TSStatus visitBatchActivateTemplate(
- BatchActivateTemplateStatement batchActivateTemplateStatement, Exception
context) {
+ final BatchActivateTemplateStatement batchActivateTemplateStatement,
+ final Exception context) {
return visitGeneralActivateTemplate(batchActivateTemplateStatement,
context);
}
@@ -115,11 +123,20 @@ public class PipeStatementExceptionVisitor extends
StatementVisitor<TSStatus, Ex
// No need to handle InternalBatchActivateTemplateStatement
private TSStatus visitGeneralActivateTemplate(
- Statement activateTemplateStatement, Exception context) {
+ final Statement activateTemplateStatement, final Exception context) {
if (context instanceof MetadataException || context instanceof
StatementAnalyzeException) {
return new
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
+ } else if (isAutoCreateConflict(context)) {
+ return new
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
+ .setMessage(context.getCause().getMessage());
}
return visitStatement(activateTemplateStatement, context);
}
+
+ private boolean isAutoCreateConflict(final Exception e) {
+ return e instanceof RuntimeException
+ && e.getCause() instanceof IoTDBException
+ && e.getCause().getMessage().contains("already been created as
database");
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
index 6cfd503cb38..be4ee14ff02 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
@@ -214,6 +214,16 @@ public class PipeStatementTSStatusVisitor extends
StatementVisitor<TSStatus, TSS
@Override
public TSStatus visitBatchActivateTemplate(
final BatchActivateTemplateStatement batchActivateTemplateStatement,
final TSStatus context) {
+ if (context.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+ for (final TSStatus status : context.getSubStatus()) {
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ && status.getCode() !=
TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) {
+ return visitStatement(batchActivateTemplateStatement, context);
+ }
+ }
+ return new
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
+ .setMessage(context.getMessage());
+ }
return visitGeneralActivateTemplate(batchActivateTemplateStatement,
context);
}