This is an automated email from the ASF dual-hosted git repository. tanxinyu pushed a commit to branch wal_compression in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 51bfb3d368ab50d4a27298cd47c2773f35db8c76 Author: OneSizeFitQuorum <[email protected]> AuthorDate: Sat Jun 29 16:03:34 2024 +0800 use directbuffer to optimize performance Signed-off-by: OneSizeFitQuorum <[email protected]> --- .../dataregion/wal/io/WALInputStream.java | 23 +++++++++++++++------- .../java/org/apache/iotdb/db/utils/MmapUtil.java | 8 ++++++++ 2 files changed, 24 insertions(+), 7 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java index 90bfd52e371..abaf9dce36d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.storageengine.dataregion.wal.io; import org.apache.iotdb.commons.conf.IoTDBConstant; +import org.apache.iotdb.db.utils.MmapUtil; import org.apache.tsfile.compress.IUnCompressor; import org.apache.tsfile.file.metadata.enums.CompressionType; @@ -164,6 +165,7 @@ public class WALInputStream extends InputStream implements AutoCloseable { @Override public void close() throws IOException { channel.close(); + MmapUtil.clean(dataBuffer); dataBuffer = null; } @@ -210,14 +212,17 @@ public class WALInputStream extends InputStream implements AutoCloseable { if (Objects.isNull(dataBuffer) || dataBuffer.capacity() < segmentInfo.uncompressedSize || dataBuffer.capacity() > segmentInfo.uncompressedSize * 2) { - dataBuffer = ByteBuffer.allocate(segmentInfo.uncompressedSize); + if (!Objects.isNull(dataBuffer)) { + MmapUtil.clean(dataBuffer); + } + dataBuffer = ByteBuffer.allocateDirect(segmentInfo.uncompressedSize); } dataBuffer.clear(); if (Objects.isNull(compressedBuffer) || compressedBuffer.capacity() < segmentInfo.dataInDiskSize || compressedBuffer.capacity() > segmentInfo.dataInDiskSize * 2) { - compressedBuffer = ByteBuffer.allocate(segmentInfo.dataInDiskSize); + compressedBuffer = ByteBuffer.allocateDirect(segmentInfo.dataInDiskSize); } compressedBuffer.clear(); // limit the buffer to prevent it from reading too much byte than expected @@ -226,15 +231,18 @@ public class WALInputStream extends InputStream implements AutoCloseable { throw new IOException("Unexpected end of file"); } compressedBuffer.flip(); - IUnCompressor unCompressor = IUnCompressor.getUnCompressor(segmentInfo.compressionType); unCompressor.uncompress(compressedBuffer, dataBuffer); + MmapUtil.clean(compressedBuffer); } else { // An uncompressed segment if (Objects.isNull(dataBuffer) || dataBuffer.capacity() < segmentInfo.dataInDiskSize || dataBuffer.capacity() > segmentInfo.dataInDiskSize * 2) { - dataBuffer = ByteBuffer.allocate(segmentInfo.dataInDiskSize); + if (!Objects.isNull(dataBuffer)) { + MmapUtil.clean(dataBuffer); + } + dataBuffer = ByteBuffer.allocateDirect(segmentInfo.dataInDiskSize); } dataBuffer.clear(); // limit the buffer to prevent it from reading too much byte than expected @@ -286,14 +294,15 @@ public class WALInputStream extends InputStream implements AutoCloseable { } while (posRemain >= 0); if (segmentInfo.compressionType != CompressionType.UNCOMPRESSED) { - compressedBuffer = ByteBuffer.allocate(segmentInfo.dataInDiskSize); + compressedBuffer = ByteBuffer.allocateDirect(segmentInfo.dataInDiskSize); channel.read(compressedBuffer); compressedBuffer.flip(); IUnCompressor unCompressor = IUnCompressor.getUnCompressor(segmentInfo.compressionType); - dataBuffer = ByteBuffer.allocate(segmentInfo.uncompressedSize); + dataBuffer = ByteBuffer.allocateDirect(segmentInfo.uncompressedSize); unCompressor.uncompress(compressedBuffer, dataBuffer); + MmapUtil.clean(compressedBuffer); } else { - dataBuffer = ByteBuffer.allocate(segmentInfo.dataInDiskSize); + dataBuffer = ByteBuffer.allocateDirect(segmentInfo.dataInDiskSize); channel.read(dataBuffer); dataBuffer.flip(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MmapUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MmapUtil.java index 10b7bd6eb73..326c6c0583d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MmapUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MmapUtil.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.utils; import io.netty.util.internal.PlatformDependent; +import java.nio.ByteBuffer; import java.nio.MappedByteBuffer; public class MmapUtil { @@ -32,4 +33,11 @@ public class MmapUtil { } PlatformDependent.freeDirectBuffer(mappedByteBuffer); } + + /** we do not need to clean heapByteBuffer manually, so we just leave it alone. */ + public static void clean(ByteBuffer byteBuffer) { + if (byteBuffer != null & byteBuffer instanceof MappedByteBuffer) { + clean((MappedByteBuffer) byteBuffer); + } + } }
