This is an automated email from the ASF dual-hosted git repository. marklau99 pushed a commit to branch compressed-wal in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 335833d8a5848401e47d6e86e36b0f8377768834 Author: Liu Xuxin <[email protected]> AuthorDate: Fri Jan 26 13:52:57 2024 +0800 avoid allocating direct memory too frequently --- .../db/storageengine/dataregion/wal/io/LogWriter.java | 18 +++++++++++++----- .../dataregion/wal/io/WALInputStream.java | 10 ++++++++-- 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java index ec0b3c74b09..8553e8f563c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java @@ -48,11 +48,21 @@ public abstract class LogWriter implements ILogWriter { protected final FileChannel logChannel; protected long size; private final ByteBuffer headerBuffer = ByteBuffer.allocate(Integer.BYTES * 2 + 1); + private final ICompressor compressor = ICompressor.getCompressor(CompressionType.LZ4); + private final ByteBuffer compressedByteBuffer; protected LogWriter(File logFile) throws FileNotFoundException { this.logFile = logFile; this.logStream = new FileOutputStream(logFile, true); this.logChannel = this.logStream.getChannel(); + if (IoTDBDescriptor.getInstance().getConfig().isEnableWALCompression()) { + compressedByteBuffer = + ByteBuffer.allocate( + compressor.getMaxBytesForCompression( + IoTDBDescriptor.getInstance().getConfig().getWalBufferSize())); + } else { + compressedByteBuffer = null; + } } @Override @@ -63,11 +73,9 @@ public abstract class LogWriter implements ILogWriter { int uncompressedSize = bufferSize; if (IoTDBDescriptor.getInstance().getConfig().isEnableWALCompression() && bufferSize > 1024 * 512 /* Do not compress buffer that is less than 512KB */) { - ICompressor compressor = ICompressor.getCompressor(CompressionType.LZ4); - ByteBuffer compressedBuffer = - ByteBuffer.allocateDirect(compressor.getMaxBytesForCompression(buffer.limit())); - compressor.compress(buffer, compressedBuffer); - buffer = compressedBuffer; + compressedByteBuffer.clear(); + compressor.compress(buffer, compressedByteBuffer); + buffer = compressedByteBuffer; bufferSize = buffer.position(); buffer.flip(); compressed = true; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java index e6a235c9729..c622e218680 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java @@ -18,6 +18,7 @@ */ package org.apache.iotdb.db.storageengine.dataregion.wal.io; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.tsfile.compress.IUnCompressor; import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; @@ -37,7 +38,8 @@ public class WALInputStream extends InputStream implements AutoCloseable { private final FileChannel channel; private final ByteBuffer headerBuffer = ByteBuffer.allocate(Integer.BYTES + 1); private final ByteBuffer compressedHeader = ByteBuffer.allocate(Integer.BYTES); - private ByteBuffer dataBuffer = null; + private ByteBuffer dataBuffer = + ByteBuffer.allocate(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize()); public WALInputStream(File logFile) throws IOException { channel = FileChannel.open(logFile.toPath()); @@ -72,13 +74,17 @@ public class WALInputStream extends InputStream implements AutoCloseable { } compressedHeader.flip(); int uncompressedSize = compressedHeader.getInt(); - dataBuffer = ByteBuffer.allocateDirect(uncompressedSize); + if (uncompressedSize > dataBuffer.capacity()) { + // enlarge buffer + dataBuffer = ByteBuffer.allocateDirect(uncompressedSize); + } ByteBuffer compressedData = ByteBuffer.allocateDirect(dataSize); if (channel.read(compressedData) != dataSize) { throw new IOException("Unexpected end of file"); } compressedData.flip(); IUnCompressor unCompressor = IUnCompressor.getUnCompressor(CompressionType.LZ4); + dataBuffer.clear(); unCompressor.uncompress(compressedData, dataBuffer); } else { dataBuffer = ByteBuffer.allocateDirect(dataSize);
