This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new aa2687c1fe7 Pipe: Fix BatchActivateTemplateStatement is not handled 
correctly when some of the timeseries already exists (#12587)
aa2687c1fe7 is described below

commit aa2687c1fe70ed25127542c04d3bdf15aea1badc
Author: Caideyipi <[email protected]>
AuthorDate: Sat May 25 02:39:53 2024 +0800

    Pipe: Fix BatchActivateTemplateStatement is not handled correctly when some 
of the timeseries already exists (#12587)
---
 .../visitor/PipeStatementExceptionVisitor.java     | 39 ++++++++++++++++------
 .../visitor/PipeStatementTSStatusVisitor.java      | 10 ++++++
 2 files changed, 38 insertions(+), 11 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java
index 7391fa32fcc..7cbe5bed416 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementExceptionVisitor.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.pipe.receiver.visitor;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.db.exception.LoadRuntimeOutOfMemoryException;
 import org.apache.iotdb.db.exception.sql.SemanticException;
@@ -44,13 +45,14 @@ import org.apache.iotdb.rpc.TSStatusCode;
  */
 public class PipeStatementExceptionVisitor extends StatementVisitor<TSStatus, 
Exception> {
   @Override
-  public TSStatus visitNode(StatementNode node, Exception context) {
+  public TSStatus visitNode(final StatementNode node, final Exception context) 
{
     return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode())
         .setMessage(context.getMessage());
   }
 
   @Override
-  public TSStatus visitLoadFile(LoadTsFileStatement loadTsFileStatement, 
Exception context) {
+  public TSStatus visitLoadFile(
+      final LoadTsFileStatement loadTsFileStatement, final Exception context) {
     if (context instanceof LoadRuntimeOutOfMemoryException) {
       return new TSStatus(
               
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
@@ -63,51 +65,57 @@ public class PipeStatementExceptionVisitor extends 
StatementVisitor<TSStatus, Ex
   }
 
   @Override
-  public TSStatus visitCreateTimeseries(CreateTimeSeriesStatement statement, 
Exception context) {
+  public TSStatus visitCreateTimeseries(
+      final CreateTimeSeriesStatement statement, final Exception context) {
     return visitGeneralCreateTimeSeries(statement, context);
   }
 
   @Override
   public TSStatus visitCreateAlignedTimeseries(
-      CreateAlignedTimeSeriesStatement statement, Exception context) {
+      final CreateAlignedTimeSeriesStatement statement, final Exception 
context) {
     return visitGeneralCreateTimeSeries(statement, context);
   }
 
   @Override
   public TSStatus visitCreateMultiTimeseries(
-      CreateMultiTimeSeriesStatement statement, Exception context) {
+      final CreateMultiTimeSeriesStatement statement, final Exception context) 
{
     return visitGeneralCreateTimeSeries(statement, context);
   }
 
   @Override
   public TSStatus visitInternalCreateTimeseries(
-      InternalCreateTimeSeriesStatement statement, Exception context) {
+      final InternalCreateTimeSeriesStatement statement, final Exception 
context) {
     return visitGeneralCreateTimeSeries(statement, context);
   }
 
   @Override
   public TSStatus visitInternalCreateMultiTimeSeries(
-      InternalCreateMultiTimeSeriesStatement statement, Exception context) {
+      final InternalCreateMultiTimeSeriesStatement statement, final Exception 
context) {
     return visitGeneralCreateTimeSeries(statement, context);
   }
 
-  private TSStatus visitGeneralCreateTimeSeries(Statement statement, Exception 
context) {
+  private TSStatus visitGeneralCreateTimeSeries(
+      final Statement statement, final Exception context) {
     if (context instanceof SemanticException) {
       return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
           .setMessage(context.getMessage());
+    } else if (isAutoCreateConflict(context)) {
+      return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
+          .setMessage(context.getCause().getMessage());
     }
     return visitStatement(statement, context);
   }
 
   @Override
   public TSStatus visitActivateTemplate(
-      ActivateTemplateStatement activateTemplateStatement, Exception context) {
+      final ActivateTemplateStatement activateTemplateStatement, final 
Exception context) {
     return visitGeneralActivateTemplate(activateTemplateStatement, context);
   }
 
   @Override
   public TSStatus visitBatchActivateTemplate(
-      BatchActivateTemplateStatement batchActivateTemplateStatement, Exception 
context) {
+      final BatchActivateTemplateStatement batchActivateTemplateStatement,
+      final Exception context) {
     return visitGeneralActivateTemplate(batchActivateTemplateStatement, 
context);
   }
 
@@ -115,11 +123,20 @@ public class PipeStatementExceptionVisitor extends 
StatementVisitor<TSStatus, Ex
   // No need to handle InternalBatchActivateTemplateStatement
 
   private TSStatus visitGeneralActivateTemplate(
-      Statement activateTemplateStatement, Exception context) {
+      final Statement activateTemplateStatement, final Exception context) {
     if (context instanceof MetadataException || context instanceof 
StatementAnalyzeException) {
       return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
           .setMessage(context.getMessage());
+    } else if (isAutoCreateConflict(context)) {
+      return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
+          .setMessage(context.getCause().getMessage());
     }
     return visitStatement(activateTemplateStatement, context);
   }
+
+  private boolean isAutoCreateConflict(final Exception e) {
+    return e instanceof RuntimeException
+        && e.getCause() instanceof IoTDBException
+        && e.getCause().getMessage().contains("already been created as 
database");
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
index 6cfd503cb38..be4ee14ff02 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java
@@ -214,6 +214,16 @@ public class PipeStatementTSStatusVisitor extends 
StatementVisitor<TSStatus, TSS
   @Override
   public TSStatus visitBatchActivateTemplate(
       final BatchActivateTemplateStatement batchActivateTemplateStatement, 
final TSStatus context) {
+    if (context.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+      for (final TSStatus status : context.getSubStatus()) {
+        if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+            && status.getCode() != 
TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) {
+          return visitStatement(batchActivateTemplateStatement, context);
+        }
+      }
+      return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
+          .setMessage(context.getMessage());
+    }
     return visitGeneralActivateTemplate(batchActivateTemplateStatement, 
context);
   }
 

Reply via email to