This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.1 by this push:
     new 3c70155135f [IOTDB-6088] Load: Optimize the schema creation process & 
Avoid OOM and ratis createMultiTimeseriesPlan bufferSize too large  (#10736)
3c70155135f is described below

commit 3c70155135fa18ad82dc574ef5e108af8cc6ba44
Author: Itami Sho <[email protected]>
AuthorDate: Sat Jul 29 16:51:34 2023 +0800

    [IOTDB-6088] Load: Optimize the schema creation process & Avoid OOM and 
ratis createMultiTimeseriesPlan bufferSize too large  (#10736)
    
    (cherry picked from commit ccec884516ce62debc7a9c9eb970dc21be9e0037)
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  10 +-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   6 +
 .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java  | 146 +++++++++++++++++----
 3 files changed, 130 insertions(+), 32 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 4a9aa8431bc..1ad5d07239c 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -161,7 +161,7 @@ public class IoTDBConfig {
   /** The proportion of write memory for loading TsFile */
   private double loadTsFileProportion = 0.125;
 
-  private final int maxLoadingDeviceNumber = 10000;
+  private int maxLoadingTimeseriesNumber = 2000;
 
   /**
    * If memory cost of data region increased more than proportion of 
{@linkplain
@@ -3338,8 +3338,12 @@ public class IoTDBConfig {
     return loadTsFileProportion;
   }
 
-  public int getMaxLoadingDeviceNumber() {
-    return maxLoadingDeviceNumber;
+  public int getMaxLoadingTimeseriesNumber() {
+    return maxLoadingTimeseriesNumber;
+  }
+
+  public void setMaxLoadingTimeseriesNumber(int maxLoadingTimeseriesNumber) {
+    this.maxLoadingTimeseriesNumber = maxLoadingTimeseriesNumber;
   }
 
   public static String getEnvironmentVariables() {
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 5f81bf54359..cb6d2b273fe 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -938,6 +938,12 @@ public class IoTDBDescriptor {
       conf.setIntoOperationExecutionThreadCount(2);
     }
 
+    conf.setMaxLoadingTimeseriesNumber(
+        Integer.parseInt(
+            properties.getProperty(
+                "max_loading_timeseries_number",
+                String.valueOf(conf.getMaxLoadingTimeseriesNumber()))));
+
     conf.setExtPipeDir(properties.getProperty("ext_pipe_dir", 
conf.getExtPipeDir()).trim());
 
     // At the same time, set TSFileConfig
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index b699978b539..e1e990a69e4 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -2005,20 +2005,22 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
 
     Map<String, Map<MeasurementSchema, File>> device2Schemas = new HashMap<>();
     Map<String, Pair<Boolean, File>> device2IsAligned = new HashMap<>();
+    int tsfileNum = loadTsFileStatement.getTsFiles().size();
 
     // analyze tsfile metadata
-    for (File tsFile : loadTsFileStatement.getTsFiles()) {
+    for (int i = 0; i < tsfileNum; i++) {
+      File tsFile = loadTsFileStatement.getTsFiles().get(i);
       if (tsFile.length() == 0) {
-        logger.warn(String.format("TsFile %s is empty.", tsFile.getPath()));
+        if (logger.isWarnEnabled()) {
+          logger.warn(String.format("TsFile %s is empty.", tsFile.getPath()));
+        }
         throw new SemanticException(
             String.format(
                 "TsFile %s is empty, please check it be flushed to disk 
correctly.",
                 tsFile.getPath()));
       }
       try {
-        TsFileResource resource =
-            analyzeTsFile(loadTsFileStatement, tsFile, device2Schemas, 
device2IsAligned);
-        loadTsFileStatement.addTsFileResource(resource);
+        analyzeTsFile(loadTsFileStatement, tsFile, device2Schemas, 
device2IsAligned);
       } catch (IllegalArgumentException e) {
         logger.warn(
             String.format(
@@ -2031,13 +2033,21 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
         throw new SemanticException(
             String.format("Parse file %s to resource error", 
tsFile.getPath()));
       }
-      if (device2Schemas.size() > CONFIG.getMaxLoadingDeviceNumber()) {
-        autoCreateAndVerifySchema(loadTsFileStatement, device2Schemas, 
device2IsAligned);
+
+      if (i + 1 == tsfileNum) {
+        break;
       }
-    }
 
-    autoCreateAndVerifySchema(loadTsFileStatement, device2Schemas, 
device2IsAligned);
+      double progressPercentage = (i + 1) * 100.00 / tsfileNum;
 
+      logger.info(
+          "Load - Analysis Stage: {}/{} tsfiles have been analyzed, progress: 
{}%",
+          i + 1, tsfileNum, String.format("%.2f", progressPercentage));
+    }
+
+    logger.info(
+        "Load - Analysis Stage:{}/{} tsfiles have been analyzed, progress: 
{}%",
+        tsfileNum, tsfileNum, "100.00");
     // load function will query data partition in scheduler
     Analysis analysis = new Analysis();
     analysis.setStatement(loadTsFileStatement);
@@ -2096,23 +2106,37 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
     return analysis;
   }
 
-  private TsFileResource analyzeTsFile(
+  private void analyzeTsFile(
       LoadTsFileStatement statement,
       File tsFile,
       Map<String, Map<MeasurementSchema, File>> device2Schemas,
       Map<String, Pair<Boolean, File>> device2IsAligned)
       throws IOException, VerifyMetadataException {
     try (TsFileSequenceReader reader = new 
TsFileSequenceReader(tsFile.getAbsolutePath())) {
-      Map<String, List<TimeseriesMetadata>> device2Metadata = 
reader.getAllTimeseriesMetadata(true);
+      TsFileResource tsFileResource = new TsFileResource(tsFile);
+      final boolean isAlreadyExistBeforeLoad = 
tsFileResource.resourceFileExists();
+      boolean isDeserializeDone = false;
 
+      Map<String, List<TimeseriesMetadata>> device2Metadata = 
reader.getAllTimeseriesMetadata(true);
       if (IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()
           || statement.isVerifySchema()) {
         // construct schema
+        int deviceSize = device2Metadata.size();
+        int deviceCount = 0;
+        int timeseriesCount = 0;
+
         for (Map.Entry<String, List<TimeseriesMetadata>> entry : 
device2Metadata.entrySet()) {
           String device = entry.getKey();
+          deviceCount++;
           List<TimeseriesMetadata> timeseriesMetadataList = entry.getValue();
           boolean isAligned = false;
-          for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataList) 
{
+
+          int timeseriesMetadataListSize = timeseriesMetadataList.size();
+
+          for (int timeseriesIndex = 0;
+              timeseriesIndex < timeseriesMetadataListSize;
+              timeseriesIndex++) {
+            TimeseriesMetadata timeseriesMetadata = 
timeseriesMetadataList.get(timeseriesIndex);
             TSDataType dataType = timeseriesMetadata.getTSDataType();
             if (!dataType.equals(TSDataType.VECTOR)) {
               Pair<CompressionType, TSEncoding> pair =
@@ -2126,15 +2150,63 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
               device2Schemas
                   .computeIfAbsent(device, o -> new HashMap<>())
                   .put(measurementSchema, tsFile);
+              timeseriesCount++;
             } else {
               isAligned = true;
             }
+
+            // if the number of timeseries exceeds the threshold or loop to 
the last timeseries of
+            // the last device, we should create and verify schema , and clean 
the device2Schemas
+            // and device2IsAligned map.
+            if (timeseriesCount > CONFIG.getMaxLoadingTimeseriesNumber()
+                || (deviceCount == deviceSize
+                    && timeseriesIndex == timeseriesMetadataListSize - 1)) {
+
+              // check if the device has the same aligned definition in all 
tsfiles
+              if (isDeviceAligned(device2IsAligned, device, tsFile, 
isAligned)) {
+                // case 1: if the tsfile has tsfile resource before loading, 
we should deserialize
+                // it only once.
+                if (isAlreadyExistBeforeLoad) {
+                  if (!isDeserializeDone) {
+                    tsFileResource.deserialize();
+                    statement.addTsFileResource(tsFileResource);
+                    isDeserializeDone = true;
+                  }
+
+                } else if (!tsFileResource.resourceFileExists()) {
+                  // case 2: if the tsfile has no tsfile resource before 
loading, we should
+                  // construct it.
+                  tsFileResource = constructTsFileResource(tsFile, 
device2Metadata, reader);
+                  statement.addTsFileResource(tsFileResource);
+
+                } else {
+                  // case 3: the tsfile resource is created when loading, so 
we just need to update
+                  // the resource.
+                  FileLoaderUtils.updateTsFileResource(device2Metadata, 
tsFileResource);
+                }
+
+                tsFileResource.setStatus(TsFileResourceStatus.NORMAL);
+
+                autoCreateAndVerifySchema(statement, device2Schemas, 
device2IsAligned);
+                timeseriesCount = 0;
+              } else {
+                throw new VerifyMetadataException(
+                    String.format(
+                        "Device %s has different aligned definition in tsFile 
%s and other TsFile.",
+                        device, tsFile.getParentFile()));
+              }
+
+              logger.info(
+                  "Load - Create and Verify Schemas Stage: the device {} in 
tsfile {} have been created and verified.",
+                  entry.getKey(),
+                  tsFile.getName());
+            }
           }
-          boolean finalIsAligned = isAligned;
-          if (!device2IsAligned
-              .computeIfAbsent(device, o -> new Pair<>(finalIsAligned, tsFile))
-              .left
-              .equals(isAligned)) {
+
+          // when the number of devices does not exceed the threshold and it's 
not the last
+          // timeseries of the last device, we also need to check if the 
device has the same aligned
+          // definition in all tsfiles before going to the next device loop.
+          if (!isDeviceAligned(device2IsAligned, device, tsFile, isAligned)) {
             throw new VerifyMetadataException(
                 String.format(
                     "Device %s has different aligned definition in tsFile %s 
and other TsFile.",
@@ -2142,21 +2214,37 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
           }
         }
       }
+    }
+  }
 
-      // construct TsFileResource
-      TsFileResource resource = new TsFileResource(tsFile);
-      if (!resource.resourceFileExists()) {
-        FileLoaderUtils.updateTsFileResource(
-            device2Metadata, resource); // serialize it in LoadSingleTsFileNode
-        resource.updatePlanIndexes(reader.getMinPlanIndex());
-        resource.updatePlanIndexes(reader.getMaxPlanIndex());
-      } else {
-        resource.deserialize();
-      }
+  private boolean isDeviceAligned(
+      Map<String, Pair<Boolean, File>> device2IsAligned,
+      String device,
+      File tsFile,
+      boolean isAligned) {
+    return device2IsAligned
+        .computeIfAbsent(device, o -> new Pair<>(isAligned, tsFile))
+        .left
+        .equals(isAligned);
+  }
 
-      resource.setStatus(TsFileResourceStatus.NORMAL);
-      return resource;
+  private TsFileResource constructTsFileResource(
+      File tsFile,
+      Map<String, List<TimeseriesMetadata>> device2Metadata,
+      TsFileSequenceReader reader)
+      throws IOException {
+    TsFileResource resource = new TsFileResource(tsFile);
+    if (!resource.resourceFileExists()) {
+      FileLoaderUtils.updateTsFileResource(
+          device2Metadata, resource); // serialize it in LoadSingleTsFileNode
+      resource.updatePlanIndexes(reader.getMinPlanIndex());
+      resource.updatePlanIndexes(reader.getMaxPlanIndex());
+    } else {
+      resource.deserialize();
     }
+
+    resource.setStatus(TsFileResourceStatus.NORMAL);
+    return resource;
   }
 
   private void autoCreateSg(int sgLevel, Map<String, Map<MeasurementSchema, 
File>> device2Schemas)

Reply via email to