This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch convert-on-type-mismatch
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 100979e08d52eb097a2c1d70957b6dfa536bfb3b
Author: Steve Yurong Su <[email protected]>
AuthorDate: Tue Aug 6 14:30:27 2024 +0800

    visitor
---
 .../protocol/thrift/IoTDBDataNodeReceiver.java     |  46 ++++---
 ...peStatementDataTypeConvertExecutionVisitor.java | 139 +++++++++++++++++++++
 .../commons/pipe/receiver/IoTDBFileReceiver.java   |   2 +-
 3 files changed, 170 insertions(+), 17 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 12916ba4506..62bcabd1a56 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -51,6 +51,7 @@ import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransfer
 import 
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionSnapshotEvent;
 import org.apache.iotdb.db.pipe.metric.PipeDataNodeReceiverMetrics;
 import org.apache.iotdb.db.pipe.receiver.visitor.PipePlanToStatementVisitor;
+import 
org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementDataTypeConvertExecutionVisitor;
 import org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementExceptionVisitor;
 import 
org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementPatternParseVisitor;
 import org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementTSStatusVisitor;
@@ -61,7 +62,6 @@ import 
org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
 import org.apache.iotdb.db.queryengine.plan.Coordinator;
 import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
 import 
org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher;
-import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
 import 
org.apache.iotdb.db.queryengine.plan.execution.config.executor.ClusterConfigTaskExecutor;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.AlterLogicalViewNode;
 import org.apache.iotdb.db.queryengine.plan.statement.Statement;
@@ -117,6 +117,9 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
       new PipeStatementExceptionVisitor();
   private static final PipeStatementPatternParseVisitor 
STATEMENT_PATTERN_PARSE_VISITOR =
       new PipeStatementPatternParseVisitor();
+  private static final PipeStatementDataTypeConvertExecutionVisitor
+      STATEMENT_DATA_TYPE_CONVERT_EXECUTION_VISITOR =
+          new 
PipeStatementDataTypeConvertExecutionVisitor(IoTDBDataNodeReceiver::executeStatement);
   private final PipeStatementToBatchVisitor batchVisitor = new 
PipeStatementToBatchVisitor();
 
   // Used for data transfer: confignode (cluster A) -> datanode (cluster B) -> 
confignode (cluster
@@ -461,7 +464,7 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
 
   private TSStatus executeStatementAndClassifyExceptions(final Statement 
statement) {
     try {
-      final TSStatus result = executeStatement(statement);
+      final TSStatus result = 
executeStatementWithRetryOnDataTypeMismatch(statement);
       if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
           || result.getCode() == 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
         return result;
@@ -483,25 +486,36 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
     }
   }
 
-  private TSStatus executeStatement(Statement statement) {
+  private TSStatus executeStatementWithRetryOnDataTypeMismatch(final Statement 
statement)
+      throws Exception {
     if (statement == null) {
       return RpcUtils.getStatus(
           TSStatusCode.PIPE_TRANSFER_EXECUTE_STATEMENT_ERROR, "Execute null 
statement.");
     }
 
-    statement = new PipeEnrichedStatement(statement);
-
-    final ExecutionResult result =
-        Coordinator.getInstance()
-            .executeForTreeModel(
-                statement,
-                SessionManager.getInstance().requestQueryId(),
-                new SessionInfo(0, AuthorityChecker.SUPER_USER, 
ZoneId.systemDefault()),
-                "",
-                ClusterPartitionFetcher.getInstance(),
-                ClusterSchemaFetcher.getInstance(),
-                
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold());
-    return result.status;
+    try {
+      return executeStatement(statement);
+    } catch (final Exception e) {
+      if (!shouldConvertDataTypeOnTypeMismatch) {
+        throw e;
+      }
+      return statement
+          .accept(STATEMENT_DATA_TYPE_CONVERT_EXECUTION_VISITOR, e)
+          .orElseThrow(() -> e);
+    }
+  }
+
+  private static TSStatus executeStatement(final Statement statement) {
+    return Coordinator.getInstance()
+        .executeForTreeModel(
+            new PipeEnrichedStatement(statement),
+            SessionManager.getInstance().requestQueryId(),
+            new SessionInfo(0, AuthorityChecker.SUPER_USER, 
ZoneId.systemDefault()),
+            "",
+            ClusterPartitionFetcher.getInstance(),
+            ClusterSchemaFetcher.getInstance(),
+            
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold())
+        .status;
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
new file mode 100644
index 00000000000..67600ff7b5f
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.receiver.visitor;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.db.queryengine.plan.statement.Statement;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementNode;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
+
+import java.util.Optional;
+import java.util.function.Consumer;
+
+/**
+ * This visitor transforms the data type of the statement when the statement 
is executed and an
+ * exception occurs. The transformed statement (if any) is returned and will 
be executed again.
+ */
+public class PipeStatementDataTypeConvertExecutionVisitor
+    extends StatementVisitor<Optional<TSStatus>, Exception> {
+
+  private final Consumer<Statement> statementExecutor;
+
+  public PipeStatementDataTypeConvertExecutionVisitor(final 
Consumer<Statement> statementExecutor) {
+    this.statementExecutor = statementExecutor;
+  }
+
+  @Override
+  public Optional<TSStatus> visitNode(
+      final StatementNode statementNode, final Exception exception) {
+    return Optional.empty();
+  }
+
+  //
+  //  @Override
+  //  public TSStatus visitLoadFile(
+  //      final LoadTsFileStatement loadTsFileStatement, final Exception 
context) {
+  //    if (context instanceof LoadRuntimeOutOfMemoryException) {
+  //      return new TSStatus(
+  //              
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
+  //          .setMessage(context.getMessage());
+  //    } else if (context instanceof SemanticException) {
+  //      return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
+  //          .setMessage(context.getMessage());
+  //    }
+  //    return visitStatement(loadTsFileStatement, context);
+  //  }
+  //
+  //  @Override
+  //  public TSStatus visitCreateTimeseries(
+  //      final CreateTimeSeriesStatement statement, final Exception context) {
+  //    return visitGeneralCreateTimeSeries(statement, context);
+  //  }
+  //
+  //  @Override
+  //  public TSStatus visitCreateAlignedTimeseries(
+  //      final CreateAlignedTimeSeriesStatement statement, final Exception 
context) {
+  //    return visitGeneralCreateTimeSeries(statement, context);
+  //  }
+  //
+  //  @Override
+  //  public TSStatus visitCreateMultiTimeseries(
+  //      final CreateMultiTimeSeriesStatement statement, final Exception 
context) {
+  //    return visitGeneralCreateTimeSeries(statement, context);
+  //  }
+  //
+  //  @Override
+  //  public TSStatus visitInternalCreateTimeseries(
+  //      final InternalCreateTimeSeriesStatement statement, final Exception 
context) {
+  //    return visitGeneralCreateTimeSeries(statement, context);
+  //  }
+  //
+  //  @Override
+  //  public TSStatus visitInternalCreateMultiTimeSeries(
+  //      final InternalCreateMultiTimeSeriesStatement statement, final 
Exception context) {
+  //    return visitGeneralCreateTimeSeries(statement, context);
+  //  }
+  //
+  //  private TSStatus visitGeneralCreateTimeSeries(
+  //      final Statement statement, final Exception context) {
+  //    if (context instanceof SemanticException) {
+  //      return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
+  //          .setMessage(context.getMessage());
+  //    } else if (isAutoCreateConflict(context)) {
+  //      return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
+  //          .setMessage(context.getCause().getMessage());
+  //    }
+  //    return visitStatement(statement, context);
+  //  }
+  //
+  //  @Override
+  //  public TSStatus visitActivateTemplate(
+  //      final ActivateTemplateStatement activateTemplateStatement, final 
Exception context) {
+  //    return visitGeneralActivateTemplate(activateTemplateStatement, 
context);
+  //  }
+  //
+  //  @Override
+  //  public TSStatus visitBatchActivateTemplate(
+  //      final BatchActivateTemplateStatement batchActivateTemplateStatement,
+  //      final Exception context) {
+  //    return visitGeneralActivateTemplate(batchActivateTemplateStatement, 
context);
+  //  }
+  //
+  //  // InternalBatchActivateTemplateNode is converted to 
BatchActivateTemplateStatement
+  //  // No need to handle InternalBatchActivateTemplateStatement
+  //
+  //  private TSStatus visitGeneralActivateTemplate(
+  //      final Statement activateTemplateStatement, final Exception context) {
+  //    if (context instanceof MetadataException || context instanceof 
StatementAnalyzeException) {
+  //      return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
+  //          .setMessage(context.getMessage());
+  //    } else if (isAutoCreateConflict(context)) {
+  //      return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
+  //          .setMessage(context.getCause().getMessage());
+  //    }
+  //    return visitStatement(activateTemplateStatement, context);
+  //  }
+  //
+  //  private boolean isAutoCreateConflict(final Exception e) {
+  //    return e instanceof RuntimeException
+  //        && e.getCause() instanceof IoTDBException
+  //        && e.getCause().getMessage().contains("already been created as 
database");
+  //  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
index 6665ee21d36..7197e112526 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
@@ -68,7 +68,7 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
   private File writingFile;
   private RandomAccessFile writingFileWriter;
 
-  private boolean shouldConvertDataTypeOnTypeMismatch =
+  protected boolean shouldConvertDataTypeOnTypeMismatch =
       CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE;
 
   @Override

Reply via email to