This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 45a382039e4 Load: flush metadata of generated files to avoid OOM
(#14419) (#14445)
45a382039e4 is described below
commit 45a382039e49b34a49d8801c869d74755ae6cf7e
Author: Zikun Ma <[email protected]>
AuthorDate: Mon Dec 16 23:01:42 2024 +0800
Load: flush metadata of generated files to avoid OOM (#14419) (#14445)
* Load: flush metadata of generated files to avoid OOM (#14419)
(cherry picked from commit 0d22f2b3f3581d419a7c1180e70dab2a3934cb69)
* resolve
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 10 ++++
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 17 ++++--
.../db/storageengine/load/LoadTsFileManager.java | 63 ++++++++++++++++++----
pom.xml | 2 +-
4 files changed, 78 insertions(+), 14 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 7a510a7312c..577bfa25e12 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1140,6 +1140,8 @@ public class IoTDBConfig {
private long loadTsFileAnalyzeSchemaMemorySizeInBytes =
0L; // 0 means that the decision will be adaptive based on the number of
sequences
+ private long loadChunkMetadataMemorySizeInBytes = 33554432; // 32MB
+
private long loadMemoryAllocateRetryIntervalMs = 1000L;
private int loadMemoryAllocateMaxRetries = 5;
@@ -3962,6 +3964,14 @@ public class IoTDBConfig {
this.loadTsFileAnalyzeSchemaMemorySizeInBytes =
loadTsFileAnalyzeSchemaMemorySizeInBytes;
}
+ public long getLoadChunkMetadataMemorySizeInBytes() {
+ return loadChunkMetadataMemorySizeInBytes;
+ }
+
+ public void setLoadChunkMetadataMemorySizeInBytes(long
loadChunkMetadataMemorySizeInBytes) {
+ this.loadChunkMetadataMemorySizeInBytes =
loadChunkMetadataMemorySizeInBytes;
+ }
+
public long getLoadMemoryAllocateRetryIntervalMs() {
return loadMemoryAllocateRetryIntervalMs;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 930c005fc82..fd2d2e4ac53 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -2334,9 +2334,20 @@ public class IoTDBDescriptor {
String.valueOf(conf.getLoadTsFileAnalyzeSchemaBatchFlushTimeSeriesNumber()))));
conf.setLoadTsFileAnalyzeSchemaMemorySizeInBytes(
Long.parseLong(
- properties.getProperty(
- "load_tsfile_analyze_schema_memory_size_in_bytes",
-
String.valueOf(conf.getLoadTsFileAnalyzeSchemaMemorySizeInBytes()))));
+ Optional.ofNullable(
+ properties.getProperty(
+ "load_tsfile_analyze_schema_memory_size_in_bytes",
+
String.valueOf(conf.getLoadTsFileAnalyzeSchemaMemorySizeInBytes())))
+ .map(String::trim)
+
.orElse(String.valueOf(conf.getLoadTsFileAnalyzeSchemaMemorySizeInBytes()))));
+ conf.setLoadChunkMetadataMemorySizeInBytes(
+ Long.parseLong(
+ Optional.ofNullable(
+ properties.getProperty(
+ "load_chunk_metadata_memory_size_in_bytes",
+
String.valueOf(conf.getLoadChunkMetadataMemorySizeInBytes())))
+ .map(String::trim)
+
.orElse(String.valueOf(conf.getLoadChunkMetadataMemorySizeInBytes()))));
conf.setLoadCleanupTaskExecutionDelayTimeSeconds(
Long.parseLong(
properties.getProperty(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
index fd21140453c..cba11624cb8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
@@ -41,7 +41,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.db.storageengine.dataregion.flush.MemTableFlushTask;
import
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
-import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils;
+import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
import org.apache.iotdb.db.storageengine.load.active.ActiveLoadAgent;
import org.apache.iotdb.db.storageengine.load.splitter.ChunkData;
import org.apache.iotdb.db.storageengine.load.splitter.DeletionData;
@@ -51,6 +51,9 @@ import
org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyT
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.tsfile.common.constant.TsFileConstant;
+import org.apache.tsfile.file.metadata.ChunkGroupMetadata;
+import org.apache.tsfile.file.metadata.ChunkMetadata;
+import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.PlainDeviceID;
import org.apache.tsfile.write.writer.TsFileIOWriter;
import org.slf4j.Logger;
@@ -356,6 +359,7 @@ public class LoadTsFileManager {
private final File taskDir;
private Map<DataPartitionInfo, TsFileIOWriter> dataPartition2Writer;
+ private Map<DataPartitionInfo, TsFileResource> dataPartition2Resource;
private Map<DataPartitionInfo, String> dataPartition2LastDevice;
private Map<DataPartitionInfo, ModificationFile>
dataPartition2ModificationFile;
private boolean isClosed;
@@ -363,6 +367,7 @@ public class LoadTsFileManager {
private TsFileWriterManager(File taskDir) {
this.taskDir = taskDir;
this.dataPartition2Writer = new HashMap<>();
+ this.dataPartition2Resource = new HashMap<>();
this.dataPartition2LastDevice = new HashMap<>();
this.dataPartition2ModificationFile = new HashMap<>();
this.isClosed = false;
@@ -398,12 +403,39 @@ public class LoadTsFileManager {
return;
}
- dataPartition2Writer.put(partitionInfo, new TsFileIOWriter(newTsFile));
+ final long chunkMetadataMaxSizeForEachWriter =
+ CONFIG.getLoadChunkMetadataMemorySizeInBytes() /
(dataPartition2Writer.size() + 1);
+ final TsFileIOWriter writer =
+ new TsFileIOWriter(newTsFile, chunkMetadataMaxSizeForEachWriter);
+ final TsFileResource resource = new TsFileResource(writer.getFile());
+ writer.addFlushListener(
+ // Update time index by chunk groups going to be flushed to temp
file
+ sortedChunkMetadataList ->
+ sortedChunkMetadataList.forEach(
+ pair -> {
+ final IDeviceID deviceId = pair.left.left;
+ pair.getRight()
+ .forEach(
+ chunkMetadata -> {
+ resource.updateStartTime(deviceId,
chunkMetadata.getStartTime());
+ resource.updateEndTime(deviceId,
chunkMetadata.getEndTime());
+ });
+ }));
+
+ // When a new writer is added, we need to reduce the metadata size
limit of all existing
+ // writers for memory control
+ for (final TsFileIOWriter existingWriter :
dataPartition2Writer.values()) {
+ existingWriter.setMaxMetadataSize(chunkMetadataMaxSizeForEachWriter);
+ existingWriter.checkMetadataSizeAndMayFlush();
+ }
+ dataPartition2Writer.put(partitionInfo, writer);
+ dataPartition2Resource.put(partitionInfo, resource);
}
TsFileIOWriter writer = dataPartition2Writer.get(partitionInfo);
if
(!chunkData.getDevice().equals(dataPartition2LastDevice.getOrDefault(partitionInfo,
""))) {
if (dataPartition2LastDevice.containsKey(partitionInfo)) {
writer.endChunkGroup();
+ writer.checkMetadataSizeAndMayFlush();
}
writer.startChunkGroup(new PlainDeviceID(chunkData.getDevice()));
dataPartition2LastDevice.put(partitionInfo, chunkData.getDevice());
@@ -445,19 +477,22 @@ public class LoadTsFileManager {
if (isClosed) {
throw new
IOException(String.format(MESSAGE_WRITER_MANAGER_HAS_BEEN_CLOSED, taskDir));
}
- for (Map.Entry<DataPartitionInfo, ModificationFile> entry :
+ for (final Map.Entry<DataPartitionInfo, ModificationFile> entry :
dataPartition2ModificationFile.entrySet()) {
entry.getValue().close();
}
- for (Map.Entry<DataPartitionInfo, TsFileIOWriter> entry :
dataPartition2Writer.entrySet()) {
- TsFileIOWriter writer = entry.getValue();
+ for (final Map.Entry<DataPartitionInfo, TsFileIOWriter> entry :
+ dataPartition2Writer.entrySet()) {
+ final TsFileIOWriter writer = entry.getValue();
if (writer.isWritingChunkGroup()) {
writer.endChunkGroup();
}
writer.endFile();
- DataRegion dataRegion = entry.getKey().getDataRegion();
- dataRegion.loadNewTsFile(generateResource(writer, progressIndex),
true, isGeneratedByPipe);
+ final DataRegion dataRegion = entry.getKey().getDataRegion();
+ final TsFileResource tsFileResource =
dataPartition2Resource.get(entry.getKey());
+ endTsFileResource(writer, tsFileResource, progressIndex);
+ dataRegion.loadNewTsFile(tsFileResource, true, isGeneratedByPipe);
// Metrics
dataRegion
@@ -469,12 +504,20 @@ public class LoadTsFileManager {
}
}
- private TsFileResource generateResource(TsFileIOWriter writer,
ProgressIndex progressIndex)
+ private void endTsFileResource(
+ TsFileIOWriter writer, TsFileResource tsFileResource, ProgressIndex
progressIndex)
throws IOException {
- TsFileResource tsFileResource =
TsFileResourceUtils.generateTsFileResource(writer);
+ // Update time index by chunk groups still in memory
+ for (final ChunkGroupMetadata chunkGroupMetadata :
writer.getChunkGroupMetadataList()) {
+ final IDeviceID device = chunkGroupMetadata.getDevice();
+ for (final ChunkMetadata chunkMetadata :
chunkGroupMetadata.getChunkMetadataList()) {
+ tsFileResource.updateStartTime(device, chunkMetadata.getStartTime());
+ tsFileResource.updateEndTime(device, chunkMetadata.getEndTime());
+ }
+ }
+ tsFileResource.setStatus(TsFileResourceStatus.NORMAL);
tsFileResource.setProgressIndex(progressIndex);
tsFileResource.serialize();
- return tsFileResource;
}
private long getTsFileWritePointCount(TsFileIOWriter writer) {
diff --git a/pom.xml b/pom.xml
index 4a07cef0af4..d3af4ab2238 100644
--- a/pom.xml
+++ b/pom.xml
@@ -166,7 +166,7 @@
<thrift.version>0.14.1</thrift.version>
<xz.version>1.9</xz.version>
<zstd-jni.version>1.5.6-3</zstd-jni.version>
- <tsfile.version>1.1.0-241205-SNAPSHOT</tsfile.version>
+ <tsfile.version>1.1.0-241216-SNAPSHOT</tsfile.version>
</properties>
<!--
if we claim dependencies in dependencyManagement, then we do not claim