This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch compressed-wal
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 4c1ba9d067a291a8da06bf11954f567abdc01dc6
Author: Liu Xuxin <[email protected]>
AuthorDate: Thu Jan 25 16:34:55 2024 +0800

    support wal write compress
---
 .../main/java/org/apache/iotdb/SessionExample.java | 18 ++++++------
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 10 +++++++
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  5 ++++
 .../storageengine/dataregion/wal/io/LogWriter.java | 32 +++++++++++++++++++++-
 4 files changed, 55 insertions(+), 10 deletions(-)

diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java 
b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index 768f276dbad..0b0c2ff8caf 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -74,7 +74,7 @@ public class SessionExample {
             .password("root")
             .version(Version.V_1_0)
             .build();
-    session.open(true);
+    session.open(false);
 
     // set session fetchSize
     session.setFetchSize(10000);
@@ -90,21 +90,21 @@ public class SessionExample {
     //     createTemplate();
     createTimeseries();
     createMultiTimeseries();
-    insertRecord();
-    insertTablet();
+    //    insertRecord();
+    //    insertTablet();
     //    insertTabletWithNullValues();
     //    insertTablets();
-    //    insertRecords();
+    insertRecords();
     //    insertText();
     //    selectInto();
     //    createAndDropContinuousQueries();
     //    nonQuery();
-    query();
+    //    query();
     //    queryWithTimeout();
-    rawDataQuery();
-    lastDataQuery();
-    aggregationQuery();
-    groupByQuery();
+    //    rawDataQuery();
+    //    lastDataQuery();
+    //    aggregationQuery();
+    //    groupByQuery();
     //    queryByIterator();
     //    deleteData();
     //    deleteTimeseries();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index eb1c229a7b4..0f5aac66740 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1060,6 +1060,8 @@ public class IoTDBConfig {
   /** whether the local write api records audit logs * */
   private boolean enableAuditLogForNativeInsertApi = true;
 
+  private boolean enableWALCompression = false;
+
   // customizedProperties, this should be empty by default.
   private Properties customizedProperties = new Properties();
 
@@ -3811,4 +3813,12 @@ public class IoTDBConfig {
       double innerCompactionTaskSelectionDiskRedundancy) {
     this.innerCompactionTaskSelectionDiskRedundancy = 
innerCompactionTaskSelectionDiskRedundancy;
   }
+
+  public boolean isEnableWALCompression() {
+    return enableWALCompression;
+  }
+
+  public void setEnableWALCompression(boolean enableWALCompression) {
+    this.enableWALCompression = enableWALCompression;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 76fac28fe61..b6880715dfe 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -442,6 +442,11 @@ public class IoTDBDescriptor {
                 "enable_seq_space_compaction",
                 Boolean.toString(conf.isEnableSeqSpaceCompaction()))));
 
+    conf.setEnableWALCompression(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "enable_wal_compression", 
Boolean.toString(conf.isEnableWALCompression()))));
+
     conf.setEnableUnseqSpaceCompaction(
         Boolean.parseBoolean(
             properties.getProperty(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java
index 68f4deae318..7065bd816d6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java
@@ -19,8 +19,11 @@
 
 package org.apache.iotdb.db.storageengine.dataregion.wal.io;
 
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry;
 import org.apache.iotdb.db.storageengine.dataregion.wal.checkpoint.Checkpoint;
+import org.apache.iotdb.tsfile.compress.ICompressor;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,6 +47,7 @@ public abstract class LogWriter implements ILogWriter {
   protected final FileOutputStream logStream;
   protected final FileChannel logChannel;
   protected long size;
+  private final ByteBuffer headerBuffer = 
ByteBuffer.allocateDirect(Integer.BYTES * 2 + 1);
 
   protected LogWriter(File logFile) throws FileNotFoundException {
     this.logFile = logFile;
@@ -53,9 +57,35 @@ public abstract class LogWriter implements ILogWriter {
 
   @Override
   public void write(ByteBuffer buffer) throws IOException {
-    size += buffer.position();
+    int bufferSize = buffer.position();
     buffer.flip();
+    boolean compressed = false;
+    int uncompressedSize = bufferSize;
+    if (IoTDBDescriptor.getInstance().getConfig().isEnableWALCompression()
+        && bufferSize > 1024 * 1024) {
+      ICompressor compressor = ICompressor.getCompressor(CompressionType.LZ4);
+      ByteBuffer compressedBuffer =
+          
ByteBuffer.allocateDirect(compressor.getMaxBytesForCompression(buffer.limit()));
+      compressor.compress(buffer, compressedBuffer);
+      logger.error(
+          "compressed size: {}, original size: {}, compression ratio is {}",
+          compressedBuffer.position(),
+          bufferSize,
+          (double) bufferSize / compressedBuffer.position());
+      buffer = compressedBuffer;
+      bufferSize = buffer.position();
+      buffer.flip();
+      compressed = true;
+    }
+    size += bufferSize;
+    headerBuffer.clear();
+    headerBuffer.putInt(bufferSize);
+    headerBuffer.put((byte) (compressed ? 1 : 0));
     try {
+      if (compressed) {
+        headerBuffer.putInt(uncompressedSize);
+      }
+      logChannel.write(headerBuffer);
       logChannel.write(buffer);
     } catch (ClosedChannelException e) {
       logger.warn("Cannot write to {}", logFile, e);

Reply via email to