This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new dd4d2be1723 Load: Support auto data type conversion when data type
mismatch detected during analysis stage (#14529)
dd4d2be1723 is described below
commit dd4d2be1723f83dc362849e88bfb02aff1b35613
Author: Itami Sho <[email protected]>
AuthorDate: Tue Dec 31 09:22:49 2024 +0800
Load: Support auto data type conversion when data type mismatch detected
during analysis stage (#14529)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java | 73 ++++++++++-
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 3 +
.../db/exception/VerifyMetadataException.java | 12 +-
...va => VerifyMetadataTypeMismatchException.java} | 13 +-
.../protocol/legacy/loader/TsFileLoader.java | 1 +
.../protocol/thrift/IoTDBDataNodeReceiver.java | 1 +
.../plan/analyze/load/LoadTsFileAnalyzer.java | 76 ++++++++---
.../analyze/load/LoadTsFileTableSchemaCache.java | 3 +-
.../load/LoadTsFileToTreeModelAnalyzer.java | 7 +-
.../load/TreeSchemaAutoCreatorAndVerifier.java | 32 +++--
.../plan/relational/sql/ast/LoadTsFile.java | 16 ++-
.../plan/statement/crud/LoadTsFileStatement.java | 70 ++++++----
.../load/active/ActiveLoadTsFileLoader.java | 1 +
.../load/config/LoadTsFileConfigurator.java | 29 ++++-
.../LoadConvertedInsertTabletStatement.java | 52 ++++++++
...ertedInsertTabletStatementExceptionVisitor.java | 51 ++++++++
...vertedInsertTabletStatementTSStatusVisitor.java | 65 ++++++++++
...leStatementDataTypeConvertExecutionVisitor.java | 143 +++++++++++++++++++++
...eeStatementDataTypeConvertExecutionVisitor.java | 130 +++++++++++++++++++
.../converter/LoadTsFileDataTypeConverter.java | 107 +++++++++++++++
20 files changed, 805 insertions(+), 80 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java
index 0b9d310c767..cf4d22c6aca 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.jdbc.IoTDBSQLException;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.read.common.Path;
+import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.apache.tsfile.write.schema.MeasurementSchema;
import org.junit.After;
@@ -48,6 +49,7 @@ import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -56,6 +58,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import static org.apache.iotdb.db.it.utils.TestUtils.assertNonQueryTestFail;
import static org.apache.iotdb.db.it.utils.TestUtils.createUser;
@@ -328,7 +331,7 @@ public class IoTDBLoadTsFileIT {
}
@Test
- public void testLoadWithAutoRegister() throws Exception {
+ public void testLoadWithAutoCreate() throws Exception {
final long writtenPoint1;
// device 0, device 1, sg 0
try (final TsFileGenerator generator =
@@ -898,6 +901,74 @@ public class IoTDBLoadTsFileIT {
}
}
+ @Test
+ public void testLoadWithConvertOnTypeMismatch() throws Exception {
+
+ List<Pair<MeasurementSchema, MeasurementSchema>> measurementSchemas =
+ generateMeasurementSchemasForDataTypeConvertion();
+
+ final File file = new File(tmpDir, "1-0-0-0.tsfile");
+
+ long writtenPoint = 0;
+ List<MeasurementSchema> schemaList1 =
+ measurementSchemas.stream().map(pair ->
pair.left).collect(Collectors.toList());
+ List<IMeasurementSchema> schemaList2 =
+ measurementSchemas.stream().map(pair ->
pair.right).collect(Collectors.toList());
+
+ try (final TsFileGenerator generator = new TsFileGenerator(file)) {
+ generator.registerTimeseries(SchemaConfig.DEVICE_0, schemaList2);
+
+ generator.generateData(SchemaConfig.DEVICE_0, 100, PARTITION_INTERVAL /
10_000, false);
+
+ writtenPoint = generator.getTotalNumber();
+ }
+
+ try (final Connection connection = EnvFactory.getEnv().getConnection();
+ final Statement statement = connection.createStatement()) {
+
+ for (MeasurementSchema schema : schemaList1) {
+ statement.execute(convert2SQL(SchemaConfig.DEVICE_0, schema));
+ }
+
+ statement.execute(String.format("load \"%s\" ", file.getAbsolutePath()));
+
+ try (final ResultSet resultSet =
+ statement.executeQuery("select count(*) from root.** group by
level=1,2")) {
+ if (resultSet.next()) {
+ final long sgCount = resultSet.getLong("count(root.sg.test_0.*.*)");
+ Assert.assertEquals(writtenPoint, sgCount);
+ } else {
+ Assert.fail("This ResultSet is empty.");
+ }
+ }
+ }
+ }
+
+ private List<Pair<MeasurementSchema, MeasurementSchema>>
+ generateMeasurementSchemasForDataTypeConvertion() {
+ TSDataType[] dataTypes = {
+ TSDataType.STRING,
+ TSDataType.TEXT,
+ TSDataType.BLOB,
+ TSDataType.TIMESTAMP,
+ TSDataType.BOOLEAN,
+ TSDataType.DATE,
+ TSDataType.DOUBLE,
+ TSDataType.FLOAT,
+ TSDataType.INT32,
+ TSDataType.INT64
+ };
+ List<Pair<MeasurementSchema, MeasurementSchema>> pairs = new ArrayList<>();
+
+ for (TSDataType type : dataTypes) {
+ for (TSDataType dataType : dataTypes) {
+ String id = String.format("%s2%s", type.name(), dataType.name());
+ pairs.add(new Pair<>(new MeasurementSchema(id, type), new
MeasurementSchema(id, dataType)));
+ }
+ }
+ return pairs;
+ }
+
private static class SchemaConfig {
private static final String STORAGE_GROUP_0 = "root.sg.test_0";
private static final String STORAGE_GROUP_1 = "root.sg.test_1";
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 84145d01d79..b1863c3d491 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -197,6 +197,9 @@ public enum TSStatusCode {
PIPE_ERROR(1107),
PIPESERVER_ERROR(1108),
VERIFY_METADATA_ERROR(1109),
+ LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION(1110),
+ LOAD_IDEMPOTENT_CONFLICT_EXCEPTION(1111),
+ LOAD_USER_CONFLICT_EXCEPTION(1112),
// UDF
UDF_LOAD_CLASS_ERROR(1200),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/VerifyMetadataException.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/VerifyMetadataException.java
index 1603c9872f4..78c6d8d4d5c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/VerifyMetadataException.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/VerifyMetadataException.java
@@ -23,16 +23,12 @@ import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.rpc.TSStatusCode;
public class VerifyMetadataException extends IoTDBException {
- public VerifyMetadataException(
- String path, String compareInfo, String tsFileInfo, String tsFilePath,
String IoTDBInfo) {
- super(
- String.format(
- "%s %s mismatch, %s in tsfile %s, but %s in IoTDB.",
- path, compareInfo, tsFileInfo, tsFilePath, IoTDBInfo),
- TSStatusCode.VERIFY_METADATA_ERROR.getStatusCode());
- }
public VerifyMetadataException(String message) {
super(message, TSStatusCode.VERIFY_METADATA_ERROR.getStatusCode());
}
+
+ public VerifyMetadataException(String message, int errorCode) {
+ super(message, errorCode);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/VerifyMetadataException.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/VerifyMetadataTypeMismatchException.java
similarity index 64%
copy from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/VerifyMetadataException.java
copy to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/VerifyMetadataTypeMismatchException.java
index 1603c9872f4..209fb83bcdb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/VerifyMetadataException.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/VerifyMetadataTypeMismatchException.java
@@ -19,20 +19,11 @@
package org.apache.iotdb.db.exception;
-import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.rpc.TSStatusCode;
-public class VerifyMetadataException extends IoTDBException {
- public VerifyMetadataException(
- String path, String compareInfo, String tsFileInfo, String tsFilePath,
String IoTDBInfo) {
- super(
- String.format(
- "%s %s mismatch, %s in tsfile %s, but %s in IoTDB.",
- path, compareInfo, tsFileInfo, tsFilePath, IoTDBInfo),
- TSStatusCode.VERIFY_METADATA_ERROR.getStatusCode());
- }
+public class VerifyMetadataTypeMismatchException extends
VerifyMetadataException {
- public VerifyMetadataException(String message) {
+ public VerifyMetadataTypeMismatchException(String message) {
super(message, TSStatusCode.VERIFY_METADATA_ERROR.getStatusCode());
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/loader/TsFileLoader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/loader/TsFileLoader.java
index c78ca0544b4..e181ec1d592 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/loader/TsFileLoader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/loader/TsFileLoader.java
@@ -56,6 +56,7 @@ public class TsFileLoader implements ILoader {
try {
LoadTsFileStatement statement = new
LoadTsFileStatement(tsFile.getAbsolutePath());
statement.setDeleteAfterLoad(true);
+ statement.setConvertOnTypeMismatch(true);
statement.setDatabaseLevel(parseSgLevel());
statement.setVerifySchema(true);
statement.setAutoCreateDatabase(false);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 8785231480f..a87e229a03a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -569,6 +569,7 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
throws FileNotFoundException {
final LoadTsFileStatement statement = new
LoadTsFileStatement(fileAbsolutePath);
statement.setDeleteAfterLoad(true);
+ statement.setConvertOnTypeMismatch(true);
statement.setVerifySchema(true);
statement.setAutoCreateDatabase(false);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
index 9cf39b42325..71e985be113 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
@@ -19,9 +19,11 @@
package org.apache.iotdb.db.queryengine.plan.analyze.load;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.auth.AuthException;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.db.exception.VerifyMetadataException;
+import org.apache.iotdb.db.exception.VerifyMetadataTypeMismatchException;
import org.apache.iotdb.db.exception.load.LoadReadOnlyException;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
@@ -33,6 +35,7 @@ import
org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadTsFile;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import
org.apache.iotdb.db.storageengine.load.converter.LoadTsFileDataTypeConverter;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -54,8 +57,7 @@ public abstract class LoadTsFileAnalyzer implements
AutoCloseable {
private static final Logger LOGGER =
LoggerFactory.getLogger(LoadTsFileAnalyzer.class);
// These are only used when constructed from tree model SQL
- private final LoadTsFileStatement loadTsFileStatement;
-
+ private final LoadTsFileStatement loadTsFileTreeStatement;
// These are only used when constructed from table model SQL
private final LoadTsFile loadTsFileTableStatement;
@@ -67,6 +69,8 @@ public abstract class LoadTsFileAnalyzer implements
AutoCloseable {
protected final boolean isDeleteAfterLoad;
+ protected final boolean isConvertOnTypeMismatch;
+
protected final boolean isAutoCreateDatabase;
protected final int databaseLevel;
@@ -78,15 +82,19 @@ public abstract class LoadTsFileAnalyzer implements
AutoCloseable {
final IPartitionFetcher partitionFetcher =
ClusterPartitionFetcher.getInstance();
final ISchemaFetcher schemaFetcher = ClusterSchemaFetcher.getInstance();
+ protected final LoadTsFileDataTypeConverter loadTsFileDataTypeConverter;
+
LoadTsFileAnalyzer(LoadTsFileStatement loadTsFileStatement, MPPQueryContext
context) {
- this.loadTsFileStatement = loadTsFileStatement;
+ this.loadTsFileTreeStatement = loadTsFileStatement;
this.tsFiles = loadTsFileStatement.getTsFiles();
this.statementString = loadTsFileStatement.toString();
this.isVerifySchema = loadTsFileStatement.isVerifySchema();
this.isDeleteAfterLoad = loadTsFileStatement.isDeleteAfterLoad();
+ this.isConvertOnTypeMismatch =
loadTsFileStatement.isConvertOnTypeMismatch();
this.isAutoCreateDatabase = loadTsFileStatement.isAutoCreateDatabase();
this.databaseLevel = loadTsFileStatement.getDatabaseLevel();
this.database = loadTsFileStatement.getDatabase();
+ this.loadTsFileDataTypeConverter = new LoadTsFileDataTypeConverter();
this.loadTsFileTableStatement = null;
this.isTableModelStatement = false;
@@ -99,11 +107,13 @@ public abstract class LoadTsFileAnalyzer implements
AutoCloseable {
this.statementString = loadTsFileTableStatement.toString();
this.isVerifySchema = true;
this.isDeleteAfterLoad = loadTsFileTableStatement.isDeleteAfterLoad();
+ this.isConvertOnTypeMismatch =
loadTsFileTableStatement.isConvertOnTypeMismatch();
this.isAutoCreateDatabase =
loadTsFileTableStatement.isAutoCreateDatabase();
this.databaseLevel = loadTsFileTableStatement.getDatabaseLevel();
this.database = loadTsFileTableStatement.getDatabase();
+ this.loadTsFileDataTypeConverter = new LoadTsFileDataTypeConverter();
- this.loadTsFileStatement = null;
+ this.loadTsFileTreeStatement = null;
this.isTableModelStatement = true;
this.context = context;
}
@@ -137,6 +147,11 @@ public abstract class LoadTsFileAnalyzer implements
AutoCloseable {
} catch (AuthException e) {
setFailAnalysisForAuthException(analysis, e);
return false;
+ } catch (VerifyMetadataTypeMismatchException e) {
+ executeDataTypeConversionOnTypeMismatch(analysis, e);
+ // just return false to STOP the analysis process,
+ // the real result on the conversion will be set in the analysis.
+ return false;
} catch (BufferUnderflowException e) {
LOGGER.warn(
"The file {} is not a valid tsfile. Please check the input file.",
tsFile.getPath(), e);
@@ -161,6 +176,39 @@ public abstract class LoadTsFileAnalyzer implements
AutoCloseable {
protected abstract void analyzeSingleTsFile(final File tsFile)
throws IOException, AuthException, VerifyMetadataException;
+ protected void executeDataTypeConversionOnTypeMismatch(
+ final IAnalysis analysis, final VerifyMetadataTypeMismatchException e) {
+ final TSStatus status =
+ isConvertOnTypeMismatch
+ ? (isTableModelStatement
+ ?
loadTsFileDataTypeConverter.convertForTableModel(loadTsFileTableStatement)
+ :
loadTsFileDataTypeConverter.convertForTreeModel(loadTsFileTreeStatement))
+ : null;
+
+ if (status == null) {
+ analysis.setFailStatus(
+ new
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage(e.getMessage()));
+ } else if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ && status.getCode() !=
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()
+ && status.getCode() !=
TSStatusCode.LOAD_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()) {
+ analysis.setFailStatus(status);
+ }
+ analysis.setFinishQueryAfterAnalyze(true);
+ setRealStatement(analysis);
+ }
+
+ protected void setRealStatement(IAnalysis analysis) {
+ if (isTableModelStatement) {
+ // Do nothing by now.
+ } else {
+ analysis.setRealStatement(loadTsFileTreeStatement);
+ }
+ }
+
+ protected String getStatementString() {
+ return statementString;
+ }
+
protected TsFileResource constructTsFileResource(
final TsFileSequenceReader reader, final File tsFile) throws IOException
{
final TsFileResource tsFileResource = new TsFileResource(tsFile);
@@ -174,23 +222,11 @@ public abstract class LoadTsFileAnalyzer implements
AutoCloseable {
return tsFileResource;
}
- protected String getStatementString() {
- return statementString;
- }
-
- protected void setRealStatement(IAnalysis analysis) {
- if (isTableModelStatement) {
- // Do nothing by now.
- } else {
- analysis.setRealStatement(loadTsFileStatement);
- }
- }
-
protected void addTsFileResource(TsFileResource tsFileResource) {
if (isTableModelStatement) {
loadTsFileTableStatement.addTsFileResource(tsFileResource);
} else {
- loadTsFileStatement.addTsFileResource(tsFileResource);
+ loadTsFileTreeStatement.addTsFileResource(tsFileResource);
}
}
@@ -198,7 +234,7 @@ public abstract class LoadTsFileAnalyzer implements
AutoCloseable {
if (isTableModelStatement) {
loadTsFileTableStatement.addWritePointCount(writePointCount);
} else {
- loadTsFileStatement.addWritePointCount(writePointCount);
+ loadTsFileTreeStatement.addWritePointCount(writePointCount);
}
}
@@ -206,6 +242,10 @@ public abstract class LoadTsFileAnalyzer implements
AutoCloseable {
return isVerifySchema;
}
+ protected boolean isConvertOnTypeMismatch() {
+ return isConvertOnTypeMismatch;
+ }
+
protected boolean isAutoCreateDatabase() {
return isAutoCreateDatabase;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java
index dac5f81e66b..a3edfabadb8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java
@@ -24,6 +24,7 @@ import
org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.VerifyMetadataException;
+import org.apache.iotdb.db.exception.VerifyMetadataTypeMismatchException;
import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
@@ -290,7 +291,7 @@ public class LoadTsFileTableSchemaCache {
final ColumnSchema realColumn =
realSchema.getColumn(fileColumn.getName(),
fileColumn.getColumnCategory());
if (!fileColumn.getType().equals(realColumn.getType())) {
- throw new VerifyMetadataException(
+ throw new VerifyMetadataTypeMismatchException(
String.format(
"Data type mismatch for column %s in table %s, type in
TsFile: %s, type in IoTDB: %s",
realColumn.getName(),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileToTreeModelAnalyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileToTreeModelAnalyzer.java
index fe91d00f8e1..a34483af9d8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileToTreeModelAnalyzer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileToTreeModelAnalyzer.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.queryengine.plan.analyze.load;
import org.apache.iotdb.commons.auth.AuthException;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.VerifyMetadataTypeMismatchException;
import org.apache.iotdb.db.exception.load.LoadEmptyFileException;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
@@ -85,6 +86,9 @@ public class LoadTsFileToTreeModelAnalyzer extends
LoadTsFileAnalyzer {
} catch (AuthException e) {
setFailAnalysisForAuthException(analysis, e);
return analysis;
+ } catch (VerifyMetadataTypeMismatchException e) {
+ executeDataTypeConversionOnTypeMismatch(analysis, e);
+ return analysis;
} catch (Exception e) {
final String exceptionMessage =
String.format(
@@ -105,7 +109,8 @@ public class LoadTsFileToTreeModelAnalyzer extends
LoadTsFileAnalyzer {
}
@Override
- protected void analyzeSingleTsFile(final File tsFile) throws IOException,
AuthException {
+ protected void analyzeSingleTsFile(final File tsFile)
+ throws IOException, AuthException, VerifyMetadataTypeMismatchException {
try (final TsFileSequenceReader reader = new
TsFileSequenceReader(tsFile.getAbsolutePath())) {
// can be reused when constructing tsfile resource
final TsFileSequenceReaderTimeseriesMetadataIterator
timeseriesMetadataIterator =
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/TreeSchemaAutoCreatorAndVerifier.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/TreeSchemaAutoCreatorAndVerifier.java
index 67379c724ef..ac389b2a6db 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/TreeSchemaAutoCreatorAndVerifier.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/TreeSchemaAutoCreatorAndVerifier.java
@@ -35,6 +35,7 @@ import
org.apache.iotdb.confignode.rpc.thrift.TShowDatabaseResp;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.VerifyMetadataException;
+import org.apache.iotdb.db.exception.VerifyMetadataTypeMismatchException;
import org.apache.iotdb.db.exception.load.LoadFileException;
import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException;
import org.apache.iotdb.db.exception.sql.SemanticException;
@@ -80,6 +81,7 @@ import java.util.Set;
import java.util.stream.Collectors;
public class TreeSchemaAutoCreatorAndVerifier {
+
private static final Logger LOGGER =
LoggerFactory.getLogger(TreeSchemaAutoCreatorAndVerifier.class);
@@ -103,7 +105,7 @@ public class TreeSchemaAutoCreatorAndVerifier {
public void autoCreateAndVerify(
TsFileSequenceReader reader,
Map<IDeviceID, List<TimeseriesMetadata>> device2TimeseriesMetadataList)
- throws IOException, AuthException {
+ throws IOException, AuthException, VerifyMetadataTypeMismatchException {
for (final Map.Entry<IDeviceID, List<TimeseriesMetadata>> entry :
device2TimeseriesMetadataList.entrySet()) {
final IDeviceID device = entry.getKey();
@@ -204,13 +206,14 @@ public class TreeSchemaAutoCreatorAndVerifier {
schemaCache.clearDeviceIsAlignedCacheIfNecessary();
}
- public void flush() throws AuthException {
+ public void flush() throws AuthException,
VerifyMetadataTypeMismatchException {
doAutoCreateAndVerify();
schemaCache.clearTimeSeries();
}
- private void doAutoCreateAndVerify() throws SemanticException, AuthException
{
+ private void doAutoCreateAndVerify()
+ throws SemanticException, AuthException,
VerifyMetadataTypeMismatchException {
if (schemaCache.getDevice2TimeSeries().isEmpty()) {
return;
}
@@ -233,15 +236,26 @@ public class TreeSchemaAutoCreatorAndVerifier {
}
} catch (AuthException e) {
throw e;
+ } catch (VerifyMetadataTypeMismatchException e) {
+ if (loadTsFileAnalyzer.isConvertOnTypeMismatch()) {
+ // throw exception to convert data type in the upper layer
(LoadTsFileAnalyzer)
+ throw e;
+ } else {
+ handleException(e, loadTsFileAnalyzer.getStatementString());
+ }
} catch (Exception e) {
- LOGGER.warn("Auto create or verify schema error.", e);
- throw new SemanticException(
- String.format(
- "Auto create or verify schema error when executing statement %s.
Detail: %s.",
- loadTsFileAnalyzer.getStatementString(), e.getMessage()));
+ handleException(e, loadTsFileAnalyzer.getStatementString());
}
}
+ private void handleException(Exception e, String statementString) throws
SemanticException {
+ LOGGER.warn("Auto create or verify schema error.", e);
+ throw new SemanticException(
+ String.format(
+ "Auto create or verify schema error when executing statement %s.
Detail: %s.",
+ statementString, e.getMessage()));
+ }
+
private void makeSureNoDuplicatedMeasurementsInDevices() throws
VerifyMetadataException {
for (final Map.Entry<IDeviceID, Set<MeasurementSchema>> entry :
schemaCache.getDevice2TimeSeries().entrySet()) {
@@ -445,7 +459,7 @@ public class TreeSchemaAutoCreatorAndVerifier {
// check datatype
if (!tsFileSchema.getType().equals(iotdbSchema.getType())) {
- throw new VerifyMetadataException(
+ throw new VerifyMetadataTypeMismatchException(
String.format(
"Measurement %s%s%s datatype not match, TsFile: %s, IoTDB:
%s",
device,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java
index 71ca9359e6f..e61d9353852 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java
@@ -41,8 +41,9 @@ public class LoadTsFile extends Statement {
private final File file;
private int databaseLevel; // For loading to tree-model only
private String database; // For loading to table-model only
- private boolean deleteAfterLoad;
- private boolean autoCreateDatabase;
+ private boolean deleteAfterLoad = false;
+ private boolean convertOnTypeMismatch = true;
+ private boolean autoCreateDatabase = true;
private String model = LoadTsFileConfigurator.MODEL_TABLE_VALUE;
private final Map<String, String> loadAttributes;
@@ -58,6 +59,7 @@ public class LoadTsFile extends Statement {
this.file = new File(filePath);
this.databaseLevel =
IoTDBDescriptor.getInstance().getConfig().getDefaultStorageGroupLevel();
this.deleteAfterLoad = false;
+ this.convertOnTypeMismatch = true;
this.autoCreateDatabase =
IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled();
this.resources = new ArrayList<>();
this.writePointCountList = new ArrayList<>();
@@ -85,12 +87,16 @@ public class LoadTsFile extends Statement {
this.autoCreateDatabase = autoCreateDatabase;
}
+ public boolean isAutoCreateDatabase() {
+ return autoCreateDatabase;
+ }
+
public boolean isDeleteAfterLoad() {
return deleteAfterLoad;
}
- public boolean isAutoCreateDatabase() {
- return autoCreateDatabase;
+ public boolean isConvertOnTypeMismatch() {
+ return convertOnTypeMismatch;
}
public int getDatabaseLevel() {
@@ -133,6 +139,8 @@ public class LoadTsFile extends Statement {
this.databaseLevel =
LoadTsFileConfigurator.parseOrGetDefaultDatabaseLevel(loadAttributes);
this.database = LoadTsFileConfigurator.parseDatabaseName(loadAttributes);
this.deleteAfterLoad =
LoadTsFileConfigurator.parseOrGetDefaultOnSuccess(loadAttributes);
+ this.convertOnTypeMismatch =
+
LoadTsFileConfigurator.parseOrGetDefaultConvertOnTypeMismatch(loadAttributes);
this.model =
LoadTsFileConfigurator.parseOrGetDefaultModel(
loadAttributes, LoadTsFileConfigurator.MODEL_TABLE_VALUE);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
index 9e1484f638f..520700d3848 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
@@ -43,19 +43,23 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static
org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.CONVERT_ON_TYPE_MISMATCH_KEY;
import static
org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.DATABASE_LEVEL_KEY;
import static
org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.DATABASE_NAME_KEY;
import static
org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.MODEL_KEY;
+import static
org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.ON_SUCCESS_DELETE_VALUE;
import static
org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.ON_SUCCESS_KEY;
+import static
org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.ON_SUCCESS_NONE_VALUE;
public class LoadTsFileStatement extends Statement {
private final File file;
private int databaseLevel; // For loading to tree-model only
private String database; // For loading to table-model only
- private boolean verifySchema;
- private boolean deleteAfterLoad;
- private boolean autoCreateDatabase;
+ private boolean verifySchema = true;
+ private boolean deleteAfterLoad = false;
+ private boolean convertOnTypeMismatch = true;
+ private boolean autoCreateDatabase = true;
private String model = LoadTsFileConfigurator.MODEL_TREE_VALUE;
private Map<String, String> loadAttributes;
@@ -69,6 +73,7 @@ public class LoadTsFileStatement extends Statement {
this.databaseLevel =
IoTDBDescriptor.getInstance().getConfig().getDefaultStorageGroupLevel();
this.verifySchema = true;
this.deleteAfterLoad = false;
+ this.convertOnTypeMismatch = true;
this.autoCreateDatabase =
IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled();
this.resources = new ArrayList<>();
this.writePointCountList = new ArrayList<>();
@@ -99,6 +104,7 @@ public class LoadTsFileStatement extends Statement {
this.databaseLevel =
IoTDBDescriptor.getInstance().getConfig().getDefaultStorageGroupLevel();
this.verifySchema = true;
this.deleteAfterLoad = false;
+ this.convertOnTypeMismatch = true;
this.autoCreateDatabase =
IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled();
this.tsFiles = new ArrayList<>();
this.resources = new ArrayList<>();
@@ -136,48 +142,56 @@ public class LoadTsFileStatement extends Statement {
});
}
- public void setDeleteAfterLoad(boolean deleteAfterLoad) {
- this.deleteAfterLoad = deleteAfterLoad;
- }
-
public void setDatabaseLevel(int databaseLevel) {
this.databaseLevel = databaseLevel;
}
+ public int getDatabaseLevel() {
+ return databaseLevel;
+ }
+
public void setDatabase(String database) {
this.database = database;
}
- public void setModel(String model) {
- this.model = model;
+ public String getDatabase() {
+ return database;
}
public void setVerifySchema(boolean verifySchema) {
this.verifySchema = verifySchema;
}
- public void setAutoCreateDatabase(boolean autoCreateDatabase) {
- this.autoCreateDatabase = autoCreateDatabase;
- }
-
public boolean isVerifySchema() {
return verifySchema;
}
+ public void setDeleteAfterLoad(boolean deleteAfterLoad) {
+ this.deleteAfterLoad = deleteAfterLoad;
+ }
+
public boolean isDeleteAfterLoad() {
return deleteAfterLoad;
}
- public boolean isAutoCreateDatabase() {
- return autoCreateDatabase;
+ public void setConvertOnTypeMismatch(boolean convertOnTypeMismatch) {
+ this.convertOnTypeMismatch = convertOnTypeMismatch;
}
- public int getDatabaseLevel() {
- return databaseLevel;
+ public boolean isConvertOnTypeMismatch() {
+ return convertOnTypeMismatch;
}
- public String getDatabase() {
- return database;
+ public void setAutoCreateDatabase(boolean autoCreateDatabase) {
+ this.autoCreateDatabase = autoCreateDatabase;
+ }
+
+ public boolean isAutoCreateDatabase() {
+ return autoCreateDatabase;
+ }
+
+ public void setModel(String model) {
+ this.model = model;
}
public String getModel() {
@@ -213,6 +227,8 @@ public class LoadTsFileStatement extends Statement {
this.databaseLevel =
LoadTsFileConfigurator.parseOrGetDefaultDatabaseLevel(loadAttributes);
this.database = LoadTsFileConfigurator.parseDatabaseName(loadAttributes);
this.deleteAfterLoad =
LoadTsFileConfigurator.parseOrGetDefaultOnSuccess(loadAttributes);
+ this.convertOnTypeMismatch =
+
LoadTsFileConfigurator.parseOrGetDefaultConvertOnTypeMismatch(loadAttributes);
this.model =
LoadTsFileConfigurator.parseOrGetDefaultModel(
loadAttributes, LoadTsFileConfigurator.MODEL_TREE_VALUE);
@@ -234,14 +250,18 @@ public class LoadTsFileStatement extends Statement {
public org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement
toRelationalStatement(
MPPQueryContext context) {
loadAttributes = new HashMap<>();
+
loadAttributes.put(DATABASE_LEVEL_KEY, String.valueOf(databaseLevel));
if (database != null) {
loadAttributes.put(DATABASE_NAME_KEY, database);
}
- loadAttributes.put(ON_SUCCESS_KEY, String.valueOf(deleteAfterLoad));
+ loadAttributes.put(
+ ON_SUCCESS_KEY, deleteAfterLoad ? ON_SUCCESS_DELETE_VALUE :
ON_SUCCESS_NONE_VALUE);
+ loadAttributes.put(CONVERT_ON_TYPE_MISMATCH_KEY,
String.valueOf(convertOnTypeMismatch));
if (model != null) {
loadAttributes.put(MODEL_KEY, model);
}
+
return new LoadTsFile(null, file.getAbsolutePath(), loadAttributes);
}
@@ -255,13 +275,15 @@ public class LoadTsFileStatement extends Statement {
return "LoadTsFileStatement{"
+ "file="
+ file
- + ", deleteAfterLoad="
+ + ", delete-after-load="
+ deleteAfterLoad
- + ", databaseLevel="
+ + ", database-level="
+ databaseLevel
- + ", verifySchema="
+ + ", verify-schema="
+ verifySchema
- + ", tsFiles Size="
+ + ", convert-on-type-mismatch="
+ + convertOnTypeMismatch
+ + ", tsFiles size="
+ tsFiles.size()
+ '}';
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
index 605218ae0a6..188ae30436d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
@@ -192,6 +192,7 @@ public class ActiveLoadTsFileLoader {
private TSStatus loadTsFile(final Pair<String, Boolean> filePair) throws
FileNotFoundException {
final LoadTsFileStatement statement = new
LoadTsFileStatement(filePair.getLeft());
statement.setDeleteAfterLoad(true);
+ statement.setConvertOnTypeMismatch(true);
statement.setVerifySchema(true);
statement.setAutoCreateDatabase(false);
return executeStatement(filePair.getRight() ? new
PipeEnrichedStatement(statement) : statement);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
index 929d106f244..70d7401c560 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
@@ -49,6 +49,9 @@ public class LoadTsFileConfigurator {
break;
case DATABASE_NAME_KEY:
break;
+ case CONVERT_ON_TYPE_MISMATCH_KEY:
+ validateConvertOnTypeMismatchParam(value);
+ break;
default:
throw new SemanticException("Invalid parameter '" + key + "' for LOAD
TSFILE command.");
}
@@ -90,8 +93,8 @@ public class LoadTsFileConfigurator {
}
public static final String ON_SUCCESS_KEY = "on-success";
- private static final String ON_SUCCESS_DELETE_VALUE = "delete";
- private static final String ON_SUCCESS_NONE_VALUE = "none";
+ public static final String ON_SUCCESS_DELETE_VALUE = "delete";
+ public static final String ON_SUCCESS_NONE_VALUE = "none";
private static final Set<String> ON_SUCCESS_VALUE_SET =
Collections.unmodifiableSet(
new HashSet<>(Arrays.asList(ON_SUCCESS_DELETE_VALUE,
ON_SUCCESS_NONE_VALUE)));
@@ -107,7 +110,27 @@ public class LoadTsFileConfigurator {
public static boolean parseOrGetDefaultOnSuccess(final Map<String, String>
loadAttributes) {
final String value = loadAttributes.get(ON_SUCCESS_KEY);
- return StringUtils.isEmpty(value) || ON_SUCCESS_DELETE_VALUE.equals(value);
+ return !StringUtils.isEmpty(value) &&
ON_SUCCESS_DELETE_VALUE.equalsIgnoreCase(value);
+ }
+
+ public static final String CONVERT_ON_TYPE_MISMATCH_KEY =
"convert-on-type-mismatch";
+ private static final boolean CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE = true;
+
+ public static void validateConvertOnTypeMismatchParam(final String
convertOnTypeMismatch) {
+ if (!"true".equalsIgnoreCase(convertOnTypeMismatch)
+ && !"false".equalsIgnoreCase(convertOnTypeMismatch)) {
+ throw new SemanticException(
+ String.format(
+ "Given %s value '%s' is not supported, please input a valid
boolean value.",
+ CONVERT_ON_TYPE_MISMATCH_KEY, convertOnTypeMismatch));
+ }
+ }
+
+ public static boolean parseOrGetDefaultConvertOnTypeMismatch(
+ final Map<String, String> loadAttributes) {
+ return Boolean.parseBoolean(
+ loadAttributes.getOrDefault(
+ CONVERT_ON_TYPE_MISMATCH_KEY,
String.valueOf(CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE)));
}
public static final String MODEL_KEY = "model";
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatement.java
new file mode 100644
index 00000000000..1a814442402
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatement.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.converter;
+
+import org.apache.iotdb.db.pipe.receiver.transform.converter.ArrayConverter;
+import
org.apache.iotdb.db.pipe.receiver.transform.statement.PipeConvertedInsertTabletStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LoadConvertedInsertTabletStatement extends
PipeConvertedInsertTabletStatement {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(LoadConvertedInsertTabletStatement.class);
+
+ public LoadConvertedInsertTabletStatement(final InsertTabletStatement
insertTabletStatement) {
+ super(insertTabletStatement);
+ }
+
+ @Override
+ protected boolean checkAndCastDataType(int columnIndex, TSDataType dataType)
{
+ LOGGER.info(
+ "Load: Inserting tablet to {}.{}. Casting type from {} to {}.",
+ devicePath,
+ measurements[columnIndex],
+ dataTypes[columnIndex],
+ dataType);
+ columns[columnIndex] =
+ ArrayConverter.convert(dataTypes[columnIndex], dataType,
columns[columnIndex]);
+ dataTypes[columnIndex] = dataType;
+ return true;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatementExceptionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatementExceptionVisitor.java
new file mode 100644
index 00000000000..f292ee55930
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatementExceptionVisitor.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.converter;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException;
+import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementNode;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+public class LoadConvertedInsertTabletStatementExceptionVisitor
+ extends StatementVisitor<TSStatus, Exception> {
+
+ @Override
+ public TSStatus visitNode(final StatementNode node, final Exception context)
{
+ return new TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode())
+ .setMessage(context.getMessage());
+ }
+
+ @Override
+ public TSStatus visitLoadFile(
+ final LoadTsFileStatement loadTsFileStatement, final Exception context) {
+ if (context instanceof LoadRuntimeOutOfMemoryException) {
+ return new
TSStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
+ .setMessage(context.getMessage());
+ } else if (context instanceof SemanticException) {
+ return new
TSStatus(TSStatusCode.LOAD_USER_CONFLICT_EXCEPTION.getStatusCode())
+ .setMessage(context.getMessage());
+ }
+ return visitStatement(loadTsFileStatement, context);
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatementTSStatusVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatementTSStatusVisitor.java
new file mode 100644
index 00000000000..6e9601f8d95
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatementTSStatusVisitor.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.converter;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementNode;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+public class LoadConvertedInsertTabletStatementTSStatusVisitor
+ extends StatementVisitor<TSStatus, TSStatus> {
+
+ private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+ @Override
+ public TSStatus visitNode(final StatementNode node, final TSStatus context) {
+ return context;
+ }
+
+ @Override
+ public TSStatus visitInsertTablet(
+ final InsertTabletStatement insertTabletStatement, final TSStatus
context) {
+ return visitInsertBase(insertTabletStatement, context);
+ }
+
+ private TSStatus visitInsertBase(
+ final InsertBaseStatement insertBaseStatement, final TSStatus context) {
+ if (context.getCode() == TSStatusCode.SYSTEM_READ_ONLY.getStatusCode()
+ || context.getCode() ==
TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode()) {
+ return new
TSStatus(TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
+ .setMessage(context.getMessage());
+ } else if (context.getCode() == TSStatusCode.OUT_OF_TTL.getStatusCode()) {
+ return new
TSStatus(TSStatusCode.LOAD_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
+ .setMessage(context.getMessage());
+ } else if (context.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()
+ &&
(context.getMessage().contains(DataTypeMismatchException.REGISTERED_TYPE_STRING)
+ && config.isEnablePartialInsert())) {
+ return new
TSStatus(TSStatusCode.LOAD_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
+ .setMessage(context.getMessage());
+ }
+ return visitStatement(insertBaseStatement, context);
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableStatementDataTypeConvertExecutionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableStatementDataTypeConvertExecutionVisitor.java
new file mode 100644
index 00000000000..233f60124ac
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableStatementDataTypeConvertExecutionVisitor.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.converter;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
+import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import
org.apache.iotdb.db.pipe.event.common.tsfile.parser.table.TsFileInsertionEventTableParser;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AstVisitor;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadTsFile;
+import org.apache.iotdb.db.queryengine.plan.statement.Statement;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Objects;
+import java.util.Optional;
+
+public class LoadTableStatementDataTypeConvertExecutionVisitor
+ extends AstVisitor<Optional<TSStatus>, String> {
+
+ private static final Logger LOGGER =
+
LoggerFactory.getLogger(LoadTableStatementDataTypeConvertExecutionVisitor.class);
+
+ @FunctionalInterface
+ public interface StatementExecutor {
+ // databaseName can NOT be null
+ TSStatus execute(final Statement statement, final String databaseName);
+ }
+
+ private final StatementExecutor statementExecutor;
+
+ public LoadTableStatementDataTypeConvertExecutionVisitor(StatementExecutor
statementExecutor) {
+ this.statementExecutor = statementExecutor;
+ }
+
+ @Override
+ public Optional<TSStatus> visitLoadTsFile(
+ final LoadTsFile loadTsFileStatement, final String databaseName) {
+ if (Objects.isNull(databaseName)) {
+ LOGGER.warn(
+ "Database name is unexpectedly null for LoadTsFileStatement: {}.
Skip data type conversion.",
+ loadTsFileStatement);
+ return Optional.empty();
+ }
+
+ LOGGER.info("Start data type conversion for LoadTsFileStatement: {}.",
loadTsFileStatement);
+
+ for (final File file : loadTsFileStatement.getTsFiles()) {
+ try (final TsFileInsertionEventTableParser parser =
+ new TsFileInsertionEventTableParser(
+ file,
+ new TablePattern(true, null, null),
+ Long.MIN_VALUE,
+ Long.MAX_VALUE,
+ null,
+ null)) {
+ for (final TabletInsertionEvent tabletInsertionEvent :
parser.toTabletInsertionEvents()) {
+ if (!(tabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) {
+ continue;
+ }
+ final PipeRawTabletInsertionEvent rawTabletInsertionEvent =
+ (PipeRawTabletInsertionEvent) tabletInsertionEvent;
+
+ final LoadConvertedInsertTabletStatement statement =
+ new LoadConvertedInsertTabletStatement(
+ PipeTransferTabletRawReq.toTPipeTransferRawReq(
+ rawTabletInsertionEvent.convertToTablet(),
+ rawTabletInsertionEvent.isAligned())
+ .constructStatement());
+
+ TSStatus result;
+ try {
+ result =
+ statement.accept(
+ LoadTsFileDataTypeConverter.STATEMENT_STATUS_VISITOR,
+ statementExecutor.execute(statement, databaseName));
+
+ // Retry max 5 times if the write process is rejected
+ for (int i = 0;
+ i < 5
+ && result.getCode()
+ ==
TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode();
+ i++) {
+ Thread.sleep(100L * (i + 1));
+ result =
+ statement.accept(
+ LoadTsFileDataTypeConverter.STATEMENT_STATUS_VISITOR,
+ statementExecutor.execute(statement, databaseName));
+ }
+ } catch (final Exception e) {
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ result =
statement.accept(LoadTsFileDataTypeConverter.STATEMENT_EXCEPTION_VISITOR, e);
+ }
+
+ if (!(result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ || result.getCode() ==
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()
+ || result.getCode()
+ ==
TSStatusCode.LOAD_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())) {
+ return Optional.empty();
+ }
+ }
+ } catch (final Exception e) {
+ LOGGER.warn(
+ "Failed to convert data type for LoadTsFileStatement: {}.",
loadTsFileStatement, e);
+ return Optional.empty();
+ }
+ }
+
+ if (loadTsFileStatement.isDeleteAfterLoad()) {
+ loadTsFileStatement.getTsFiles().forEach(FileUtils::deleteQuietly);
+ }
+
+ LOGGER.info(
+ "Data type conversion for LoadTsFileStatement {} is successful.",
loadTsFileStatement);
+
+ return Optional.of(new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
new file mode 100644
index 00000000000..5793b96502a
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.converter;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
+import
org.apache.iotdb.db.pipe.event.common.tsfile.parser.scan.TsFileInsertionEventScanParser;
+import org.apache.iotdb.db.queryengine.plan.statement.Statement;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementNode;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.write.record.Tablet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Optional;
+
+public class LoadTreeStatementDataTypeConvertExecutionVisitor
+ extends StatementVisitor<Optional<TSStatus>, Void> {
+
+ private final StatementExecutor statementExecutor;
+
+ private static final Logger LOGGER =
+
LoggerFactory.getLogger(LoadTreeStatementDataTypeConvertExecutionVisitor.class);
+
+ @FunctionalInterface
+ public interface StatementExecutor {
+ TSStatus execute(final Statement statement);
+ }
+
+ public LoadTreeStatementDataTypeConvertExecutionVisitor(
+ final StatementExecutor statementExecutor) {
+ this.statementExecutor = statementExecutor;
+ }
+
+ @Override
+ public Optional<TSStatus> visitNode(final StatementNode statementNode, final
Void v) {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional<TSStatus> visitLoadFile(
+ final LoadTsFileStatement loadTsFileStatement, final Void v) {
+
+ LOGGER.info("Start data type conversion for LoadTsFileStatement: {}",
loadTsFileStatement);
+
+ for (final File file : loadTsFileStatement.getTsFiles()) {
+ try (final TsFileInsertionEventScanParser parser =
+ new TsFileInsertionEventScanParser(
+ file, new IoTDBTreePattern(null), Long.MIN_VALUE,
Long.MAX_VALUE, null, null)) {
+ for (final Pair<Tablet, Boolean> tabletWithIsAligned :
parser.toTabletWithIsAligneds()) {
+ final LoadConvertedInsertTabletStatement statement =
+ new LoadConvertedInsertTabletStatement(
+ PipeTransferTabletRawReq.toTPipeTransferRawReq(
+ tabletWithIsAligned.getLeft(),
tabletWithIsAligned.getRight())
+ .constructStatement());
+
+ TSStatus result;
+ try {
+ result =
+ statement.accept(
+ LoadTsFileDataTypeConverter.STATEMENT_STATUS_VISITOR,
+ statementExecutor.execute(statement));
+
+ // Retry max 5 times if the write process is rejected
+ for (int i = 0;
+ i < 5
+ && result.getCode()
+ ==
TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode();
+ i++) {
+ Thread.sleep(100L * (i + 1));
+ result =
+ statement.accept(
+ LoadTsFileDataTypeConverter.STATEMENT_STATUS_VISITOR,
+ statementExecutor.execute(statement));
+ }
+ } catch (final Exception e) {
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ result =
statement.accept(LoadTsFileDataTypeConverter.STATEMENT_EXCEPTION_VISITOR, e);
+ }
+
+ if (!(result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ || result.getCode() ==
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()
+ || result.getCode()
+ ==
TSStatusCode.LOAD_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())) {
+ return Optional.empty();
+ }
+ }
+ } catch (final Exception e) {
+ LOGGER.warn(
+ "Failed to convert data type for LoadTsFileStatement: {}.",
loadTsFileStatement, e);
+ return Optional.empty();
+ }
+ }
+
+ if (loadTsFileStatement.isDeleteAfterLoad()) {
+ loadTsFileStatement.getTsFiles().forEach(FileUtils::deleteQuietly);
+ }
+
+ LOGGER.info(
+ "Data type conversion for LoadTsFileStatement {} is successful.",
loadTsFileStatement);
+
+ return Optional.of(new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
new file mode 100644
index 00000000000..7ad7a5af544
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.converter;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.protocol.session.SessionManager;
+import org.apache.iotdb.db.queryengine.plan.Coordinator;
+import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
+import
org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher;
+import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadTsFile;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.parser.SqlParser;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LoadTsFileDataTypeConverter {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(LoadTsFileDataTypeConverter.class);
+
+ private static final SessionManager SESSION_MANAGER =
SessionManager.getInstance();
+
+ private final SqlParser relationalSqlParser = new SqlParser();
+ private final LoadTableStatementDataTypeConvertExecutionVisitor
+ tableStatementDataTypeConvertExecutionVisitor =
+ new LoadTableStatementDataTypeConvertExecutionVisitor(
+ ((statement, databaseName) ->
+ Coordinator.getInstance()
+ .executeForTableModel(
+ statement,
+ relationalSqlParser,
+ SESSION_MANAGER.getCurrSession(),
+ SESSION_MANAGER.requestQueryId(),
+ SESSION_MANAGER.getSessionInfoOfPipeReceiver(
+ SESSION_MANAGER.getCurrSession(), databaseName),
+ "",
+ LocalExecutionPlanner.getInstance().metadata,
+
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold())
+ .status));
+ private final LoadTreeStatementDataTypeConvertExecutionVisitor
+ treeStatementDataTypeConvertExecutionVisitor =
+ new LoadTreeStatementDataTypeConvertExecutionVisitor(
+ statement ->
+ Coordinator.getInstance()
+ .executeForTreeModel(
+ statement,
+ SESSION_MANAGER.requestQueryId(),
+
SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
+ "",
+ ClusterPartitionFetcher.getInstance(),
+ ClusterSchemaFetcher.getInstance(),
+
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold(),
+ false)
+ .status);
+
+ public static final LoadConvertedInsertTabletStatementTSStatusVisitor
STATEMENT_STATUS_VISITOR =
+ new LoadConvertedInsertTabletStatementTSStatusVisitor();
+ public static final LoadConvertedInsertTabletStatementExceptionVisitor
+ STATEMENT_EXCEPTION_VISITOR = new
LoadConvertedInsertTabletStatementExceptionVisitor();
+
+ public TSStatus convertForTableModel(LoadTsFile loadTsFileTableStatement) {
+ try {
+ return loadTsFileTableStatement
+ .accept(
+ tableStatementDataTypeConvertExecutionVisitor,
loadTsFileTableStatement.getDatabase())
+ .orElse(null);
+ } catch (Exception e) {
+ LOGGER.warn(
+ "Failed to convert data types for table model statement {}.",
+ loadTsFileTableStatement,
+ e);
+ return new
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage(e.getMessage());
+ }
+ }
+
+ public TSStatus convertForTreeModel(LoadTsFileStatement
loadTsFileTreeStatement) {
+ try {
+ return loadTsFileTreeStatement
+ .accept(treeStatementDataTypeConvertExecutionVisitor, null)
+ .orElse(null);
+ } catch (Exception e) {
+ LOGGER.warn(
+ "Failed to convert data types for tree model statement {}.",
loadTsFileTreeStatement, e);
+ return new
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage(e.getMessage());
+ }
+ }
+}