This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new d072d6f066d Load: Support converting mini TsFile into Tablets & Adjust
the way to extract metrics & Fix file not delete when analysis cast happens &
Pipe IT: Ignore IoTDBPipeProcessorIT.testTumblingTimeSamplingProcessor (#14784)
d072d6f066d is described below
commit d072d6f066df4be3f0d40bab76ad0b753557bbbb
Author: Itami Sho <[email protected]>
AuthorDate: Thu Mar 20 13:17:15 2025 +0800
Load: Support converting mini TsFile into Tablets & Adjust the way to
extract metrics & Fix file not delete when analysis cast happens & Pipe IT:
Ignore IoTDBPipeProcessorIT.testTumblingTimeSamplingProcessor (#14784)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../treemodel/auto/basic/IoTDBPipeProcessorIT.java | 2 +
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 10 +
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 6 +
.../queryengine/plan/analyze/AnalyzeVisitor.java | 6 -
.../plan/analyze/load/LoadTsFileAnalyzer.java | 213 ++++++++++++++++-----
.../relational/analyzer/StatementAnalyzer.java | 6 -
.../plan/relational/sql/ast/LoadTsFile.java | 81 +++++++-
.../plan/scheduler/load/LoadTsFileScheduler.java | 11 +-
.../plan/statement/crud/LoadTsFileStatement.java | 80 +++++++-
.../load/config/LoadTsFileConfigurator.java | 14 ++
...leStatementDataTypeConvertExecutionVisitor.java | 14 +-
...eeStatementDataTypeConvertExecutionVisitor.java | 14 +-
.../load/metrics/LoadTsFileCostMetricsSet.java | 31 ++-
13 files changed, 395 insertions(+), 93 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProcessorIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProcessorIT.java
index 0face7e1dd1..53b5f27b0c4 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProcessorIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProcessorIT.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.rpc.TSStatusCode;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@@ -76,6 +77,7 @@ public class IoTDBPipeProcessorIT extends
AbstractPipeDualTreeModelAutoIT {
receiverEnv.initClusterEnvironment();
}
+ @Ignore
@Test
public void testTumblingTimeSamplingProcessor() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 41b3c6a0fa3..f4c62d33585 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1096,6 +1096,8 @@ public class IoTDBConfig {
private double loadWriteThroughputBytesPerSecond = -1; // Bytes/s
+ private long loadTabletConversionThresholdBytes = -1;
+
private boolean loadActiveListeningEnable = true;
private String[] loadActiveListeningDirs =
@@ -3823,6 +3825,14 @@ public class IoTDBConfig {
this.loadWriteThroughputBytesPerSecond = loadWriteThroughputBytesPerSecond;
}
+ public long getLoadTabletConversionThresholdBytes() {
+ return loadTabletConversionThresholdBytes;
+ }
+
+ public void setLoadTabletConversionThresholdBytes(long
loadTabletConversionThresholdBytes) {
+ this.loadTabletConversionThresholdBytes =
loadTabletConversionThresholdBytes;
+ }
+
public int getLoadActiveListeningMaxThreadNum() {
return loadActiveListeningMaxThreadNum;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 96c4bacb44b..98b80de001d 100755
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -2197,6 +2197,12 @@ public class IoTDBDescriptor {
"load_write_throughput_bytes_per_second",
String.valueOf(conf.getLoadWriteThroughputBytesPerSecond()))));
+ conf.setLoadTabletConversionThresholdBytes(
+ Long.parseLong(
+ properties.getProperty(
+ "load_tablet_conversion_threshold_bytes",
+
String.valueOf(conf.getLoadTabletConversionThresholdBytes()))));
+
conf.setLoadActiveListeningEnable(
Boolean.parseBoolean(
properties.getProperty(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index e0fa5c1c004..3cea277668b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -151,7 +151,6 @@ import
org.apache.iotdb.db.queryengine.plan.statement.sys.ExplainStatement;
import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement;
import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowVersionStatement;
import org.apache.iotdb.db.schemaengine.template.Template;
-import org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet;
import org.apache.iotdb.db.utils.constant.SqlConstant;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -213,7 +212,6 @@ import static
org.apache.iotdb.db.queryengine.plan.optimization.LimitOffsetPushD
import static
org.apache.iotdb.db.queryengine.plan.parser.ASTVisitor.parseNodeString;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.DataNodeLocationSupplierFactory.getReadableDataNodeLocations;
import static
org.apache.iotdb.db.schemaengine.schemaregion.view.visitor.GetSourcePathsVisitor.getSourcePaths;
-import static
org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet.ANALYSIS;
import static org.apache.iotdb.db.utils.constant.SqlConstant.COUNT_TIME_HEADER;
import static
org.apache.iotdb.db.utils.constant.SqlConstant.TREE_MODEL_DATABASE_PREFIX;
@@ -2983,7 +2981,6 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
public Analysis visitLoadFile(LoadTsFileStatement loadTsFileStatement,
MPPQueryContext context) {
context.setQueryType(QueryType.WRITE);
- final long startTime = System.nanoTime();
try (final LoadTsFileAnalyzer loadTsFileAnalyzer =
new LoadTsFileAnalyzer(
loadTsFileStatement, loadTsFileStatement.isGeneratedByPipe(),
context)) {
@@ -2999,9 +2996,6 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
analysis.setFinishQueryAfterAnalyze(true);
analysis.setFailStatus(RpcUtils.getStatus(TSStatusCode.LOAD_FILE_ERROR,
exceptionMessage));
return analysis;
- } finally {
- LoadTsFileCostMetricsSet.getInstance()
- .recordPhaseTimeCost(ANALYSIS, System.nanoTime() - startTime);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
index 832d4dbe369..54bed1a4b9e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
@@ -50,6 +50,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils;
import
org.apache.iotdb.db.storageengine.load.converter.LoadTsFileDataTypeConverter;
+import org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet;
import org.apache.iotdb.db.utils.TimestampPrecisionUtils;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -68,6 +69,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.BufferUnderflowException;
import java.util.ArrayList;
@@ -80,11 +82,15 @@ import java.util.Optional;
import static
org.apache.iotdb.db.queryengine.plan.execution.config.TableConfigTaskVisitor.DATABASE_NOT_SPECIFIED;
import static
org.apache.iotdb.db.queryengine.plan.execution.config.TableConfigTaskVisitor.validateDatabaseName;
+import static
org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet.ANALYSIS;
public class LoadTsFileAnalyzer implements AutoCloseable {
private static final Logger LOGGER =
LoggerFactory.getLogger(LoadTsFileAnalyzer.class);
+ private static final LoadTsFileCostMetricsSet LOAD_TSFILE_COST_METRICS_SET =
+ LoadTsFileCostMetricsSet.getInstance();
+
final IPartitionFetcher partitionFetcher =
ClusterPartitionFetcher.getInstance();
final ISchemaFetcher schemaFetcher = ClusterSchemaFetcher.getInstance();
private final Metadata metadata =
LocalExecutionPlanner.getInstance().metadata;
@@ -103,6 +109,8 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
private final boolean isGeneratedByPipe;
private final List<File> tsFiles;
+ private final List<Boolean> isMiniTsFile;
+ private boolean isMiniTsFileConverted = false;
private final List<Boolean> isTableModelTsFile;
private int isTableModelTsFileReliableIndex = -1;
@@ -110,9 +118,10 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
private final int databaseLevel;
private String databaseForTableData;
private final boolean isVerifySchema;
+ private final boolean isAutoCreateDatabase;
private final boolean isDeleteAfterLoad;
private final boolean isConvertOnTypeMismatch;
- private final boolean isAutoCreateDatabase;
+ private final long tabletConversionThresholdBytes;
// Schema creators for tree and table
private TreeSchemaAutoCreatorAndVerifier treeSchemaAutoCreatorAndVerifier;
@@ -129,14 +138,16 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
this.isGeneratedByPipe = isGeneratedByPipe;
this.tsFiles = loadTsFileStatement.getTsFiles();
+ this.isMiniTsFile = new
ArrayList<>(Collections.nCopies(this.tsFiles.size(), false));
this.isTableModelTsFile = new
ArrayList<>(Collections.nCopies(this.tsFiles.size(), false));
this.databaseLevel = loadTsFileStatement.getDatabaseLevel();
this.databaseForTableData = loadTsFileStatement.getDatabase();
this.isVerifySchema = loadTsFileStatement.isVerifySchema();
+ this.isAutoCreateDatabase = loadTsFileStatement.isAutoCreateDatabase();
this.isDeleteAfterLoad = loadTsFileStatement.isDeleteAfterLoad();
this.isConvertOnTypeMismatch =
loadTsFileStatement.isConvertOnTypeMismatch();
- this.isAutoCreateDatabase = loadTsFileStatement.isAutoCreateDatabase();
+ this.tabletConversionThresholdBytes =
loadTsFileStatement.getTabletConversionThresholdBytes();
}
public LoadTsFileAnalyzer(
@@ -150,57 +161,64 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
this.isGeneratedByPipe = isGeneratedByPipe;
this.tsFiles = loadTsFileTableStatement.getTsFiles();
+ this.isMiniTsFile = new
ArrayList<>(Collections.nCopies(this.tsFiles.size(), false));
this.isTableModelTsFile = new
ArrayList<>(Collections.nCopies(this.tsFiles.size(), false));
this.databaseLevel = loadTsFileTableStatement.getDatabaseLevel();
this.databaseForTableData = loadTsFileTableStatement.getDatabase();
this.isVerifySchema = loadTsFileTableStatement.isVerifySchema();
+ this.isAutoCreateDatabase =
loadTsFileTableStatement.isAutoCreateDatabase();
this.isDeleteAfterLoad = loadTsFileTableStatement.isDeleteAfterLoad();
this.isConvertOnTypeMismatch =
loadTsFileTableStatement.isConvertOnTypeMismatch();
- this.isAutoCreateDatabase =
loadTsFileTableStatement.isAutoCreateDatabase();
+ this.tabletConversionThresholdBytes =
+ loadTsFileTableStatement.getTabletConversionThresholdBytes();
}
protected String getStatementString() {
return statementString;
}
- protected boolean isVerifySchema() {
- return isVerifySchema;
+ protected int getDatabaseLevel() {
+ return databaseLevel;
}
- protected boolean isConvertOnTypeMismatch() {
- return isConvertOnTypeMismatch;
+ protected boolean isVerifySchema() {
+ return isVerifySchema;
}
protected boolean isAutoCreateDatabase() {
return isAutoCreateDatabase;
}
- protected int getDatabaseLevel() {
- return databaseLevel;
+ protected boolean isConvertOnTypeMismatch() {
+ return isConvertOnTypeMismatch;
}
public IAnalysis analyzeFileByFile(IAnalysis analysis) {
- checkBeforeAnalyzeFileByFile(analysis);
- if (analysis.isFinishQueryAfterAnalyze()) {
- return analysis;
- }
-
- if (!doAnalyzeFileByFile(analysis)) {
- // return false means the analysis is failed because of exception
+ if (!checkBeforeAnalyzeFileByFile(analysis)) {
return analysis;
}
try {
- // flush remaining metadata of tree-model, currently no need for
table-model
- if (treeSchemaAutoCreatorAndVerifier != null) {
- treeSchemaAutoCreatorAndVerifier.flush();
+ if (!doAnalyzeFileByFile(analysis)) {
+ return analysis;
+ }
+
+ final long startTime = System.nanoTime();
+ try {
+ // flush remaining metadata of tree-model, currently no need for
table-model
+ if (treeSchemaAutoCreatorAndVerifier != null) {
+ treeSchemaAutoCreatorAndVerifier.flush();
+ }
+ } finally {
+ LoadTsFileCostMetricsSet.getInstance()
+ .recordPhaseTimeCost(ANALYSIS, System.nanoTime() - startTime);
}
} catch (AuthException e) {
setFailAnalysisForAuthException(analysis, e);
return analysis;
} catch (LoadAnalyzeException e) {
- executeTabletConversion(analysis, e);
+ executeTabletConversionOnException(analysis, e);
return analysis;
} catch (Exception e) {
final String exceptionMessage =
@@ -216,20 +234,27 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
LOGGER.info("Load - Analysis Stage: all tsfiles have been analyzed.");
- // data partition will be queried in the scheduler
- setRealStatement(analysis);
setTsFileModelInfoToStatement();
+ if (reconstructStatementIfMiniFileConverted()) {
+ // All mini tsfiles are converted to tablets, so the analysis is
finished.
+ analysis.setFinishQueryAfterAnalyze(true);
+ analysis.setFailStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
+ return analysis;
+ }
+
+ // Still some tsfiles are not converted to tablets, so the load process
should continue.
+ setRealStatement(analysis);
return analysis;
}
- private void checkBeforeAnalyzeFileByFile(IAnalysis analysis) {
+ private boolean checkBeforeAnalyzeFileByFile(IAnalysis analysis) {
if (TSFileDescriptor.getInstance().getConfig().getEncryptFlag()) {
analysis.setFinishQueryAfterAnalyze(true);
analysis.setFailStatus(
RpcUtils.getStatus(
TSStatusCode.LOAD_FILE_ERROR,
"TSFile encryption is enabled, and the Load TSFile function is
disabled"));
- return;
+ return false;
}
// check if the system is read only
@@ -237,7 +262,10 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
analysis.setFinishQueryAfterAnalyze(true);
analysis.setFailStatus(
RpcUtils.getStatus(TSStatusCode.SYSTEM_READ_ONLY,
LoadReadOnlyException.MESSAGE));
+ return false;
}
+
+ return true;
}
private boolean doAnalyzeFileByFile(IAnalysis analysis) {
@@ -257,6 +285,7 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
continue;
}
+ final long startTime = System.nanoTime();
try {
analyzeSingleTsFile(tsFile, i);
if (LOGGER.isInfoEnabled()) {
@@ -268,7 +297,7 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
setFailAnalysisForAuthException(analysis, e);
return false;
} catch (LoadAnalyzeException e) {
- executeTabletConversion(analysis, e);
+ executeTabletConversionOnException(analysis, e);
// just return false to STOP the analysis process,
// the real result on the conversion will be set in the analysis.
return false;
@@ -288,15 +317,17 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
analysis.setFinishQueryAfterAnalyze(true);
analysis.setFailStatus(RpcUtils.getStatus(TSStatusCode.LOAD_FILE_ERROR,
exceptionMessage));
return false;
+ } finally {
+ LoadTsFileCostMetricsSet.getInstance()
+ .recordPhaseTimeCost(ANALYSIS, System.nanoTime() - startTime);
}
}
+
return true;
}
- private void analyzeSingleTsFile(final File tsFile, int i)
- throws IOException, AuthException, LoadAnalyzeException {
+ private void analyzeSingleTsFile(final File tsFile, int i) throws Exception {
try (final TsFileSequenceReader reader = new
TsFileSequenceReader(tsFile.getAbsolutePath())) {
-
// check whether the tsfile is tree-model or not
final Map<String, TableSchema> tableSchemaMap =
reader.getTableSchemaMap();
final boolean isTableModelFile = Objects.nonNull(tableSchemaMap) &&
!tableSchemaMap.isEmpty();
@@ -327,6 +358,12 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
this.isTableModelTsFile.set(i, isTableModelFile);
this.isTableModelTsFileReliableIndex = i;
+ if (0 <= tabletConversionThresholdBytes
+ && tsFile.length() <= tabletConversionThresholdBytes
+ && handleSingleMiniFile(i)) {
+ return;
+ }
+
if (isTableModelFile) {
doAnalyzeSingleTableFile(tsFile, reader, timeseriesMetadataIterator,
tableSchemaMap);
} else {
@@ -340,6 +377,50 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
}
}
+ private boolean handleSingleMiniFile(final int i) throws
FileNotFoundException {
+ final long startTime = System.nanoTime();
+ try {
+ final LoadTsFileDataTypeConverter loadTsFileDataTypeConverter =
+ new LoadTsFileDataTypeConverter(isGeneratedByPipe);
+
+ final TSStatus status =
+ isTableModelTsFile.get(i)
+ ? loadTsFileDataTypeConverter
+ .convertForTableModel(
+ new LoadTsFile(null, tsFiles.get(i).getPath(),
Collections.emptyMap())
+ .setDatabase(databaseForTableData)
+ .setDeleteAfterLoad(isDeleteAfterLoad)
+ .setConvertOnTypeMismatch(isConvertOnTypeMismatch))
+ .orElse(null)
+ : loadTsFileDataTypeConverter
+ .convertForTreeModel(
+ new LoadTsFileStatement(tsFiles.get(i).getPath())
+ .setDeleteAfterLoad(isDeleteAfterLoad)
+ .setConvertOnTypeMismatch(isConvertOnTypeMismatch))
+ .orElse(null);
+
+ if (status == null || !loadTsFileDataTypeConverter.isSuccessful(status))
{
+ LOGGER.warn(
+ "Load: Failed to convert mini tsfile {} to tablets from statement
{}. Status: {}.",
+ tsFiles.get(i).getPath(),
+ isTableModelStatement ? loadTsFileTableStatement :
loadTsFileTreeStatement,
+ status);
+ return false;
+ }
+
+ // A mark of successful conversion
+ isMiniTsFile.set(i, Boolean.TRUE);
+ isMiniTsFileConverted = true;
+
+ addTsFileResource(null);
+ addWritePointCount(0);
+ return true;
+ } finally {
+ LOAD_TSFILE_COST_METRICS_SET.recordPhaseTimeCost(
+ LoadTsFileCostMetricsSet.ANALYSIS_CAST_TABLETS, System.nanoTime() -
startTime);
+ }
+ }
+
private void doAnalyzeSingleTreeFile(
final File tsFile,
final TsFileSequenceReader reader,
@@ -527,23 +608,47 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
}
}
- private void setRealStatement(IAnalysis analysis) {
+ private void setTsFileModelInfoToStatement() {
if (isTableModelStatement) {
- // Do nothing by now.
+ this.loadTsFileTableStatement.setIsTableModel(this.isTableModelTsFile);
} else {
- analysis.setRealStatement(loadTsFileTreeStatement);
+ this.loadTsFileTreeStatement.setIsTableModel(this.isTableModelTsFile);
}
}
- private void setTsFileModelInfoToStatement() {
+ private boolean reconstructStatementIfMiniFileConverted() {
+ if (!isMiniTsFileConverted) {
+ return false;
+ }
+
+ return isTableModelStatement
+ ?
loadTsFileTableStatement.reconstructStatementIfMiniFileConverted(isMiniTsFile)
+ :
loadTsFileTreeStatement.reconstructStatementIfMiniFileConverted(isMiniTsFile);
+ }
+
+ private void setRealStatement(IAnalysis analysis) {
if (isTableModelStatement) {
- this.loadTsFileTableStatement.setIsTableModel(this.isTableModelTsFile);
+ // Do nothing by now.
} else {
- this.loadTsFileTreeStatement.setIsTableModel(this.isTableModelTsFile);
+ analysis.setRealStatement(loadTsFileTreeStatement);
}
}
- private void executeTabletConversion(final IAnalysis analysis, final
LoadAnalyzeException e) {
+ private void setFailAnalysisForAuthException(IAnalysis analysis,
AuthException e) {
+ analysis.setFinishQueryAfterAnalyze(true);
+ analysis.setFailStatus(RpcUtils.getStatus(e.getCode(), e.getMessage()));
+ }
+
+ private void executeTabletConversionOnException(
+ final IAnalysis analysis, final LoadAnalyzeException e) {
+ if (shouldSkipConversion(e)) {
+ analysis.setFailStatus(
+ new
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage(e.getMessage()));
+ analysis.setFinishQueryAfterAnalyze(true);
+ setRealStatement(analysis);
+ return;
+ }
+
if (isTableModelTsFileReliableIndex < tsFiles.size() - 1) {
try {
getFileModelInfoBeforeTabletConversion();
@@ -564,19 +669,23 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
new LoadTsFileDataTypeConverter(isGeneratedByPipe);
for (int i = 0; i < tsFiles.size(); i++) {
+ final long startTime = System.nanoTime();
try {
final TSStatus status =
- (!(e instanceof LoadAnalyzeTypeMismatchException) ||
isConvertOnTypeMismatch)
- ? (isTableModelTsFile.get(i)
- ? loadTsFileDataTypeConverter
- .convertForTableModel(
- new LoadTsFile(null, tsFiles.get(i).getPath(),
Collections.emptyMap())
- .setDatabase(databaseForTableData))
- .orElse(null)
- : loadTsFileDataTypeConverter
- .convertForTreeModel(new
LoadTsFileStatement(tsFiles.get(i).getPath()))
- .orElse(null))
- : null;
+ isTableModelTsFile.get(i)
+ ? loadTsFileDataTypeConverter
+ .convertForTableModel(
+ new LoadTsFile(null, tsFiles.get(i).getPath(),
Collections.emptyMap())
+ .setDatabase(databaseForTableData)
+ .setDeleteAfterLoad(isDeleteAfterLoad)
+ .setConvertOnTypeMismatch(isConvertOnTypeMismatch))
+ .orElse(null)
+ : loadTsFileDataTypeConverter
+ .convertForTreeModel(
+ new LoadTsFileStatement(tsFiles.get(i).getPath())
+ .setDeleteAfterLoad(isDeleteAfterLoad)
+ .setConvertOnTypeMismatch(isConvertOnTypeMismatch))
+ .orElse(null);
if (status == null) {
LOGGER.warn(
@@ -602,6 +711,9 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
analysis.setFailStatus(
new
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage(e.getMessage()));
break;
+ } finally {
+ LOAD_TSFILE_COST_METRICS_SET.recordPhaseTimeCost(
+ LoadTsFileCostMetricsSet.ANALYSIS_CAST_TABLETS, System.nanoTime()
- startTime);
}
}
@@ -609,6 +721,10 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
setRealStatement(analysis);
}
+ private boolean shouldSkipConversion(LoadAnalyzeException e) {
+ return (e instanceof LoadAnalyzeTypeMismatchException) &&
!isConvertOnTypeMismatch;
+ }
+
private void getFileModelInfoBeforeTabletConversion() throws IOException {
for (int i = isTableModelTsFileReliableIndex + 1; i < tsFiles.size(); i++)
{
try (final TsFileSequenceReader reader =
@@ -620,11 +736,6 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
}
}
- private void setFailAnalysisForAuthException(IAnalysis analysis,
AuthException e) {
- analysis.setFinishQueryAfterAnalyze(true);
- analysis.setFailStatus(RpcUtils.getStatus(e.getCode(), e.getMessage()));
- }
-
@Override
public void close() throws Exception {
if (treeSchemaAutoCreatorAndVerifier != null) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
index 8619f5b89eb..62691ca4069 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
@@ -176,7 +176,6 @@ import
org.apache.iotdb.db.queryengine.plan.statement.component.FillPolicy;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache;
import org.apache.iotdb.db.schemaengine.table.InformationSchemaUtils;
-import org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.udf.api.relational.TableFunction;
@@ -255,7 +254,6 @@ import static
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Join.Type.
import static
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Join.Type.RIGHT;
import static
org.apache.iotdb.db.queryengine.plan.relational.sql.util.AstUtil.preOrder;
import static
org.apache.iotdb.db.queryengine.plan.relational.utils.NodeUtils.getSortItemsFromOrderBy;
-import static
org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet.ANALYSIS;
import static org.apache.tsfile.read.common.type.BooleanType.BOOLEAN;
public class StatementAnalyzer {
@@ -638,7 +636,6 @@ public class StatementAnalyzer {
protected Scope visitLoadTsFile(final LoadTsFile node, final
Optional<Scope> scope) {
queryContext.setQueryType(QueryType.WRITE);
- final long startTime = System.nanoTime();
try (final LoadTsFileAnalyzer loadTsFileAnalyzer =
new LoadTsFileAnalyzer(node, node.isGeneratedByPipe(),
queryContext)) {
loadTsFileAnalyzer.analyzeFileByFile(analysis);
@@ -649,9 +646,6 @@ public class StatementAnalyzer {
node, e.getMessage() == null ? e.getClass().getName() :
e.getMessage());
analysis.setFinishQueryAfterAnalyze(true);
analysis.setFailStatus(RpcUtils.getStatus(TSStatusCode.LOAD_FILE_ERROR,
exceptionMessage));
- } finally {
- LoadTsFileCostMetricsSet.getInstance()
- .recordPhaseTimeCost(ANALYSIS, System.nanoTime() - startTime);
}
return createAndAssignScope(node, scope);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java
index 7414f1bee2b..f42ac22002e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java
@@ -36,43 +36,47 @@ import static
com.google.common.base.MoreObjects.toStringHelper;
import static java.util.Objects.requireNonNull;
public class LoadTsFile extends Statement {
+
private final String filePath;
- private final File file;
private int databaseLevel; // For loading to tree-model only
private String database; // For loading to table-model only
private boolean deleteAfterLoad = false;
private boolean convertOnTypeMismatch = true;
+ private long tabletConversionThresholdBytes = -1;
private boolean autoCreateDatabase = true;
- private boolean verify;
+ private boolean verify = true;
+
private boolean isGeneratedByPipe = false;
private final Map<String, String> loadAttributes;
- private final List<File> tsFiles;
+ private List<File> tsFiles;
+ private List<TsFileResource> resources;
+ private List<Long> writePointCountList;
private List<Boolean> isTableModel;
- private final List<TsFileResource> resources;
- private final List<Long> writePointCountList;
public LoadTsFile(NodeLocation location, String filePath, Map<String,
String> loadAttributes) {
super(location);
this.filePath = requireNonNull(filePath, "filePath is null");
- this.file = new File(filePath);
this.databaseLevel =
IoTDBDescriptor.getInstance().getConfig().getDefaultStorageGroupLevel();
this.deleteAfterLoad = false;
this.convertOnTypeMismatch = true;
- this.verify = true;
+ this.tabletConversionThresholdBytes =
+
IoTDBDescriptor.getInstance().getConfig().getLoadTabletConversionThresholdBytes();
this.autoCreateDatabase =
IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled();
- this.resources = new ArrayList<>();
- this.writePointCountList = new ArrayList<>();
+ this.verify = true;
+
this.loadAttributes = loadAttributes == null ? Collections.emptyMap() :
loadAttributes;
initAttributes();
try {
this.tsFiles =
org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement.processTsFile(
- file);
+ new File(filePath));
+ this.resources = new ArrayList<>();
+ this.writePointCountList = new ArrayList<>();
this.isTableModel = new
ArrayList<>(Collections.nCopies(this.tsFiles.size(), true));
} catch (FileNotFoundException e) {
throw new SemanticException(e);
@@ -99,10 +103,24 @@ public class LoadTsFile extends Statement {
return deleteAfterLoad;
}
+ public LoadTsFile setDeleteAfterLoad(boolean deleteAfterLoad) {
+ this.deleteAfterLoad = deleteAfterLoad;
+ return this;
+ }
+
public boolean isConvertOnTypeMismatch() {
return convertOnTypeMismatch;
}
+ public LoadTsFile setConvertOnTypeMismatch(boolean convertOnTypeMismatch) {
+ this.convertOnTypeMismatch = convertOnTypeMismatch;
+ return this;
+ }
+
+ public long getTabletConversionThresholdBytes() {
+ return tabletConversionThresholdBytes;
+ }
+
public boolean isVerifySchema() {
return verify;
}
@@ -162,9 +180,52 @@ public class LoadTsFile extends Statement {
this.deleteAfterLoad =
LoadTsFileConfigurator.parseOrGetDefaultOnSuccess(loadAttributes);
this.convertOnTypeMismatch =
LoadTsFileConfigurator.parseOrGetDefaultConvertOnTypeMismatch(loadAttributes);
+ this.tabletConversionThresholdBytes =
+
LoadTsFileConfigurator.parseOrGetDefaultTabletConversionThresholdBytes(loadAttributes);
this.verify =
LoadTsFileConfigurator.parseOrGetDefaultVerify(loadAttributes);
}
+ public boolean reconstructStatementIfMiniFileConverted(final List<Boolean>
isMiniTsFile) {
+ int lastNonMiniTsFileIndex = -1;
+
+ for (int i = 0, n = isMiniTsFile.size(); i < n; i++) {
+ if (isMiniTsFile.get(i)) {
+ continue;
+ }
+
+ ++lastNonMiniTsFileIndex;
+ if (tsFiles != null) {
+ tsFiles.set(lastNonMiniTsFileIndex, tsFiles.get(i));
+ }
+ if (isTableModel != null) {
+ isTableModel.set(lastNonMiniTsFileIndex, isTableModel.get(i));
+ }
+ if (resources != null) {
+ resources.set(lastNonMiniTsFileIndex, resources.get(i));
+ }
+ if (writePointCountList != null) {
+ writePointCountList.set(lastNonMiniTsFileIndex,
writePointCountList.get(i));
+ }
+ }
+
+ tsFiles =
+ tsFiles != null ? tsFiles.subList(0, lastNonMiniTsFileIndex + 1) :
Collections.emptyList();
+ isTableModel =
+ isTableModel != null
+ ? isTableModel.subList(0, lastNonMiniTsFileIndex + 1)
+ : Collections.emptyList();
+ resources =
+ resources != null
+ ? resources.subList(0, lastNonMiniTsFileIndex + 1)
+ : Collections.emptyList();
+ writePointCountList =
+ writePointCountList != null
+ ? writePointCountList.subList(0, lastNonMiniTsFileIndex + 1)
+ : Collections.emptyList();
+
+ return tsFiles == null || tsFiles.isEmpty();
+ }
+
@Override
public <R, C> R accept(AstVisitor<R, C> visitor, C context) {
return visitor.visitLoadTsFile(this, context);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index 4bb85991961..86391555a8b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -300,7 +300,7 @@ public class LoadTsFileScheduler implements IScheduler {
convertFailedTsFilesToTabletsAndRetry();
} finally {
LOAD_TSFILE_COST_METRICS_SET.recordPhaseTimeCost(
- LoadTsFileCostMetricsSet.CAST_TABLETS, System.nanoTime() -
startTime);
+ LoadTsFileCostMetricsSet.SCHEDULER_CAST_TABLETS,
System.nanoTime() - startTime);
}
}
} finally {
@@ -576,10 +576,15 @@ public class LoadTsFileScheduler implements IScheduler {
? loadTsFileDataTypeConverter
.convertForTableModel(
new LoadTsFile(null, filePath, Collections.emptyMap())
- .setDatabase(failedNode.getDatabase()))
+ .setDatabase(failedNode.getDatabase())
+ .setDeleteAfterLoad(failedNode.isDeleteAfterLoad())
+ .setConvertOnTypeMismatch(true))
.orElse(null)
: loadTsFileDataTypeConverter
- .convertForTreeModel(new LoadTsFileStatement(filePath))
+ .convertForTreeModel(
+ new LoadTsFileStatement(filePath)
+ .setDeleteAfterLoad(failedNode.isDeleteAfterLoad())
+ .setConvertOnTypeMismatch(true))
.orElse(null);
if (loadTsFileDataTypeConverter.isSuccessful(status)) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
index 0c9c0eaaf48..2acd906cfa4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
@@ -49,6 +49,7 @@ import static
org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurat
import static
org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.ON_SUCCESS_DELETE_VALUE;
import static
org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.ON_SUCCESS_KEY;
import static
org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.ON_SUCCESS_NONE_VALUE;
+import static
org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.TABLET_CONVERSION_THRESHOLD_KEY;
public class LoadTsFileStatement extends Statement {
@@ -58,15 +59,16 @@ public class LoadTsFileStatement extends Statement {
private boolean verifySchema = true;
private boolean deleteAfterLoad = false;
private boolean convertOnTypeMismatch = true;
+ private long tabletConversionThresholdBytes = -1;
private boolean autoCreateDatabase = true;
private boolean isGeneratedByPipe = false;
private Map<String, String> loadAttributes;
- private final List<File> tsFiles;
+ private List<File> tsFiles;
private List<Boolean> isTableModel;
- private final List<TsFileResource> resources;
- private final List<Long> writePointCountList;
+ private List<TsFileResource> resources;
+ private List<Long> writePointCountList;
public LoadTsFileStatement(String filePath) throws FileNotFoundException {
this.file = new File(filePath);
@@ -74,13 +76,15 @@ public class LoadTsFileStatement extends Statement {
this.verifySchema = true;
this.deleteAfterLoad = false;
this.convertOnTypeMismatch = true;
+ this.tabletConversionThresholdBytes =
+
IoTDBDescriptor.getInstance().getConfig().getLoadTabletConversionThresholdBytes();
this.autoCreateDatabase =
IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled();
- this.resources = new ArrayList<>();
- this.writePointCountList = new ArrayList<>();
- this.statementType = StatementType.MULTI_BATCH_INSERT;
this.tsFiles = processTsFile(file);
+ this.resources = new ArrayList<>();
+ this.writePointCountList = new ArrayList<>();
this.isTableModel = new
ArrayList<>(Collections.nCopies(this.tsFiles.size(), false));
+ this.statementType = StatementType.MULTI_BATCH_INSERT;
}
public static List<File> processTsFile(final File file) throws
FileNotFoundException {
@@ -106,7 +110,10 @@ public class LoadTsFileStatement extends Statement {
this.verifySchema = true;
this.deleteAfterLoad = false;
this.convertOnTypeMismatch = true;
+ this.tabletConversionThresholdBytes =
+
IoTDBDescriptor.getInstance().getConfig().getLoadTabletConversionThresholdBytes();
this.autoCreateDatabase =
IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled();
+
this.tsFiles = new ArrayList<>();
this.resources = new ArrayList<>();
this.writePointCountList = new ArrayList<>();
@@ -167,22 +174,32 @@ public class LoadTsFileStatement extends Statement {
return verifySchema;
}
- public void setDeleteAfterLoad(boolean deleteAfterLoad) {
+ public LoadTsFileStatement setDeleteAfterLoad(boolean deleteAfterLoad) {
this.deleteAfterLoad = deleteAfterLoad;
+ return this;
}
public boolean isDeleteAfterLoad() {
return deleteAfterLoad;
}
- public void setConvertOnTypeMismatch(boolean convertOnTypeMismatch) {
+ public LoadTsFileStatement setConvertOnTypeMismatch(boolean
convertOnTypeMismatch) {
this.convertOnTypeMismatch = convertOnTypeMismatch;
+ return this;
}
public boolean isConvertOnTypeMismatch() {
return convertOnTypeMismatch;
}
+ public void setTabletConversionThresholdBytes(long
tabletConversionThresholdBytes) {
+ this.tabletConversionThresholdBytes = tabletConversionThresholdBytes;
+ }
+
+ public long getTabletConversionThresholdBytes() {
+ return tabletConversionThresholdBytes;
+ }
+
public void setAutoCreateDatabase(boolean autoCreateDatabase) {
this.autoCreateDatabase = autoCreateDatabase;
}
@@ -238,6 +255,49 @@ public class LoadTsFileStatement extends Statement {
this.deleteAfterLoad =
LoadTsFileConfigurator.parseOrGetDefaultOnSuccess(loadAttributes);
this.convertOnTypeMismatch =
LoadTsFileConfigurator.parseOrGetDefaultConvertOnTypeMismatch(loadAttributes);
+ this.tabletConversionThresholdBytes =
+
LoadTsFileConfigurator.parseOrGetDefaultTabletConversionThresholdBytes(loadAttributes);
+ this.verifySchema =
LoadTsFileConfigurator.parseOrGetDefaultVerify(loadAttributes);
+ }
+
+ public boolean reconstructStatementIfMiniFileConverted(final List<Boolean>
isMiniTsFile) {
+ int lastNonMiniTsFileIndex = -1;
+
+ for (int i = 0, n = isMiniTsFile.size(); i < n; i++) {
+ if (isMiniTsFile.get(i)) {
+ continue;
+ }
+ ++lastNonMiniTsFileIndex;
+ if (tsFiles != null) {
+ tsFiles.set(lastNonMiniTsFileIndex, tsFiles.get(i));
+ }
+ if (isTableModel != null) {
+ isTableModel.set(lastNonMiniTsFileIndex, isTableModel.get(i));
+ }
+ if (resources != null) {
+ resources.set(lastNonMiniTsFileIndex, resources.get(i));
+ }
+ if (writePointCountList != null) {
+ writePointCountList.set(lastNonMiniTsFileIndex,
writePointCountList.get(i));
+ }
+ }
+
+ tsFiles =
+ tsFiles != null ? tsFiles.subList(0, lastNonMiniTsFileIndex + 1) :
Collections.emptyList();
+ isTableModel =
+ isTableModel != null
+ ? isTableModel.subList(0, lastNonMiniTsFileIndex + 1)
+ : Collections.emptyList();
+ resources =
+ resources != null
+ ? resources.subList(0, lastNonMiniTsFileIndex + 1)
+ : Collections.emptyList();
+ writePointCountList =
+ writePointCountList != null
+ ? writePointCountList.subList(0, lastNonMiniTsFileIndex + 1)
+ : Collections.emptyList();
+
+ return tsFiles == null || tsFiles.isEmpty();
}
@Override
@@ -264,6 +324,8 @@ public class LoadTsFileStatement extends Statement {
loadAttributes.put(
ON_SUCCESS_KEY, deleteAfterLoad ? ON_SUCCESS_DELETE_VALUE :
ON_SUCCESS_NONE_VALUE);
loadAttributes.put(CONVERT_ON_TYPE_MISMATCH_KEY,
String.valueOf(convertOnTypeMismatch));
+ loadAttributes.put(
+ TABLET_CONVERSION_THRESHOLD_KEY,
String.valueOf(tabletConversionThresholdBytes));
return new LoadTsFile(null, file.getAbsolutePath(), loadAttributes);
}
@@ -286,6 +348,8 @@ public class LoadTsFileStatement extends Statement {
+ verifySchema
+ ", convert-on-type-mismatch="
+ convertOnTypeMismatch
+ + ", tablet-conversion-threshold="
+ + tabletConversionThresholdBytes
+ ", tsFiles size="
+ tsFiles.size()
+ '}';
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
index 9a5eef5ba0a..0a21c82be27 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
@@ -45,6 +45,7 @@ public class LoadTsFileConfigurator {
validateOnSuccessParam(value);
break;
case DATABASE_NAME_KEY:
+ case TABLET_CONVERSION_THRESHOLD_KEY:
break;
case CONVERT_ON_TYPE_MISMATCH_KEY:
validateConvertOnTypeMismatchParam(value);
@@ -133,6 +134,19 @@ public class LoadTsFileConfigurator {
CONVERT_ON_TYPE_MISMATCH_KEY,
String.valueOf(CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE)));
}
+ public static final String TABLET_CONVERSION_THRESHOLD_KEY =
"tablet-conversion-threshold";
+
+ public static long parseOrGetDefaultTabletConversionThresholdBytes(
+ final Map<String, String> loadAttributes) {
+ return Long.parseLong(
+ loadAttributes.getOrDefault(
+ TABLET_CONVERSION_THRESHOLD_KEY,
+ String.valueOf(
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getLoadTabletConversionThresholdBytes())));
+ }
+
public static final String VERIFY_KEY = "verify";
private static final boolean VERIFY_DEFAULT_VALUE = true;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableStatementDataTypeConvertExecutionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableStatementDataTypeConvertExecutionVisitor.java
index 2a8a98c59e2..398cee63627 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableStatementDataTypeConvertExecutionVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableStatementDataTypeConvertExecutionVisitor.java
@@ -27,6 +27,9 @@ import
org.apache.iotdb.db.pipe.event.common.tsfile.parser.table.TsFileInsertion
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AstVisitor;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadTsFile;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
+import
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
+import
org.apache.iotdb.db.storageengine.dataregion.modification.v1.ModificationFileV1;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -139,7 +142,16 @@ public class
LoadTableStatementDataTypeConvertExecutionVisitor
}
if (loadTsFileStatement.isDeleteAfterLoad()) {
- loadTsFileStatement.getTsFiles().forEach(FileUtils::deleteQuietly);
+ loadTsFileStatement
+ .getTsFiles()
+ .forEach(
+ tsfile -> {
+ FileUtils.deleteQuietly(tsfile);
+ final String tsFilePath = tsfile.getAbsolutePath();
+ FileUtils.deleteQuietly(new File(tsFilePath +
TsFileResource.RESOURCE_SUFFIX));
+ FileUtils.deleteQuietly(new File(tsFilePath +
ModificationFileV1.FILE_SUFFIX));
+ FileUtils.deleteQuietly(new File(tsFilePath +
ModificationFile.FILE_SUFFIX));
+ });
}
LOGGER.info(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
index eb7efba3618..d92dbeab3e5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
@@ -27,6 +27,9 @@ import
org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.db.queryengine.plan.statement.StatementNode;
import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
+import
org.apache.iotdb.db.storageengine.dataregion.modification.v1.ModificationFileV1;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.commons.io.FileUtils;
@@ -124,7 +127,16 @@ public class
LoadTreeStatementDataTypeConvertExecutionVisitor
}
if (loadTsFileStatement.isDeleteAfterLoad()) {
- loadTsFileStatement.getTsFiles().forEach(FileUtils::deleteQuietly);
+ loadTsFileStatement
+ .getTsFiles()
+ .forEach(
+ tsfile -> {
+ FileUtils.deleteQuietly(tsfile);
+ final String tsFilePath = tsfile.getAbsolutePath();
+ FileUtils.deleteQuietly(new File(tsFilePath +
TsFileResource.RESOURCE_SUFFIX));
+ FileUtils.deleteQuietly(new File(tsFilePath +
ModificationFileV1.FILE_SUFFIX));
+ FileUtils.deleteQuietly(new File(tsFilePath +
ModificationFile.FILE_SUFFIX));
+ });
}
LOGGER.info(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileCostMetricsSet.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileCostMetricsSet.java
index 5947f101b06..1ccd00cb9bd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileCostMetricsSet.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileCostMetricsSet.java
@@ -40,7 +40,8 @@ public class LoadTsFileCostMetricsSet implements IMetricSet {
public static final String FIRST_PHASE = "first_phase";
public static final String SECOND_PHASE = "second_phase";
public static final String LOAD_LOCALLY = "load_locally";
- public static final String CAST_TABLETS = "cast_tablets";
+ public static final String SCHEDULER_CAST_TABLETS = "scheduler_cast_tablets";
+ public static final String ANALYSIS_CAST_TABLETS = "analysis_cast_tablets";
private LoadTsFileCostMetricsSet() {
// empty constructor
@@ -50,7 +51,8 @@ public class LoadTsFileCostMetricsSet implements IMetricSet {
private Timer firstPhaseTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
private Timer secondPhaseTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
private Timer loadLocallyTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
- private Timer castTabletsTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+ private Timer schedulerCastTabletsTimer =
DoNothingMetricManager.DO_NOTHING_TIMER;
+ private Timer analysisCastTabletsTimer =
DoNothingMetricManager.DO_NOTHING_TIMER;
private Counter diskIOCounter = DoNothingMetricManager.DO_NOTHING_COUNTER;
@@ -68,8 +70,11 @@ public class LoadTsFileCostMetricsSet implements IMetricSet {
case LOAD_LOCALLY:
loadLocallyTimer.updateNanos(costTimeInNanos);
break;
- case CAST_TABLETS:
- castTabletsTimer.updateNanos(costTimeInNanos);
+ case SCHEDULER_CAST_TABLETS:
+ schedulerCastTabletsTimer.updateNanos(costTimeInNanos);
+ break;
+ case ANALYSIS_CAST_TABLETS:
+ analysisCastTabletsTimer.updateNanos(costTimeInNanos);
break;
default:
throw new UnsupportedOperationException("Unsupported stage: " + stage);
@@ -103,12 +108,18 @@ public class LoadTsFileCostMetricsSet implements
IMetricSet {
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
LOAD_LOCALLY);
- castTabletsTimer =
+ schedulerCastTabletsTimer =
+ metricService.getOrCreateTimer(
+ Metric.LOAD_TIME_COST.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ SCHEDULER_CAST_TABLETS);
+ analysisCastTabletsTimer =
metricService.getOrCreateTimer(
Metric.LOAD_TIME_COST.toString(),
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
- CAST_TABLETS);
+ ANALYSIS_CAST_TABLETS);
diskIOCounter =
metricService.getOrCreateCounter(
@@ -120,7 +131,13 @@ public class LoadTsFileCostMetricsSet implements
IMetricSet {
@Override
public void unbindFrom(AbstractMetricService metricService) {
- Arrays.asList(ANALYSIS, FIRST_PHASE, SECOND_PHASE, LOAD_LOCALLY,
CAST_TABLETS)
+ Arrays.asList(
+ ANALYSIS,
+ FIRST_PHASE,
+ SECOND_PHASE,
+ LOAD_LOCALLY,
+ SCHEDULER_CAST_TABLETS,
+ ANALYSIS_CAST_TABLETS)
.forEach(
stage ->
metricService.remove(