This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch convert-on-type-mismatch in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 100979e08d52eb097a2c1d70957b6dfa536bfb3b Author: Steve Yurong Su <[email protected]> AuthorDate: Tue Aug 6 14:30:27 2024 +0800 visitor --- .../protocol/thrift/IoTDBDataNodeReceiver.java | 46 ++++--- ...peStatementDataTypeConvertExecutionVisitor.java | 139 +++++++++++++++++++++ .../commons/pipe/receiver/IoTDBFileReceiver.java | 2 +- 3 files changed, 170 insertions(+), 17 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java index 12916ba4506..62bcabd1a56 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java @@ -51,6 +51,7 @@ import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransfer import org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionSnapshotEvent; import org.apache.iotdb.db.pipe.metric.PipeDataNodeReceiverMetrics; import org.apache.iotdb.db.pipe.receiver.visitor.PipePlanToStatementVisitor; +import org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementDataTypeConvertExecutionVisitor; import org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementExceptionVisitor; import org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementPatternParseVisitor; import org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementTSStatusVisitor; @@ -61,7 +62,6 @@ import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant; import org.apache.iotdb.db.queryengine.plan.Coordinator; import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher; import org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher; -import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult; import org.apache.iotdb.db.queryengine.plan.execution.config.executor.ClusterConfigTaskExecutor; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.AlterLogicalViewNode; import org.apache.iotdb.db.queryengine.plan.statement.Statement; @@ -117,6 +117,9 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver { new PipeStatementExceptionVisitor(); private static final PipeStatementPatternParseVisitor STATEMENT_PATTERN_PARSE_VISITOR = new PipeStatementPatternParseVisitor(); + private static final PipeStatementDataTypeConvertExecutionVisitor + STATEMENT_DATA_TYPE_CONVERT_EXECUTION_VISITOR = + new PipeStatementDataTypeConvertExecutionVisitor(IoTDBDataNodeReceiver::executeStatement); private final PipeStatementToBatchVisitor batchVisitor = new PipeStatementToBatchVisitor(); // Used for data transfer: confignode (cluster A) -> datanode (cluster B) -> confignode (cluster @@ -461,7 +464,7 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver { private TSStatus executeStatementAndClassifyExceptions(final Statement statement) { try { - final TSStatus result = executeStatement(statement); + final TSStatus result = executeStatementWithRetryOnDataTypeMismatch(statement); if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() || result.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { return result; @@ -483,25 +486,36 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver { } } - private TSStatus executeStatement(Statement statement) { + private TSStatus executeStatementWithRetryOnDataTypeMismatch(final Statement statement) + throws Exception { if (statement == null) { return RpcUtils.getStatus( TSStatusCode.PIPE_TRANSFER_EXECUTE_STATEMENT_ERROR, "Execute null statement."); } - statement = new PipeEnrichedStatement(statement); - - final ExecutionResult result = - Coordinator.getInstance() - .executeForTreeModel( - statement, - SessionManager.getInstance().requestQueryId(), - new SessionInfo(0, AuthorityChecker.SUPER_USER, ZoneId.systemDefault()), - "", - ClusterPartitionFetcher.getInstance(), - ClusterSchemaFetcher.getInstance(), - IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold()); - return result.status; + try { + return executeStatement(statement); + } catch (final Exception e) { + if (!shouldConvertDataTypeOnTypeMismatch) { + throw e; + } + return statement + .accept(STATEMENT_DATA_TYPE_CONVERT_EXECUTION_VISITOR, e) + .orElseThrow(() -> e); + } + } + + private static TSStatus executeStatement(final Statement statement) { + return Coordinator.getInstance() + .executeForTreeModel( + new PipeEnrichedStatement(statement), + SessionManager.getInstance().requestQueryId(), + new SessionInfo(0, AuthorityChecker.SUPER_USER, ZoneId.systemDefault()), + "", + ClusterPartitionFetcher.getInstance(), + ClusterSchemaFetcher.getInstance(), + IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold()) + .status; } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java new file mode 100644 index 00000000000..67600ff7b5f --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.receiver.visitor; + +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.db.queryengine.plan.statement.Statement; +import org.apache.iotdb.db.queryengine.plan.statement.StatementNode; +import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor; + +import java.util.Optional; +import java.util.function.Consumer; + +/** + * This visitor transforms the data type of the statement when the statement is executed and an + * exception occurs. The transformed statement (if any) is returned and will be executed again. + */ +public class PipeStatementDataTypeConvertExecutionVisitor + extends StatementVisitor<Optional<TSStatus>, Exception> { + + private final Consumer<Statement> statementExecutor; + + public PipeStatementDataTypeConvertExecutionVisitor(final Consumer<Statement> statementExecutor) { + this.statementExecutor = statementExecutor; + } + + @Override + public Optional<TSStatus> visitNode( + final StatementNode statementNode, final Exception exception) { + return Optional.empty(); + } + + // + // @Override + // public TSStatus visitLoadFile( + // final LoadTsFileStatement loadTsFileStatement, final Exception context) { + // if (context instanceof LoadRuntimeOutOfMemoryException) { + // return new TSStatus( + // TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode()) + // .setMessage(context.getMessage()); + // } else if (context instanceof SemanticException) { + // return new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode()) + // .setMessage(context.getMessage()); + // } + // return visitStatement(loadTsFileStatement, context); + // } + // + // @Override + // public TSStatus visitCreateTimeseries( + // final CreateTimeSeriesStatement statement, final Exception context) { + // return visitGeneralCreateTimeSeries(statement, context); + // } + // + // @Override + // public TSStatus visitCreateAlignedTimeseries( + // final CreateAlignedTimeSeriesStatement statement, final Exception context) { + // return visitGeneralCreateTimeSeries(statement, context); + // } + // + // @Override + // public TSStatus visitCreateMultiTimeseries( + // final CreateMultiTimeSeriesStatement statement, final Exception context) { + // return visitGeneralCreateTimeSeries(statement, context); + // } + // + // @Override + // public TSStatus visitInternalCreateTimeseries( + // final InternalCreateTimeSeriesStatement statement, final Exception context) { + // return visitGeneralCreateTimeSeries(statement, context); + // } + // + // @Override + // public TSStatus visitInternalCreateMultiTimeSeries( + // final InternalCreateMultiTimeSeriesStatement statement, final Exception context) { + // return visitGeneralCreateTimeSeries(statement, context); + // } + // + // private TSStatus visitGeneralCreateTimeSeries( + // final Statement statement, final Exception context) { + // if (context instanceof SemanticException) { + // return new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode()) + // .setMessage(context.getMessage()); + // } else if (isAutoCreateConflict(context)) { + // return new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode()) + // .setMessage(context.getCause().getMessage()); + // } + // return visitStatement(statement, context); + // } + // + // @Override + // public TSStatus visitActivateTemplate( + // final ActivateTemplateStatement activateTemplateStatement, final Exception context) { + // return visitGeneralActivateTemplate(activateTemplateStatement, context); + // } + // + // @Override + // public TSStatus visitBatchActivateTemplate( + // final BatchActivateTemplateStatement batchActivateTemplateStatement, + // final Exception context) { + // return visitGeneralActivateTemplate(batchActivateTemplateStatement, context); + // } + // + // // InternalBatchActivateTemplateNode is converted to BatchActivateTemplateStatement + // // No need to handle InternalBatchActivateTemplateStatement + // + // private TSStatus visitGeneralActivateTemplate( + // final Statement activateTemplateStatement, final Exception context) { + // if (context instanceof MetadataException || context instanceof StatementAnalyzeException) { + // return new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode()) + // .setMessage(context.getMessage()); + // } else if (isAutoCreateConflict(context)) { + // return new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode()) + // .setMessage(context.getCause().getMessage()); + // } + // return visitStatement(activateTemplateStatement, context); + // } + // + // private boolean isAutoCreateConflict(final Exception e) { + // return e instanceof RuntimeException + // && e.getCause() instanceof IoTDBException + // && e.getCause().getMessage().contains("already been created as database"); + // } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java index 6665ee21d36..7197e112526 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java @@ -68,7 +68,7 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { private File writingFile; private RandomAccessFile writingFileWriter; - private boolean shouldConvertDataTypeOnTypeMismatch = + protected boolean shouldConvertDataTypeOnTypeMismatch = CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE; @Override
