This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch compressed-wal
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 335833d8a5848401e47d6e86e36b0f8377768834
Author: Liu Xuxin <[email protected]>
AuthorDate: Fri Jan 26 13:52:57 2024 +0800

    avoid allocating direct memory too frequently
---
 .../db/storageengine/dataregion/wal/io/LogWriter.java  | 18 +++++++++++++-----
 .../dataregion/wal/io/WALInputStream.java              | 10 ++++++++--
 2 files changed, 21 insertions(+), 7 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java
index ec0b3c74b09..8553e8f563c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java
@@ -48,11 +48,21 @@ public abstract class LogWriter implements ILogWriter {
   protected final FileChannel logChannel;
   protected long size;
   private final ByteBuffer headerBuffer = ByteBuffer.allocate(Integer.BYTES * 
2 + 1);
+  private final ICompressor compressor = 
ICompressor.getCompressor(CompressionType.LZ4);
+  private final ByteBuffer compressedByteBuffer;
 
   protected LogWriter(File logFile) throws FileNotFoundException {
     this.logFile = logFile;
     this.logStream = new FileOutputStream(logFile, true);
     this.logChannel = this.logStream.getChannel();
+    if (IoTDBDescriptor.getInstance().getConfig().isEnableWALCompression()) {
+      compressedByteBuffer =
+          ByteBuffer.allocate(
+              compressor.getMaxBytesForCompression(
+                  
IoTDBDescriptor.getInstance().getConfig().getWalBufferSize()));
+    } else {
+      compressedByteBuffer = null;
+    }
   }
 
   @Override
@@ -63,11 +73,9 @@ public abstract class LogWriter implements ILogWriter {
     int uncompressedSize = bufferSize;
     if (IoTDBDescriptor.getInstance().getConfig().isEnableWALCompression()
         && bufferSize > 1024 * 512 /* Do not compress buffer that is less than 
512KB */) {
-      ICompressor compressor = ICompressor.getCompressor(CompressionType.LZ4);
-      ByteBuffer compressedBuffer =
-          
ByteBuffer.allocateDirect(compressor.getMaxBytesForCompression(buffer.limit()));
-      compressor.compress(buffer, compressedBuffer);
-      buffer = compressedBuffer;
+      compressedByteBuffer.clear();
+      compressor.compress(buffer, compressedByteBuffer);
+      buffer = compressedByteBuffer;
       bufferSize = buffer.position();
       buffer.flip();
       compressed = true;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
index e6a235c9729..c622e218680 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.storageengine.dataregion.wal.io;
 
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.tsfile.compress.IUnCompressor;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 
@@ -37,7 +38,8 @@ public class WALInputStream extends InputStream implements 
AutoCloseable {
   private final FileChannel channel;
   private final ByteBuffer headerBuffer = ByteBuffer.allocate(Integer.BYTES + 
1);
   private final ByteBuffer compressedHeader = 
ByteBuffer.allocate(Integer.BYTES);
-  private ByteBuffer dataBuffer = null;
+  private ByteBuffer dataBuffer =
+      
ByteBuffer.allocate(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize());
 
   public WALInputStream(File logFile) throws IOException {
     channel = FileChannel.open(logFile.toPath());
@@ -72,13 +74,17 @@ public class WALInputStream extends InputStream implements 
AutoCloseable {
       }
       compressedHeader.flip();
       int uncompressedSize = compressedHeader.getInt();
-      dataBuffer = ByteBuffer.allocateDirect(uncompressedSize);
+      if (uncompressedSize > dataBuffer.capacity()) {
+        // enlarge buffer
+        dataBuffer = ByteBuffer.allocateDirect(uncompressedSize);
+      }
       ByteBuffer compressedData = ByteBuffer.allocateDirect(dataSize);
       if (channel.read(compressedData) != dataSize) {
         throw new IOException("Unexpected end of file");
       }
       compressedData.flip();
       IUnCompressor unCompressor = 
IUnCompressor.getUnCompressor(CompressionType.LZ4);
+      dataBuffer.clear();
       unCompressor.uncompress(compressedData, dataBuffer);
     } else {
       dataBuffer = ByteBuffer.allocateDirect(dataSize);

Reply via email to