This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new ccec884516c [IOTDB-6088] Load: Optimize the schema creation process & 
Avoid OOM and ratis createMultiTimeseriesPlan bufferSize too large (#10715)
ccec884516c is described below

commit ccec884516ce62debc7a9c9eb970dc21be9e0037
Author: Itami Sho <[email protected]>
AuthorDate: Fri Jul 28 23:37:55 2023 +0800

    [IOTDB-6088] Load: Optimize the schema creation process & Avoid OOM and 
ratis createMultiTimeseriesPlan bufferSize too large (#10715)
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  10 +-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   6 ++
 .../queryengine/plan/analyze/AnalyzeVisitor.java   | 117 +++++++++++++++++----
 3 files changed, 112 insertions(+), 21 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 3b9c23db234..f05c19bfa48 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -161,7 +161,7 @@ public class IoTDBConfig {
   /** The proportion of write memory for loading TsFile */
   private double loadTsFileProportion = 0.125;
 
-  private final int maxLoadingDeviceNumber = 10000;
+  private int maxLoadingTimeseriesNumber = 2000;
 
   /**
    * If memory cost of data region increased more than proportion of 
{@linkplain
@@ -3237,8 +3237,12 @@ public class IoTDBConfig {
     return loadTsFileProportion;
   }
 
-  public int getMaxLoadingDeviceNumber() {
-    return maxLoadingDeviceNumber;
+  public int getMaxLoadingTimeseriesNumber() {
+    return maxLoadingTimeseriesNumber;
+  }
+
+  public void setMaxLoadingTimeseriesNumber(int maxLoadingTimeseriesNumber) {
+    this.maxLoadingTimeseriesNumber = maxLoadingTimeseriesNumber;
   }
 
   public static String getEnvironmentVariables() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index b472ffe841a..f2b47b9ebb5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -898,6 +898,12 @@ public class IoTDBDescriptor {
       conf.setIntoOperationExecutionThreadCount(2);
     }
 
+    conf.setMaxLoadingTimeseriesNumber(
+        Integer.parseInt(
+            properties.getProperty(
+                "max_loading_timeseries_number",
+                String.valueOf(conf.getMaxLoadingTimeseriesNumber()))));
+
     conf.setExtPipeDir(properties.getProperty("ext_pipe_dir", 
conf.getExtPipeDir()).trim());
 
     // At the same time, set TSFileConfig
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index 2ed8bb64c94..448f5b08ff0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -2494,9 +2494,11 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
 
     Map<String, Map<MeasurementSchema, File>> device2Schemas = new HashMap<>();
     Map<String, Pair<Boolean, File>> device2IsAligned = new HashMap<>();
+    int tsfileNum = loadTsFileStatement.getTsFiles().size();
 
     // analyze tsfile metadata
-    for (File tsFile : loadTsFileStatement.getTsFiles()) {
+    for (int i = 0; i < tsfileNum; i++) {
+      File tsFile = loadTsFileStatement.getTsFiles().get(i);
       if (tsFile.length() == 0) {
         if (logger.isWarnEnabled()) {
           logger.warn(String.format("TsFile %s is empty.", tsFile.getPath()));
@@ -2507,9 +2509,7 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
                 tsFile.getPath()));
       }
       try {
-        TsFileResource resource =
-            analyzeTsFile(loadTsFileStatement, tsFile, device2Schemas, 
device2IsAligned);
-        loadTsFileStatement.addTsFileResource(resource);
+        analyzeTsFile(loadTsFileStatement, tsFile, device2Schemas, 
device2IsAligned, context);
       } catch (IllegalArgumentException e) {
         logger.warn(
             String.format(
@@ -2522,13 +2522,21 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
         throw new SemanticException(
             String.format("Parse file %s to resource error", 
tsFile.getPath()));
       }
-      if (device2Schemas.size() > CONFIG.getMaxLoadingDeviceNumber()) {
-        autoCreateAndVerifySchema(loadTsFileStatement, device2Schemas, 
device2IsAligned, context);
+
+      if (i + 1 == tsfileNum) {
+        break;
       }
-    }
 
-    autoCreateAndVerifySchema(loadTsFileStatement, device2Schemas, 
device2IsAligned, context);
+      double progressPercentage = (i + 1) * 100.00 / tsfileNum;
 
+      logger.info(
+          "Load - Analysis Stage: {}/{} tsfiles have been analyzed, progress: 
{}%",
+          i + 1, tsfileNum, String.format("%.2f", progressPercentage));
+    }
+
+    logger.info(
+        "Load - Analysis Stage:{}/{} tsfiles have been analyzed, progress: 
{}%",
+        tsfileNum, tsfileNum, "100.00");
     // load function will query data partition in scheduler
     Analysis analysis = new Analysis();
     analysis.setStatement(loadTsFileStatement);
@@ -2590,23 +2598,38 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
     return analysis;
   }
 
-  private TsFileResource analyzeTsFile(
+  private void analyzeTsFile(
       LoadTsFileStatement statement,
       File tsFile,
       Map<String, Map<MeasurementSchema, File>> device2Schemas,
-      Map<String, Pair<Boolean, File>> device2IsAligned)
+      Map<String, Pair<Boolean, File>> device2IsAligned,
+      MPPQueryContext context)
       throws IOException, VerifyMetadataException {
     try (TsFileSequenceReader reader = new 
TsFileSequenceReader(tsFile.getAbsolutePath())) {
-      Map<String, List<TimeseriesMetadata>> device2Metadata = 
reader.getAllTimeseriesMetadata(true);
+      TsFileResource tsFileResource = new TsFileResource(tsFile);
+      final boolean isAlreadyExistBeforeLoad = 
tsFileResource.resourceFileExists();
+      boolean isDeserializeDone = false;
 
+      Map<String, List<TimeseriesMetadata>> device2Metadata = 
reader.getAllTimeseriesMetadata(true);
       if (IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()
           || statement.isVerifySchema()) {
         // construct schema
+        int deviceSize = device2Metadata.size();
+        int deviceCount = 0;
+        int timeseriesCount = 0;
+
         for (Map.Entry<String, List<TimeseriesMetadata>> entry : 
device2Metadata.entrySet()) {
           String device = entry.getKey();
+          deviceCount++;
           List<TimeseriesMetadata> timeseriesMetadataList = entry.getValue();
           boolean isAligned = false;
-          for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataList) 
{
+
+          int timeseriesMetadataListSize = timeseriesMetadataList.size();
+
+          for (int timeseriesIndex = 0;
+              timeseriesIndex < timeseriesMetadataListSize;
+              timeseriesIndex++) {
+            TimeseriesMetadata timeseriesMetadata = 
timeseriesMetadataList.get(timeseriesIndex);
             TSDataType dataType = timeseriesMetadata.getTsDataType();
             if (!dataType.equals(TSDataType.VECTOR)) {
               Pair<CompressionType, TSEncoding> pair =
@@ -2620,15 +2643,63 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
               device2Schemas
                   .computeIfAbsent(device, o -> new HashMap<>())
                   .put(measurementSchema, tsFile);
+              timeseriesCount++;
             } else {
               isAligned = true;
             }
+
+            // if the number of timeseries exceeds the threshold or loop to 
the last timeseries of
+            // the last device, we should create and verify schema , and clean 
the device2Schemas
+            // and device2IsAligned map.
+            if (timeseriesCount > CONFIG.getMaxLoadingTimeseriesNumber()
+                || (deviceCount == deviceSize
+                    && timeseriesIndex == timeseriesMetadataListSize - 1)) {
+
+              // check if the device has the same aligned definition in all 
tsfiles
+              if (isDeviceAligned(device2IsAligned, device, tsFile, 
isAligned)) {
+                // case 1: if the tsfile has tsfile resource before loading, 
we should deserialize
+                // it only once.
+                if (isAlreadyExistBeforeLoad) {
+                  if (!isDeserializeDone) {
+                    tsFileResource.deserialize();
+                    statement.addTsFileResource(tsFileResource);
+                    isDeserializeDone = true;
+                  }
+
+                } else if (!tsFileResource.resourceFileExists()) {
+                  // case 2: if the tsfile has no tsfile resource before 
loading, we should
+                  // construct it.
+                  tsFileResource = constructTsFileResource(tsFile, 
device2Metadata, reader);
+                  statement.addTsFileResource(tsFileResource);
+
+                } else {
+                  // case 3: the tsfile resource is created when loading, so 
we just need to update
+                  // the resource.
+                  FileLoaderUtils.updateTsFileResource(device2Metadata, 
tsFileResource);
+                }
+
+                tsFileResource.setStatus(TsFileResourceStatus.NORMAL);
+
+                autoCreateAndVerifySchema(statement, device2Schemas, 
device2IsAligned, context);
+                timeseriesCount = 0;
+              } else {
+                throw new VerifyMetadataException(
+                    String.format(
+                        "Device %s has different aligned definition in tsFile 
%s and other TsFile.",
+                        device, tsFile.getParentFile()));
+              }
+
+              logger.info(
+                  "Load - Create and Verify Schemas Stage: the device {} in 
tsfile {} have been created and verified.",
+                  entry.getKey(),
+                  tsFile.getName());
+            }
           }
-          boolean finalIsAligned = isAligned;
-          if (!device2IsAligned
-              .computeIfAbsent(device, o -> new Pair<>(finalIsAligned, tsFile))
-              .left
-              .equals(isAligned)) {
+
+          // when the number of devices does not exceed the threshold and it's 
not the last
+          // timeseries of the last device, we also need to check if the 
device has the same aligned
+          // definition in all tsfiles before going to the next device loop.
+          if (!isDeviceAligned(device2IsAligned, device, tsFile, isAligned)) {
             throw new VerifyMetadataException(
                 String.format(
                     "Device %s has different aligned definition in tsFile %s 
and other TsFile.",
@@ -2636,10 +2707,20 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
           }
         }
       }
-      return constructTsFileResource(tsFile, device2Metadata, reader);
     }
   }
 
+  private boolean isDeviceAligned(
+      Map<String, Pair<Boolean, File>> device2IsAligned,
+      String device,
+      File tsFile,
+      boolean isAligned) {
+    return device2IsAligned
+        .computeIfAbsent(device, o -> new Pair<>(isAligned, tsFile))
+        .left
+        .equals(isAligned);
+  }
+
   private TsFileResource constructTsFileResource(
       File tsFile,
       Map<String, List<TimeseriesMetadata>> device2Metadata,

Reply via email to