This is an automated email from the ASF dual-hosted git repository. marklau99 pushed a commit to branch compressed-wal in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4c1ba9d067a291a8da06bf11954f567abdc01dc6 Author: Liu Xuxin <[email protected]> AuthorDate: Thu Jan 25 16:34:55 2024 +0800 support wal write compress --- .../main/java/org/apache/iotdb/SessionExample.java | 18 ++++++------ .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 10 +++++++ .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 5 ++++ .../storageengine/dataregion/wal/io/LogWriter.java | 32 +++++++++++++++++++++- 4 files changed, 55 insertions(+), 10 deletions(-) diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java index 768f276dbad..0b0c2ff8caf 100644 --- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java +++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java @@ -74,7 +74,7 @@ public class SessionExample { .password("root") .version(Version.V_1_0) .build(); - session.open(true); + session.open(false); // set session fetchSize session.setFetchSize(10000); @@ -90,21 +90,21 @@ public class SessionExample { // createTemplate(); createTimeseries(); createMultiTimeseries(); - insertRecord(); - insertTablet(); + // insertRecord(); + // insertTablet(); // insertTabletWithNullValues(); // insertTablets(); - // insertRecords(); + insertRecords(); // insertText(); // selectInto(); // createAndDropContinuousQueries(); // nonQuery(); - query(); + // query(); // queryWithTimeout(); - rawDataQuery(); - lastDataQuery(); - aggregationQuery(); - groupByQuery(); + // rawDataQuery(); + // lastDataQuery(); + // aggregationQuery(); + // groupByQuery(); // queryByIterator(); // deleteData(); // deleteTimeseries(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index eb1c229a7b4..0f5aac66740 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -1060,6 +1060,8 @@ public class IoTDBConfig { /** whether the local write api records audit logs * */ private boolean enableAuditLogForNativeInsertApi = true; + private boolean enableWALCompression = false; + // customizedProperties, this should be empty by default. private Properties customizedProperties = new Properties(); @@ -3811,4 +3813,12 @@ public class IoTDBConfig { double innerCompactionTaskSelectionDiskRedundancy) { this.innerCompactionTaskSelectionDiskRedundancy = innerCompactionTaskSelectionDiskRedundancy; } + + public boolean isEnableWALCompression() { + return enableWALCompression; + } + + public void setEnableWALCompression(boolean enableWALCompression) { + this.enableWALCompression = enableWALCompression; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 76fac28fe61..b6880715dfe 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -442,6 +442,11 @@ public class IoTDBDescriptor { "enable_seq_space_compaction", Boolean.toString(conf.isEnableSeqSpaceCompaction())))); + conf.setEnableWALCompression( + Boolean.parseBoolean( + properties.getProperty( + "enable_wal_compression", Boolean.toString(conf.isEnableWALCompression())))); + conf.setEnableUnseqSpaceCompaction( Boolean.parseBoolean( properties.getProperty( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java index 68f4deae318..7065bd816d6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java @@ -19,8 +19,11 @@ package org.apache.iotdb.db.storageengine.dataregion.wal.io; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry; import org.apache.iotdb.db.storageengine.dataregion.wal.checkpoint.Checkpoint; +import org.apache.iotdb.tsfile.compress.ICompressor; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,6 +47,7 @@ public abstract class LogWriter implements ILogWriter { protected final FileOutputStream logStream; protected final FileChannel logChannel; protected long size; + private final ByteBuffer headerBuffer = ByteBuffer.allocateDirect(Integer.BYTES * 2 + 1); protected LogWriter(File logFile) throws FileNotFoundException { this.logFile = logFile; @@ -53,9 +57,35 @@ public abstract class LogWriter implements ILogWriter { @Override public void write(ByteBuffer buffer) throws IOException { - size += buffer.position(); + int bufferSize = buffer.position(); buffer.flip(); + boolean compressed = false; + int uncompressedSize = bufferSize; + if (IoTDBDescriptor.getInstance().getConfig().isEnableWALCompression() + && bufferSize > 1024 * 1024) { + ICompressor compressor = ICompressor.getCompressor(CompressionType.LZ4); + ByteBuffer compressedBuffer = + ByteBuffer.allocateDirect(compressor.getMaxBytesForCompression(buffer.limit())); + compressor.compress(buffer, compressedBuffer); + logger.error( + "compressed size: {}, original size: {}, compression ratio is {}", + compressedBuffer.position(), + bufferSize, + (double) bufferSize / compressedBuffer.position()); + buffer = compressedBuffer; + bufferSize = buffer.position(); + buffer.flip(); + compressed = true; + } + size += bufferSize; + headerBuffer.clear(); + headerBuffer.putInt(bufferSize); + headerBuffer.put((byte) (compressed ? 1 : 0)); try { + if (compressed) { + headerBuffer.putInt(uncompressedSize); + } + logChannel.write(headerBuffer); logChannel.write(buffer); } catch (ClosedChannelException e) { logger.warn("Cannot write to {}", logFile, e);
