This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch compressed-wal
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit dc712a9b26311a51fea26aa66f01fa5fc4148d6c
Author: Liu Xuxin <[email protected]>
AuthorDate: Thu Jan 25 17:34:31 2024 +0800

    support wal read compress
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  2 +-
 .../dataregion/wal/io/CheckpointReader.java        |  5 +-
 .../storageengine/dataregion/wal/io/LogWriter.java |  6 +-
 .../dataregion/wal/io/WALInputStream.java          | 92 ++++++++++++++++++++++
 .../storageengine/dataregion/wal/io/WALReader.java |  6 +-
 5 files changed, 99 insertions(+), 12 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 0f5aac66740..ae9942f1607 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1060,7 +1060,7 @@ public class IoTDBConfig {
   /** whether the local write api records audit logs * */
   private boolean enableAuditLogForNativeInsertApi = true;
 
-  private boolean enableWALCompression = false;
+  private boolean enableWALCompression = true;
 
   // customizedProperties, this should be empty by default.
   private Properties customizedProperties = new Properties();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/CheckpointReader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/CheckpointReader.java
index 5d2bad0a874..578ab21ae8c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/CheckpointReader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/CheckpointReader.java
@@ -24,10 +24,8 @@ import 
org.apache.iotdb.db.storageengine.dataregion.wal.checkpoint.Checkpoint;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedInputStream;
 import java.io.DataInputStream;
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -47,8 +45,7 @@ public class CheckpointReader {
 
   private void init() {
     checkpoints = new ArrayList<>();
-    try (DataInputStream logStream =
-        new DataInputStream(new BufferedInputStream(new 
FileInputStream(logFile)))) {
+    try (DataInputStream logStream = new DataInputStream(new 
WALInputStream(logFile))) {
       maxMemTableId = logStream.readLong();
       while (logStream.available() > 0) {
         Checkpoint checkpoint = Checkpoint.deserialize(logStream);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java
index 7065bd816d6..991a44a3700 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java
@@ -47,7 +47,7 @@ public abstract class LogWriter implements ILogWriter {
   protected final FileOutputStream logStream;
   protected final FileChannel logChannel;
   protected long size;
-  private final ByteBuffer headerBuffer = 
ByteBuffer.allocateDirect(Integer.BYTES * 2 + 1);
+  private final ByteBuffer headerBuffer = ByteBuffer.allocate(Integer.BYTES * 
2 + 1);
 
   protected LogWriter(File logFile) throws FileNotFoundException {
     this.logFile = logFile;
@@ -62,7 +62,7 @@ public abstract class LogWriter implements ILogWriter {
     boolean compressed = false;
     int uncompressedSize = bufferSize;
     if (IoTDBDescriptor.getInstance().getConfig().isEnableWALCompression()
-        && bufferSize > 1024 * 1024) {
+        && bufferSize > 1024 * 512 /* Do not compress buffer that is less than 
512KB */) {
       ICompressor compressor = ICompressor.getCompressor(CompressionType.LZ4);
       ByteBuffer compressedBuffer =
           
ByteBuffer.allocateDirect(compressor.getMaxBytesForCompression(buffer.limit()));
@@ -82,9 +82,11 @@ public abstract class LogWriter implements ILogWriter {
     headerBuffer.putInt(bufferSize);
     headerBuffer.put((byte) (compressed ? 1 : 0));
     try {
+      logger.error("Channel's offset is {}", logChannel.position());
       if (compressed) {
         headerBuffer.putInt(uncompressedSize);
       }
+      headerBuffer.flip();
       logChannel.write(headerBuffer);
       logChannel.write(buffer);
     } catch (ClosedChannelException e) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
new file mode 100644
index 00000000000..34354beecb7
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.storageengine.dataregion.wal.io;
+
+import org.apache.iotdb.tsfile.compress.IUnCompressor;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.Objects;
+
+public class WALInputStream extends InputStream implements AutoCloseable {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(WALInputStream.class);
+  private final FileChannel channel;
+  private final ByteBuffer headerBuffer = ByteBuffer.allocate(Integer.BYTES + 
1);
+  private final ByteBuffer compressedHeader = 
ByteBuffer.allocate(Integer.BYTES);
+  private ByteBuffer dataBuffer = null;
+
+  public WALInputStream(File logFile) throws IOException {
+    channel = FileChannel.open(logFile.toPath());
+  }
+
+  @Override
+  public int read() throws IOException {
+    if (Objects.isNull(dataBuffer) || dataBuffer.position() == 
dataBuffer.limit()) {
+      loadNextSegment();
+    }
+    return dataBuffer.get() & 0xFF;
+  }
+
+  @Override
+  public void close() throws IOException {
+    channel.close();
+    dataBuffer = null;
+  }
+
+  private void loadNextSegment() throws IOException {
+    headerBuffer.clear();
+    logger.error("channel's offset is {}", channel.position());
+    if (channel.read(headerBuffer) != Integer.BYTES + 1) {
+      throw new IOException("Unexpected end of file");
+    }
+    headerBuffer.flip();
+    int dataSize = headerBuffer.getInt();
+    boolean isCompressed = headerBuffer.get() == 1;
+    if (isCompressed) {
+      compressedHeader.clear();
+      if (channel.read(compressedHeader) != Integer.BYTES) {
+        throw new IOException("Unexpected end of file");
+      }
+      compressedHeader.flip();
+      int uncompressedSize = compressedHeader.getInt();
+      dataBuffer = ByteBuffer.allocateDirect(uncompressedSize);
+      ByteBuffer compressedData = ByteBuffer.allocateDirect(dataSize);
+      if (channel.read(compressedData) != dataSize) {
+        throw new IOException("Unexpected end of file");
+      }
+      compressedData.flip();
+      IUnCompressor unCompressor = 
IUnCompressor.getUnCompressor(CompressionType.LZ4);
+      unCompressor.uncompress(compressedData, dataBuffer);
+    } else {
+      dataBuffer = ByteBuffer.allocateDirect(dataSize);
+      if (channel.read(dataBuffer) != dataSize) {
+        throw new IOException("Unexpected end of file");
+      }
+    }
+    dataBuffer.flip();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALReader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALReader.java
index ee50c73df97..475ea2b0b2d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALReader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALReader.java
@@ -26,12 +26,10 @@ import 
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedInputStream;
 import java.io.Closeable;
 import java.io.DataInputStream;
 import java.io.File;
 import java.io.IOException;
-import java.nio.file.Files;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
 
@@ -57,9 +55,7 @@ public class WALReader implements Closeable {
   public WALReader(File logFile, boolean fileMayCorrupt) throws IOException {
     this.logFile = logFile;
     this.fileMayCorrupt = fileMayCorrupt;
-    this.logStream =
-        new DataInputStream(
-            new BufferedInputStream(Files.newInputStream(logFile.toPath()), 
STREAM_BUFFER_SIZE));
+    this.logStream = new DataInputStream(new WALInputStream(logFile));
   }
 
   /** Like {@link Iterator#hasNext()}. */

Reply via email to