This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new e27245e129e Fix memory leak in wal compressed buffer (#15418)
e27245e129e is described below
commit e27245e129ebf60d9628107854cac80ef9ca731f
Author: Jiang Tian <[email protected]>
AuthorDate: Mon Apr 28 09:50:34 2025 +0800
Fix memory leak in wal compressed buffer (#15418)
---
.../deletion/persist/PageCacheDeletionBuffer.java | 1 +
.../dataregion/wal/buffer/WALBuffer.java | 9 ++++-
.../dataregion/wal/io/WALInputStream.java | 2 +
.../wal/compression/WALCompressionTest.java | 45 ++++++++++++----------
4 files changed, 35 insertions(+), 22 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java
index 7cb1600c6a5..bf334b0ed8f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java
@@ -273,6 +273,7 @@ public class PageCacheDeletionBuffer implements
DeletionBuffer {
}
// clean buffer
MmapUtil.clean(serializeBuffer);
+ serializeBuffer = null;
}
private void waitUntilFlushAllDeletionsOrTimeOut() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java
index 3a53dbf18ae..16be3f8ad08 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java
@@ -178,11 +178,12 @@ public class WALBuffer extends AbstractWALBuffer {
buffersLock.lock();
try {
MmapUtil.clean(workingBuffer);
- MmapUtil.clean(workingBuffer);
+ MmapUtil.clean(idleBuffer);
MmapUtil.clean(syncingBuffer);
MmapUtil.clean(compressedByteBuffer);
workingBuffer = ByteBuffer.allocateDirect(capacity);
idleBuffer = ByteBuffer.allocateDirect(capacity);
+ syncingBuffer = null;
compressedByteBuffer =
ByteBuffer.allocateDirect(getCompressedByteBufferSize(capacity));
currentWALFileWriter.setCompressedByteBuffer(compressedByteBuffer);
} catch (OutOfMemoryError e) {
@@ -719,9 +720,13 @@ public class WALBuffer extends AbstractWALBuffer {
checkpointManager.close();
MmapUtil.clean(workingBuffer);
- MmapUtil.clean(workingBuffer);
+ MmapUtil.clean(idleBuffer);
MmapUtil.clean(syncingBuffer);
MmapUtil.clean(compressedByteBuffer);
+ workingBuffer = null;
+ idleBuffer = null;
+ syncingBuffer = null;
+ compressedByteBuffer = null;
}
private void shutdownThread(ExecutorService thread, ThreadName threadName) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
index e3f544a8894..1827bfc9365 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
@@ -173,6 +173,7 @@ public class WALInputStream extends InputStream implements
AutoCloseable {
MmapUtil.clean(dataBuffer);
MmapUtil.clean(compressedBuffer);
dataBuffer = null;
+ compressedBuffer = null;
}
@Override
@@ -306,6 +307,7 @@ public class WALInputStream extends InputStream implements
AutoCloseable {
dataBuffer = ByteBuffer.allocateDirect(segmentInfo.uncompressedSize);
uncompressWALBuffer(compressedBuffer, dataBuffer, unCompressor);
MmapUtil.clean(compressedBuffer);
+ compressedBuffer = null;
} else {
dataBuffer = ByteBuffer.allocateDirect(segmentInfo.dataInDiskSize);
readWALBufferFromChannel(dataBuffer);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/compression/WALCompressionTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/compression/WALCompressionTest.java
index 28b95625594..b0a7abd52a3 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/compression/WALCompressionTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/compression/WALCompressionTest.java
@@ -80,6 +80,7 @@ public class WALCompressionTest {
FileUtils.delete(walFile);
}
originalMinCompressionSize = WALTestUtils.getMinCompressionSize();
+ WALTestUtils.setMinCompressionSize(0);
if (new File(compressionDir).exists()) {
FileUtils.forceDelete(new File(compressionDir));
}
@@ -125,29 +126,33 @@ public class WALCompressionTest {
public void testSkipToGivenPosition()
throws QueryProcessException, IllegalPathException, IOException {
- LogWriter writer = new WALWriter(walFile);
- ByteBuffer buffer = ByteBuffer.allocate(1024 * 4);
- List<Pair<Long, Integer>> positionAndEntryPairList = new ArrayList<>();
- int memTableId = 0;
- long fileOffset = 0;
- for (int i = 0; i < 100; ) {
- InsertRowNode insertRowNode = WALTestUtils.getInsertRowNode(devicePath +
memTableId, i);
- if (buffer.remaining() >= buffer.capacity() / 4) {
- int pos = buffer.position();
- insertRowNode.serialize(buffer);
- int size = buffer.position() - pos;
- positionAndEntryPairList.add(new Pair<>(fileOffset, size));
- fileOffset += size;
- i++;
- } else {
+ List<Pair<Long, Integer>> positionAndEntryPairList;
+ int memTableId;
+ try (LogWriter writer = new WALWriter(walFile)) {
+ writer.setCompressedByteBuffer(
+ ByteBuffer.allocateDirect(WALBuffer.ONE_THIRD_WAL_BUFFER_SIZE));
+ ByteBuffer buffer = ByteBuffer.allocate(1024 * 4);
+ positionAndEntryPairList = new ArrayList<>();
+ memTableId = 0;
+ long fileOffset = 0;
+ for (int i = 0; i < 100; ) {
+ InsertRowNode insertRowNode = WALTestUtils.getInsertRowNode(devicePath
+ memTableId, i);
+ if (buffer.remaining() >= buffer.capacity() / 4) {
+ int pos = buffer.position();
+ insertRowNode.serialize(buffer);
+ int size = buffer.position() - pos;
+ positionAndEntryPairList.add(new Pair<>(fileOffset, size));
+ fileOffset += size;
+ i++;
+ } else {
+ writer.write(buffer);
+ buffer.clear();
+ }
+ }
+ if (buffer.position() != 0) {
writer.write(buffer);
- buffer.clear();
}
}
- if (buffer.position() != 0) {
- writer.write(buffer);
- }
- writer.close();
try (WALInputStream stream = new WALInputStream(walFile)) {
for (int i = 0; i < 100; ++i) {
Pair<Long, Integer> positionAndNodePair =
positionAndEntryPairList.get(i);