This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 8bef94666f7 Load: Support auto data type conversion when data type
mismatch detected during analysis stage (#14529) (#14619)
8bef94666f7 is described below
commit 8bef94666f704baa74df862f123028bc1abc5be9
Author: Itami Sho <[email protected]>
AuthorDate: Fri Jan 3 18:04:04 2025 +0800
Load: Support auto data type conversion when data type mismatch detected
during analysis stage (#14529) (#14619)
---
.../org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java | 71 +++++++++++
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 3 +
.../db/exception/VerifyMetadataException.java | 12 +-
...va => VerifyMetadataTypeMismatchException.java} | 13 +--
.../protocol/legacy/loader/TsFileLoader.java | 1 +
.../protocol/thrift/IoTDBDataNodeReceiver.java | 1 +
.../plan/analyze/LoadTsFileAnalyzer.java | 50 ++++++--
.../plan/statement/crud/LoadTsFileStatement.java | 75 +++++++-----
.../load/active/ActiveLoadTsFileLoader.java | 1 +
.../load/config/LoadTsFileConfigurator.java | 33 +++++-
.../LoadConvertedInsertTabletStatement.java | 52 +++++++++
...ertedInsertTabletStatementExceptionVisitor.java | 51 ++++++++
...vertedInsertTabletStatementTSStatusVisitor.java | 65 +++++++++++
...eeStatementDataTypeConvertExecutionVisitor.java | 130 +++++++++++++++++++++
.../converter/LoadTsFileDataTypeConverter.java | 72 ++++++++++++
15 files changed, 571 insertions(+), 59 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java
index 11892f32921..5b4ae76332f 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.jdbc.IoTDBSQLException;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.read.common.Path;
+import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.write.schema.MeasurementSchema;
import org.junit.After;
import org.junit.Assert;
@@ -47,6 +48,7 @@ import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -55,6 +57,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import static org.apache.iotdb.db.it.utils.TestUtils.assertNonQueryTestFail;
import static org.apache.iotdb.db.it.utils.TestUtils.createUser;
@@ -897,6 +900,74 @@ public class IoTDBLoadTsFileIT {
}
}
+ @Test
+ public void testLoadWithConvertOnTypeMismatch() throws Exception {
+
+ List<Pair<MeasurementSchema, MeasurementSchema>> measurementSchemas =
+ generateMeasurementSchemasForDataTypeConvertion();
+
+ final File file = new File(tmpDir, "1-0-0-0.tsfile");
+
+ long writtenPoint = 0;
+ List<MeasurementSchema> schemaList1 =
+ measurementSchemas.stream().map(pair ->
pair.left).collect(Collectors.toList());
+ List<MeasurementSchema> schemaList2 =
+ measurementSchemas.stream().map(pair ->
pair.right).collect(Collectors.toList());
+
+ try (final TsFileGenerator generator = new TsFileGenerator(file)) {
+ generator.registerTimeseries(SchemaConfig.DEVICE_0, schemaList2);
+
+ generator.generateData(SchemaConfig.DEVICE_0, 100, PARTITION_INTERVAL /
10_000, false);
+
+ writtenPoint = generator.getTotalNumber();
+ }
+
+ try (final Connection connection = EnvFactory.getEnv().getConnection();
+ final Statement statement = connection.createStatement()) {
+
+ for (MeasurementSchema schema : schemaList1) {
+ statement.execute(convert2SQL(SchemaConfig.DEVICE_0, schema));
+ }
+
+ statement.execute(String.format("load \"%s\" ", file.getAbsolutePath()));
+
+ try (final ResultSet resultSet =
+ statement.executeQuery("select count(*) from root.** group by
level=1,2")) {
+ if (resultSet.next()) {
+ final long sgCount = resultSet.getLong("count(root.sg.test_0.*.*)");
+ Assert.assertEquals(writtenPoint, sgCount);
+ } else {
+ Assert.fail("This ResultSet is empty.");
+ }
+ }
+ }
+ }
+
+ private List<Pair<MeasurementSchema, MeasurementSchema>>
+ generateMeasurementSchemasForDataTypeConvertion() {
+ TSDataType[] dataTypes = {
+ TSDataType.STRING,
+ TSDataType.TEXT,
+ TSDataType.BLOB,
+ TSDataType.TIMESTAMP,
+ TSDataType.BOOLEAN,
+ TSDataType.DATE,
+ TSDataType.DOUBLE,
+ TSDataType.FLOAT,
+ TSDataType.INT32,
+ TSDataType.INT64
+ };
+ List<Pair<MeasurementSchema, MeasurementSchema>> pairs = new ArrayList<>();
+
+ for (TSDataType type : dataTypes) {
+ for (TSDataType dataType : dataTypes) {
+ String id = String.format("%s2%s", type.name(), dataType.name());
+ pairs.add(new Pair<>(new MeasurementSchema(id, type), new
MeasurementSchema(id, dataType)));
+ }
+ }
+ return pairs;
+ }
+
private static class SchemaConfig {
private static final String STORAGE_GROUP_0 = "root.sg.test_0";
private static final String STORAGE_GROUP_1 = "root.sg.test_1";
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index e69b980d388..512f9dde4e4 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -173,6 +173,9 @@ public enum TSStatusCode {
PIPE_ERROR(1107),
PIPESERVER_ERROR(1108),
VERIFY_METADATA_ERROR(1109),
+ LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION(1110),
+ LOAD_IDEMPOTENT_CONFLICT_EXCEPTION(1111),
+ LOAD_USER_CONFLICT_EXCEPTION(1112),
// UDF
UDF_LOAD_CLASS_ERROR(1200),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/VerifyMetadataException.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/VerifyMetadataException.java
index 1603c9872f4..78c6d8d4d5c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/VerifyMetadataException.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/VerifyMetadataException.java
@@ -23,16 +23,12 @@ import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.rpc.TSStatusCode;
public class VerifyMetadataException extends IoTDBException {
- public VerifyMetadataException(
- String path, String compareInfo, String tsFileInfo, String tsFilePath,
String IoTDBInfo) {
- super(
- String.format(
- "%s %s mismatch, %s in tsfile %s, but %s in IoTDB.",
- path, compareInfo, tsFileInfo, tsFilePath, IoTDBInfo),
- TSStatusCode.VERIFY_METADATA_ERROR.getStatusCode());
- }
public VerifyMetadataException(String message) {
super(message, TSStatusCode.VERIFY_METADATA_ERROR.getStatusCode());
}
+
+ public VerifyMetadataException(String message, int errorCode) {
+ super(message, errorCode);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/VerifyMetadataException.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/VerifyMetadataTypeMismatchException.java
similarity index 64%
copy from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/VerifyMetadataException.java
copy to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/VerifyMetadataTypeMismatchException.java
index 1603c9872f4..209fb83bcdb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/VerifyMetadataException.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/VerifyMetadataTypeMismatchException.java
@@ -19,20 +19,11 @@
package org.apache.iotdb.db.exception;
-import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.rpc.TSStatusCode;
-public class VerifyMetadataException extends IoTDBException {
- public VerifyMetadataException(
- String path, String compareInfo, String tsFileInfo, String tsFilePath,
String IoTDBInfo) {
- super(
- String.format(
- "%s %s mismatch, %s in tsfile %s, but %s in IoTDB.",
- path, compareInfo, tsFileInfo, tsFilePath, IoTDBInfo),
- TSStatusCode.VERIFY_METADATA_ERROR.getStatusCode());
- }
+public class VerifyMetadataTypeMismatchException extends
VerifyMetadataException {
- public VerifyMetadataException(String message) {
+ public VerifyMetadataTypeMismatchException(String message) {
super(message, TSStatusCode.VERIFY_METADATA_ERROR.getStatusCode());
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/loader/TsFileLoader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/loader/TsFileLoader.java
index c78ca0544b4..e181ec1d592 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/loader/TsFileLoader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/loader/TsFileLoader.java
@@ -56,6 +56,7 @@ public class TsFileLoader implements ILoader {
try {
LoadTsFileStatement statement = new
LoadTsFileStatement(tsFile.getAbsolutePath());
statement.setDeleteAfterLoad(true);
+ statement.setConvertOnTypeMismatch(true);
statement.setDatabaseLevel(parseSgLevel());
statement.setVerifySchema(true);
statement.setAutoCreateDatabase(false);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 79a98ab399c..c82c969b8e9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -479,6 +479,7 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
final LoadTsFileStatement statement = new
LoadTsFileStatement(fileAbsolutePath);
statement.setDeleteAfterLoad(true);
+ statement.setConvertOnTypeMismatch(true);
statement.setVerifySchema(true);
statement.setAutoCreateDatabase(false);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
index 4578dca8a76..c2f31b9e35a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.VerifyMetadataException;
+import org.apache.iotdb.db.exception.VerifyMetadataTypeMismatchException;
import org.apache.iotdb.db.exception.load.LoadEmptyFileException;
import org.apache.iotdb.db.exception.load.LoadFileException;
import org.apache.iotdb.db.exception.load.LoadReadOnlyException;
@@ -63,6 +64,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeIndex;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex;
import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils;
+import
org.apache.iotdb.db.storageengine.load.converter.LoadTsFileDataTypeConverter;
import
org.apache.iotdb.db.storageengine.load.memory.LoadTsFileAnalyzeSchemaMemoryBlock;
import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileMemoryManager;
import org.apache.iotdb.db.utils.ModificationUtils;
@@ -129,6 +131,8 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
private final SchemaAutoCreatorAndVerifier schemaAutoCreatorAndVerifier;
+ private final LoadTsFileDataTypeConverter loadTsFileDataTypeConverter;
+
LoadTsFileAnalyzer(
LoadTsFileStatement loadTsFileStatement,
MPPQueryContext context,
@@ -141,6 +145,7 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
this.schemaFetcher = schemaFetcher;
this.schemaAutoCreatorAndVerifier = new SchemaAutoCreatorAndVerifier();
+ this.loadTsFileDataTypeConverter = new LoadTsFileDataTypeConverter();
}
public Analysis analyzeFileByFile(final boolean isDeleteAfterLoad) {
@@ -179,6 +184,11 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
}
} catch (AuthException e) {
return createFailAnalysisForAuthException(e);
+ } catch (VerifyMetadataTypeMismatchException e) {
+ executeDataTypeConversionOnTypeMismatch(analysis, e);
+ // just return false to STOP the analysis process,
+ // the real result on the conversion will be set in the analysis.
+ return analysis;
} catch (Exception e) {
final String exceptionMessage =
String.format(
@@ -195,6 +205,11 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
schemaAutoCreatorAndVerifier.flush();
} catch (AuthException e) {
return createFailAnalysisForAuthException(e);
+ } catch (VerifyMetadataTypeMismatchException e) {
+ executeDataTypeConversionOnTypeMismatch(analysis, e);
+ // just return false to STOP the analysis process,
+ // the real result on the conversion will be set in the analysis.
+ return analysis;
} catch (Exception e) {
final String exceptionMessage =
String.format(
@@ -214,13 +229,33 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
return analysis;
}
+ private void executeDataTypeConversionOnTypeMismatch(
+ final Analysis analysis, final VerifyMetadataTypeMismatchException e) {
+ final TSStatus status =
+ loadTsFileStatement.isConvertOnTypeMismatch()
+ ?
loadTsFileDataTypeConverter.convertForTreeModel(loadTsFileStatement)
+ : null;
+
+ if (status == null) {
+ analysis.setFailStatus(
+ new
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage(e.getMessage()));
+ } else if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ && status.getCode() !=
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()
+ && status.getCode() !=
TSStatusCode.LOAD_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()) {
+ analysis.setFailStatus(status);
+ }
+
+ analysis.setFinishQueryAfterAnalyze(true);
+ analysis.setStatement(loadTsFileStatement);
+ }
+
@Override
public void close() {
schemaAutoCreatorAndVerifier.close();
}
private void analyzeSingleTsFile(final File tsFile, final boolean
isDeleteAfterLoad)
- throws IOException, AuthException {
+ throws IOException, AuthException, VerifyMetadataTypeMismatchException {
try (final TsFileSequenceReader reader = new
TsFileSequenceReader(tsFile.getAbsolutePath())) {
// can be reused when constructing tsfile resource
final TsFileSequenceReaderTimeseriesMetadataIterator
timeseriesMetadataIterator =
@@ -310,7 +345,7 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
public void autoCreateAndVerify(
TsFileSequenceReader reader,
Map<IDeviceID, List<TimeseriesMetadata>> device2TimeseriesMetadataList)
- throws IOException, AuthException {
+ throws IOException, AuthException, VerifyMetadataTypeMismatchException
{
for (final Map.Entry<IDeviceID, List<TimeseriesMetadata>> entry :
device2TimeseriesMetadataList.entrySet()) {
final IDeviceID device = entry.getKey();
@@ -412,13 +447,14 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
schemaCache.clearDeviceIsAlignedCacheIfNecessary();
}
- public void flush() throws AuthException {
+ public void flush() throws AuthException,
VerifyMetadataTypeMismatchException {
doAutoCreateAndVerify();
schemaCache.clearTimeSeries();
}
- private void doAutoCreateAndVerify() throws SemanticException,
AuthException {
+ private void doAutoCreateAndVerify()
+ throws SemanticException, AuthException,
VerifyMetadataTypeMismatchException {
if (schemaCache.getDevice2TimeSeries().isEmpty()) {
return;
}
@@ -439,7 +475,7 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
if (loadTsFileStatement.isVerifySchema()) {
verifySchema(schemaTree);
}
- } catch (AuthException e) {
+ } catch (AuthException | VerifyMetadataTypeMismatchException e) {
throw e;
} catch (Exception e) {
LOGGER.warn("Auto create or verify schema error.", e);
@@ -600,7 +636,7 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
}
private void verifySchema(ISchemaTree schemaTree)
- throws VerifyMetadataException, IllegalPathException {
+ throws VerifyMetadataException, IllegalPathException,
VerifyMetadataTypeMismatchException {
for (final Map.Entry<IDeviceID, Set<MeasurementSchema>> entry :
schemaCache.getDevice2TimeSeries().entrySet()) {
final IDeviceID device = entry.getKey();
@@ -648,7 +684,7 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
// check datatype
if (!tsFileSchema.getType().equals(iotdbSchema.getType())) {
- throw new VerifyMetadataException(
+ throw new VerifyMetadataTypeMismatchException(
String.format(
"Measurement %s%s%s datatype not match, TsFile: %s, IoTDB:
%s",
device,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
index 1fb3e5f1b92..889ca2c202c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
@@ -43,9 +43,10 @@ public class LoadTsFileStatement extends Statement {
private final File file;
private int databaseLevel;
- private boolean verifySchema;
- private boolean deleteAfterLoad;
- private boolean autoCreateDatabase;
+ private boolean verifySchema = true;
+ private boolean deleteAfterLoad = false;
+ private boolean convertOnTypeMismatch = true;
+ private boolean autoCreateDatabase = true;
private Map<String, String> loadAttributes;
@@ -58,16 +59,17 @@ public class LoadTsFileStatement extends Statement {
this.databaseLevel =
IoTDBDescriptor.getInstance().getConfig().getDefaultStorageGroupLevel();
this.verifySchema = true;
this.deleteAfterLoad = false;
+ this.convertOnTypeMismatch = true;
this.autoCreateDatabase =
IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled();
- this.tsFiles = new ArrayList<>();
this.resources = new ArrayList<>();
this.writePointCountList = new ArrayList<>();
this.statementType = StatementType.MULTI_BATCH_INSERT;
- processTsFile(filePath);
+ this.tsFiles = processTsFile(file);
}
- private void processTsFile(final String filePath) throws
FileNotFoundException {
+ public static List<File> processTsFile(final File file) throws
FileNotFoundException {
+ final List<File> tsFiles = new ArrayList<>();
if (file.isFile()) {
tsFiles.add(file);
} else {
@@ -75,11 +77,12 @@ public class LoadTsFileStatement extends Statement {
throw new FileNotFoundException(
String.format(
"Can not find %s on this machine, notice that load can only
handle files on this machine.",
- filePath));
+ file.getPath()));
}
- findAllTsFile(file);
+ tsFiles.addAll(findAllTsFile(file));
}
sortTsFiles(tsFiles);
+ return tsFiles;
}
protected LoadTsFileStatement() {
@@ -87,6 +90,7 @@ public class LoadTsFileStatement extends Statement {
this.databaseLevel =
IoTDBDescriptor.getInstance().getConfig().getDefaultStorageGroupLevel();
this.verifySchema = true;
this.deleteAfterLoad = false;
+ this.convertOnTypeMismatch = true;
this.autoCreateDatabase =
IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled();
this.tsFiles = new ArrayList<>();
this.resources = new ArrayList<>();
@@ -94,21 +98,24 @@ public class LoadTsFileStatement extends Statement {
this.statementType = StatementType.MULTI_BATCH_INSERT;
}
- private void findAllTsFile(File file) {
+ private static List<File> findAllTsFile(File file) {
final File[] files = file.listFiles();
if (files == null) {
- return;
+ return Collections.emptyList();
}
+
+ final List<File> tsFiles = new ArrayList<>();
for (File nowFile : files) {
if (nowFile.getName().endsWith(TsFileConstant.TSFILE_SUFFIX)) {
tsFiles.add(nowFile);
} else if (nowFile.isDirectory()) {
- findAllTsFile(nowFile);
+ tsFiles.addAll(findAllTsFile(nowFile));
}
}
+ return tsFiles;
}
- private void sortTsFiles(List<File> files) {
+ private static void sortTsFiles(List<File> files) {
files.sort(
(o1, o2) -> {
String file1Name = o1.getName();
@@ -121,36 +128,44 @@ public class LoadTsFileStatement extends Statement {
});
}
- public void setDeleteAfterLoad(boolean deleteAfterLoad) {
- this.deleteAfterLoad = deleteAfterLoad;
- }
-
public void setDatabaseLevel(int databaseLevel) {
this.databaseLevel = databaseLevel;
}
- public void setVerifySchema(boolean verifySchema) {
- this.verifySchema = verifySchema;
+ public int getDatabaseLevel() {
+ return databaseLevel;
}
- public void setAutoCreateDatabase(boolean autoCreateDatabase) {
- this.autoCreateDatabase = autoCreateDatabase;
+ public void setVerifySchema(boolean verifySchema) {
+ this.verifySchema = verifySchema;
}
public boolean isVerifySchema() {
return verifySchema;
}
+ public void setDeleteAfterLoad(boolean deleteAfterLoad) {
+ this.deleteAfterLoad = deleteAfterLoad;
+ }
+
public boolean isDeleteAfterLoad() {
return deleteAfterLoad;
}
- public boolean isAutoCreateDatabase() {
- return autoCreateDatabase;
+ public void setConvertOnTypeMismatch(boolean convertOnTypeMismatch) {
+ this.convertOnTypeMismatch = convertOnTypeMismatch;
}
- public int getDatabaseLevel() {
- return databaseLevel;
+ public boolean isConvertOnTypeMismatch() {
+ return convertOnTypeMismatch;
+ }
+
+ public void setAutoCreateDatabase(boolean autoCreateDatabase) {
+ this.autoCreateDatabase = autoCreateDatabase;
+ }
+
+ public boolean isAutoCreateDatabase() {
+ return autoCreateDatabase;
}
public List<File> getTsFiles() {
@@ -181,6 +196,8 @@ public class LoadTsFileStatement extends Statement {
private void initAttributes() {
this.databaseLevel =
LoadTsFileConfigurator.parseOrGetDefaultDatabaseLevel(loadAttributes);
this.deleteAfterLoad =
LoadTsFileConfigurator.parseOrGetDefaultOnSuccess(loadAttributes);
+ this.convertOnTypeMismatch =
+
LoadTsFileConfigurator.parseOrGetDefaultConvertOnTypeMismatch(loadAttributes);
}
@Override
@@ -204,13 +221,15 @@ public class LoadTsFileStatement extends Statement {
return "LoadTsFileStatement{"
+ "file="
+ file
- + ", deleteAfterLoad="
+ + ", delete-after-load="
+ deleteAfterLoad
- + ", databaseLevel="
+ + ", database-level="
+ databaseLevel
- + ", verifySchema="
+ + ", verify-schema="
+ verifySchema
- + ", tsFiles Size="
+ + ", convert-on-type-mismatch="
+ + convertOnTypeMismatch
+ + ", tsFiles size="
+ tsFiles.size()
+ '}';
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
index 605218ae0a6..188ae30436d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
@@ -192,6 +192,7 @@ public class ActiveLoadTsFileLoader {
private TSStatus loadTsFile(final Pair<String, Boolean> filePair) throws
FileNotFoundException {
final LoadTsFileStatement statement = new
LoadTsFileStatement(filePair.getLeft());
statement.setDeleteAfterLoad(true);
+ statement.setConvertOnTypeMismatch(true);
statement.setVerifySchema(true);
statement.setAutoCreateDatabase(false);
return executeStatement(filePair.getRight() ? new
PipeEnrichedStatement(statement) : statement);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
index d09540133ab..d5672028369 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
@@ -40,12 +40,15 @@ public class LoadTsFileConfigurator {
case ON_SUCCESS_KEY:
validateOnSuccessParam(value);
break;
+ case CONVERT_ON_TYPE_MISMATCH_KEY:
+ validateConvertOnTypeMismatchParam(value);
+ break;
default:
throw new SemanticException("Invalid parameter '" + key + "' for LOAD
TSFILE command.");
}
}
- private static final String DATABASE_LEVEL_KEY = "database-level";
+ public static final String DATABASE_LEVEL_KEY = "database-level";
private static final int DATABASE_LEVEL_DEFAULT_VALUE =
IoTDBDescriptor.getInstance().getConfig().getDefaultStorageGroupLevel();
private static final int DATABASE_LEVEL_MIN_VALUE = 1;
@@ -73,9 +76,9 @@ public class LoadTsFileConfigurator {
DATABASE_LEVEL_KEY, String.valueOf(DATABASE_LEVEL_DEFAULT_VALUE)));
}
- private static final String ON_SUCCESS_KEY = "on-success";
- private static final String ON_SUCCESS_DELETE_VALUE = "delete";
- private static final String ON_SUCCESS_NONE_VALUE = "none";
+ public static final String ON_SUCCESS_KEY = "on-success";
+ public static final String ON_SUCCESS_DELETE_VALUE = "delete";
+ public static final String ON_SUCCESS_NONE_VALUE = "none";
private static final Set<String> ON_SUCCESS_VALUE_SET =
Collections.unmodifiableSet(
new HashSet<>(Arrays.asList(ON_SUCCESS_DELETE_VALUE,
ON_SUCCESS_NONE_VALUE)));
@@ -91,7 +94,27 @@ public class LoadTsFileConfigurator {
public static boolean parseOrGetDefaultOnSuccess(final Map<String, String>
loadAttributes) {
final String value = loadAttributes.get(ON_SUCCESS_KEY);
- return StringUtils.isEmpty(value) || ON_SUCCESS_DELETE_VALUE.equals(value);
+ return !StringUtils.isEmpty(value) &&
ON_SUCCESS_DELETE_VALUE.equalsIgnoreCase(value);
+ }
+
+ public static final String CONVERT_ON_TYPE_MISMATCH_KEY =
"convert-on-type-mismatch";
+ private static final boolean CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE = true;
+
+ public static void validateConvertOnTypeMismatchParam(final String
convertOnTypeMismatch) {
+ if (!"true".equalsIgnoreCase(convertOnTypeMismatch)
+ && !"false".equalsIgnoreCase(convertOnTypeMismatch)) {
+ throw new SemanticException(
+ String.format(
+ "Given %s value '%s' is not supported, please input a valid
boolean value.",
+ CONVERT_ON_TYPE_MISMATCH_KEY, convertOnTypeMismatch));
+ }
+ }
+
+ public static boolean parseOrGetDefaultConvertOnTypeMismatch(
+ final Map<String, String> loadAttributes) {
+ return Boolean.parseBoolean(
+ loadAttributes.getOrDefault(
+ CONVERT_ON_TYPE_MISMATCH_KEY,
String.valueOf(CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE)));
}
private LoadTsFileConfigurator() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatement.java
new file mode 100644
index 00000000000..1a814442402
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatement.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.converter;
+
+import org.apache.iotdb.db.pipe.receiver.transform.converter.ArrayConverter;
+import
org.apache.iotdb.db.pipe.receiver.transform.statement.PipeConvertedInsertTabletStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LoadConvertedInsertTabletStatement extends
PipeConvertedInsertTabletStatement {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(LoadConvertedInsertTabletStatement.class);
+
+ public LoadConvertedInsertTabletStatement(final InsertTabletStatement
insertTabletStatement) {
+ super(insertTabletStatement);
+ }
+
+ @Override
+ protected boolean checkAndCastDataType(int columnIndex, TSDataType dataType)
{
+ LOGGER.info(
+ "Load: Inserting tablet to {}.{}. Casting type from {} to {}.",
+ devicePath,
+ measurements[columnIndex],
+ dataTypes[columnIndex],
+ dataType);
+ columns[columnIndex] =
+ ArrayConverter.convert(dataTypes[columnIndex], dataType,
columns[columnIndex]);
+ dataTypes[columnIndex] = dataType;
+ return true;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatementExceptionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatementExceptionVisitor.java
new file mode 100644
index 00000000000..f292ee55930
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatementExceptionVisitor.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.converter;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException;
+import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementNode;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+public class LoadConvertedInsertTabletStatementExceptionVisitor
+ extends StatementVisitor<TSStatus, Exception> {
+
+ @Override
+ public TSStatus visitNode(final StatementNode node, final Exception context)
{
+ return new TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode())
+ .setMessage(context.getMessage());
+ }
+
+ @Override
+ public TSStatus visitLoadFile(
+ final LoadTsFileStatement loadTsFileStatement, final Exception context) {
+ if (context instanceof LoadRuntimeOutOfMemoryException) {
+ return new
TSStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
+ .setMessage(context.getMessage());
+ } else if (context instanceof SemanticException) {
+ return new
TSStatus(TSStatusCode.LOAD_USER_CONFLICT_EXCEPTION.getStatusCode())
+ .setMessage(context.getMessage());
+ }
+ return visitStatement(loadTsFileStatement, context);
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatementTSStatusVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatementTSStatusVisitor.java
new file mode 100644
index 00000000000..6e9601f8d95
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatementTSStatusVisitor.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.converter;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementNode;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+public class LoadConvertedInsertTabletStatementTSStatusVisitor
+ extends StatementVisitor<TSStatus, TSStatus> {
+
+ private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+ @Override
+ public TSStatus visitNode(final StatementNode node, final TSStatus context) {
+ return context;
+ }
+
+ @Override
+ public TSStatus visitInsertTablet(
+ final InsertTabletStatement insertTabletStatement, final TSStatus
context) {
+ return visitInsertBase(insertTabletStatement, context);
+ }
+
+ private TSStatus visitInsertBase(
+ final InsertBaseStatement insertBaseStatement, final TSStatus context) {
+ if (context.getCode() == TSStatusCode.SYSTEM_READ_ONLY.getStatusCode()
+ || context.getCode() ==
TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode()) {
+ return new
TSStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
+ .setMessage(context.getMessage());
+ } else if (context.getCode() == TSStatusCode.OUT_OF_TTL.getStatusCode()) {
+ return new
TSStatus(TSStatusCode.LOAD_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
+ .setMessage(context.getMessage());
+ } else if (context.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()
+ &&
(context.getMessage().contains(DataTypeMismatchException.REGISTERED_TYPE_STRING)
+ && config.isEnablePartialInsert())) {
+ return new
TSStatus(TSStatusCode.LOAD_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
+ .setMessage(context.getMessage());
+ }
+ return visitStatement(insertBaseStatement, context);
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
new file mode 100644
index 00000000000..e6c35d384df
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.converter;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
+import
org.apache.iotdb.db.pipe.event.common.tsfile.container.scan.TsFileInsertionScanDataContainer;
+import org.apache.iotdb.db.queryengine.plan.statement.Statement;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementNode;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.write.record.Tablet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Optional;
+
+public class LoadTreeStatementDataTypeConvertExecutionVisitor
+ extends StatementVisitor<Optional<TSStatus>, Void> {
+
+ private final StatementExecutor statementExecutor;
+
+ private static final Logger LOGGER =
+
LoggerFactory.getLogger(LoadTreeStatementDataTypeConvertExecutionVisitor.class);
+
+ @FunctionalInterface
+ public interface StatementExecutor {
+ TSStatus execute(final Statement statement);
+ }
+
+ public LoadTreeStatementDataTypeConvertExecutionVisitor(
+ final StatementExecutor statementExecutor) {
+ this.statementExecutor = statementExecutor;
+ }
+
+ @Override
+ public Optional<TSStatus> visitNode(final StatementNode statementNode, final
Void v) {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional<TSStatus> visitLoadFile(
+ final LoadTsFileStatement loadTsFileStatement, final Void v) {
+
+ LOGGER.info("Start data type conversion for LoadTsFileStatement: {}",
loadTsFileStatement);
+
+ for (final File file : loadTsFileStatement.getTsFiles()) {
+ try (final TsFileInsertionScanDataContainer container =
+ new TsFileInsertionScanDataContainer(
+ file, new IoTDBPipePattern(null), Long.MIN_VALUE,
Long.MAX_VALUE, null, null)) {
+ for (final Pair<Tablet, Boolean> tabletWithIsAligned :
container.toTabletWithIsAligneds()) {
+ final LoadConvertedInsertTabletStatement statement =
+ new LoadConvertedInsertTabletStatement(
+ PipeTransferTabletRawReq.toTPipeTransferRawReq(
+ tabletWithIsAligned.getLeft(),
tabletWithIsAligned.getRight())
+ .constructStatement());
+
+ TSStatus result;
+ try {
+ result =
+ statement.accept(
+ LoadTsFileDataTypeConverter.STATEMENT_STATUS_VISITOR,
+ statementExecutor.execute(statement));
+
+ // Retry max 5 times if the write process is rejected
+ for (int i = 0;
+ i < 5
+ && result.getCode()
+ ==
TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode();
+ i++) {
+ Thread.sleep(100L * (i + 1));
+ result =
+ statement.accept(
+ LoadTsFileDataTypeConverter.STATEMENT_STATUS_VISITOR,
+ statementExecutor.execute(statement));
+ }
+ } catch (final Exception e) {
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ result =
statement.accept(LoadTsFileDataTypeConverter.STATEMENT_EXCEPTION_VISITOR, e);
+ }
+
+ if (!(result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ || result.getCode() ==
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()
+ || result.getCode()
+ ==
TSStatusCode.LOAD_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())) {
+ return Optional.empty();
+ }
+ }
+ } catch (final Exception e) {
+ LOGGER.warn(
+ "Failed to convert data type for LoadTsFileStatement: {}.",
loadTsFileStatement, e);
+ return Optional.empty();
+ }
+ }
+
+ if (loadTsFileStatement.isDeleteAfterLoad()) {
+ loadTsFileStatement.getTsFiles().forEach(FileUtils::deleteQuietly);
+ }
+
+ LOGGER.info(
+ "Data type conversion for LoadTsFileStatement {} is successful.",
loadTsFileStatement);
+
+ return Optional.of(new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
new file mode 100644
index 00000000000..b3824140483
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.converter;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.protocol.session.SessionManager;
+import org.apache.iotdb.db.queryengine.plan.Coordinator;
+import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
+import
org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LoadTsFileDataTypeConverter {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(LoadTsFileDataTypeConverter.class);
+
+ private static final SessionManager SESSION_MANAGER =
SessionManager.getInstance();
+
+ private final LoadTreeStatementDataTypeConvertExecutionVisitor
+ treeStatementDataTypeConvertExecutionVisitor =
+ new LoadTreeStatementDataTypeConvertExecutionVisitor(
+ statement ->
+ Coordinator.getInstance()
+ .executeForTreeModel(
+ statement,
+ SESSION_MANAGER.requestQueryId(),
+
SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
+ "",
+ ClusterPartitionFetcher.getInstance(),
+ ClusterSchemaFetcher.getInstance(),
+
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold(),
+ false)
+ .status);
+
+ public static final LoadConvertedInsertTabletStatementTSStatusVisitor
STATEMENT_STATUS_VISITOR =
+ new LoadConvertedInsertTabletStatementTSStatusVisitor();
+ public static final LoadConvertedInsertTabletStatementExceptionVisitor
+ STATEMENT_EXCEPTION_VISITOR = new
LoadConvertedInsertTabletStatementExceptionVisitor();
+
+ public TSStatus convertForTreeModel(LoadTsFileStatement
loadTsFileTreeStatement) {
+ try {
+ return loadTsFileTreeStatement
+ .accept(treeStatementDataTypeConvertExecutionVisitor, null)
+ .orElse(null);
+ } catch (Exception e) {
+ LOGGER.warn(
+ "Failed to convert data types for tree model statement {}.",
loadTsFileTreeStatement, e);
+ return new
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage(e.getMessage());
+ }
+ }
+}