This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new cd0c0a5880c Load: Support converting mini TsFile into Tablets & Adjust
the way to extract metrics & Fix file not delete when analysis cast happens &
Pipe IT: Ignore IoTDBPipeProcessorIT.testTumblingTimeSamplingProcessor (#14784)
(#15199)
cd0c0a5880c is described below
commit cd0c0a5880c6510e6f4c004736944e70307e0b58
Author: Itami Sho <[email protected]>
AuthorDate: Mon Apr 7 20:58:26 2025 +0800
Load: Support converting mini TsFile into Tablets & Adjust the way to
extract metrics & Fix file not delete when analysis cast happens & Pipe IT:
Ignore IoTDBPipeProcessorIT.testTumblingTimeSamplingProcessor (#14784) (#15199)
---
.../pipe/it/autocreate/IoTDBPipeProcessorIT.java | 2 +
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 21 ++
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 11 +
.../queryengine/plan/analyze/AnalyzeVisitor.java | 8 +-
.../plan/analyze/LoadTsFileAnalyzer.java | 378 ++++++++++++++-------
.../plan/scheduler/load/LoadTsFileScheduler.java | 7 +-
.../plan/statement/crud/LoadTsFileStatement.java | 65 +++-
.../load/config/LoadTsFileConfigurator.java | 15 +
...eeStatementDataTypeConvertExecutionVisitor.java | 12 +-
.../load/metrics/LoadTsFileCostMetricsSet.java | 31 +-
10 files changed, 408 insertions(+), 142 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProcessorIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProcessorIT.java
index 27bc5be9659..125fd6972b1 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProcessorIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProcessorIT.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.rpc.TSStatusCode;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@@ -74,6 +75,7 @@ public class IoTDBPipeProcessorIT extends
AbstractPipeDualAutoIT {
receiverEnv.initClusterEnvironment();
}
+ @Ignore
@Test
public void testTumblingTimeSamplingProcessor() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 34e5450e5f4..c2fbb899d80 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1137,6 +1137,7 @@ public class IoTDBConfig {
/** Load related */
private double maxAllocateMemoryRatioForLoad = 0.8;
+ private int loadTsFileAnalyzeSchemaBatchReadTimeSeriesMetadataCount = 4096;
private int loadTsFileAnalyzeSchemaBatchFlushTimeSeriesNumber = 4096;
private long loadTsFileAnalyzeSchemaMemorySizeInBytes =
0L; // 0 means that the decision will be adaptive based on the number of
sequences
@@ -1152,6 +1153,8 @@ public class IoTDBConfig {
private double loadWriteThroughputBytesPerSecond = -1; // Bytes/s
+ private long loadTabletConversionThresholdBytes = -1;
+
private boolean loadActiveListeningEnable = true;
private String[] loadActiveListeningDirs =
@@ -3958,6 +3961,16 @@ public class IoTDBConfig {
this.maxAllocateMemoryRatioForLoad = maxAllocateMemoryRatioForLoad;
}
+ public int getLoadTsFileAnalyzeSchemaBatchReadTimeSeriesMetadataCount() {
+ return loadTsFileAnalyzeSchemaBatchReadTimeSeriesMetadataCount;
+ }
+
+ public void setLoadTsFileAnalyzeSchemaBatchReadTimeSeriesMetadataCount(
+ int loadTsFileAnalyzeSchemaBatchReadTimeSeriesMetadataCount) {
+ this.loadTsFileAnalyzeSchemaBatchReadTimeSeriesMetadataCount =
+ loadTsFileAnalyzeSchemaBatchReadTimeSeriesMetadataCount;
+ }
+
public int getLoadTsFileAnalyzeSchemaBatchFlushTimeSeriesNumber() {
return loadTsFileAnalyzeSchemaBatchFlushTimeSeriesNumber;
}
@@ -4026,6 +4039,14 @@ public class IoTDBConfig {
this.loadWriteThroughputBytesPerSecond = loadWriteThroughputBytesPerSecond;
}
+ public long getLoadTabletConversionThresholdBytes() {
+ return loadTabletConversionThresholdBytes;
+ }
+
+ public void setLoadTabletConversionThresholdBytes(long
loadTabletConversionThresholdBytes) {
+ this.loadTabletConversionThresholdBytes =
loadTabletConversionThresholdBytes;
+ }
+
public int getLoadActiveListeningMaxThreadNum() {
return loadActiveListeningMaxThreadNum;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 1a4d8e885b1..6646044055f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -2362,6 +2362,12 @@ public class IoTDBDescriptor {
properties.getProperty(
"max_allocate_memory_ratio_for_load",
String.valueOf(conf.getMaxAllocateMemoryRatioForLoad()))));
+ conf.setLoadTsFileAnalyzeSchemaBatchReadTimeSeriesMetadataCount(
+ Integer.parseInt(
+ properties.getProperty(
+
"load_tsfile_analyze_schema_batch_read_time_series_metadata_count",
+ String.valueOf(
+
conf.getLoadTsFileAnalyzeSchemaBatchReadTimeSeriesMetadataCount()))));
conf.setLoadTsFileAnalyzeSchemaBatchFlushTimeSeriesNumber(
Integer.parseInt(
properties.getProperty(
@@ -2398,6 +2404,11 @@ public class IoTDBDescriptor {
properties.getProperty(
"load_write_throughput_bytes_per_second",
String.valueOf(conf.getLoadWriteThroughputBytesPerSecond()))));
+ conf.setLoadTabletConversionThresholdBytes(
+ Long.parseLong(
+ properties.getProperty(
+ "load_tablet_conversion_threshold_bytes",
+
String.valueOf(conf.getLoadTabletConversionThresholdBytes()))));
conf.setLoadActiveListeningEnable(
Boolean.parseBoolean(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index 8344530728d..afc964ea301 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -157,7 +157,6 @@ import
org.apache.iotdb.db.queryengine.plan.statement.sys.ExplainStatement;
import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement;
import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowVersionStatement;
import org.apache.iotdb.db.schemaengine.template.Template;
-import org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet;
import org.apache.iotdb.db.utils.constant.SqlConstant;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -216,7 +215,6 @@ import static
org.apache.iotdb.db.queryengine.plan.optimization.LimitOffsetPushD
import static
org.apache.iotdb.db.queryengine.plan.optimization.LimitOffsetPushDown.pushDownLimitOffsetInGroupByTimeForDevice;
import static
org.apache.iotdb.db.queryengine.plan.parser.ASTVisitor.parseNodeString;
import static
org.apache.iotdb.db.schemaengine.schemaregion.view.visitor.GetSourcePathsVisitor.getSourcePaths;
-import static
org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet.ANALYSIS;
import static org.apache.iotdb.db.utils.constant.SqlConstant.COUNT_TIME_HEADER;
/** This visitor is used to analyze each type of Statement and returns the
{@link Analysis}. */
@@ -3039,10 +3037,9 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
public Analysis visitLoadFile(LoadTsFileStatement loadTsFileStatement,
MPPQueryContext context) {
context.setQueryType(QueryType.WRITE);
- final long startTime = System.nanoTime();
try (final LoadTsFileAnalyzer loadTsfileAnalyzer =
new LoadTsFileAnalyzer(loadTsFileStatement, context, partitionFetcher,
schemaFetcher)) {
- return
loadTsfileAnalyzer.analyzeFileByFile(loadTsFileStatement.isDeleteAfterLoad());
+ return loadTsfileAnalyzer.analyzeFileByFile(new Analysis());
} catch (final Exception e) {
final String exceptionMessage =
String.format(
@@ -3054,9 +3051,6 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
analysis.setFinishQueryAfterAnalyze(true);
analysis.setFailStatus(RpcUtils.getStatus(TSStatusCode.LOAD_FILE_ERROR,
exceptionMessage));
return analysis;
- } finally {
- LoadTsFileCostMetricsSet.getInstance()
- .recordPhaseTimeCost(ANALYSIS, System.nanoTime() - startTime);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
index c4292a330fd..50dcdfde018 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
@@ -67,6 +67,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils;
import
org.apache.iotdb.db.storageengine.load.converter.LoadTsFileDataTypeConverter;
import
org.apache.iotdb.db.storageengine.load.memory.LoadTsFileAnalyzeSchemaMemoryBlock;
import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileMemoryManager;
+import org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet;
import org.apache.iotdb.db.utils.ModificationUtils;
import org.apache.iotdb.db.utils.TimestampPrecisionUtils;
import org.apache.iotdb.db.utils.constant.SqlConstant;
@@ -90,6 +91,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -103,10 +105,15 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import static
org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet.ANALYSIS;
+
public class LoadTsFileAnalyzer implements AutoCloseable {
private static final Logger LOGGER =
LoggerFactory.getLogger(LoadTsFileAnalyzer.class);
+ private static final LoadTsFileCostMetricsSet LOAD_TSFILE_COST_METRICS_SET =
+ LoadTsFileCostMetricsSet.getInstance();
+
private static final IClientManager<ConfigRegionId, ConfigNodeClient>
CONFIG_NODE_CLIENT_MANAGER =
ConfigNodeClientManager.getInstance();
private static final int BATCH_FLUSH_TIME_SERIES_NUMBER;
@@ -131,6 +138,20 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
private final SchemaAutoCreatorAndVerifier schemaAutoCreatorAndVerifier;
+ private final boolean isGeneratedByPipe;
+
+ private final List<File> tsFiles;
+ private final List<Boolean> isMiniTsFile;
+ private boolean isMiniTsFileConverted = false;
+
+ // User specified configs
+ private final int databaseLevel;
+ private final boolean isVerifySchema;
+ private final boolean isAutoCreateDatabase;
+ private final boolean isDeleteAfterLoad;
+ private final boolean isConvertOnTypeMismatch;
+ private final long tabletConversionThresholdBytes;
+
LoadTsFileAnalyzer(
LoadTsFileStatement loadTsFileStatement,
MPPQueryContext context,
@@ -143,19 +164,83 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
this.schemaFetcher = schemaFetcher;
this.schemaAutoCreatorAndVerifier = new SchemaAutoCreatorAndVerifier();
+
+ this.isGeneratedByPipe = loadTsFileStatement.isGeneratedByPipe();
+
+ this.tsFiles = loadTsFileStatement.getTsFiles();
+ this.isMiniTsFile = new
ArrayList<>(Collections.nCopies(this.tsFiles.size(), false));
+
+ this.databaseLevel = loadTsFileStatement.getDatabaseLevel();
+ this.isVerifySchema = loadTsFileStatement.isVerifySchema();
+ this.isAutoCreateDatabase = loadTsFileStatement.isAutoCreateDatabase();
+ this.isDeleteAfterLoad = loadTsFileStatement.isDeleteAfterLoad();
+ this.isConvertOnTypeMismatch =
loadTsFileStatement.isConvertOnTypeMismatch();
+ this.tabletConversionThresholdBytes =
loadTsFileStatement.getTabletConversionThresholdBytes();
}
- public Analysis analyzeFileByFile(final boolean isDeleteAfterLoad) {
- final Analysis analysis = new Analysis();
+ public Analysis analyzeFileByFile(Analysis analysis) {
+ if (!checkBeforeAnalyzeFileByFile(analysis)) {
+ return analysis;
+ }
+
+ try {
+ if (!doAnalyzeFileByFile(analysis)) {
+ return analysis;
+ }
+ final long startTime = System.nanoTime();
+
+ try {
+ schemaAutoCreatorAndVerifier.flush();
+ } finally {
+ LOAD_TSFILE_COST_METRICS_SET.recordPhaseTimeCost(
+ LoadTsFileCostMetricsSet.ANALYSIS, System.nanoTime() - startTime);
+ }
+ } catch (AuthException e) {
+ return setFailAnalysisForAuthException(analysis, e);
+ } catch (LoadAnalyzeTypeMismatchException e) {
+ executeTabletConversionOnException(analysis, e);
+ // just return false to STOP the analysis process,
+ // the real result on the conversion will be set in the analysis.
+ return analysis;
+ } catch (Exception e) {
+ final String exceptionMessage =
+ String.format(
+ "Auto create or verify schema error when executing statement %s.
Detail: %s.",
+ loadTsFileStatement,
+ e.getMessage() == null ? e.getClass().getName() :
e.getMessage());
+ LOGGER.warn(exceptionMessage, e);
+ analysis.setFinishQueryAfterAnalyze(true);
+ analysis.setFailStatus(RpcUtils.getStatus(TSStatusCode.LOAD_FILE_ERROR,
exceptionMessage));
+ return analysis;
+ }
+
+ LOGGER.info("Load - Analysis Stage: all tsfiles have been analyzed.");
+
+ if (reconstructStatementIfMiniFileConverted()) {
+ // All mini tsfiles are converted to tablets, so the analysis is
finished.
+ analysis.setFinishQueryAfterAnalyze(true);
+ analysis.setFailStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
+ return analysis;
+ }
+
+ // data partition will be queried in the scheduler
+ analysis.setStatement(loadTsFileStatement);
+ return analysis;
+ }
+ private boolean checkBeforeAnalyzeFileByFile(Analysis analysis) {
// check if the system is read only
if (CommonDescriptor.getInstance().getConfig().isReadOnly()) {
analysis.setFinishQueryAfterAnalyze(true);
analysis.setFailStatus(
RpcUtils.getStatus(TSStatusCode.SYSTEM_READ_ONLY,
LoadReadOnlyException.MESSAGE));
- return analysis;
+ return false;
}
+ return true;
+ }
+
+ private boolean doAnalyzeFileByFile(Analysis analysis) {
// analyze tsfile metadata file by file
for (int i = 0, tsfileNum = loadTsFileStatement.getTsFiles().size(); i <
tsfileNum; i++) {
final File tsFile = loadTsFileStatement.getTsFiles().get(i);
@@ -172,20 +257,22 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
continue;
}
+ final long startTime = System.nanoTime();
try {
- analyzeSingleTsFile(tsFile, isDeleteAfterLoad);
+ analyzeSingleTsFile(tsFile, i);
if (LOGGER.isInfoEnabled()) {
LOGGER.info(
"Load - Analysis Stage: {}/{} tsfiles have been analyzed,
progress: {}%",
i + 1, tsfileNum, String.format("%.3f", (i + 1) * 100.00 /
tsfileNum));
}
} catch (AuthException e) {
- return createFailAnalysisForAuthException(e);
+ setFailAnalysisForAuthException(analysis, e);
+ return false;
} catch (LoadAnalyzeTypeMismatchException e) {
- executeTabletConversion(analysis, e);
+ executeTabletConversionOnException(analysis, e);
// just return false to STOP the analysis process,
// the real result on the conversion will be set in the analysis.
- return analysis;
+ return false;
} catch (Exception e) {
final String exceptionMessage =
String.format(
@@ -194,134 +281,137 @@ public class LoadTsFileAnalyzer implements
AutoCloseable {
LOGGER.warn(exceptionMessage, e);
analysis.setFinishQueryAfterAnalyze(true);
analysis.setFailStatus(RpcUtils.getStatus(TSStatusCode.LOAD_FILE_ERROR,
exceptionMessage));
- return analysis;
+ return false;
+ } finally {
+ LoadTsFileCostMetricsSet.getInstance()
+ .recordPhaseTimeCost(ANALYSIS, System.nanoTime() - startTime);
}
}
- try {
- schemaAutoCreatorAndVerifier.flush();
- } catch (AuthException e) {
- return createFailAnalysisForAuthException(e);
- } catch (LoadAnalyzeTypeMismatchException e) {
- executeTabletConversion(analysis, e);
- // just return false to STOP the analysis process,
- // the real result on the conversion will be set in the analysis.
- return analysis;
- } catch (Exception e) {
- final String exceptionMessage =
- String.format(
- "Auto create or verify schema error when executing statement %s.
Detail: %s.",
- loadTsFileStatement,
- e.getMessage() == null ? e.getClass().getName() :
e.getMessage());
- LOGGER.warn(exceptionMessage, e);
- analysis.setFinishQueryAfterAnalyze(true);
- analysis.setFailStatus(RpcUtils.getStatus(TSStatusCode.LOAD_FILE_ERROR,
exceptionMessage));
- return analysis;
- }
-
- LOGGER.info("Load - Analysis Stage: all tsfiles have been analyzed.");
-
- // data partition will be queried in the scheduler
- analysis.setStatement(loadTsFileStatement);
- return analysis;
- }
-
- private void executeTabletConversion(final Analysis analysis, final
LoadAnalyzeException e) {
- final LoadTsFileDataTypeConverter loadTsFileDataTypeConverter =
- new
LoadTsFileDataTypeConverter(loadTsFileStatement.isGeneratedByPipe());
- final TSStatus status =
- (!(e instanceof LoadAnalyzeTypeMismatchException)
- || loadTsFileStatement.isConvertOnTypeMismatch())
- ?
loadTsFileDataTypeConverter.convertForTreeModel(loadTsFileStatement).orElse(null)
- : null;
-
- if (status == null) {
- LOGGER.warn(
- "Load: Failed to convert to tablets from statement {}. Status is
null.",
- loadTsFileStatement);
- analysis.setFailStatus(
- new
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage(e.getMessage()));
- } else if (!loadTsFileDataTypeConverter.isSuccessful(status)) {
- LOGGER.warn(
- "Load: Failed to convert to tablets from statement {}. Status: {}",
- loadTsFileStatement,
- status);
- analysis.setFailStatus(status);
- }
-
- analysis.setFinishQueryAfterAnalyze(true);
- analysis.setStatement(loadTsFileStatement);
- }
-
- @Override
- public void close() {
- schemaAutoCreatorAndVerifier.close();
+ return true;
}
- private void analyzeSingleTsFile(final File tsFile, final boolean
isDeleteAfterLoad)
- throws IOException, AuthException, LoadAnalyzeTypeMismatchException {
+ private void analyzeSingleTsFile(final File tsFile, int index) throws
Exception {
try (final TsFileSequenceReader reader = new
TsFileSequenceReader(tsFile.getAbsolutePath())) {
// can be reused when constructing tsfile resource
final TsFileSequenceReaderTimeseriesMetadataIterator
timeseriesMetadataIterator =
- new TsFileSequenceReaderTimeseriesMetadataIterator(reader, true, 1);
-
- // construct tsfile resource
- final TsFileResource tsFileResource = new TsFileResource(tsFile);
- if (!tsFileResource.resourceFileExists()) {
- // it will be serialized in LoadSingleTsFileNode
- tsFileResource.updatePlanIndexes(reader.getMinPlanIndex());
- tsFileResource.updatePlanIndexes(reader.getMaxPlanIndex());
- } else {
- tsFileResource.deserialize();
- // Reset tsfileResource's isGeneratedByPipe mark to prevent
deserializing the wrong mark.
- // If this tsfile is loaded by a pipe receiver, the correct mark will
be added in
- // `listenToTsFile`
-
tsFileResource.setGeneratedByPipe(loadTsFileStatement.isGeneratedByPipe());
- }
-
-
schemaAutoCreatorAndVerifier.setCurrentModificationsAndTimeIndex(tsFileResource);
+ new TsFileSequenceReaderTimeseriesMetadataIterator(
+ reader,
+ true,
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+
.getLoadTsFileAnalyzeSchemaBatchReadTimeSeriesMetadataCount());
// check if the tsfile is empty
if (!timeseriesMetadataIterator.hasNext()) {
throw new LoadEmptyFileException(tsFile.getAbsolutePath());
}
- long writePointCount = 0;
+ if (0 <= tabletConversionThresholdBytes
+ && tsFile.length() <= tabletConversionThresholdBytes
+ && handleSingleMiniFile(index)) {
+ return;
+ }
+
+ doAnalyzeSingleFile(tsFile, reader, timeseriesMetadataIterator);
+ } catch (final LoadEmptyFileException loadEmptyFileException) {
+ LOGGER.warn("Empty file detected, will skip loading this file: {}",
tsFile.getAbsolutePath());
+ if (isDeleteAfterLoad) {
+ FileUtils.deleteQuietly(tsFile);
+ }
+ }
+ }
+
+ private boolean handleSingleMiniFile(final int i) throws
FileNotFoundException {
+ final long startTime = System.nanoTime();
+ try {
+ final LoadTsFileDataTypeConverter loadTsFileDataTypeConverter =
+ new LoadTsFileDataTypeConverter(isGeneratedByPipe);
+
+ final TSStatus status =
+ loadTsFileDataTypeConverter
+ .convertForTreeModel(
+ new LoadTsFileStatement(tsFiles.get(i).getPath())
+ .setDeleteAfterLoad(isDeleteAfterLoad)
+ .setConvertOnTypeMismatch(isConvertOnTypeMismatch))
+ .orElse(null);
+
+ if (status == null || !loadTsFileDataTypeConverter.isSuccessful(status))
{
+ LOGGER.warn(
+ "Load: Failed to convert mini tsfile {} to tablets from statement
{}. Status: {}.",
+ tsFiles.get(i).getPath(),
+ loadTsFileStatement,
+ status);
+ return false;
+ }
- final boolean isAutoCreateSchemaOrVerifySchemaEnabled =
- IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()
- || loadTsFileStatement.isVerifySchema();
- while (timeseriesMetadataIterator.hasNext()) {
- final Map<IDeviceID, List<TimeseriesMetadata>>
device2TimeseriesMetadata =
- timeseriesMetadataIterator.next();
+ // A mark of successful conversion
+ isMiniTsFile.set(i, Boolean.TRUE);
+ isMiniTsFileConverted = true;
- if (isAutoCreateSchemaOrVerifySchemaEnabled) {
- schemaAutoCreatorAndVerifier.autoCreateAndVerify(reader,
device2TimeseriesMetadata);
- }
+ loadTsFileStatement.addTsFileResource(null);
+ loadTsFileStatement.addWritePointCount(0);
+ return true;
+ } finally {
+ LOAD_TSFILE_COST_METRICS_SET.recordPhaseTimeCost(
+ LoadTsFileCostMetricsSet.ANALYSIS_CAST_TABLETS, System.nanoTime() -
startTime);
+ }
+ }
- if (!tsFileResource.resourceFileExists()) {
- TsFileResourceUtils.updateTsFileResource(device2TimeseriesMetadata,
tsFileResource);
- }
+ private void doAnalyzeSingleFile(
+ final File tsFile,
+ final TsFileSequenceReader reader,
+ final TsFileSequenceReaderTimeseriesMetadataIterator
timeseriesMetadataIterator)
+ throws IOException, LoadAnalyzeException, AuthException {
+ // construct tsfile resource
+ final TsFileResource tsFileResource = constructTsFileResource(reader,
tsFile);
- // TODO: how to get the correct write point count when
- // !isAutoCreateSchemaOrVerifySchemaEnabled
- writePointCount += getWritePointCount(device2TimeseriesMetadata);
- }
+ long writePointCount = 0;
+
+
schemaAutoCreatorAndVerifier.setCurrentModificationsAndTimeIndex(tsFileResource);
+
+ final boolean isAutoCreateSchemaOrVerifySchemaEnabled =
+ IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()
|| isVerifySchema;
+
+ while (timeseriesMetadataIterator.hasNext()) {
+ final Map<IDeviceID, List<TimeseriesMetadata>> device2TimeseriesMetadata
=
+ timeseriesMetadataIterator.next();
if (isAutoCreateSchemaOrVerifySchemaEnabled) {
-
schemaAutoCreatorAndVerifier.flushAndClearDeviceIsAlignedCacheIfNecessary();
+ schemaAutoCreatorAndVerifier.autoCreateAndVerify(reader,
device2TimeseriesMetadata);
+ }
+ if (!tsFileResource.resourceFileExists()) {
+ TsFileResourceUtils.updateTsFileResource(device2TimeseriesMetadata,
tsFileResource);
}
+ // TODO: how to get the correct write point count when
+ // !isAutoCreateSchemaOrVerifySchemaEnabled
+ writePointCount += getWritePointCount(device2TimeseriesMetadata);
+ }
+ if (isAutoCreateSchemaOrVerifySchemaEnabled) {
+
schemaAutoCreatorAndVerifier.flushAndClearDeviceIsAlignedCacheIfNecessary();
+ }
-
TimestampPrecisionUtils.checkTimestampPrecision(tsFileResource.getFileEndTime());
- tsFileResource.setStatus(TsFileResourceStatus.NORMAL);
+
TimestampPrecisionUtils.checkTimestampPrecision(tsFileResource.getFileEndTime());
+ tsFileResource.setStatus(TsFileResourceStatus.NORMAL);
- loadTsFileStatement.addTsFileResource(tsFileResource);
- loadTsFileStatement.addWritePointCount(writePointCount);
- } catch (final LoadEmptyFileException loadEmptyFileException) {
- LOGGER.warn("Failed to load empty file: {}", tsFile.getAbsolutePath());
- if (isDeleteAfterLoad) {
- FileUtils.deleteQuietly(tsFile);
- }
+ loadTsFileStatement.addTsFileResource(tsFileResource);
+ loadTsFileStatement.addWritePointCount(writePointCount);
+ }
+
+ private TsFileResource constructTsFileResource(
+ final TsFileSequenceReader reader, final File tsFile) throws IOException
{
+ final TsFileResource tsFileResource = new TsFileResource(tsFile);
+ if (!tsFileResource.resourceFileExists()) {
+ // it will be serialized in LoadSingleTsFileNode
+ tsFileResource.updatePlanIndexes(reader.getMinPlanIndex());
+ tsFileResource.updatePlanIndexes(reader.getMaxPlanIndex());
+ } else {
+ tsFileResource.deserialize();
+ // Reset tsfileResource's isGeneratedByPipe mark to prevent
deserializing the wrong mark.
+ // If this tsfile is loaded by a pipe receiver, the correct mark will be
added in
+ // `listenToTsFile`
+ tsFileResource.setGeneratedByPipe(isGeneratedByPipe);
}
+ return tsFileResource;
}
private long getWritePointCount(
@@ -332,13 +422,65 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
.sum();
}
- private Analysis createFailAnalysisForAuthException(AuthException e) {
- Analysis analysis = new Analysis();
+ private boolean reconstructStatementIfMiniFileConverted() {
+ if (!isMiniTsFileConverted) {
+ return false;
+ }
+
+ return
loadTsFileStatement.reconstructStatementIfMiniFileConverted(isMiniTsFile);
+ }
+
+ private Analysis setFailAnalysisForAuthException(Analysis analysis,
AuthException e) {
analysis.setFinishQueryAfterAnalyze(true);
analysis.setFailStatus(RpcUtils.getStatus(e.getCode(), e.getMessage()));
return analysis;
}
+ private Analysis executeTabletConversionOnException(
+ final Analysis analysis, final LoadAnalyzeException e) {
+ if (shouldSkipConversion(e)) {
+ analysis.setFailStatus(
+ new
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage(e.getMessage()));
+ analysis.setFinishQueryAfterAnalyze(true);
+ return analysis;
+ }
+
+ final LoadTsFileDataTypeConverter loadTsFileDataTypeConverter =
+ new LoadTsFileDataTypeConverter(isGeneratedByPipe);
+ final TSStatus status =
+ loadTsFileStatement.isConvertOnTypeMismatch()
+ ?
loadTsFileDataTypeConverter.convertForTreeModel(loadTsFileStatement).orElse(null)
+ : null;
+
+ if (status == null) {
+ LOGGER.warn(
+ "Load: Failed to convert to tablets from statement {}. Status is
null.",
+ loadTsFileStatement);
+ analysis.setFailStatus(
+ new
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage(e.getMessage()));
+ } else if (!loadTsFileDataTypeConverter.isSuccessful(status)) {
+ LOGGER.warn(
+ "Load: Failed to convert to tablets from statement {}. Status: {}",
+ loadTsFileStatement,
+ status);
+ analysis.setFailStatus(status);
+ }
+
+ analysis.setFinishQueryAfterAnalyze(true);
+ analysis.setStatement(loadTsFileStatement);
+ return analysis;
+ }
+
+ private boolean shouldSkipConversion(LoadAnalyzeException e) {
+ return (e instanceof LoadAnalyzeTypeMismatchException)
+ && !loadTsFileStatement.isConvertOnTypeMismatch();
+ }
+
+ @Override
+ public void close() {
+ schemaAutoCreatorAndVerifier.close();
+ }
+
private final class SchemaAutoCreatorAndVerifier {
private final LoadTsFileAnalyzeSchemaCache schemaCache;
@@ -468,11 +610,11 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
}
try {
- if (loadTsFileStatement.isVerifySchema()) {
+ if (isVerifySchema) {
makeSureNoDuplicatedMeasurementsInDevices();
}
- if (loadTsFileStatement.isAutoCreateDatabase()) {
+ if (isAutoCreateDatabase) {
autoCreateDatabase();
}
@@ -480,7 +622,7 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
// isAutoCreateSchemaEnabled is false.
final ISchemaTree schemaTree = autoCreateSchema();
- if (loadTsFileStatement.isVerifySchema()) {
+ if (isVerifySchema) {
verifySchema(schemaTree);
}
} catch (AuthException | LoadAnalyzeTypeMismatchException e) {
@@ -512,7 +654,7 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
private void autoCreateDatabase()
throws LoadAnalyzeException, LoadFileException, IllegalPathException,
AuthException {
- final int databasePrefixNodesLength =
loadTsFileStatement.getDatabaseLevel() + 1;
+ final int databasePrefixNodesLength = databaseLevel + 1;
final Set<PartialPath> databasesNeededToBeSet = new HashSet<>();
for (final IDeviceID device :
schemaCache.getDevice2TimeSeries().keySet()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index 6f5a33d08ec..a86f22ac2cd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -290,7 +290,7 @@ public class LoadTsFileScheduler implements IScheduler {
convertFailedTsFilesToTabletsAndRetry();
} finally {
LOAD_TSFILE_COST_METRICS_SET.recordPhaseTimeCost(
- LoadTsFileCostMetricsSet.CAST_TABLETS, System.nanoTime() -
startTime);
+ LoadTsFileCostMetricsSet.SCHEDULER_CAST_TABLETS,
System.nanoTime() - startTime);
}
}
} finally {
@@ -544,7 +544,10 @@ public class LoadTsFileScheduler implements IScheduler {
try {
final TSStatus status =
loadTsFileDataTypeConverter
- .convertForTreeModel(new LoadTsFileStatement(filePath))
+ .convertForTreeModel(
+ new LoadTsFileStatement(filePath)
+ .setDeleteAfterLoad(failedNode.isDeleteAfterLoad())
+ .setConvertOnTypeMismatch(true))
.orElse(null);
if (loadTsFileDataTypeConverter.isSuccessful(status)) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
index d1a346b14cc..a074b237ddc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
@@ -46,14 +46,15 @@ public class LoadTsFileStatement extends Statement {
private boolean verifySchema = true;
private boolean deleteAfterLoad = false;
private boolean convertOnTypeMismatch = true;
+ private long tabletConversionThresholdBytes = -1;
private boolean autoCreateDatabase = true;
private boolean isGeneratedByPipe = false;
private Map<String, String> loadAttributes;
- private final List<File> tsFiles;
- private final List<TsFileResource> resources;
- private final List<Long> writePointCountList;
+ private List<File> tsFiles;
+ private List<TsFileResource> resources;
+ private List<Long> writePointCountList;
public LoadTsFileStatement(String filePath) throws FileNotFoundException {
this.file = new File(filePath);
@@ -61,12 +62,14 @@ public class LoadTsFileStatement extends Statement {
this.verifySchema = true;
this.deleteAfterLoad = false;
this.convertOnTypeMismatch = true;
+ this.tabletConversionThresholdBytes =
+
IoTDBDescriptor.getInstance().getConfig().getLoadTabletConversionThresholdBytes();
this.autoCreateDatabase =
IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled();
+
+ this.tsFiles = processTsFile(file);
this.resources = new ArrayList<>();
this.writePointCountList = new ArrayList<>();
this.statementType = StatementType.MULTI_BATCH_INSERT;
-
- this.tsFiles = processTsFile(file);
}
public static List<File> processTsFile(final File file) throws
FileNotFoundException {
@@ -145,22 +148,32 @@ public class LoadTsFileStatement extends Statement {
return verifySchema;
}
- public void setDeleteAfterLoad(boolean deleteAfterLoad) {
+ public LoadTsFileStatement setDeleteAfterLoad(boolean deleteAfterLoad) {
this.deleteAfterLoad = deleteAfterLoad;
+ return this;
}
public boolean isDeleteAfterLoad() {
return deleteAfterLoad;
}
- public void setConvertOnTypeMismatch(boolean convertOnTypeMismatch) {
+ public LoadTsFileStatement setConvertOnTypeMismatch(boolean
convertOnTypeMismatch) {
this.convertOnTypeMismatch = convertOnTypeMismatch;
+ return this;
}
public boolean isConvertOnTypeMismatch() {
return convertOnTypeMismatch;
}
+ public void setTabletConversionThresholdBytes(long
tabletConversionThresholdBytes) {
+ this.tabletConversionThresholdBytes = tabletConversionThresholdBytes;
+ }
+
+ public long getTabletConversionThresholdBytes() {
+ return tabletConversionThresholdBytes;
+ }
+
public void setAutoCreateDatabase(boolean autoCreateDatabase) {
this.autoCreateDatabase = autoCreateDatabase;
}
@@ -207,6 +220,42 @@ public class LoadTsFileStatement extends Statement {
this.deleteAfterLoad =
LoadTsFileConfigurator.parseOrGetDefaultOnSuccess(loadAttributes);
this.convertOnTypeMismatch =
LoadTsFileConfigurator.parseOrGetDefaultConvertOnTypeMismatch(loadAttributes);
+ this.tabletConversionThresholdBytes =
+
LoadTsFileConfigurator.parseOrGetDefaultTabletConversionThresholdBytes(loadAttributes);
+ this.verifySchema =
LoadTsFileConfigurator.parseOrGetDefaultVerify(loadAttributes);
+ }
+
+ public boolean reconstructStatementIfMiniFileConverted(final List<Boolean>
isMiniTsFile) {
+ int lastNonMiniTsFileIndex = -1;
+
+ for (int i = 0, n = isMiniTsFile.size(); i < n; i++) {
+ if (isMiniTsFile.get(i)) {
+ continue;
+ }
+ ++lastNonMiniTsFileIndex;
+ if (tsFiles != null) {
+ tsFiles.set(lastNonMiniTsFileIndex, tsFiles.get(i));
+ }
+ if (resources != null) {
+ resources.set(lastNonMiniTsFileIndex, resources.get(i));
+ }
+ if (writePointCountList != null) {
+ writePointCountList.set(lastNonMiniTsFileIndex,
writePointCountList.get(i));
+ }
+ }
+
+ tsFiles =
+ tsFiles != null ? tsFiles.subList(0, lastNonMiniTsFileIndex + 1) :
Collections.emptyList();
+ resources =
+ resources != null
+ ? resources.subList(0, lastNonMiniTsFileIndex + 1)
+ : Collections.emptyList();
+ writePointCountList =
+ writePointCountList != null
+ ? writePointCountList.subList(0, lastNonMiniTsFileIndex + 1)
+ : Collections.emptyList();
+
+ return tsFiles == null || tsFiles.isEmpty();
}
@Override
@@ -238,6 +287,8 @@ public class LoadTsFileStatement extends Statement {
+ verifySchema
+ ", convert-on-type-mismatch="
+ convertOnTypeMismatch
+ + ", tablet-conversion-threshold="
+ + tabletConversionThresholdBytes
+ ", tsFiles size="
+ tsFiles.size()
+ '}';
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
index 00dca6b61b3..6b2f53a8584 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
@@ -40,6 +40,8 @@ public class LoadTsFileConfigurator {
case ON_SUCCESS_KEY:
validateOnSuccessParam(value);
break;
+ case TABLET_CONVERSION_THRESHOLD_KEY:
+ break;
case CONVERT_ON_TYPE_MISMATCH_KEY:
validateConvertOnTypeMismatchParam(value);
break;
@@ -120,6 +122,19 @@ public class LoadTsFileConfigurator {
CONVERT_ON_TYPE_MISMATCH_KEY,
String.valueOf(CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE)));
}
+ public static final String TABLET_CONVERSION_THRESHOLD_KEY =
"tablet-conversion-threshold";
+
+ public static long parseOrGetDefaultTabletConversionThresholdBytes(
+ final Map<String, String> loadAttributes) {
+ return Long.parseLong(
+ loadAttributes.getOrDefault(
+ TABLET_CONVERSION_THRESHOLD_KEY,
+ String.valueOf(
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getLoadTabletConversionThresholdBytes())));
+ }
+
public static final String VERIFY_KEY = "verify";
private static final boolean VERIFY_DEFAULT_VALUE = true;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
index d37316a2908..fd30ad074b5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
@@ -27,6 +27,8 @@ import
org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.db.queryengine.plan.statement.StatementNode;
import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.commons.io.FileUtils;
@@ -124,7 +126,15 @@ public class
LoadTreeStatementDataTypeConvertExecutionVisitor
}
if (loadTsFileStatement.isDeleteAfterLoad()) {
- loadTsFileStatement.getTsFiles().forEach(FileUtils::deleteQuietly);
+ loadTsFileStatement
+ .getTsFiles()
+ .forEach(
+ tsfile -> {
+ FileUtils.deleteQuietly(tsfile);
+ final String tsFilePath = tsfile.getAbsolutePath();
+ FileUtils.deleteQuietly(new File(tsFilePath +
TsFileResource.RESOURCE_SUFFIX));
+ FileUtils.deleteQuietly(new File(tsFilePath +
ModificationFile.FILE_SUFFIX));
+ });
}
LOGGER.info(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileCostMetricsSet.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileCostMetricsSet.java
index 5947f101b06..1ccd00cb9bd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileCostMetricsSet.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileCostMetricsSet.java
@@ -40,7 +40,8 @@ public class LoadTsFileCostMetricsSet implements IMetricSet {
public static final String FIRST_PHASE = "first_phase";
public static final String SECOND_PHASE = "second_phase";
public static final String LOAD_LOCALLY = "load_locally";
- public static final String CAST_TABLETS = "cast_tablets";
+ public static final String SCHEDULER_CAST_TABLETS = "scheduler_cast_tablets";
+ public static final String ANALYSIS_CAST_TABLETS = "analysis_cast_tablets";
private LoadTsFileCostMetricsSet() {
// empty constructor
@@ -50,7 +51,8 @@ public class LoadTsFileCostMetricsSet implements IMetricSet {
private Timer firstPhaseTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
private Timer secondPhaseTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
private Timer loadLocallyTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
- private Timer castTabletsTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+ private Timer schedulerCastTabletsTimer =
DoNothingMetricManager.DO_NOTHING_TIMER;
+ private Timer analysisCastTabletsTimer =
DoNothingMetricManager.DO_NOTHING_TIMER;
private Counter diskIOCounter = DoNothingMetricManager.DO_NOTHING_COUNTER;
@@ -68,8 +70,11 @@ public class LoadTsFileCostMetricsSet implements IMetricSet {
case LOAD_LOCALLY:
loadLocallyTimer.updateNanos(costTimeInNanos);
break;
- case CAST_TABLETS:
- castTabletsTimer.updateNanos(costTimeInNanos);
+ case SCHEDULER_CAST_TABLETS:
+ schedulerCastTabletsTimer.updateNanos(costTimeInNanos);
+ break;
+ case ANALYSIS_CAST_TABLETS:
+ analysisCastTabletsTimer.updateNanos(costTimeInNanos);
break;
default:
throw new UnsupportedOperationException("Unsupported stage: " + stage);
@@ -103,12 +108,18 @@ public class LoadTsFileCostMetricsSet implements
IMetricSet {
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
LOAD_LOCALLY);
- castTabletsTimer =
+ schedulerCastTabletsTimer =
+ metricService.getOrCreateTimer(
+ Metric.LOAD_TIME_COST.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ SCHEDULER_CAST_TABLETS);
+ analysisCastTabletsTimer =
metricService.getOrCreateTimer(
Metric.LOAD_TIME_COST.toString(),
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
- CAST_TABLETS);
+ ANALYSIS_CAST_TABLETS);
diskIOCounter =
metricService.getOrCreateCounter(
@@ -120,7 +131,13 @@ public class LoadTsFileCostMetricsSet implements
IMetricSet {
@Override
public void unbindFrom(AbstractMetricService metricService) {
- Arrays.asList(ANALYSIS, FIRST_PHASE, SECOND_PHASE, LOAD_LOCALLY,
CAST_TABLETS)
+ Arrays.asList(
+ ANALYSIS,
+ FIRST_PHASE,
+ SECOND_PHASE,
+ LOAD_LOCALLY,
+ SCHEDULER_CAST_TABLETS,
+ ANALYSIS_CAST_TABLETS)
.forEach(
stage ->
metricService.remove(