This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new dd4d2be1723 Load: Support auto data type conversion when data type 
mismatch detected during analysis stage (#14529)
dd4d2be1723 is described below

commit dd4d2be1723f83dc362849e88bfb02aff1b35613
Author: Itami Sho <[email protected]>
AuthorDate: Tue Dec 31 09:22:49 2024 +0800

    Load: Support auto data type conversion when data type mismatch detected 
during analysis stage (#14529)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java  |  73 ++++++++++-
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |   3 +
 .../db/exception/VerifyMetadataException.java      |  12 +-
 ...va => VerifyMetadataTypeMismatchException.java} |  13 +-
 .../protocol/legacy/loader/TsFileLoader.java       |   1 +
 .../protocol/thrift/IoTDBDataNodeReceiver.java     |   1 +
 .../plan/analyze/load/LoadTsFileAnalyzer.java      |  76 ++++++++---
 .../analyze/load/LoadTsFileTableSchemaCache.java   |   3 +-
 .../load/LoadTsFileToTreeModelAnalyzer.java        |   7 +-
 .../load/TreeSchemaAutoCreatorAndVerifier.java     |  32 +++--
 .../plan/relational/sql/ast/LoadTsFile.java        |  16 ++-
 .../plan/statement/crud/LoadTsFileStatement.java   |  70 ++++++----
 .../load/active/ActiveLoadTsFileLoader.java        |   1 +
 .../load/config/LoadTsFileConfigurator.java        |  29 ++++-
 .../LoadConvertedInsertTabletStatement.java        |  52 ++++++++
 ...ertedInsertTabletStatementExceptionVisitor.java |  51 ++++++++
 ...vertedInsertTabletStatementTSStatusVisitor.java |  65 ++++++++++
 ...leStatementDataTypeConvertExecutionVisitor.java | 143 +++++++++++++++++++++
 ...eeStatementDataTypeConvertExecutionVisitor.java | 130 +++++++++++++++++++
 .../converter/LoadTsFileDataTypeConverter.java     | 107 +++++++++++++++
 20 files changed, 805 insertions(+), 80 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java 
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java
index 0b9d310c767..cf4d22c6aca 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.jdbc.IoTDBSQLException;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.tsfile.read.common.Path;
+import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.write.schema.IMeasurementSchema;
 import org.apache.tsfile.write.schema.MeasurementSchema;
 import org.junit.After;
@@ -48,6 +49,7 @@ import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -56,6 +58,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import static org.apache.iotdb.db.it.utils.TestUtils.assertNonQueryTestFail;
 import static org.apache.iotdb.db.it.utils.TestUtils.createUser;
@@ -328,7 +331,7 @@ public class IoTDBLoadTsFileIT {
   }
 
   @Test
-  public void testLoadWithAutoRegister() throws Exception {
+  public void testLoadWithAutoCreate() throws Exception {
     final long writtenPoint1;
     // device 0, device 1, sg 0
     try (final TsFileGenerator generator =
@@ -898,6 +901,74 @@ public class IoTDBLoadTsFileIT {
     }
   }
 
+  @Test
+  public void testLoadWithConvertOnTypeMismatch() throws Exception {
+
+    List<Pair<MeasurementSchema, MeasurementSchema>> measurementSchemas =
+        generateMeasurementSchemasForDataTypeConvertion();
+
+    final File file = new File(tmpDir, "1-0-0-0.tsfile");
+
+    long writtenPoint = 0;
+    List<MeasurementSchema> schemaList1 =
+        measurementSchemas.stream().map(pair -> 
pair.left).collect(Collectors.toList());
+    List<IMeasurementSchema> schemaList2 =
+        measurementSchemas.stream().map(pair -> 
pair.right).collect(Collectors.toList());
+
+    try (final TsFileGenerator generator = new TsFileGenerator(file)) {
+      generator.registerTimeseries(SchemaConfig.DEVICE_0, schemaList2);
+
+      generator.generateData(SchemaConfig.DEVICE_0, 100, PARTITION_INTERVAL / 
10_000, false);
+
+      writtenPoint = generator.getTotalNumber();
+    }
+
+    try (final Connection connection = EnvFactory.getEnv().getConnection();
+        final Statement statement = connection.createStatement()) {
+
+      for (MeasurementSchema schema : schemaList1) {
+        statement.execute(convert2SQL(SchemaConfig.DEVICE_0, schema));
+      }
+
+      statement.execute(String.format("load \"%s\" ", file.getAbsolutePath()));
+
+      try (final ResultSet resultSet =
+          statement.executeQuery("select count(*) from root.** group by 
level=1,2")) {
+        if (resultSet.next()) {
+          final long sgCount = resultSet.getLong("count(root.sg.test_0.*.*)");
+          Assert.assertEquals(writtenPoint, sgCount);
+        } else {
+          Assert.fail("This ResultSet is empty.");
+        }
+      }
+    }
+  }
+
+  private List<Pair<MeasurementSchema, MeasurementSchema>>
+      generateMeasurementSchemasForDataTypeConvertion() {
+    TSDataType[] dataTypes = {
+      TSDataType.STRING,
+      TSDataType.TEXT,
+      TSDataType.BLOB,
+      TSDataType.TIMESTAMP,
+      TSDataType.BOOLEAN,
+      TSDataType.DATE,
+      TSDataType.DOUBLE,
+      TSDataType.FLOAT,
+      TSDataType.INT32,
+      TSDataType.INT64
+    };
+    List<Pair<MeasurementSchema, MeasurementSchema>> pairs = new ArrayList<>();
+
+    for (TSDataType type : dataTypes) {
+      for (TSDataType dataType : dataTypes) {
+        String id = String.format("%s2%s", type.name(), dataType.name());
+        pairs.add(new Pair<>(new MeasurementSchema(id, type), new 
MeasurementSchema(id, dataType)));
+      }
+    }
+    return pairs;
+  }
+
   private static class SchemaConfig {
     private static final String STORAGE_GROUP_0 = "root.sg.test_0";
     private static final String STORAGE_GROUP_1 = "root.sg.test_1";
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 84145d01d79..b1863c3d491 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -197,6 +197,9 @@ public enum TSStatusCode {
   PIPE_ERROR(1107),
   PIPESERVER_ERROR(1108),
   VERIFY_METADATA_ERROR(1109),
+  LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION(1110),
+  LOAD_IDEMPOTENT_CONFLICT_EXCEPTION(1111),
+  LOAD_USER_CONFLICT_EXCEPTION(1112),
 
   // UDF
   UDF_LOAD_CLASS_ERROR(1200),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/VerifyMetadataException.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/VerifyMetadataException.java
index 1603c9872f4..78c6d8d4d5c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/VerifyMetadataException.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/VerifyMetadataException.java
@@ -23,16 +23,12 @@ import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 public class VerifyMetadataException extends IoTDBException {
-  public VerifyMetadataException(
-      String path, String compareInfo, String tsFileInfo, String tsFilePath, 
String IoTDBInfo) {
-    super(
-        String.format(
-            "%s %s mismatch, %s in tsfile %s, but %s in IoTDB.",
-            path, compareInfo, tsFileInfo, tsFilePath, IoTDBInfo),
-        TSStatusCode.VERIFY_METADATA_ERROR.getStatusCode());
-  }
 
   public VerifyMetadataException(String message) {
     super(message, TSStatusCode.VERIFY_METADATA_ERROR.getStatusCode());
   }
+
+  public VerifyMetadataException(String message, int errorCode) {
+    super(message, errorCode);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/VerifyMetadataException.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/VerifyMetadataTypeMismatchException.java
similarity index 64%
copy from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/VerifyMetadataException.java
copy to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/VerifyMetadataTypeMismatchException.java
index 1603c9872f4..209fb83bcdb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/VerifyMetadataException.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/VerifyMetadataTypeMismatchException.java
@@ -19,20 +19,11 @@
 
 package org.apache.iotdb.db.exception;
 
-import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
-public class VerifyMetadataException extends IoTDBException {
-  public VerifyMetadataException(
-      String path, String compareInfo, String tsFileInfo, String tsFilePath, 
String IoTDBInfo) {
-    super(
-        String.format(
-            "%s %s mismatch, %s in tsfile %s, but %s in IoTDB.",
-            path, compareInfo, tsFileInfo, tsFilePath, IoTDBInfo),
-        TSStatusCode.VERIFY_METADATA_ERROR.getStatusCode());
-  }
+public class VerifyMetadataTypeMismatchException extends 
VerifyMetadataException {
 
-  public VerifyMetadataException(String message) {
+  public VerifyMetadataTypeMismatchException(String message) {
     super(message, TSStatusCode.VERIFY_METADATA_ERROR.getStatusCode());
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/loader/TsFileLoader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/loader/TsFileLoader.java
index c78ca0544b4..e181ec1d592 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/loader/TsFileLoader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/loader/TsFileLoader.java
@@ -56,6 +56,7 @@ public class TsFileLoader implements ILoader {
     try {
       LoadTsFileStatement statement = new 
LoadTsFileStatement(tsFile.getAbsolutePath());
       statement.setDeleteAfterLoad(true);
+      statement.setConvertOnTypeMismatch(true);
       statement.setDatabaseLevel(parseSgLevel());
       statement.setVerifySchema(true);
       statement.setAutoCreateDatabase(false);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 8785231480f..a87e229a03a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -569,6 +569,7 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
       throws FileNotFoundException {
     final LoadTsFileStatement statement = new 
LoadTsFileStatement(fileAbsolutePath);
     statement.setDeleteAfterLoad(true);
+    statement.setConvertOnTypeMismatch(true);
     statement.setVerifySchema(true);
     statement.setAutoCreateDatabase(false);
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
index 9cf39b42325..71e985be113 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
@@ -19,9 +19,11 @@
 
 package org.apache.iotdb.db.queryengine.plan.analyze.load;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.auth.AuthException;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.db.exception.VerifyMetadataException;
+import org.apache.iotdb.db.exception.VerifyMetadataTypeMismatchException;
 import org.apache.iotdb.db.exception.load.LoadReadOnlyException;
 import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
@@ -33,6 +35,7 @@ import 
org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadTsFile;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import 
org.apache.iotdb.db.storageengine.load.converter.LoadTsFileDataTypeConverter;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -54,8 +57,7 @@ public abstract class LoadTsFileAnalyzer implements 
AutoCloseable {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(LoadTsFileAnalyzer.class);
 
   // These are only used when constructed from tree model SQL
-  private final LoadTsFileStatement loadTsFileStatement;
-
+  private final LoadTsFileStatement loadTsFileTreeStatement;
   // These are only used when constructed from table model SQL
   private final LoadTsFile loadTsFileTableStatement;
 
@@ -67,6 +69,8 @@ public abstract class LoadTsFileAnalyzer implements 
AutoCloseable {
 
   protected final boolean isDeleteAfterLoad;
 
+  protected final boolean isConvertOnTypeMismatch;
+
   protected final boolean isAutoCreateDatabase;
 
   protected final int databaseLevel;
@@ -78,15 +82,19 @@ public abstract class LoadTsFileAnalyzer implements 
AutoCloseable {
   final IPartitionFetcher partitionFetcher = 
ClusterPartitionFetcher.getInstance();
   final ISchemaFetcher schemaFetcher = ClusterSchemaFetcher.getInstance();
 
+  protected final LoadTsFileDataTypeConverter loadTsFileDataTypeConverter;
+
   LoadTsFileAnalyzer(LoadTsFileStatement loadTsFileStatement, MPPQueryContext 
context) {
-    this.loadTsFileStatement = loadTsFileStatement;
+    this.loadTsFileTreeStatement = loadTsFileStatement;
     this.tsFiles = loadTsFileStatement.getTsFiles();
     this.statementString = loadTsFileStatement.toString();
     this.isVerifySchema = loadTsFileStatement.isVerifySchema();
     this.isDeleteAfterLoad = loadTsFileStatement.isDeleteAfterLoad();
+    this.isConvertOnTypeMismatch = 
loadTsFileStatement.isConvertOnTypeMismatch();
     this.isAutoCreateDatabase = loadTsFileStatement.isAutoCreateDatabase();
     this.databaseLevel = loadTsFileStatement.getDatabaseLevel();
     this.database = loadTsFileStatement.getDatabase();
+    this.loadTsFileDataTypeConverter = new LoadTsFileDataTypeConverter();
 
     this.loadTsFileTableStatement = null;
     this.isTableModelStatement = false;
@@ -99,11 +107,13 @@ public abstract class LoadTsFileAnalyzer implements 
AutoCloseable {
     this.statementString = loadTsFileTableStatement.toString();
     this.isVerifySchema = true;
     this.isDeleteAfterLoad = loadTsFileTableStatement.isDeleteAfterLoad();
+    this.isConvertOnTypeMismatch = 
loadTsFileTableStatement.isConvertOnTypeMismatch();
     this.isAutoCreateDatabase = 
loadTsFileTableStatement.isAutoCreateDatabase();
     this.databaseLevel = loadTsFileTableStatement.getDatabaseLevel();
     this.database = loadTsFileTableStatement.getDatabase();
+    this.loadTsFileDataTypeConverter = new LoadTsFileDataTypeConverter();
 
-    this.loadTsFileStatement = null;
+    this.loadTsFileTreeStatement = null;
     this.isTableModelStatement = true;
     this.context = context;
   }
@@ -137,6 +147,11 @@ public abstract class LoadTsFileAnalyzer implements 
AutoCloseable {
       } catch (AuthException e) {
         setFailAnalysisForAuthException(analysis, e);
         return false;
+      } catch (VerifyMetadataTypeMismatchException e) {
+        executeDataTypeConversionOnTypeMismatch(analysis, e);
+        // just return false to STOP the analysis process,
+        // the real result on the conversion will be set in the analysis.
+        return false;
       } catch (BufferUnderflowException e) {
         LOGGER.warn(
             "The file {} is not a valid tsfile. Please check the input file.", 
tsFile.getPath(), e);
@@ -161,6 +176,39 @@ public abstract class LoadTsFileAnalyzer implements 
AutoCloseable {
   protected abstract void analyzeSingleTsFile(final File tsFile)
       throws IOException, AuthException, VerifyMetadataException;
 
+  protected void executeDataTypeConversionOnTypeMismatch(
+      final IAnalysis analysis, final VerifyMetadataTypeMismatchException e) {
+    final TSStatus status =
+        isConvertOnTypeMismatch
+            ? (isTableModelStatement
+                ? 
loadTsFileDataTypeConverter.convertForTableModel(loadTsFileTableStatement)
+                : 
loadTsFileDataTypeConverter.convertForTreeModel(loadTsFileTreeStatement))
+            : null;
+
+    if (status == null) {
+      analysis.setFailStatus(
+          new 
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage(e.getMessage()));
+    } else if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+        && status.getCode() != 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()
+        && status.getCode() != 
TSStatusCode.LOAD_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()) {
+      analysis.setFailStatus(status);
+    }
+    analysis.setFinishQueryAfterAnalyze(true);
+    setRealStatement(analysis);
+  }
+
+  protected void setRealStatement(IAnalysis analysis) {
+    if (isTableModelStatement) {
+      // Do nothing by now.
+    } else {
+      analysis.setRealStatement(loadTsFileTreeStatement);
+    }
+  }
+
+  protected String getStatementString() {
+    return statementString;
+  }
+
   protected TsFileResource constructTsFileResource(
       final TsFileSequenceReader reader, final File tsFile) throws IOException 
{
     final TsFileResource tsFileResource = new TsFileResource(tsFile);
@@ -174,23 +222,11 @@ public abstract class LoadTsFileAnalyzer implements 
AutoCloseable {
     return tsFileResource;
   }
 
-  protected String getStatementString() {
-    return statementString;
-  }
-
-  protected void setRealStatement(IAnalysis analysis) {
-    if (isTableModelStatement) {
-      // Do nothing by now.
-    } else {
-      analysis.setRealStatement(loadTsFileStatement);
-    }
-  }
-
   protected void addTsFileResource(TsFileResource tsFileResource) {
     if (isTableModelStatement) {
       loadTsFileTableStatement.addTsFileResource(tsFileResource);
     } else {
-      loadTsFileStatement.addTsFileResource(tsFileResource);
+      loadTsFileTreeStatement.addTsFileResource(tsFileResource);
     }
   }
 
@@ -198,7 +234,7 @@ public abstract class LoadTsFileAnalyzer implements 
AutoCloseable {
     if (isTableModelStatement) {
       loadTsFileTableStatement.addWritePointCount(writePointCount);
     } else {
-      loadTsFileStatement.addWritePointCount(writePointCount);
+      loadTsFileTreeStatement.addWritePointCount(writePointCount);
     }
   }
 
@@ -206,6 +242,10 @@ public abstract class LoadTsFileAnalyzer implements 
AutoCloseable {
     return isVerifySchema;
   }
 
+  protected boolean isConvertOnTypeMismatch() {
+    return isConvertOnTypeMismatch;
+  }
+
   protected boolean isAutoCreateDatabase() {
     return isAutoCreateDatabase;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java
index dac5f81e66b..a3edfabadb8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java
@@ -24,6 +24,7 @@ import 
org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.VerifyMetadataException;
+import org.apache.iotdb.db.exception.VerifyMetadataTypeMismatchException;
 import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException;
 import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
@@ -290,7 +291,7 @@ public class LoadTsFileTableSchemaCache {
         final ColumnSchema realColumn =
             realSchema.getColumn(fileColumn.getName(), 
fileColumn.getColumnCategory());
         if (!fileColumn.getType().equals(realColumn.getType())) {
-          throw new VerifyMetadataException(
+          throw new VerifyMetadataTypeMismatchException(
               String.format(
                   "Data type mismatch for column %s in table %s, type in 
TsFile: %s, type in IoTDB: %s",
                   realColumn.getName(),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileToTreeModelAnalyzer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileToTreeModelAnalyzer.java
index fe91d00f8e1..a34483af9d8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileToTreeModelAnalyzer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileToTreeModelAnalyzer.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.queryengine.plan.analyze.load;
 
 import org.apache.iotdb.commons.auth.AuthException;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.VerifyMetadataTypeMismatchException;
 import org.apache.iotdb.db.exception.load.LoadEmptyFileException;
 import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
@@ -85,6 +86,9 @@ public class LoadTsFileToTreeModelAnalyzer extends 
LoadTsFileAnalyzer {
     } catch (AuthException e) {
       setFailAnalysisForAuthException(analysis, e);
       return analysis;
+    } catch (VerifyMetadataTypeMismatchException e) {
+      executeDataTypeConversionOnTypeMismatch(analysis, e);
+      return analysis;
     } catch (Exception e) {
       final String exceptionMessage =
           String.format(
@@ -105,7 +109,8 @@ public class LoadTsFileToTreeModelAnalyzer extends 
LoadTsFileAnalyzer {
   }
 
   @Override
-  protected void analyzeSingleTsFile(final File tsFile) throws IOException, 
AuthException {
+  protected void analyzeSingleTsFile(final File tsFile)
+      throws IOException, AuthException, VerifyMetadataTypeMismatchException {
     try (final TsFileSequenceReader reader = new 
TsFileSequenceReader(tsFile.getAbsolutePath())) {
       // can be reused when constructing tsfile resource
       final TsFileSequenceReaderTimeseriesMetadataIterator 
timeseriesMetadataIterator =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/TreeSchemaAutoCreatorAndVerifier.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/TreeSchemaAutoCreatorAndVerifier.java
index 67379c724ef..ac389b2a6db 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/TreeSchemaAutoCreatorAndVerifier.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/TreeSchemaAutoCreatorAndVerifier.java
@@ -35,6 +35,7 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TShowDatabaseResp;
 import org.apache.iotdb.db.auth.AuthorityChecker;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.VerifyMetadataException;
+import org.apache.iotdb.db.exception.VerifyMetadataTypeMismatchException;
 import org.apache.iotdb.db.exception.load.LoadFileException;
 import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException;
 import org.apache.iotdb.db.exception.sql.SemanticException;
@@ -80,6 +81,7 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 public class TreeSchemaAutoCreatorAndVerifier {
+
   private static final Logger LOGGER =
       LoggerFactory.getLogger(TreeSchemaAutoCreatorAndVerifier.class);
 
@@ -103,7 +105,7 @@ public class TreeSchemaAutoCreatorAndVerifier {
   public void autoCreateAndVerify(
       TsFileSequenceReader reader,
       Map<IDeviceID, List<TimeseriesMetadata>> device2TimeseriesMetadataList)
-      throws IOException, AuthException {
+      throws IOException, AuthException, VerifyMetadataTypeMismatchException {
     for (final Map.Entry<IDeviceID, List<TimeseriesMetadata>> entry :
         device2TimeseriesMetadataList.entrySet()) {
       final IDeviceID device = entry.getKey();
@@ -204,13 +206,14 @@ public class TreeSchemaAutoCreatorAndVerifier {
     schemaCache.clearDeviceIsAlignedCacheIfNecessary();
   }
 
-  public void flush() throws AuthException {
+  public void flush() throws AuthException, 
VerifyMetadataTypeMismatchException {
     doAutoCreateAndVerify();
 
     schemaCache.clearTimeSeries();
   }
 
-  private void doAutoCreateAndVerify() throws SemanticException, AuthException 
{
+  private void doAutoCreateAndVerify()
+      throws SemanticException, AuthException, 
VerifyMetadataTypeMismatchException {
     if (schemaCache.getDevice2TimeSeries().isEmpty()) {
       return;
     }
@@ -233,15 +236,26 @@ public class TreeSchemaAutoCreatorAndVerifier {
       }
     } catch (AuthException e) {
       throw e;
+    } catch (VerifyMetadataTypeMismatchException e) {
+      if (loadTsFileAnalyzer.isConvertOnTypeMismatch()) {
+        // throw exception to convert data type in the upper layer 
(LoadTsFileAnalyzer)
+        throw e;
+      } else {
+        handleException(e, loadTsFileAnalyzer.getStatementString());
+      }
     } catch (Exception e) {
-      LOGGER.warn("Auto create or verify schema error.", e);
-      throw new SemanticException(
-          String.format(
-              "Auto create or verify schema error when executing statement %s. 
 Detail: %s.",
-              loadTsFileAnalyzer.getStatementString(), e.getMessage()));
+      handleException(e, loadTsFileAnalyzer.getStatementString());
     }
   }
 
+  private void handleException(Exception e, String statementString) throws 
SemanticException {
+    LOGGER.warn("Auto create or verify schema error.", e);
+    throw new SemanticException(
+        String.format(
+            "Auto create or verify schema error when executing statement %s.  
Detail: %s.",
+            statementString, e.getMessage()));
+  }
+
   private void makeSureNoDuplicatedMeasurementsInDevices() throws 
VerifyMetadataException {
     for (final Map.Entry<IDeviceID, Set<MeasurementSchema>> entry :
         schemaCache.getDevice2TimeSeries().entrySet()) {
@@ -445,7 +459,7 @@ public class TreeSchemaAutoCreatorAndVerifier {
 
         // check datatype
         if (!tsFileSchema.getType().equals(iotdbSchema.getType())) {
-          throw new VerifyMetadataException(
+          throw new VerifyMetadataTypeMismatchException(
               String.format(
                   "Measurement %s%s%s datatype not match, TsFile: %s, IoTDB: 
%s",
                   device,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java
index 71ca9359e6f..e61d9353852 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java
@@ -41,8 +41,9 @@ public class LoadTsFile extends Statement {
   private final File file;
   private int databaseLevel; // For loading to tree-model only
   private String database; // For loading to table-model only
-  private boolean deleteAfterLoad;
-  private boolean autoCreateDatabase;
+  private boolean deleteAfterLoad = false;
+  private boolean convertOnTypeMismatch = true;
+  private boolean autoCreateDatabase = true;
   private String model = LoadTsFileConfigurator.MODEL_TABLE_VALUE;
 
   private final Map<String, String> loadAttributes;
@@ -58,6 +59,7 @@ public class LoadTsFile extends Statement {
     this.file = new File(filePath);
     this.databaseLevel = 
IoTDBDescriptor.getInstance().getConfig().getDefaultStorageGroupLevel();
     this.deleteAfterLoad = false;
+    this.convertOnTypeMismatch = true;
     this.autoCreateDatabase = 
IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled();
     this.resources = new ArrayList<>();
     this.writePointCountList = new ArrayList<>();
@@ -85,12 +87,16 @@ public class LoadTsFile extends Statement {
     this.autoCreateDatabase = autoCreateDatabase;
   }
 
+  public boolean isAutoCreateDatabase() {
+    return autoCreateDatabase;
+  }
+
   public boolean isDeleteAfterLoad() {
     return deleteAfterLoad;
   }
 
-  public boolean isAutoCreateDatabase() {
-    return autoCreateDatabase;
+  public boolean isConvertOnTypeMismatch() {
+    return convertOnTypeMismatch;
   }
 
   public int getDatabaseLevel() {
@@ -133,6 +139,8 @@ public class LoadTsFile extends Statement {
     this.databaseLevel = 
LoadTsFileConfigurator.parseOrGetDefaultDatabaseLevel(loadAttributes);
     this.database = LoadTsFileConfigurator.parseDatabaseName(loadAttributes);
     this.deleteAfterLoad = 
LoadTsFileConfigurator.parseOrGetDefaultOnSuccess(loadAttributes);
+    this.convertOnTypeMismatch =
+        
LoadTsFileConfigurator.parseOrGetDefaultConvertOnTypeMismatch(loadAttributes);
     this.model =
         LoadTsFileConfigurator.parseOrGetDefaultModel(
             loadAttributes, LoadTsFileConfigurator.MODEL_TABLE_VALUE);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
index 9e1484f638f..520700d3848 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
@@ -43,19 +43,23 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static 
org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.CONVERT_ON_TYPE_MISMATCH_KEY;
 import static 
org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.DATABASE_LEVEL_KEY;
 import static 
org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.DATABASE_NAME_KEY;
 import static 
org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.MODEL_KEY;
+import static 
org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.ON_SUCCESS_DELETE_VALUE;
 import static 
org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.ON_SUCCESS_KEY;
+import static 
org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.ON_SUCCESS_NONE_VALUE;
 
 public class LoadTsFileStatement extends Statement {
 
   private final File file;
   private int databaseLevel; // For loading to tree-model only
   private String database; // For loading to table-model only
-  private boolean verifySchema;
-  private boolean deleteAfterLoad;
-  private boolean autoCreateDatabase;
+  private boolean verifySchema = true;
+  private boolean deleteAfterLoad = false;
+  private boolean convertOnTypeMismatch = true;
+  private boolean autoCreateDatabase = true;
   private String model = LoadTsFileConfigurator.MODEL_TREE_VALUE;
 
   private Map<String, String> loadAttributes;
@@ -69,6 +73,7 @@ public class LoadTsFileStatement extends Statement {
     this.databaseLevel = 
IoTDBDescriptor.getInstance().getConfig().getDefaultStorageGroupLevel();
     this.verifySchema = true;
     this.deleteAfterLoad = false;
+    this.convertOnTypeMismatch = true;
     this.autoCreateDatabase = 
IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled();
     this.resources = new ArrayList<>();
     this.writePointCountList = new ArrayList<>();
@@ -99,6 +104,7 @@ public class LoadTsFileStatement extends Statement {
     this.databaseLevel = 
IoTDBDescriptor.getInstance().getConfig().getDefaultStorageGroupLevel();
     this.verifySchema = true;
     this.deleteAfterLoad = false;
+    this.convertOnTypeMismatch = true;
     this.autoCreateDatabase = 
IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled();
     this.tsFiles = new ArrayList<>();
     this.resources = new ArrayList<>();
@@ -136,48 +142,56 @@ public class LoadTsFileStatement extends Statement {
         });
   }
 
-  public void setDeleteAfterLoad(boolean deleteAfterLoad) {
-    this.deleteAfterLoad = deleteAfterLoad;
-  }
-
   public void setDatabaseLevel(int databaseLevel) {
     this.databaseLevel = databaseLevel;
   }
 
+  public int getDatabaseLevel() {
+    return databaseLevel;
+  }
+
   public void setDatabase(String database) {
     this.database = database;
   }
 
-  public void setModel(String model) {
-    this.model = model;
+  public String getDatabase() {
+    return database;
   }
 
   public void setVerifySchema(boolean verifySchema) {
     this.verifySchema = verifySchema;
   }
 
-  public void setAutoCreateDatabase(boolean autoCreateDatabase) {
-    this.autoCreateDatabase = autoCreateDatabase;
-  }
-
   public boolean isVerifySchema() {
     return verifySchema;
   }
 
+  public void setDeleteAfterLoad(boolean deleteAfterLoad) {
+    this.deleteAfterLoad = deleteAfterLoad;
+  }
+
   public boolean isDeleteAfterLoad() {
     return deleteAfterLoad;
   }
 
-  public boolean isAutoCreateDatabase() {
-    return autoCreateDatabase;
+  public void setConvertOnTypeMismatch(boolean convertOnTypeMismatch) {
+    this.convertOnTypeMismatch = convertOnTypeMismatch;
   }
 
-  public int getDatabaseLevel() {
-    return databaseLevel;
+  public boolean isConvertOnTypeMismatch() {
+    return convertOnTypeMismatch;
   }
 
-  public String getDatabase() {
-    return database;
+  public void setAutoCreateDatabase(boolean autoCreateDatabase) {
+    this.autoCreateDatabase = autoCreateDatabase;
+  }
+
+  public boolean isAutoCreateDatabase() {
+    return autoCreateDatabase;
+  }
+
+  public void setModel(String model) {
+    this.model = model;
   }
 
   public String getModel() {
@@ -213,6 +227,8 @@ public class LoadTsFileStatement extends Statement {
     this.databaseLevel = 
LoadTsFileConfigurator.parseOrGetDefaultDatabaseLevel(loadAttributes);
     this.database = LoadTsFileConfigurator.parseDatabaseName(loadAttributes);
     this.deleteAfterLoad = 
LoadTsFileConfigurator.parseOrGetDefaultOnSuccess(loadAttributes);
+    this.convertOnTypeMismatch =
+        
LoadTsFileConfigurator.parseOrGetDefaultConvertOnTypeMismatch(loadAttributes);
     this.model =
         LoadTsFileConfigurator.parseOrGetDefaultModel(
             loadAttributes, LoadTsFileConfigurator.MODEL_TREE_VALUE);
@@ -234,14 +250,18 @@ public class LoadTsFileStatement extends Statement {
   public org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement 
toRelationalStatement(
       MPPQueryContext context) {
     loadAttributes = new HashMap<>();
+
     loadAttributes.put(DATABASE_LEVEL_KEY, String.valueOf(databaseLevel));
     if (database != null) {
       loadAttributes.put(DATABASE_NAME_KEY, database);
     }
-    loadAttributes.put(ON_SUCCESS_KEY, String.valueOf(deleteAfterLoad));
+    loadAttributes.put(
+        ON_SUCCESS_KEY, deleteAfterLoad ? ON_SUCCESS_DELETE_VALUE : 
ON_SUCCESS_NONE_VALUE);
+    loadAttributes.put(CONVERT_ON_TYPE_MISMATCH_KEY, 
String.valueOf(convertOnTypeMismatch));
     if (model != null) {
       loadAttributes.put(MODEL_KEY, model);
     }
+
     return new LoadTsFile(null, file.getAbsolutePath(), loadAttributes);
   }
 
@@ -255,13 +275,15 @@ public class LoadTsFileStatement extends Statement {
     return "LoadTsFileStatement{"
         + "file="
         + file
-        + ", deleteAfterLoad="
+        + ", delete-after-load="
         + deleteAfterLoad
-        + ", databaseLevel="
+        + ", database-level="
         + databaseLevel
-        + ", verifySchema="
+        + ", verify-schema="
         + verifySchema
-        + ", tsFiles Size="
+        + ", convert-on-type-mismatch="
+        + convertOnTypeMismatch
+        + ", tsFiles size="
         + tsFiles.size()
         + '}';
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
index 605218ae0a6..188ae30436d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
@@ -192,6 +192,7 @@ public class ActiveLoadTsFileLoader {
   private TSStatus loadTsFile(final Pair<String, Boolean> filePair) throws 
FileNotFoundException {
     final LoadTsFileStatement statement = new 
LoadTsFileStatement(filePair.getLeft());
     statement.setDeleteAfterLoad(true);
+    statement.setConvertOnTypeMismatch(true);
     statement.setVerifySchema(true);
     statement.setAutoCreateDatabase(false);
     return executeStatement(filePair.getRight() ? new 
PipeEnrichedStatement(statement) : statement);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
index 929d106f244..70d7401c560 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
@@ -49,6 +49,9 @@ public class LoadTsFileConfigurator {
         break;
       case DATABASE_NAME_KEY:
         break;
+      case CONVERT_ON_TYPE_MISMATCH_KEY:
+        validateConvertOnTypeMismatchParam(value);
+        break;
       default:
         throw new SemanticException("Invalid parameter '" + key + "' for LOAD 
TSFILE command.");
     }
@@ -90,8 +93,8 @@ public class LoadTsFileConfigurator {
   }
 
   public static final String ON_SUCCESS_KEY = "on-success";
-  private static final String ON_SUCCESS_DELETE_VALUE = "delete";
-  private static final String ON_SUCCESS_NONE_VALUE = "none";
+  public static final String ON_SUCCESS_DELETE_VALUE = "delete";
+  public static final String ON_SUCCESS_NONE_VALUE = "none";
   private static final Set<String> ON_SUCCESS_VALUE_SET =
       Collections.unmodifiableSet(
           new HashSet<>(Arrays.asList(ON_SUCCESS_DELETE_VALUE, 
ON_SUCCESS_NONE_VALUE)));
@@ -107,7 +110,27 @@ public class LoadTsFileConfigurator {
 
   public static boolean parseOrGetDefaultOnSuccess(final Map<String, String> 
loadAttributes) {
     final String value = loadAttributes.get(ON_SUCCESS_KEY);
-    return StringUtils.isEmpty(value) || ON_SUCCESS_DELETE_VALUE.equals(value);
+    return !StringUtils.isEmpty(value) && 
ON_SUCCESS_DELETE_VALUE.equalsIgnoreCase(value);
+  }
+
+  public static final String CONVERT_ON_TYPE_MISMATCH_KEY = 
"convert-on-type-mismatch";
+  private static final boolean CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE = true;
+
+  public static void validateConvertOnTypeMismatchParam(final String 
convertOnTypeMismatch) {
+    if (!"true".equalsIgnoreCase(convertOnTypeMismatch)
+        && !"false".equalsIgnoreCase(convertOnTypeMismatch)) {
+      throw new SemanticException(
+          String.format(
+              "Given %s value '%s' is not supported, please input a valid 
boolean value.",
+              CONVERT_ON_TYPE_MISMATCH_KEY, convertOnTypeMismatch));
+    }
+  }
+
+  public static boolean parseOrGetDefaultConvertOnTypeMismatch(
+      final Map<String, String> loadAttributes) {
+    return Boolean.parseBoolean(
+        loadAttributes.getOrDefault(
+            CONVERT_ON_TYPE_MISMATCH_KEY, 
String.valueOf(CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE)));
   }
 
   public static final String MODEL_KEY = "model";
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatement.java
new file mode 100644
index 00000000000..1a814442402
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatement.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.converter;
+
+import org.apache.iotdb.db.pipe.receiver.transform.converter.ArrayConverter;
+import 
org.apache.iotdb.db.pipe.receiver.transform.statement.PipeConvertedInsertTabletStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LoadConvertedInsertTabletStatement extends 
PipeConvertedInsertTabletStatement {
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(LoadConvertedInsertTabletStatement.class);
+
+  public LoadConvertedInsertTabletStatement(final InsertTabletStatement 
insertTabletStatement) {
+    super(insertTabletStatement);
+  }
+
+  @Override
+  protected boolean checkAndCastDataType(int columnIndex, TSDataType dataType) 
{
+    LOGGER.info(
+        "Load: Inserting tablet to {}.{}. Casting type from {} to {}.",
+        devicePath,
+        measurements[columnIndex],
+        dataTypes[columnIndex],
+        dataType);
+    columns[columnIndex] =
+        ArrayConverter.convert(dataTypes[columnIndex], dataType, 
columns[columnIndex]);
+    dataTypes[columnIndex] = dataType;
+    return true;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatementExceptionVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatementExceptionVisitor.java
new file mode 100644
index 00000000000..f292ee55930
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatementExceptionVisitor.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.converter;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException;
+import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementNode;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+public class LoadConvertedInsertTabletStatementExceptionVisitor
+    extends StatementVisitor<TSStatus, Exception> {
+
+  @Override
+  public TSStatus visitNode(final StatementNode node, final Exception context) 
{
+    return new TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode())
+        .setMessage(context.getMessage());
+  }
+
+  @Override
+  public TSStatus visitLoadFile(
+      final LoadTsFileStatement loadTsFileStatement, final Exception context) {
+    if (context instanceof LoadRuntimeOutOfMemoryException) {
+      return new 
TSStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
+          .setMessage(context.getMessage());
+    } else if (context instanceof SemanticException) {
+      return new 
TSStatus(TSStatusCode.LOAD_USER_CONFLICT_EXCEPTION.getStatusCode())
+          .setMessage(context.getMessage());
+    }
+    return visitStatement(loadTsFileStatement, context);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatementTSStatusVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatementTSStatusVisitor.java
new file mode 100644
index 00000000000..6e9601f8d95
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatementTSStatusVisitor.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.converter;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementNode;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+public class LoadConvertedInsertTabletStatementTSStatusVisitor
+    extends StatementVisitor<TSStatus, TSStatus> {
+
+  private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+  @Override
+  public TSStatus visitNode(final StatementNode node, final TSStatus context) {
+    return context;
+  }
+
+  @Override
+  public TSStatus visitInsertTablet(
+      final InsertTabletStatement insertTabletStatement, final TSStatus 
context) {
+    return visitInsertBase(insertTabletStatement, context);
+  }
+
+  private TSStatus visitInsertBase(
+      final InsertBaseStatement insertBaseStatement, final TSStatus context) {
+    if (context.getCode() == TSStatusCode.SYSTEM_READ_ONLY.getStatusCode()
+        || context.getCode() == 
TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode()) {
+      return new 
TSStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
+          .setMessage(context.getMessage());
+    } else if (context.getCode() == TSStatusCode.OUT_OF_TTL.getStatusCode()) {
+      return new 
TSStatus(TSStatusCode.LOAD_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
+          .setMessage(context.getMessage());
+    } else if (context.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()
+        && 
(context.getMessage().contains(DataTypeMismatchException.REGISTERED_TYPE_STRING)
+            && config.isEnablePartialInsert())) {
+      return new 
TSStatus(TSStatusCode.LOAD_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
+          .setMessage(context.getMessage());
+    }
+    return visitStatement(insertBaseStatement, context);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableStatementDataTypeConvertExecutionVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableStatementDataTypeConvertExecutionVisitor.java
new file mode 100644
index 00000000000..233f60124ac
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableStatementDataTypeConvertExecutionVisitor.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.converter;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
+import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import 
org.apache.iotdb.db.pipe.event.common.tsfile.parser.table.TsFileInsertionEventTableParser;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AstVisitor;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadTsFile;
+import org.apache.iotdb.db.queryengine.plan.statement.Statement;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Objects;
+import java.util.Optional;
+
+public class LoadTableStatementDataTypeConvertExecutionVisitor
+    extends AstVisitor<Optional<TSStatus>, String> {
+
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(LoadTableStatementDataTypeConvertExecutionVisitor.class);
+
+  @FunctionalInterface
+  public interface StatementExecutor {
+    // databaseName can NOT be null
+    TSStatus execute(final Statement statement, final String databaseName);
+  }
+
+  private final StatementExecutor statementExecutor;
+
+  public LoadTableStatementDataTypeConvertExecutionVisitor(StatementExecutor 
statementExecutor) {
+    this.statementExecutor = statementExecutor;
+  }
+
+  @Override
+  public Optional<TSStatus> visitLoadTsFile(
+      final LoadTsFile loadTsFileStatement, final String databaseName) {
+    if (Objects.isNull(databaseName)) {
+      LOGGER.warn(
+          "Database name is unexpectedly null for LoadTsFileStatement: {}. 
Skip data type conversion.",
+          loadTsFileStatement);
+      return Optional.empty();
+    }
+
+    LOGGER.info("Start data type conversion for LoadTsFileStatement: {}.", 
loadTsFileStatement);
+
+    for (final File file : loadTsFileStatement.getTsFiles()) {
+      try (final TsFileInsertionEventTableParser parser =
+          new TsFileInsertionEventTableParser(
+              file,
+              new TablePattern(true, null, null),
+              Long.MIN_VALUE,
+              Long.MAX_VALUE,
+              null,
+              null)) {
+        for (final TabletInsertionEvent tabletInsertionEvent : 
parser.toTabletInsertionEvents()) {
+          if (!(tabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) {
+            continue;
+          }
+          final PipeRawTabletInsertionEvent rawTabletInsertionEvent =
+              (PipeRawTabletInsertionEvent) tabletInsertionEvent;
+
+          final LoadConvertedInsertTabletStatement statement =
+              new LoadConvertedInsertTabletStatement(
+                  PipeTransferTabletRawReq.toTPipeTransferRawReq(
+                          rawTabletInsertionEvent.convertToTablet(),
+                          rawTabletInsertionEvent.isAligned())
+                      .constructStatement());
+
+          TSStatus result;
+          try {
+            result =
+                statement.accept(
+                    LoadTsFileDataTypeConverter.STATEMENT_STATUS_VISITOR,
+                    statementExecutor.execute(statement, databaseName));
+
+            // Retry max 5 times if the write process is rejected
+            for (int i = 0;
+                i < 5
+                    && result.getCode()
+                        == 
TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode();
+                i++) {
+              Thread.sleep(100L * (i + 1));
+              result =
+                  statement.accept(
+                      LoadTsFileDataTypeConverter.STATEMENT_STATUS_VISITOR,
+                      statementExecutor.execute(statement, databaseName));
+            }
+          } catch (final Exception e) {
+            if (e instanceof InterruptedException) {
+              Thread.currentThread().interrupt();
+            }
+            result = 
statement.accept(LoadTsFileDataTypeConverter.STATEMENT_EXCEPTION_VISITOR, e);
+          }
+
+          if (!(result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+              || result.getCode() == 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()
+              || result.getCode()
+                  == 
TSStatusCode.LOAD_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())) {
+            return Optional.empty();
+          }
+        }
+      } catch (final Exception e) {
+        LOGGER.warn(
+            "Failed to convert data type for LoadTsFileStatement: {}.", 
loadTsFileStatement, e);
+        return Optional.empty();
+      }
+    }
+
+    if (loadTsFileStatement.isDeleteAfterLoad()) {
+      loadTsFileStatement.getTsFiles().forEach(FileUtils::deleteQuietly);
+    }
+
+    LOGGER.info(
+        "Data type conversion for LoadTsFileStatement {} is successful.", 
loadTsFileStatement);
+
+    return Optional.of(new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
new file mode 100644
index 00000000000..5793b96502a
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.converter;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
+import 
org.apache.iotdb.db.pipe.event.common.tsfile.parser.scan.TsFileInsertionEventScanParser;
+import org.apache.iotdb.db.queryengine.plan.statement.Statement;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementNode;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.write.record.Tablet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Optional;
+
+public class LoadTreeStatementDataTypeConvertExecutionVisitor
+    extends StatementVisitor<Optional<TSStatus>, Void> {
+
+  private final StatementExecutor statementExecutor;
+
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(LoadTreeStatementDataTypeConvertExecutionVisitor.class);
+
+  @FunctionalInterface
+  public interface StatementExecutor {
+    TSStatus execute(final Statement statement);
+  }
+
+  public LoadTreeStatementDataTypeConvertExecutionVisitor(
+      final StatementExecutor statementExecutor) {
+    this.statementExecutor = statementExecutor;
+  }
+
+  @Override
+  public Optional<TSStatus> visitNode(final StatementNode statementNode, final 
Void v) {
+    return Optional.empty();
+  }
+
+  @Override
+  public Optional<TSStatus> visitLoadFile(
+      final LoadTsFileStatement loadTsFileStatement, final Void v) {
+
+    LOGGER.info("Start data type conversion for LoadTsFileStatement: {}", 
loadTsFileStatement);
+
+    for (final File file : loadTsFileStatement.getTsFiles()) {
+      try (final TsFileInsertionEventScanParser parser =
+          new TsFileInsertionEventScanParser(
+              file, new IoTDBTreePattern(null), Long.MIN_VALUE, 
Long.MAX_VALUE, null, null)) {
+        for (final Pair<Tablet, Boolean> tabletWithIsAligned : 
parser.toTabletWithIsAligneds()) {
+          final LoadConvertedInsertTabletStatement statement =
+              new LoadConvertedInsertTabletStatement(
+                  PipeTransferTabletRawReq.toTPipeTransferRawReq(
+                          tabletWithIsAligned.getLeft(), 
tabletWithIsAligned.getRight())
+                      .constructStatement());
+
+          TSStatus result;
+          try {
+            result =
+                statement.accept(
+                    LoadTsFileDataTypeConverter.STATEMENT_STATUS_VISITOR,
+                    statementExecutor.execute(statement));
+
+            // Retry max 5 times if the write process is rejected
+            for (int i = 0;
+                i < 5
+                    && result.getCode()
+                        == 
TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode();
+                i++) {
+              Thread.sleep(100L * (i + 1));
+              result =
+                  statement.accept(
+                      LoadTsFileDataTypeConverter.STATEMENT_STATUS_VISITOR,
+                      statementExecutor.execute(statement));
+            }
+          } catch (final Exception e) {
+            if (e instanceof InterruptedException) {
+              Thread.currentThread().interrupt();
+            }
+            result = 
statement.accept(LoadTsFileDataTypeConverter.STATEMENT_EXCEPTION_VISITOR, e);
+          }
+
+          if (!(result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+              || result.getCode() == 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()
+              || result.getCode()
+                  == 
TSStatusCode.LOAD_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())) {
+            return Optional.empty();
+          }
+        }
+      } catch (final Exception e) {
+        LOGGER.warn(
+            "Failed to convert data type for LoadTsFileStatement: {}.", 
loadTsFileStatement, e);
+        return Optional.empty();
+      }
+    }
+
+    if (loadTsFileStatement.isDeleteAfterLoad()) {
+      loadTsFileStatement.getTsFiles().forEach(FileUtils::deleteQuietly);
+    }
+
+    LOGGER.info(
+        "Data type conversion for LoadTsFileStatement {} is successful.", 
loadTsFileStatement);
+
+    return Optional.of(new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
new file mode 100644
index 00000000000..7ad7a5af544
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.converter;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.protocol.session.SessionManager;
+import org.apache.iotdb.db.queryengine.plan.Coordinator;
+import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
+import 
org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher;
+import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadTsFile;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.parser.SqlParser;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LoadTsFileDataTypeConverter {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(LoadTsFileDataTypeConverter.class);
+
+  private static final SessionManager SESSION_MANAGER = 
SessionManager.getInstance();
+
+  private final SqlParser relationalSqlParser = new SqlParser();
+  private final LoadTableStatementDataTypeConvertExecutionVisitor
+      tableStatementDataTypeConvertExecutionVisitor =
+          new LoadTableStatementDataTypeConvertExecutionVisitor(
+              ((statement, databaseName) ->
+                  Coordinator.getInstance()
+                      .executeForTableModel(
+                          statement,
+                          relationalSqlParser,
+                          SESSION_MANAGER.getCurrSession(),
+                          SESSION_MANAGER.requestQueryId(),
+                          SESSION_MANAGER.getSessionInfoOfPipeReceiver(
+                              SESSION_MANAGER.getCurrSession(), databaseName),
+                          "",
+                          LocalExecutionPlanner.getInstance().metadata,
+                          
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold())
+                      .status));
+  private final LoadTreeStatementDataTypeConvertExecutionVisitor
+      treeStatementDataTypeConvertExecutionVisitor =
+          new LoadTreeStatementDataTypeConvertExecutionVisitor(
+              statement ->
+                  Coordinator.getInstance()
+                      .executeForTreeModel(
+                          statement,
+                          SESSION_MANAGER.requestQueryId(),
+                          
SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
+                          "",
+                          ClusterPartitionFetcher.getInstance(),
+                          ClusterSchemaFetcher.getInstance(),
+                          
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold(),
+                          false)
+                      .status);
+
+  public static final LoadConvertedInsertTabletStatementTSStatusVisitor 
STATEMENT_STATUS_VISITOR =
+      new LoadConvertedInsertTabletStatementTSStatusVisitor();
+  public static final LoadConvertedInsertTabletStatementExceptionVisitor
+      STATEMENT_EXCEPTION_VISITOR = new 
LoadConvertedInsertTabletStatementExceptionVisitor();
+
+  public TSStatus convertForTableModel(LoadTsFile loadTsFileTableStatement) {
+    try {
+      return loadTsFileTableStatement
+          .accept(
+              tableStatementDataTypeConvertExecutionVisitor, 
loadTsFileTableStatement.getDatabase())
+          .orElse(null);
+    } catch (Exception e) {
+      LOGGER.warn(
+          "Failed to convert data types for table model statement {}.",
+          loadTsFileTableStatement,
+          e);
+      return new 
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage(e.getMessage());
+    }
+  }
+
+  public TSStatus convertForTreeModel(LoadTsFileStatement 
loadTsFileTreeStatement) {
+    try {
+      return loadTsFileTreeStatement
+          .accept(treeStatementDataTypeConvertExecutionVisitor, null)
+          .orElse(null);
+    } catch (Exception e) {
+      LOGGER.warn(
+          "Failed to convert data types for tree model statement {}.", 
loadTsFileTreeStatement, e);
+      return new 
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage(e.getMessage());
+    }
+  }
+}

Reply via email to