This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new ccec884516c [IOTDB-6088] Load: Optimize the schema creation process &
Avoid OOM and ratis createMultiTimeseriesPlan bufferSize too large (#10715)
ccec884516c is described below
commit ccec884516ce62debc7a9c9eb970dc21be9e0037
Author: Itami Sho <[email protected]>
AuthorDate: Fri Jul 28 23:37:55 2023 +0800
[IOTDB-6088] Load: Optimize the schema creation process & Avoid OOM and
ratis createMultiTimeseriesPlan bufferSize too large (#10715)
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 10 +-
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 6 ++
.../queryengine/plan/analyze/AnalyzeVisitor.java | 117 +++++++++++++++++----
3 files changed, 112 insertions(+), 21 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 3b9c23db234..f05c19bfa48 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -161,7 +161,7 @@ public class IoTDBConfig {
/** The proportion of write memory for loading TsFile */
private double loadTsFileProportion = 0.125;
- private final int maxLoadingDeviceNumber = 10000;
+ private int maxLoadingTimeseriesNumber = 2000;
/**
* If memory cost of data region increased more than proportion of
{@linkplain
@@ -3237,8 +3237,12 @@ public class IoTDBConfig {
return loadTsFileProportion;
}
- public int getMaxLoadingDeviceNumber() {
- return maxLoadingDeviceNumber;
+ public int getMaxLoadingTimeseriesNumber() {
+ return maxLoadingTimeseriesNumber;
+ }
+
+ public void setMaxLoadingTimeseriesNumber(int maxLoadingTimeseriesNumber) {
+ this.maxLoadingTimeseriesNumber = maxLoadingTimeseriesNumber;
}
public static String getEnvironmentVariables() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index b472ffe841a..f2b47b9ebb5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -898,6 +898,12 @@ public class IoTDBDescriptor {
conf.setIntoOperationExecutionThreadCount(2);
}
+ conf.setMaxLoadingTimeseriesNumber(
+ Integer.parseInt(
+ properties.getProperty(
+ "max_loading_timeseries_number",
+ String.valueOf(conf.getMaxLoadingTimeseriesNumber()))));
+
conf.setExtPipeDir(properties.getProperty("ext_pipe_dir",
conf.getExtPipeDir()).trim());
// At the same time, set TSFileConfig
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index 2ed8bb64c94..448f5b08ff0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -2494,9 +2494,11 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
Map<String, Map<MeasurementSchema, File>> device2Schemas = new HashMap<>();
Map<String, Pair<Boolean, File>> device2IsAligned = new HashMap<>();
+ int tsfileNum = loadTsFileStatement.getTsFiles().size();
// analyze tsfile metadata
- for (File tsFile : loadTsFileStatement.getTsFiles()) {
+ for (int i = 0; i < tsfileNum; i++) {
+ File tsFile = loadTsFileStatement.getTsFiles().get(i);
if (tsFile.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(String.format("TsFile %s is empty.", tsFile.getPath()));
@@ -2507,9 +2509,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
tsFile.getPath()));
}
try {
- TsFileResource resource =
- analyzeTsFile(loadTsFileStatement, tsFile, device2Schemas,
device2IsAligned);
- loadTsFileStatement.addTsFileResource(resource);
+ analyzeTsFile(loadTsFileStatement, tsFile, device2Schemas,
device2IsAligned, context);
} catch (IllegalArgumentException e) {
logger.warn(
String.format(
@@ -2522,13 +2522,21 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
throw new SemanticException(
String.format("Parse file %s to resource error",
tsFile.getPath()));
}
- if (device2Schemas.size() > CONFIG.getMaxLoadingDeviceNumber()) {
- autoCreateAndVerifySchema(loadTsFileStatement, device2Schemas,
device2IsAligned, context);
+
+ if (i + 1 == tsfileNum) {
+ break;
}
- }
- autoCreateAndVerifySchema(loadTsFileStatement, device2Schemas,
device2IsAligned, context);
+ double progressPercentage = (i + 1) * 100.00 / tsfileNum;
+ logger.info(
+ "Load - Analysis Stage: {}/{} tsfiles have been analyzed, progress:
{}%",
+ i + 1, tsfileNum, String.format("%.2f", progressPercentage));
+ }
+
+ logger.info(
+ "Load - Analysis Stage:{}/{} tsfiles have been analyzed, progress:
{}%",
+ tsfileNum, tsfileNum, "100.00");
// load function will query data partition in scheduler
Analysis analysis = new Analysis();
analysis.setStatement(loadTsFileStatement);
@@ -2590,23 +2598,38 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
return analysis;
}
- private TsFileResource analyzeTsFile(
+ private void analyzeTsFile(
LoadTsFileStatement statement,
File tsFile,
Map<String, Map<MeasurementSchema, File>> device2Schemas,
- Map<String, Pair<Boolean, File>> device2IsAligned)
+ Map<String, Pair<Boolean, File>> device2IsAligned,
+ MPPQueryContext context)
throws IOException, VerifyMetadataException {
try (TsFileSequenceReader reader = new
TsFileSequenceReader(tsFile.getAbsolutePath())) {
- Map<String, List<TimeseriesMetadata>> device2Metadata =
reader.getAllTimeseriesMetadata(true);
+ TsFileResource tsFileResource = new TsFileResource(tsFile);
+ final boolean isAlreadyExistBeforeLoad =
tsFileResource.resourceFileExists();
+ boolean isDeserializeDone = false;
+ Map<String, List<TimeseriesMetadata>> device2Metadata =
reader.getAllTimeseriesMetadata(true);
if (IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()
|| statement.isVerifySchema()) {
// construct schema
+ int deviceSize = device2Metadata.size();
+ int deviceCount = 0;
+ int timeseriesCount = 0;
+
for (Map.Entry<String, List<TimeseriesMetadata>> entry :
device2Metadata.entrySet()) {
String device = entry.getKey();
+ deviceCount++;
List<TimeseriesMetadata> timeseriesMetadataList = entry.getValue();
boolean isAligned = false;
- for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataList)
{
+
+ int timeseriesMetadataListSize = timeseriesMetadataList.size();
+
+ for (int timeseriesIndex = 0;
+ timeseriesIndex < timeseriesMetadataListSize;
+ timeseriesIndex++) {
+ TimeseriesMetadata timeseriesMetadata =
timeseriesMetadataList.get(timeseriesIndex);
TSDataType dataType = timeseriesMetadata.getTsDataType();
if (!dataType.equals(TSDataType.VECTOR)) {
Pair<CompressionType, TSEncoding> pair =
@@ -2620,15 +2643,63 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
device2Schemas
.computeIfAbsent(device, o -> new HashMap<>())
.put(measurementSchema, tsFile);
+ timeseriesCount++;
} else {
isAligned = true;
}
+
+ // if the number of timeseries exceeds the threshold or loop to
the last timeseries of
+ // the last device, we should create and verify schema , and clean
the device2Schemas
+ // and device2IsAligned map.
+ if (timeseriesCount > CONFIG.getMaxLoadingTimeseriesNumber()
+ || (deviceCount == deviceSize
+ && timeseriesIndex == timeseriesMetadataListSize - 1)) {
+
+ // check if the device has the same aligned definition in all
tsfiles
+ if (isDeviceAligned(device2IsAligned, device, tsFile,
isAligned)) {
+ // case 1: if the tsfile has tsfile resource before loading,
we should deserialize
+ // it only once.
+ if (isAlreadyExistBeforeLoad) {
+ if (!isDeserializeDone) {
+ tsFileResource.deserialize();
+ statement.addTsFileResource(tsFileResource);
+ isDeserializeDone = true;
+ }
+
+ } else if (!tsFileResource.resourceFileExists()) {
+ // case 2: if the tsfile has no tsfile resource before
loading, we should
+ // construct it.
+ tsFileResource = constructTsFileResource(tsFile,
device2Metadata, reader);
+ statement.addTsFileResource(tsFileResource);
+
+ } else {
+ // case 3: the tsfile resource is created when loading, so
we just need to update
+ // the resource.
+ FileLoaderUtils.updateTsFileResource(device2Metadata,
tsFileResource);
+ }
+
+ tsFileResource.setStatus(TsFileResourceStatus.NORMAL);
+
+ autoCreateAndVerifySchema(statement, device2Schemas,
device2IsAligned, context);
+ timeseriesCount = 0;
+ } else {
+ throw new VerifyMetadataException(
+ String.format(
+ "Device %s has different aligned definition in tsFile
%s and other TsFile.",
+ device, tsFile.getParentFile()));
+ }
+
+ logger.info(
+ "Load - Create and Verify Schemas Stage: the device {} in
tsfile {} have been created and verified.",
+ entry.getKey(),
+ tsFile.getName());
+ }
}
- boolean finalIsAligned = isAligned;
- if (!device2IsAligned
- .computeIfAbsent(device, o -> new Pair<>(finalIsAligned, tsFile))
- .left
- .equals(isAligned)) {
+
+ // when the number of devices does not exceed the threshold and it's
not the last
+ // timeseries of the last device, we also need to check if the
device has the same aligned
+ // definition in all tsfiles before going to the next device loop.
+ if (!isDeviceAligned(device2IsAligned, device, tsFile, isAligned)) {
throw new VerifyMetadataException(
String.format(
"Device %s has different aligned definition in tsFile %s
and other TsFile.",
@@ -2636,10 +2707,20 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
}
}
}
- return constructTsFileResource(tsFile, device2Metadata, reader);
}
}
+ private boolean isDeviceAligned(
+ Map<String, Pair<Boolean, File>> device2IsAligned,
+ String device,
+ File tsFile,
+ boolean isAligned) {
+ return device2IsAligned
+ .computeIfAbsent(device, o -> new Pair<>(isAligned, tsFile))
+ .left
+ .equals(isAligned);
+ }
+
private TsFileResource constructTsFileResource(
File tsFile,
Map<String, List<TimeseriesMetadata>> device2Metadata,