This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 0d22f2b3f35 Load: flush metadata of generated files to avoid OOM
(#14419)
0d22f2b3f35 is described below
commit 0d22f2b3f3581d419a7c1180e70dab2a3934cb69
Author: Zikun Ma <[email protected]>
AuthorDate: Mon Dec 16 12:28:47 2024 +0800
Load: flush metadata of generated files to avoid OOM (#14419)
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 10 ++++
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 8 +++
.../db/storageengine/load/LoadTsFileManager.java | 61 ++++++++++++++++++----
3 files changed, 69 insertions(+), 10 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 47707e53170..c2c1fcce560 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1169,6 +1169,8 @@ public class IoTDBConfig {
private int loadTsFileMaxDeviceCountToUseDeviceTimeIndex = 10000;
+ private long loadChunkMetadataMemorySizeInBytes = 33554432; // 32MB
+
private long loadMemoryAllocateRetryIntervalMs = 1000L;
private int loadMemoryAllocateMaxRetries = 5;
@@ -4091,6 +4093,14 @@ public class IoTDBConfig {
loadTsFileMaxDeviceCountToUseDeviceTimeIndex;
}
+ public long getLoadChunkMetadataMemorySizeInBytes() {
+ return loadChunkMetadataMemorySizeInBytes;
+ }
+
+ public void setLoadChunkMetadataMemorySizeInBytes(long
loadChunkMetadataMemorySizeInBytes) {
+ this.loadChunkMetadataMemorySizeInBytes =
loadChunkMetadataMemorySizeInBytes;
+ }
+
public long getLoadMemoryAllocateRetryIntervalMs() {
return loadMemoryAllocateRetryIntervalMs;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index ca0419a7b4c..87888d0da87 100755
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -3257,6 +3257,14 @@ public class IoTDBDescriptor {
String.valueOf(conf.getLoadTsFileMaxDeviceCountToUseDeviceTimeIndex())))
.map(String::trim)
.orElse(String.valueOf(conf.getLoadTsFileMaxDeviceCountToUseDeviceTimeIndex()))));
+ conf.setLoadChunkMetadataMemorySizeInBytes(
+ Long.parseLong(
+ Optional.ofNullable(
+ properties.getProperty(
+ "load_chunk_metadata_memory_size_in_bytes",
+
String.valueOf(conf.getLoadChunkMetadataMemorySizeInBytes())))
+ .map(String::trim)
+
.orElse(String.valueOf(conf.getLoadChunkMetadataMemorySizeInBytes()))));
conf.setLoadCleanupTaskExecutionDelayTimeSeconds(
Long.parseLong(
Optional.ofNullable(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
index 9ab7e086264..5263e251370 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
@@ -43,7 +43,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.db.storageengine.dataregion.flush.MemTableFlushTask;
import
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
-import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils;
+import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
import org.apache.iotdb.db.storageengine.load.active.ActiveLoadAgent;
import org.apache.iotdb.db.storageengine.load.splitter.ChunkData;
import org.apache.iotdb.db.storageengine.load.splitter.DeletionData;
@@ -53,6 +53,8 @@ import
org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyT
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.tsfile.common.constant.TsFileConstant;
+import org.apache.tsfile.file.metadata.ChunkGroupMetadata;
+import org.apache.tsfile.file.metadata.ChunkMetadata;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.write.writer.TsFileIOWriter;
import org.slf4j.Logger;
@@ -366,6 +368,7 @@ public class LoadTsFileManager {
private final File taskDir;
private Map<DataPartitionInfo, TsFileIOWriter> dataPartition2Writer;
+ private Map<DataPartitionInfo, TsFileResource> dataPartition2Resource;
private Map<DataPartitionInfo, IDeviceID> dataPartition2LastDevice;
private Map<DataPartitionInfo, ModificationFile>
dataPartition2ModificationFile;
private boolean isClosed;
@@ -373,6 +376,7 @@ public class LoadTsFileManager {
private TsFileWriterManager(File taskDir) {
this.taskDir = taskDir;
this.dataPartition2Writer = new HashMap<>();
+ this.dataPartition2Resource = new HashMap<>();
this.dataPartition2LastDevice = new HashMap<>();
this.dataPartition2ModificationFile = new HashMap<>();
this.isClosed = false;
@@ -408,8 +412,33 @@ public class LoadTsFileManager {
return;
}
- final TsFileIOWriter writer = new TsFileIOWriter(newTsFile);
+ final long chunkMetadataMaxSizeForEachWriter =
+ CONFIG.getLoadChunkMetadataMemorySizeInBytes() /
(dataPartition2Writer.size() + 1);
+ final TsFileIOWriter writer =
+ new TsFileIOWriter(newTsFile, chunkMetadataMaxSizeForEachWriter);
+ final TsFileResource resource = new TsFileResource(writer.getFile());
+ writer.addFlushListener(
+ // Update time index by chunk groups going to be flushed to temp
file
+ sortedChunkMetadataList ->
+ sortedChunkMetadataList.forEach(
+ pair -> {
+ final IDeviceID deviceId = pair.left.left;
+ pair.getRight()
+ .forEach(
+ chunkMetadata -> {
+ resource.updateStartTime(deviceId,
chunkMetadata.getStartTime());
+ resource.updateEndTime(deviceId,
chunkMetadata.getEndTime());
+ });
+ }));
+
+ // When a new writer is added, we need to reduce the metadata size
limit of all existing
+ // writers for memory control
+ for (final TsFileIOWriter existingWriter :
dataPartition2Writer.values()) {
+ existingWriter.setMaxMetadataSize(chunkMetadataMaxSizeForEachWriter);
+ existingWriter.checkMetadataSizeAndMayFlush();
+ }
dataPartition2Writer.put(partitionInfo, writer);
+ dataPartition2Resource.put(partitionInfo, resource);
}
TsFileIOWriter writer = dataPartition2Writer.get(partitionInfo);
@@ -432,6 +461,7 @@ public class LoadTsFileManager {
if (!Objects.equals(chunkData.getDevice(),
dataPartition2LastDevice.get(partitionInfo))) {
if (dataPartition2LastDevice.containsKey(partitionInfo)) {
writer.endChunkGroup();
+ writer.checkMetadataSizeAndMayFlush();
}
writer.startChunkGroup(chunkData.getDevice());
dataPartition2LastDevice.put(partitionInfo, chunkData.getDevice());
@@ -471,19 +501,22 @@ public class LoadTsFileManager {
if (isClosed) {
throw new
IOException(String.format(MESSAGE_WRITER_MANAGER_HAS_BEEN_CLOSED, taskDir));
}
- for (Map.Entry<DataPartitionInfo, ModificationFile> entry :
+ for (final Map.Entry<DataPartitionInfo, ModificationFile> entry :
dataPartition2ModificationFile.entrySet()) {
entry.getValue().close();
}
- for (Map.Entry<DataPartitionInfo, TsFileIOWriter> entry :
dataPartition2Writer.entrySet()) {
- TsFileIOWriter writer = entry.getValue();
+ for (final Map.Entry<DataPartitionInfo, TsFileIOWriter> entry :
+ dataPartition2Writer.entrySet()) {
+ final TsFileIOWriter writer = entry.getValue();
if (writer.isWritingChunkGroup()) {
writer.endChunkGroup();
}
writer.endFile();
- DataRegion dataRegion = entry.getKey().getDataRegion();
- dataRegion.loadNewTsFile(generateResource(writer, progressIndex),
true, isGeneratedByPipe);
+ final DataRegion dataRegion = entry.getKey().getDataRegion();
+ final TsFileResource tsFileResource =
dataPartition2Resource.get(entry.getKey());
+ endTsFileResource(writer, tsFileResource, progressIndex);
+ dataRegion.loadNewTsFile(tsFileResource, true, isGeneratedByPipe);
// Metrics
dataRegion
@@ -495,12 +528,20 @@ public class LoadTsFileManager {
}
}
- private TsFileResource generateResource(TsFileIOWriter writer,
ProgressIndex progressIndex)
+ private void endTsFileResource(
+ TsFileIOWriter writer, TsFileResource tsFileResource, ProgressIndex
progressIndex)
throws IOException {
- TsFileResource tsFileResource =
TsFileResourceUtils.generateTsFileResource(writer);
+ // Update time index by chunk groups still in memory
+ for (final ChunkGroupMetadata chunkGroupMetadata :
writer.getChunkGroupMetadataList()) {
+ final IDeviceID device = chunkGroupMetadata.getDevice();
+ for (final ChunkMetadata chunkMetadata :
chunkGroupMetadata.getChunkMetadataList()) {
+ tsFileResource.updateStartTime(device, chunkMetadata.getStartTime());
+ tsFileResource.updateEndTime(device, chunkMetadata.getEndTime());
+ }
+ }
+ tsFileResource.setStatus(TsFileResourceStatus.NORMAL);
tsFileResource.setProgressIndex(progressIndex);
tsFileResource.serialize();
- return tsFileResource;
}
private long getTsFileWritePointCount(TsFileIOWriter writer) {