This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 45a382039e4 Load: flush metadata of generated files to avoid OOM 
(#14419) (#14445)
45a382039e4 is described below

commit 45a382039e49b34a49d8801c869d74755ae6cf7e
Author: Zikun Ma <[email protected]>
AuthorDate: Mon Dec 16 23:01:42 2024 +0800

    Load: flush metadata of generated files to avoid OOM (#14419) (#14445)
    
    * Load: flush metadata of generated files to avoid OOM (#14419)
    
    (cherry picked from commit 0d22f2b3f3581d419a7c1180e70dab2a3934cb69)
    
    * resolve
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 10 ++++
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  | 17 ++++--
 .../db/storageengine/load/LoadTsFileManager.java   | 63 ++++++++++++++++++----
 pom.xml                                            |  2 +-
 4 files changed, 78 insertions(+), 14 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 7a510a7312c..577bfa25e12 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1140,6 +1140,8 @@ public class IoTDBConfig {
   private long loadTsFileAnalyzeSchemaMemorySizeInBytes =
       0L; // 0 means that the decision will be adaptive based on the number of 
sequences
 
+  private long loadChunkMetadataMemorySizeInBytes = 33554432; // 32MB
+
   private long loadMemoryAllocateRetryIntervalMs = 1000L;
   private int loadMemoryAllocateMaxRetries = 5;
 
@@ -3962,6 +3964,14 @@ public class IoTDBConfig {
     this.loadTsFileAnalyzeSchemaMemorySizeInBytes = 
loadTsFileAnalyzeSchemaMemorySizeInBytes;
   }
 
+  public long getLoadChunkMetadataMemorySizeInBytes() {
+    return loadChunkMetadataMemorySizeInBytes;
+  }
+
+  public void setLoadChunkMetadataMemorySizeInBytes(long 
loadChunkMetadataMemorySizeInBytes) {
+    this.loadChunkMetadataMemorySizeInBytes = 
loadChunkMetadataMemorySizeInBytes;
+  }
+
   public long getLoadMemoryAllocateRetryIntervalMs() {
     return loadMemoryAllocateRetryIntervalMs;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 930c005fc82..fd2d2e4ac53 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -2334,9 +2334,20 @@ public class IoTDBDescriptor {
                 
String.valueOf(conf.getLoadTsFileAnalyzeSchemaBatchFlushTimeSeriesNumber()))));
     conf.setLoadTsFileAnalyzeSchemaMemorySizeInBytes(
         Long.parseLong(
-            properties.getProperty(
-                "load_tsfile_analyze_schema_memory_size_in_bytes",
-                
String.valueOf(conf.getLoadTsFileAnalyzeSchemaMemorySizeInBytes()))));
+            Optional.ofNullable(
+                    properties.getProperty(
+                        "load_tsfile_analyze_schema_memory_size_in_bytes",
+                        
String.valueOf(conf.getLoadTsFileAnalyzeSchemaMemorySizeInBytes())))
+                .map(String::trim)
+                
.orElse(String.valueOf(conf.getLoadTsFileAnalyzeSchemaMemorySizeInBytes()))));
+    conf.setLoadChunkMetadataMemorySizeInBytes(
+        Long.parseLong(
+            Optional.ofNullable(
+                    properties.getProperty(
+                        "load_chunk_metadata_memory_size_in_bytes",
+                        
String.valueOf(conf.getLoadChunkMetadataMemorySizeInBytes())))
+                .map(String::trim)
+                
.orElse(String.valueOf(conf.getLoadChunkMetadataMemorySizeInBytes()))));
     conf.setLoadCleanupTaskExecutionDelayTimeSeconds(
         Long.parseLong(
             properties.getProperty(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
index fd21140453c..cba11624cb8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
@@ -41,7 +41,7 @@ import 
org.apache.iotdb.db.storageengine.dataregion.DataRegion;
 import org.apache.iotdb.db.storageengine.dataregion.flush.MemTableFlushTask;
 import 
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
-import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils;
+import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
 import org.apache.iotdb.db.storageengine.load.active.ActiveLoadAgent;
 import org.apache.iotdb.db.storageengine.load.splitter.ChunkData;
 import org.apache.iotdb.db.storageengine.load.splitter.DeletionData;
@@ -51,6 +51,9 @@ import 
org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyT
 import org.apache.iotdb.metrics.utils.MetricLevel;
 
 import org.apache.tsfile.common.constant.TsFileConstant;
+import org.apache.tsfile.file.metadata.ChunkGroupMetadata;
+import org.apache.tsfile.file.metadata.ChunkMetadata;
+import org.apache.tsfile.file.metadata.IDeviceID;
 import org.apache.tsfile.file.metadata.PlainDeviceID;
 import org.apache.tsfile.write.writer.TsFileIOWriter;
 import org.slf4j.Logger;
@@ -356,6 +359,7 @@ public class LoadTsFileManager {
 
     private final File taskDir;
     private Map<DataPartitionInfo, TsFileIOWriter> dataPartition2Writer;
+    private Map<DataPartitionInfo, TsFileResource> dataPartition2Resource;
     private Map<DataPartitionInfo, String> dataPartition2LastDevice;
     private Map<DataPartitionInfo, ModificationFile> 
dataPartition2ModificationFile;
     private boolean isClosed;
@@ -363,6 +367,7 @@ public class LoadTsFileManager {
     private TsFileWriterManager(File taskDir) {
       this.taskDir = taskDir;
       this.dataPartition2Writer = new HashMap<>();
+      this.dataPartition2Resource = new HashMap<>();
       this.dataPartition2LastDevice = new HashMap<>();
       this.dataPartition2ModificationFile = new HashMap<>();
       this.isClosed = false;
@@ -398,12 +403,39 @@ public class LoadTsFileManager {
           return;
         }
 
-        dataPartition2Writer.put(partitionInfo, new TsFileIOWriter(newTsFile));
+        final long chunkMetadataMaxSizeForEachWriter =
+            CONFIG.getLoadChunkMetadataMemorySizeInBytes() / 
(dataPartition2Writer.size() + 1);
+        final TsFileIOWriter writer =
+            new TsFileIOWriter(newTsFile, chunkMetadataMaxSizeForEachWriter);
+        final TsFileResource resource = new TsFileResource(writer.getFile());
+        writer.addFlushListener(
+            // Update time index by chunk groups going to be flushed to temp 
file
+            sortedChunkMetadataList ->
+                sortedChunkMetadataList.forEach(
+                    pair -> {
+                      final IDeviceID deviceId = pair.left.left;
+                      pair.getRight()
+                          .forEach(
+                              chunkMetadata -> {
+                                resource.updateStartTime(deviceId, 
chunkMetadata.getStartTime());
+                                resource.updateEndTime(deviceId, 
chunkMetadata.getEndTime());
+                              });
+                    }));
+
+        // When a new writer is added, we need to reduce the metadata size 
limit of all existing
+        // writers for memory control
+        for (final TsFileIOWriter existingWriter : 
dataPartition2Writer.values()) {
+          existingWriter.setMaxMetadataSize(chunkMetadataMaxSizeForEachWriter);
+          existingWriter.checkMetadataSizeAndMayFlush();
+        }
+        dataPartition2Writer.put(partitionInfo, writer);
+        dataPartition2Resource.put(partitionInfo, resource);
       }
       TsFileIOWriter writer = dataPartition2Writer.get(partitionInfo);
       if 
(!chunkData.getDevice().equals(dataPartition2LastDevice.getOrDefault(partitionInfo,
 ""))) {
         if (dataPartition2LastDevice.containsKey(partitionInfo)) {
           writer.endChunkGroup();
+          writer.checkMetadataSizeAndMayFlush();
         }
         writer.startChunkGroup(new PlainDeviceID(chunkData.getDevice()));
         dataPartition2LastDevice.put(partitionInfo, chunkData.getDevice());
@@ -445,19 +477,22 @@ public class LoadTsFileManager {
       if (isClosed) {
         throw new 
IOException(String.format(MESSAGE_WRITER_MANAGER_HAS_BEEN_CLOSED, taskDir));
       }
-      for (Map.Entry<DataPartitionInfo, ModificationFile> entry :
+      for (final Map.Entry<DataPartitionInfo, ModificationFile> entry :
           dataPartition2ModificationFile.entrySet()) {
         entry.getValue().close();
       }
-      for (Map.Entry<DataPartitionInfo, TsFileIOWriter> entry : 
dataPartition2Writer.entrySet()) {
-        TsFileIOWriter writer = entry.getValue();
+      for (final Map.Entry<DataPartitionInfo, TsFileIOWriter> entry :
+          dataPartition2Writer.entrySet()) {
+        final TsFileIOWriter writer = entry.getValue();
         if (writer.isWritingChunkGroup()) {
           writer.endChunkGroup();
         }
         writer.endFile();
 
-        DataRegion dataRegion = entry.getKey().getDataRegion();
-        dataRegion.loadNewTsFile(generateResource(writer, progressIndex), 
true, isGeneratedByPipe);
+        final DataRegion dataRegion = entry.getKey().getDataRegion();
+        final TsFileResource tsFileResource = 
dataPartition2Resource.get(entry.getKey());
+        endTsFileResource(writer, tsFileResource, progressIndex);
+        dataRegion.loadNewTsFile(tsFileResource, true, isGeneratedByPipe);
 
         // Metrics
         dataRegion
@@ -469,12 +504,20 @@ public class LoadTsFileManager {
       }
     }
 
-    private TsFileResource generateResource(TsFileIOWriter writer, 
ProgressIndex progressIndex)
+    private void endTsFileResource(
+        TsFileIOWriter writer, TsFileResource tsFileResource, ProgressIndex 
progressIndex)
         throws IOException {
-      TsFileResource tsFileResource = 
TsFileResourceUtils.generateTsFileResource(writer);
+      // Update time index by chunk groups still in memory
+      for (final ChunkGroupMetadata chunkGroupMetadata : 
writer.getChunkGroupMetadataList()) {
+        final IDeviceID device = chunkGroupMetadata.getDevice();
+        for (final ChunkMetadata chunkMetadata : 
chunkGroupMetadata.getChunkMetadataList()) {
+          tsFileResource.updateStartTime(device, chunkMetadata.getStartTime());
+          tsFileResource.updateEndTime(device, chunkMetadata.getEndTime());
+        }
+      }
+      tsFileResource.setStatus(TsFileResourceStatus.NORMAL);
       tsFileResource.setProgressIndex(progressIndex);
       tsFileResource.serialize();
-      return tsFileResource;
     }
 
     private long getTsFileWritePointCount(TsFileIOWriter writer) {
diff --git a/pom.xml b/pom.xml
index 4a07cef0af4..d3af4ab2238 100644
--- a/pom.xml
+++ b/pom.xml
@@ -166,7 +166,7 @@
         <thrift.version>0.14.1</thrift.version>
         <xz.version>1.9</xz.version>
         <zstd-jni.version>1.5.6-3</zstd-jni.version>
-        <tsfile.version>1.1.0-241205-SNAPSHOT</tsfile.version>
+        <tsfile.version>1.1.0-241216-SNAPSHOT</tsfile.version>
     </properties>
     <!--
     if we claim dependencies in dependencyManagement, then we do not claim

Reply via email to