This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d22f2b3f35 Load: flush metadata of generated files to avoid OOM 
(#14419)
0d22f2b3f35 is described below

commit 0d22f2b3f3581d419a7c1180e70dab2a3934cb69
Author: Zikun Ma <[email protected]>
AuthorDate: Mon Dec 16 12:28:47 2024 +0800

    Load: flush metadata of generated files to avoid OOM (#14419)
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 10 ++++
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  8 +++
 .../db/storageengine/load/LoadTsFileManager.java   | 61 ++++++++++++++++++----
 3 files changed, 69 insertions(+), 10 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 47707e53170..c2c1fcce560 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1169,6 +1169,8 @@ public class IoTDBConfig {
 
   private int loadTsFileMaxDeviceCountToUseDeviceTimeIndex = 10000;
 
+  private long loadChunkMetadataMemorySizeInBytes = 33554432; // 32MB
+
   private long loadMemoryAllocateRetryIntervalMs = 1000L;
   private int loadMemoryAllocateMaxRetries = 5;
 
@@ -4091,6 +4093,14 @@ public class IoTDBConfig {
         loadTsFileMaxDeviceCountToUseDeviceTimeIndex;
   }
 
+  public long getLoadChunkMetadataMemorySizeInBytes() {
+    return loadChunkMetadataMemorySizeInBytes;
+  }
+
+  public void setLoadChunkMetadataMemorySizeInBytes(long 
loadChunkMetadataMemorySizeInBytes) {
+    this.loadChunkMetadataMemorySizeInBytes = 
loadChunkMetadataMemorySizeInBytes;
+  }
+
   public long getLoadMemoryAllocateRetryIntervalMs() {
     return loadMemoryAllocateRetryIntervalMs;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index ca0419a7b4c..87888d0da87 100755
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -3257,6 +3257,14 @@ public class IoTDBDescriptor {
                         
String.valueOf(conf.getLoadTsFileMaxDeviceCountToUseDeviceTimeIndex())))
                 .map(String::trim)
                 
.orElse(String.valueOf(conf.getLoadTsFileMaxDeviceCountToUseDeviceTimeIndex()))));
+    conf.setLoadChunkMetadataMemorySizeInBytes(
+        Long.parseLong(
+            Optional.ofNullable(
+                    properties.getProperty(
+                        "load_chunk_metadata_memory_size_in_bytes",
+                        
String.valueOf(conf.getLoadChunkMetadataMemorySizeInBytes())))
+                .map(String::trim)
+                
.orElse(String.valueOf(conf.getLoadChunkMetadataMemorySizeInBytes()))));
     conf.setLoadCleanupTaskExecutionDelayTimeSeconds(
         Long.parseLong(
             Optional.ofNullable(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
index 9ab7e086264..5263e251370 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
@@ -43,7 +43,7 @@ import 
org.apache.iotdb.db.storageengine.dataregion.DataRegion;
 import org.apache.iotdb.db.storageengine.dataregion.flush.MemTableFlushTask;
 import 
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
-import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils;
+import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
 import org.apache.iotdb.db.storageengine.load.active.ActiveLoadAgent;
 import org.apache.iotdb.db.storageengine.load.splitter.ChunkData;
 import org.apache.iotdb.db.storageengine.load.splitter.DeletionData;
@@ -53,6 +53,8 @@ import 
org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyT
 import org.apache.iotdb.metrics.utils.MetricLevel;
 
 import org.apache.tsfile.common.constant.TsFileConstant;
+import org.apache.tsfile.file.metadata.ChunkGroupMetadata;
+import org.apache.tsfile.file.metadata.ChunkMetadata;
 import org.apache.tsfile.file.metadata.IDeviceID;
 import org.apache.tsfile.write.writer.TsFileIOWriter;
 import org.slf4j.Logger;
@@ -366,6 +368,7 @@ public class LoadTsFileManager {
 
     private final File taskDir;
     private Map<DataPartitionInfo, TsFileIOWriter> dataPartition2Writer;
+    private Map<DataPartitionInfo, TsFileResource> dataPartition2Resource;
     private Map<DataPartitionInfo, IDeviceID> dataPartition2LastDevice;
     private Map<DataPartitionInfo, ModificationFile> 
dataPartition2ModificationFile;
     private boolean isClosed;
@@ -373,6 +376,7 @@ public class LoadTsFileManager {
     private TsFileWriterManager(File taskDir) {
       this.taskDir = taskDir;
       this.dataPartition2Writer = new HashMap<>();
+      this.dataPartition2Resource = new HashMap<>();
       this.dataPartition2LastDevice = new HashMap<>();
       this.dataPartition2ModificationFile = new HashMap<>();
       this.isClosed = false;
@@ -408,8 +412,33 @@ public class LoadTsFileManager {
           return;
         }
 
-        final TsFileIOWriter writer = new TsFileIOWriter(newTsFile);
+        final long chunkMetadataMaxSizeForEachWriter =
+            CONFIG.getLoadChunkMetadataMemorySizeInBytes() / 
(dataPartition2Writer.size() + 1);
+        final TsFileIOWriter writer =
+            new TsFileIOWriter(newTsFile, chunkMetadataMaxSizeForEachWriter);
+        final TsFileResource resource = new TsFileResource(writer.getFile());
+        writer.addFlushListener(
+            // Update time index by chunk groups going to be flushed to temp 
file
+            sortedChunkMetadataList ->
+                sortedChunkMetadataList.forEach(
+                    pair -> {
+                      final IDeviceID deviceId = pair.left.left;
+                      pair.getRight()
+                          .forEach(
+                              chunkMetadata -> {
+                                resource.updateStartTime(deviceId, 
chunkMetadata.getStartTime());
+                                resource.updateEndTime(deviceId, 
chunkMetadata.getEndTime());
+                              });
+                    }));
+
+        // When a new writer is added, we need to reduce the metadata size 
limit of all existing
+        // writers for memory control
+        for (final TsFileIOWriter existingWriter : 
dataPartition2Writer.values()) {
+          existingWriter.setMaxMetadataSize(chunkMetadataMaxSizeForEachWriter);
+          existingWriter.checkMetadataSizeAndMayFlush();
+        }
         dataPartition2Writer.put(partitionInfo, writer);
+        dataPartition2Resource.put(partitionInfo, resource);
       }
       TsFileIOWriter writer = dataPartition2Writer.get(partitionInfo);
 
@@ -432,6 +461,7 @@ public class LoadTsFileManager {
       if (!Objects.equals(chunkData.getDevice(), 
dataPartition2LastDevice.get(partitionInfo))) {
         if (dataPartition2LastDevice.containsKey(partitionInfo)) {
           writer.endChunkGroup();
+          writer.checkMetadataSizeAndMayFlush();
         }
         writer.startChunkGroup(chunkData.getDevice());
         dataPartition2LastDevice.put(partitionInfo, chunkData.getDevice());
@@ -471,19 +501,22 @@ public class LoadTsFileManager {
       if (isClosed) {
         throw new 
IOException(String.format(MESSAGE_WRITER_MANAGER_HAS_BEEN_CLOSED, taskDir));
       }
-      for (Map.Entry<DataPartitionInfo, ModificationFile> entry :
+      for (final Map.Entry<DataPartitionInfo, ModificationFile> entry :
           dataPartition2ModificationFile.entrySet()) {
         entry.getValue().close();
       }
-      for (Map.Entry<DataPartitionInfo, TsFileIOWriter> entry : 
dataPartition2Writer.entrySet()) {
-        TsFileIOWriter writer = entry.getValue();
+      for (final Map.Entry<DataPartitionInfo, TsFileIOWriter> entry :
+          dataPartition2Writer.entrySet()) {
+        final TsFileIOWriter writer = entry.getValue();
         if (writer.isWritingChunkGroup()) {
           writer.endChunkGroup();
         }
         writer.endFile();
 
-        DataRegion dataRegion = entry.getKey().getDataRegion();
-        dataRegion.loadNewTsFile(generateResource(writer, progressIndex), 
true, isGeneratedByPipe);
+        final DataRegion dataRegion = entry.getKey().getDataRegion();
+        final TsFileResource tsFileResource = 
dataPartition2Resource.get(entry.getKey());
+        endTsFileResource(writer, tsFileResource, progressIndex);
+        dataRegion.loadNewTsFile(tsFileResource, true, isGeneratedByPipe);
 
         // Metrics
         dataRegion
@@ -495,12 +528,20 @@ public class LoadTsFileManager {
       }
     }
 
-    private TsFileResource generateResource(TsFileIOWriter writer, 
ProgressIndex progressIndex)
+    private void endTsFileResource(
+        TsFileIOWriter writer, TsFileResource tsFileResource, ProgressIndex 
progressIndex)
         throws IOException {
-      TsFileResource tsFileResource = 
TsFileResourceUtils.generateTsFileResource(writer);
+      // Update time index by chunk groups still in memory
+      for (final ChunkGroupMetadata chunkGroupMetadata : 
writer.getChunkGroupMetadataList()) {
+        final IDeviceID device = chunkGroupMetadata.getDevice();
+        for (final ChunkMetadata chunkMetadata : 
chunkGroupMetadata.getChunkMetadataList()) {
+          tsFileResource.updateStartTime(device, chunkMetadata.getStartTime());
+          tsFileResource.updateEndTime(device, chunkMetadata.getEndTime());
+        }
+      }
+      tsFileResource.setStatus(TsFileResourceStatus.NORMAL);
       tsFileResource.setProgressIndex(progressIndex);
       tsFileResource.serialize();
-      return tsFileResource;
     }
 
     private long getTsFileWritePointCount(TsFileIOWriter writer) {

Reply via email to