This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 8bef94666f7 Load: Support auto data type conversion when data type 
mismatch detected during analysis stage (#14529) (#14619)
8bef94666f7 is described below

commit 8bef94666f704baa74df862f123028bc1abc5be9
Author: Itami Sho <[email protected]>
AuthorDate: Fri Jan 3 18:04:04 2025 +0800

    Load: Support auto data type conversion when data type mismatch detected 
during analysis stage (#14529) (#14619)
---
 .../org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java  |  71 +++++++++++
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |   3 +
 .../db/exception/VerifyMetadataException.java      |  12 +-
 ...va => VerifyMetadataTypeMismatchException.java} |  13 +--
 .../protocol/legacy/loader/TsFileLoader.java       |   1 +
 .../protocol/thrift/IoTDBDataNodeReceiver.java     |   1 +
 .../plan/analyze/LoadTsFileAnalyzer.java           |  50 ++++++--
 .../plan/statement/crud/LoadTsFileStatement.java   |  75 +++++++-----
 .../load/active/ActiveLoadTsFileLoader.java        |   1 +
 .../load/config/LoadTsFileConfigurator.java        |  33 +++++-
 .../LoadConvertedInsertTabletStatement.java        |  52 +++++++++
 ...ertedInsertTabletStatementExceptionVisitor.java |  51 ++++++++
 ...vertedInsertTabletStatementTSStatusVisitor.java |  65 +++++++++++
 ...eeStatementDataTypeConvertExecutionVisitor.java | 130 +++++++++++++++++++++
 .../converter/LoadTsFileDataTypeConverter.java     |  72 ++++++++++++
 15 files changed, 571 insertions(+), 59 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java 
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java
index 11892f32921..5b4ae76332f 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.jdbc.IoTDBSQLException;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.tsfile.read.common.Path;
+import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.write.schema.MeasurementSchema;
 import org.junit.After;
 import org.junit.Assert;
@@ -47,6 +48,7 @@ import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -55,6 +57,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import static org.apache.iotdb.db.it.utils.TestUtils.assertNonQueryTestFail;
 import static org.apache.iotdb.db.it.utils.TestUtils.createUser;
@@ -897,6 +900,74 @@ public class IoTDBLoadTsFileIT {
     }
   }
 
+  @Test
+  public void testLoadWithConvertOnTypeMismatch() throws Exception {
+
+    List<Pair<MeasurementSchema, MeasurementSchema>> measurementSchemas =
+        generateMeasurementSchemasForDataTypeConvertion();
+
+    final File file = new File(tmpDir, "1-0-0-0.tsfile");
+
+    long writtenPoint = 0;
+    List<MeasurementSchema> schemaList1 =
+        measurementSchemas.stream().map(pair -> 
pair.left).collect(Collectors.toList());
+    List<MeasurementSchema> schemaList2 =
+        measurementSchemas.stream().map(pair -> 
pair.right).collect(Collectors.toList());
+
+    try (final TsFileGenerator generator = new TsFileGenerator(file)) {
+      generator.registerTimeseries(SchemaConfig.DEVICE_0, schemaList2);
+
+      generator.generateData(SchemaConfig.DEVICE_0, 100, PARTITION_INTERVAL / 
10_000, false);
+
+      writtenPoint = generator.getTotalNumber();
+    }
+
+    try (final Connection connection = EnvFactory.getEnv().getConnection();
+        final Statement statement = connection.createStatement()) {
+
+      for (MeasurementSchema schema : schemaList1) {
+        statement.execute(convert2SQL(SchemaConfig.DEVICE_0, schema));
+      }
+
+      statement.execute(String.format("load \"%s\" ", file.getAbsolutePath()));
+
+      try (final ResultSet resultSet =
+          statement.executeQuery("select count(*) from root.** group by 
level=1,2")) {
+        if (resultSet.next()) {
+          final long sgCount = resultSet.getLong("count(root.sg.test_0.*.*)");
+          Assert.assertEquals(writtenPoint, sgCount);
+        } else {
+          Assert.fail("This ResultSet is empty.");
+        }
+      }
+    }
+  }
+
+  private List<Pair<MeasurementSchema, MeasurementSchema>>
+      generateMeasurementSchemasForDataTypeConvertion() {
+    TSDataType[] dataTypes = {
+      TSDataType.STRING,
+      TSDataType.TEXT,
+      TSDataType.BLOB,
+      TSDataType.TIMESTAMP,
+      TSDataType.BOOLEAN,
+      TSDataType.DATE,
+      TSDataType.DOUBLE,
+      TSDataType.FLOAT,
+      TSDataType.INT32,
+      TSDataType.INT64
+    };
+    List<Pair<MeasurementSchema, MeasurementSchema>> pairs = new ArrayList<>();
+
+    for (TSDataType type : dataTypes) {
+      for (TSDataType dataType : dataTypes) {
+        String id = String.format("%s2%s", type.name(), dataType.name());
+        pairs.add(new Pair<>(new MeasurementSchema(id, type), new 
MeasurementSchema(id, dataType)));
+      }
+    }
+    return pairs;
+  }
+
   private static class SchemaConfig {
     private static final String STORAGE_GROUP_0 = "root.sg.test_0";
     private static final String STORAGE_GROUP_1 = "root.sg.test_1";
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index e69b980d388..512f9dde4e4 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -173,6 +173,9 @@ public enum TSStatusCode {
   PIPE_ERROR(1107),
   PIPESERVER_ERROR(1108),
   VERIFY_METADATA_ERROR(1109),
+  LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION(1110),
+  LOAD_IDEMPOTENT_CONFLICT_EXCEPTION(1111),
+  LOAD_USER_CONFLICT_EXCEPTION(1112),
 
   // UDF
   UDF_LOAD_CLASS_ERROR(1200),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/VerifyMetadataException.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/VerifyMetadataException.java
index 1603c9872f4..78c6d8d4d5c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/VerifyMetadataException.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/VerifyMetadataException.java
@@ -23,16 +23,12 @@ import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 public class VerifyMetadataException extends IoTDBException {
-  public VerifyMetadataException(
-      String path, String compareInfo, String tsFileInfo, String tsFilePath, 
String IoTDBInfo) {
-    super(
-        String.format(
-            "%s %s mismatch, %s in tsfile %s, but %s in IoTDB.",
-            path, compareInfo, tsFileInfo, tsFilePath, IoTDBInfo),
-        TSStatusCode.VERIFY_METADATA_ERROR.getStatusCode());
-  }
 
   public VerifyMetadataException(String message) {
     super(message, TSStatusCode.VERIFY_METADATA_ERROR.getStatusCode());
   }
+
+  public VerifyMetadataException(String message, int errorCode) {
+    super(message, errorCode);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/VerifyMetadataException.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/VerifyMetadataTypeMismatchException.java
similarity index 64%
copy from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/VerifyMetadataException.java
copy to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/VerifyMetadataTypeMismatchException.java
index 1603c9872f4..209fb83bcdb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/VerifyMetadataException.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/VerifyMetadataTypeMismatchException.java
@@ -19,20 +19,11 @@
 
 package org.apache.iotdb.db.exception;
 
-import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
-public class VerifyMetadataException extends IoTDBException {
-  public VerifyMetadataException(
-      String path, String compareInfo, String tsFileInfo, String tsFilePath, 
String IoTDBInfo) {
-    super(
-        String.format(
-            "%s %s mismatch, %s in tsfile %s, but %s in IoTDB.",
-            path, compareInfo, tsFileInfo, tsFilePath, IoTDBInfo),
-        TSStatusCode.VERIFY_METADATA_ERROR.getStatusCode());
-  }
+public class VerifyMetadataTypeMismatchException extends 
VerifyMetadataException {
 
-  public VerifyMetadataException(String message) {
+  public VerifyMetadataTypeMismatchException(String message) {
     super(message, TSStatusCode.VERIFY_METADATA_ERROR.getStatusCode());
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/loader/TsFileLoader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/loader/TsFileLoader.java
index c78ca0544b4..e181ec1d592 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/loader/TsFileLoader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/loader/TsFileLoader.java
@@ -56,6 +56,7 @@ public class TsFileLoader implements ILoader {
     try {
       LoadTsFileStatement statement = new 
LoadTsFileStatement(tsFile.getAbsolutePath());
       statement.setDeleteAfterLoad(true);
+      statement.setConvertOnTypeMismatch(true);
       statement.setDatabaseLevel(parseSgLevel());
       statement.setVerifySchema(true);
       statement.setAutoCreateDatabase(false);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 79a98ab399c..c82c969b8e9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -479,6 +479,7 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
     final LoadTsFileStatement statement = new 
LoadTsFileStatement(fileAbsolutePath);
 
     statement.setDeleteAfterLoad(true);
+    statement.setConvertOnTypeMismatch(true);
     statement.setVerifySchema(true);
     statement.setAutoCreateDatabase(false);
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
index 4578dca8a76..c2f31b9e35a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.db.auth.AuthorityChecker;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.VerifyMetadataException;
+import org.apache.iotdb.db.exception.VerifyMetadataTypeMismatchException;
 import org.apache.iotdb.db.exception.load.LoadEmptyFileException;
 import org.apache.iotdb.db.exception.load.LoadFileException;
 import org.apache.iotdb.db.exception.load.LoadReadOnlyException;
@@ -63,6 +64,7 @@ import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeIndex;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex;
 import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils;
+import 
org.apache.iotdb.db.storageengine.load.converter.LoadTsFileDataTypeConverter;
 import 
org.apache.iotdb.db.storageengine.load.memory.LoadTsFileAnalyzeSchemaMemoryBlock;
 import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileMemoryManager;
 import org.apache.iotdb.db.utils.ModificationUtils;
@@ -129,6 +131,8 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
 
   private final SchemaAutoCreatorAndVerifier schemaAutoCreatorAndVerifier;
 
+  private final LoadTsFileDataTypeConverter loadTsFileDataTypeConverter;
+
   LoadTsFileAnalyzer(
       LoadTsFileStatement loadTsFileStatement,
       MPPQueryContext context,
@@ -141,6 +145,7 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
     this.schemaFetcher = schemaFetcher;
 
     this.schemaAutoCreatorAndVerifier = new SchemaAutoCreatorAndVerifier();
+    this.loadTsFileDataTypeConverter = new LoadTsFileDataTypeConverter();
   }
 
   public Analysis analyzeFileByFile(final boolean isDeleteAfterLoad) {
@@ -179,6 +184,11 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
         }
       } catch (AuthException e) {
         return createFailAnalysisForAuthException(e);
+      } catch (VerifyMetadataTypeMismatchException e) {
+        executeDataTypeConversionOnTypeMismatch(analysis, e);
+        // just return false to STOP the analysis process,
+        // the real result on the conversion will be set in the analysis.
+        return analysis;
       } catch (Exception e) {
         final String exceptionMessage =
             String.format(
@@ -195,6 +205,11 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
       schemaAutoCreatorAndVerifier.flush();
     } catch (AuthException e) {
       return createFailAnalysisForAuthException(e);
+    } catch (VerifyMetadataTypeMismatchException e) {
+      executeDataTypeConversionOnTypeMismatch(analysis, e);
+      // just return false to STOP the analysis process,
+      // the real result on the conversion will be set in the analysis.
+      return analysis;
     } catch (Exception e) {
       final String exceptionMessage =
           String.format(
@@ -214,13 +229,33 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
     return analysis;
   }
 
+  private void executeDataTypeConversionOnTypeMismatch(
+      final Analysis analysis, final VerifyMetadataTypeMismatchException e) {
+    final TSStatus status =
+        loadTsFileStatement.isConvertOnTypeMismatch()
+            ? 
loadTsFileDataTypeConverter.convertForTreeModel(loadTsFileStatement)
+            : null;
+
+    if (status == null) {
+      analysis.setFailStatus(
+          new 
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage(e.getMessage()));
+    } else if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+        && status.getCode() != 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()
+        && status.getCode() != 
TSStatusCode.LOAD_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()) {
+      analysis.setFailStatus(status);
+    }
+
+    analysis.setFinishQueryAfterAnalyze(true);
+    analysis.setStatement(loadTsFileStatement);
+  }
+
   @Override
   public void close() {
     schemaAutoCreatorAndVerifier.close();
   }
 
   private void analyzeSingleTsFile(final File tsFile, final boolean 
isDeleteAfterLoad)
-      throws IOException, AuthException {
+      throws IOException, AuthException, VerifyMetadataTypeMismatchException {
     try (final TsFileSequenceReader reader = new 
TsFileSequenceReader(tsFile.getAbsolutePath())) {
       // can be reused when constructing tsfile resource
       final TsFileSequenceReaderTimeseriesMetadataIterator 
timeseriesMetadataIterator =
@@ -310,7 +345,7 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
     public void autoCreateAndVerify(
         TsFileSequenceReader reader,
         Map<IDeviceID, List<TimeseriesMetadata>> device2TimeseriesMetadataList)
-        throws IOException, AuthException {
+        throws IOException, AuthException, VerifyMetadataTypeMismatchException 
{
       for (final Map.Entry<IDeviceID, List<TimeseriesMetadata>> entry :
           device2TimeseriesMetadataList.entrySet()) {
         final IDeviceID device = entry.getKey();
@@ -412,13 +447,14 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
       schemaCache.clearDeviceIsAlignedCacheIfNecessary();
     }
 
-    public void flush() throws AuthException {
+    public void flush() throws AuthException, 
VerifyMetadataTypeMismatchException {
       doAutoCreateAndVerify();
 
       schemaCache.clearTimeSeries();
     }
 
-    private void doAutoCreateAndVerify() throws SemanticException, 
AuthException {
+    private void doAutoCreateAndVerify()
+        throws SemanticException, AuthException, 
VerifyMetadataTypeMismatchException {
       if (schemaCache.getDevice2TimeSeries().isEmpty()) {
         return;
       }
@@ -439,7 +475,7 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
         if (loadTsFileStatement.isVerifySchema()) {
           verifySchema(schemaTree);
         }
-      } catch (AuthException e) {
+      } catch (AuthException | VerifyMetadataTypeMismatchException e) {
         throw e;
       } catch (Exception e) {
         LOGGER.warn("Auto create or verify schema error.", e);
@@ -600,7 +636,7 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
     }
 
     private void verifySchema(ISchemaTree schemaTree)
-        throws VerifyMetadataException, IllegalPathException {
+        throws VerifyMetadataException, IllegalPathException, 
VerifyMetadataTypeMismatchException {
       for (final Map.Entry<IDeviceID, Set<MeasurementSchema>> entry :
           schemaCache.getDevice2TimeSeries().entrySet()) {
         final IDeviceID device = entry.getKey();
@@ -648,7 +684,7 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
 
           // check datatype
           if (!tsFileSchema.getType().equals(iotdbSchema.getType())) {
-            throw new VerifyMetadataException(
+            throw new VerifyMetadataTypeMismatchException(
                 String.format(
                     "Measurement %s%s%s datatype not match, TsFile: %s, IoTDB: 
%s",
                     device,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
index 1fb3e5f1b92..889ca2c202c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
@@ -43,9 +43,10 @@ public class LoadTsFileStatement extends Statement {
 
   private final File file;
   private int databaseLevel;
-  private boolean verifySchema;
-  private boolean deleteAfterLoad;
-  private boolean autoCreateDatabase;
+  private boolean verifySchema = true;
+  private boolean deleteAfterLoad = false;
+  private boolean convertOnTypeMismatch = true;
+  private boolean autoCreateDatabase = true;
 
   private Map<String, String> loadAttributes;
 
@@ -58,16 +59,17 @@ public class LoadTsFileStatement extends Statement {
     this.databaseLevel = 
IoTDBDescriptor.getInstance().getConfig().getDefaultStorageGroupLevel();
     this.verifySchema = true;
     this.deleteAfterLoad = false;
+    this.convertOnTypeMismatch = true;
     this.autoCreateDatabase = 
IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled();
-    this.tsFiles = new ArrayList<>();
     this.resources = new ArrayList<>();
     this.writePointCountList = new ArrayList<>();
     this.statementType = StatementType.MULTI_BATCH_INSERT;
 
-    processTsFile(filePath);
+    this.tsFiles = processTsFile(file);
   }
 
-  private void processTsFile(final String filePath) throws 
FileNotFoundException {
+  public static List<File> processTsFile(final File file) throws 
FileNotFoundException {
+    final List<File> tsFiles = new ArrayList<>();
     if (file.isFile()) {
       tsFiles.add(file);
     } else {
@@ -75,11 +77,12 @@ public class LoadTsFileStatement extends Statement {
         throw new FileNotFoundException(
             String.format(
                 "Can not find %s on this machine, notice that load can only 
handle files on this machine.",
-                filePath));
+                file.getPath()));
       }
-      findAllTsFile(file);
+      tsFiles.addAll(findAllTsFile(file));
     }
     sortTsFiles(tsFiles);
+    return tsFiles;
   }
 
   protected LoadTsFileStatement() {
@@ -87,6 +90,7 @@ public class LoadTsFileStatement extends Statement {
     this.databaseLevel = 
IoTDBDescriptor.getInstance().getConfig().getDefaultStorageGroupLevel();
     this.verifySchema = true;
     this.deleteAfterLoad = false;
+    this.convertOnTypeMismatch = true;
     this.autoCreateDatabase = 
IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled();
     this.tsFiles = new ArrayList<>();
     this.resources = new ArrayList<>();
@@ -94,21 +98,24 @@ public class LoadTsFileStatement extends Statement {
     this.statementType = StatementType.MULTI_BATCH_INSERT;
   }
 
-  private void findAllTsFile(File file) {
+  private static List<File> findAllTsFile(File file) {
     final File[] files = file.listFiles();
     if (files == null) {
-      return;
+      return Collections.emptyList();
     }
+
+    final List<File> tsFiles = new ArrayList<>();
     for (File nowFile : files) {
       if (nowFile.getName().endsWith(TsFileConstant.TSFILE_SUFFIX)) {
         tsFiles.add(nowFile);
       } else if (nowFile.isDirectory()) {
-        findAllTsFile(nowFile);
+        tsFiles.addAll(findAllTsFile(nowFile));
       }
     }
+    return tsFiles;
   }
 
-  private void sortTsFiles(List<File> files) {
+  private static void sortTsFiles(List<File> files) {
     files.sort(
         (o1, o2) -> {
           String file1Name = o1.getName();
@@ -121,36 +128,44 @@ public class LoadTsFileStatement extends Statement {
         });
   }
 
-  public void setDeleteAfterLoad(boolean deleteAfterLoad) {
-    this.deleteAfterLoad = deleteAfterLoad;
-  }
-
   public void setDatabaseLevel(int databaseLevel) {
     this.databaseLevel = databaseLevel;
   }
 
-  public void setVerifySchema(boolean verifySchema) {
-    this.verifySchema = verifySchema;
+  public int getDatabaseLevel() {
+    return databaseLevel;
   }
 
-  public void setAutoCreateDatabase(boolean autoCreateDatabase) {
-    this.autoCreateDatabase = autoCreateDatabase;
+  public void setVerifySchema(boolean verifySchema) {
+    this.verifySchema = verifySchema;
   }
 
   public boolean isVerifySchema() {
     return verifySchema;
   }
 
+  public void setDeleteAfterLoad(boolean deleteAfterLoad) {
+    this.deleteAfterLoad = deleteAfterLoad;
+  }
+
   public boolean isDeleteAfterLoad() {
     return deleteAfterLoad;
   }
 
-  public boolean isAutoCreateDatabase() {
-    return autoCreateDatabase;
+  public void setConvertOnTypeMismatch(boolean convertOnTypeMismatch) {
+    this.convertOnTypeMismatch = convertOnTypeMismatch;
   }
 
-  public int getDatabaseLevel() {
-    return databaseLevel;
+  public boolean isConvertOnTypeMismatch() {
+    return convertOnTypeMismatch;
+  }
+
+  public void setAutoCreateDatabase(boolean autoCreateDatabase) {
+    this.autoCreateDatabase = autoCreateDatabase;
+  }
+
+  public boolean isAutoCreateDatabase() {
+    return autoCreateDatabase;
   }
 
   public List<File> getTsFiles() {
@@ -181,6 +196,8 @@ public class LoadTsFileStatement extends Statement {
   private void initAttributes() {
     this.databaseLevel = 
LoadTsFileConfigurator.parseOrGetDefaultDatabaseLevel(loadAttributes);
     this.deleteAfterLoad = 
LoadTsFileConfigurator.parseOrGetDefaultOnSuccess(loadAttributes);
+    this.convertOnTypeMismatch =
+        
LoadTsFileConfigurator.parseOrGetDefaultConvertOnTypeMismatch(loadAttributes);
   }
 
   @Override
@@ -204,13 +221,15 @@ public class LoadTsFileStatement extends Statement {
     return "LoadTsFileStatement{"
         + "file="
         + file
-        + ", deleteAfterLoad="
+        + ", delete-after-load="
         + deleteAfterLoad
-        + ", databaseLevel="
+        + ", database-level="
         + databaseLevel
-        + ", verifySchema="
+        + ", verify-schema="
         + verifySchema
-        + ", tsFiles Size="
+        + ", convert-on-type-mismatch="
+        + convertOnTypeMismatch
+        + ", tsFiles size="
         + tsFiles.size()
         + '}';
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
index 605218ae0a6..188ae30436d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
@@ -192,6 +192,7 @@ public class ActiveLoadTsFileLoader {
   private TSStatus loadTsFile(final Pair<String, Boolean> filePair) throws 
FileNotFoundException {
     final LoadTsFileStatement statement = new 
LoadTsFileStatement(filePair.getLeft());
     statement.setDeleteAfterLoad(true);
+    statement.setConvertOnTypeMismatch(true);
     statement.setVerifySchema(true);
     statement.setAutoCreateDatabase(false);
     return executeStatement(filePair.getRight() ? new 
PipeEnrichedStatement(statement) : statement);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
index d09540133ab..d5672028369 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
@@ -40,12 +40,15 @@ public class LoadTsFileConfigurator {
       case ON_SUCCESS_KEY:
         validateOnSuccessParam(value);
         break;
+      case CONVERT_ON_TYPE_MISMATCH_KEY:
+        validateConvertOnTypeMismatchParam(value);
+        break;
       default:
         throw new SemanticException("Invalid parameter '" + key + "' for LOAD 
TSFILE command.");
     }
   }
 
-  private static final String DATABASE_LEVEL_KEY = "database-level";
+  public static final String DATABASE_LEVEL_KEY = "database-level";
   private static final int DATABASE_LEVEL_DEFAULT_VALUE =
       IoTDBDescriptor.getInstance().getConfig().getDefaultStorageGroupLevel();
   private static final int DATABASE_LEVEL_MIN_VALUE = 1;
@@ -73,9 +76,9 @@ public class LoadTsFileConfigurator {
             DATABASE_LEVEL_KEY, String.valueOf(DATABASE_LEVEL_DEFAULT_VALUE)));
   }
 
-  private static final String ON_SUCCESS_KEY = "on-success";
-  private static final String ON_SUCCESS_DELETE_VALUE = "delete";
-  private static final String ON_SUCCESS_NONE_VALUE = "none";
+  public static final String ON_SUCCESS_KEY = "on-success";
+  public static final String ON_SUCCESS_DELETE_VALUE = "delete";
+  public static final String ON_SUCCESS_NONE_VALUE = "none";
   private static final Set<String> ON_SUCCESS_VALUE_SET =
       Collections.unmodifiableSet(
           new HashSet<>(Arrays.asList(ON_SUCCESS_DELETE_VALUE, 
ON_SUCCESS_NONE_VALUE)));
@@ -91,7 +94,27 @@ public class LoadTsFileConfigurator {
 
   public static boolean parseOrGetDefaultOnSuccess(final Map<String, String> 
loadAttributes) {
     final String value = loadAttributes.get(ON_SUCCESS_KEY);
-    return StringUtils.isEmpty(value) || ON_SUCCESS_DELETE_VALUE.equals(value);
+    return !StringUtils.isEmpty(value) && 
ON_SUCCESS_DELETE_VALUE.equalsIgnoreCase(value);
+  }
+
+  public static final String CONVERT_ON_TYPE_MISMATCH_KEY = 
"convert-on-type-mismatch";
+  private static final boolean CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE = true;
+
+  public static void validateConvertOnTypeMismatchParam(final String 
convertOnTypeMismatch) {
+    if (!"true".equalsIgnoreCase(convertOnTypeMismatch)
+        && !"false".equalsIgnoreCase(convertOnTypeMismatch)) {
+      throw new SemanticException(
+          String.format(
+              "Given %s value '%s' is not supported, please input a valid 
boolean value.",
+              CONVERT_ON_TYPE_MISMATCH_KEY, convertOnTypeMismatch));
+    }
+  }
+
+  public static boolean parseOrGetDefaultConvertOnTypeMismatch(
+      final Map<String, String> loadAttributes) {
+    return Boolean.parseBoolean(
+        loadAttributes.getOrDefault(
+            CONVERT_ON_TYPE_MISMATCH_KEY, 
String.valueOf(CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE)));
   }
 
   private LoadTsFileConfigurator() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatement.java
new file mode 100644
index 00000000000..1a814442402
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatement.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.converter;
+
+import org.apache.iotdb.db.pipe.receiver.transform.converter.ArrayConverter;
+import 
org.apache.iotdb.db.pipe.receiver.transform.statement.PipeConvertedInsertTabletStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LoadConvertedInsertTabletStatement extends 
PipeConvertedInsertTabletStatement {
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(LoadConvertedInsertTabletStatement.class);
+
+  public LoadConvertedInsertTabletStatement(final InsertTabletStatement 
insertTabletStatement) {
+    super(insertTabletStatement);
+  }
+
+  @Override
+  protected boolean checkAndCastDataType(int columnIndex, TSDataType dataType) 
{
+    LOGGER.info(
+        "Load: Inserting tablet to {}.{}. Casting type from {} to {}.",
+        devicePath,
+        measurements[columnIndex],
+        dataTypes[columnIndex],
+        dataType);
+    columns[columnIndex] =
+        ArrayConverter.convert(dataTypes[columnIndex], dataType, 
columns[columnIndex]);
+    dataTypes[columnIndex] = dataType;
+    return true;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatementExceptionVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatementExceptionVisitor.java
new file mode 100644
index 00000000000..f292ee55930
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatementExceptionVisitor.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.converter;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException;
+import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementNode;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+public class LoadConvertedInsertTabletStatementExceptionVisitor
+    extends StatementVisitor<TSStatus, Exception> {
+
+  @Override
+  public TSStatus visitNode(final StatementNode node, final Exception context) 
{
+    return new TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode())
+        .setMessage(context.getMessage());
+  }
+
+  @Override
+  public TSStatus visitLoadFile(
+      final LoadTsFileStatement loadTsFileStatement, final Exception context) {
+    if (context instanceof LoadRuntimeOutOfMemoryException) {
+      return new 
TSStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
+          .setMessage(context.getMessage());
+    } else if (context instanceof SemanticException) {
+      return new 
TSStatus(TSStatusCode.LOAD_USER_CONFLICT_EXCEPTION.getStatusCode())
+          .setMessage(context.getMessage());
+    }
+    return visitStatement(loadTsFileStatement, context);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatementTSStatusVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatementTSStatusVisitor.java
new file mode 100644
index 00000000000..6e9601f8d95
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatementTSStatusVisitor.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.converter;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementNode;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+public class LoadConvertedInsertTabletStatementTSStatusVisitor
+    extends StatementVisitor<TSStatus, TSStatus> {
+
+  private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+  @Override
+  public TSStatus visitNode(final StatementNode node, final TSStatus context) {
+    return context;
+  }
+
+  @Override
+  public TSStatus visitInsertTablet(
+      final InsertTabletStatement insertTabletStatement, final TSStatus 
context) {
+    return visitInsertBase(insertTabletStatement, context);
+  }
+
+  private TSStatus visitInsertBase(
+      final InsertBaseStatement insertBaseStatement, final TSStatus context) {
+    if (context.getCode() == TSStatusCode.SYSTEM_READ_ONLY.getStatusCode()
+        || context.getCode() == 
TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode()) {
+      return new 
TSStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
+          .setMessage(context.getMessage());
+    } else if (context.getCode() == TSStatusCode.OUT_OF_TTL.getStatusCode()) {
+      return new 
TSStatus(TSStatusCode.LOAD_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
+          .setMessage(context.getMessage());
+    } else if (context.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()
+        && 
(context.getMessage().contains(DataTypeMismatchException.REGISTERED_TYPE_STRING)
+            && config.isEnablePartialInsert())) {
+      return new 
TSStatus(TSStatusCode.LOAD_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
+          .setMessage(context.getMessage());
+    }
+    return visitStatement(insertBaseStatement, context);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
new file mode 100644
index 00000000000..e6c35d384df
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.converter;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
+import 
org.apache.iotdb.db.pipe.event.common.tsfile.container.scan.TsFileInsertionScanDataContainer;
+import org.apache.iotdb.db.queryengine.plan.statement.Statement;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementNode;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.write.record.Tablet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Optional;
+
+public class LoadTreeStatementDataTypeConvertExecutionVisitor
+    extends StatementVisitor<Optional<TSStatus>, Void> {
+
+  private final StatementExecutor statementExecutor;
+
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(LoadTreeStatementDataTypeConvertExecutionVisitor.class);
+
+  @FunctionalInterface
+  public interface StatementExecutor {
+    TSStatus execute(final Statement statement);
+  }
+
+  public LoadTreeStatementDataTypeConvertExecutionVisitor(
+      final StatementExecutor statementExecutor) {
+    this.statementExecutor = statementExecutor;
+  }
+
+  @Override
+  public Optional<TSStatus> visitNode(final StatementNode statementNode, final 
Void v) {
+    return Optional.empty();
+  }
+
+  @Override
+  public Optional<TSStatus> visitLoadFile(
+      final LoadTsFileStatement loadTsFileStatement, final Void v) {
+
+    LOGGER.info("Start data type conversion for LoadTsFileStatement: {}", 
loadTsFileStatement);
+
+    for (final File file : loadTsFileStatement.getTsFiles()) {
+      try (final TsFileInsertionScanDataContainer container =
+          new TsFileInsertionScanDataContainer(
+              file, new IoTDBPipePattern(null), Long.MIN_VALUE, 
Long.MAX_VALUE, null, null)) {
+        for (final Pair<Tablet, Boolean> tabletWithIsAligned : 
container.toTabletWithIsAligneds()) {
+          final LoadConvertedInsertTabletStatement statement =
+              new LoadConvertedInsertTabletStatement(
+                  PipeTransferTabletRawReq.toTPipeTransferRawReq(
+                          tabletWithIsAligned.getLeft(), 
tabletWithIsAligned.getRight())
+                      .constructStatement());
+
+          TSStatus result;
+          try {
+            result =
+                statement.accept(
+                    LoadTsFileDataTypeConverter.STATEMENT_STATUS_VISITOR,
+                    statementExecutor.execute(statement));
+
+            // Retry max 5 times if the write process is rejected
+            for (int i = 0;
+                i < 5
+                    && result.getCode()
+                        == 
TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode();
+                i++) {
+              Thread.sleep(100L * (i + 1));
+              result =
+                  statement.accept(
+                      LoadTsFileDataTypeConverter.STATEMENT_STATUS_VISITOR,
+                      statementExecutor.execute(statement));
+            }
+          } catch (final Exception e) {
+            if (e instanceof InterruptedException) {
+              Thread.currentThread().interrupt();
+            }
+            result = 
statement.accept(LoadTsFileDataTypeConverter.STATEMENT_EXCEPTION_VISITOR, e);
+          }
+
+          if (!(result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+              || result.getCode() == 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()
+              || result.getCode()
+                  == 
TSStatusCode.LOAD_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())) {
+            return Optional.empty();
+          }
+        }
+      } catch (final Exception e) {
+        LOGGER.warn(
+            "Failed to convert data type for LoadTsFileStatement: {}.", 
loadTsFileStatement, e);
+        return Optional.empty();
+      }
+    }
+
+    if (loadTsFileStatement.isDeleteAfterLoad()) {
+      loadTsFileStatement.getTsFiles().forEach(FileUtils::deleteQuietly);
+    }
+
+    LOGGER.info(
+        "Data type conversion for LoadTsFileStatement {} is successful.", 
loadTsFileStatement);
+
+    return Optional.of(new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
new file mode 100644
index 00000000000..b3824140483
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.converter;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.protocol.session.SessionManager;
+import org.apache.iotdb.db.queryengine.plan.Coordinator;
+import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
+import 
org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LoadTsFileDataTypeConverter {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(LoadTsFileDataTypeConverter.class);
+
+  private static final SessionManager SESSION_MANAGER = 
SessionManager.getInstance();
+
+  private final LoadTreeStatementDataTypeConvertExecutionVisitor
+      treeStatementDataTypeConvertExecutionVisitor =
+          new LoadTreeStatementDataTypeConvertExecutionVisitor(
+              statement ->
+                  Coordinator.getInstance()
+                      .executeForTreeModel(
+                          statement,
+                          SESSION_MANAGER.requestQueryId(),
+                          
SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
+                          "",
+                          ClusterPartitionFetcher.getInstance(),
+                          ClusterSchemaFetcher.getInstance(),
+                          
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold(),
+                          false)
+                      .status);
+
+  public static final LoadConvertedInsertTabletStatementTSStatusVisitor 
STATEMENT_STATUS_VISITOR =
+      new LoadConvertedInsertTabletStatementTSStatusVisitor();
+  public static final LoadConvertedInsertTabletStatementExceptionVisitor
+      STATEMENT_EXCEPTION_VISITOR = new 
LoadConvertedInsertTabletStatementExceptionVisitor();
+
+  public TSStatus convertForTreeModel(LoadTsFileStatement 
loadTsFileTreeStatement) {
+    try {
+      return loadTsFileTreeStatement
+          .accept(treeStatementDataTypeConvertExecutionVisitor, null)
+          .orElse(null);
+    } catch (Exception e) {
+      LOGGER.warn(
+          "Failed to convert data types for tree model statement {}.", 
loadTsFileTreeStatement, e);
+      return new 
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage(e.getMessage());
+    }
+  }
+}


Reply via email to