This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 709d714413b WAL Compression enhancement (#12763)
709d714413b is described below
commit 709d714413b9e3a7f4f5d3eabfb6200699ec4167
Author: Potato <[email protected]>
AuthorDate: Fri Jun 28 11:01:45 2024 +0800
WAL Compression enhancement (#12763)
* enhance
Signed-off-by: OneSizeFitQuorum <[email protected]>
* fix huge write
Signed-off-by: OneSizeFitQuorum <[email protected]>
* fix bug when recovering old version of wal
* optimize the code
* fix bug
Signed-off-by: OneSizeFitQuorum <[email protected]>
* fix testFileWithoutMagicString & make unknown version records v2 magin
string
Signed-off-by: OneSizeFitQuorum <[email protected]>
* fix review issue
Signed-off-by: OneSizeFitQuorum <[email protected]>
* enhance version judgement
Signed-off-by: OneSizeFitQuorum <[email protected]>
* remove direct buffer
Signed-off-by: OneSizeFitQuorum <[email protected]>
* enhance test
Signed-off-by: OneSizeFitQuorum <[email protected]>
* fix bug
Signed-off-by: OneSizeFitQuorum <[email protected]>
---------
Signed-off-by: OneSizeFitQuorum <[email protected]>
Co-authored-by: Liu Xuxin <[email protected]>
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +-
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 2 +-
.../dataregion/wal/buffer/WALBuffer.java | 2 +-
.../dataregion/wal/io/CheckpointWriter.java | 2 +-
.../storageengine/dataregion/wal/io/LogWriter.java | 46 +++---
.../dataregion/wal/io/WALFileVersion.java | 61 ++++++++
.../dataregion/wal/io/WALInputStream.java | 157 ++++++++++-----------
.../dataregion/wal/io/WALMetaData.java | 21 ++-
.../storageengine/dataregion/wal/io/WALWriter.java | 36 +++--
.../dataregion/wal/recover/WALRecoverWriter.java | 31 ++--
.../dataregion/wal/utils/WALEntryPosition.java | 1 -
.../storageengine/dataregion/wal/WALTestUtils.java | 4 +-
.../wal/compression/WALCompressionTest.java | 37 ++---
.../wal/recover/WALRecoverWriterTest.java | 4 +-
.../conf/iotdb-system.properties.template | 7 +-
15 files changed, 250 insertions(+), 163 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 69f59575666..c38c17535fa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1138,7 +1138,7 @@ public class IoTDBConfig {
*/
private String RateLimiterType = "FixedIntervalRateLimiter";
- private CompressionType WALCompressionAlgorithm =
CompressionType.UNCOMPRESSED;
+ private CompressionType WALCompressionAlgorithm = CompressionType.LZ4;
IoTDBConfig() {}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 3b3387339de..2fb8ddc7f35 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -417,7 +417,7 @@ public class IoTDBDescriptor {
"io_task_queue_size_for_flushing",
Integer.toString(conf.getIoTaskQueueSizeForFlushing()))));
boolean enableWALCompression =
- Boolean.parseBoolean(properties.getProperty("enable_wal_compression",
"false"));
+ Boolean.parseBoolean(properties.getProperty("enable_wal_compression",
"true"));
conf.setWALCompressionAlgorithm(
enableWALCompression ? CompressionType.LZ4 :
CompressionType.UNCOMPRESSED);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java
index 20970aea34b..63296491396 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java
@@ -72,7 +72,7 @@ import static
org.apache.iotdb.db.storageengine.dataregion.wal.node.WALNode.DEFA
public class WALBuffer extends AbstractWALBuffer {
private static final Logger logger =
LoggerFactory.getLogger(WALBuffer.class);
private static final IoTDBConfig config =
IoTDBDescriptor.getInstance().getConfig();
- private static final int HALF_WAL_BUFFER_SIZE = config.getWalBufferSize() /
2;
+ public static final int HALF_WAL_BUFFER_SIZE = config.getWalBufferSize() / 2;
private static final double FSYNC_BUFFER_RATIO = 0.95;
private static final int QUEUE_CAPACITY = config.getWalBufferQueueCapacity();
private static final WritingMetrics WRITING_METRICS =
WritingMetrics.getInstance();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/CheckpointWriter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/CheckpointWriter.java
index 322aa3c9f5d..f31f2e9b846 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/CheckpointWriter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/CheckpointWriter.java
@@ -27,6 +27,6 @@ import java.io.IOException;
/** CheckpointWriter writes the binary {@link Checkpoint} into .checkpoint
file. */
public class CheckpointWriter extends LogWriter {
public CheckpointWriter(File logFile) throws IOException {
- super(logFile);
+ super(logFile, WALFileVersion.V2);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java
index 29335efde5c..e4ea2f93dc3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.storageengine.dataregion.wal.io;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALBuffer;
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry;
import org.apache.iotdb.db.storageengine.dataregion.wal.checkpoint.Checkpoint;
@@ -49,37 +50,43 @@ public abstract class LogWriter implements ILogWriter {
protected final FileChannel logChannel;
protected long size = 0;
protected long originalSize = 0;
- private final ByteBuffer headerBuffer = ByteBuffer.allocate(Integer.BYTES *
2 + 1);
+
+ /**
+ * 1 byte for whether enable compression, 4 byte for compressedSize, 4 byte
for uncompressedSize
+ */
+ private final int COMPRESSED_HEADER_SIZE = Byte.BYTES + Integer.BYTES * 2;
+
+ /** 1 byte for whether enable compression, 4 byte for uncompressedSize */
+ private final int UN_COMPRESSED_HEADER_SIZE = Byte.BYTES + Integer.BYTES;
+
+ private final ByteBuffer headerBuffer =
ByteBuffer.allocate(COMPRESSED_HEADER_SIZE);
private ICompressor compressor =
ICompressor.getCompressor(
IoTDBDescriptor.getInstance().getConfig().getWALCompressionAlgorithm());
private ByteBuffer compressedByteBuffer;
- // Minimum size to compress, default is 32 KB
- private static long minCompressionSize = 32 * 1024L;
- protected LogWriter(File logFile) throws IOException {
+ /** Minimum size to compress, use magic number 32 KB */
+ private static long MIN_COMPRESSION_SIZE = 32 * 1024L;
+
+ protected LogWriter(File logFile, WALFileVersion version) throws IOException
{
this.logFile = logFile;
this.logStream = new FileOutputStream(logFile, true);
this.logChannel = this.logStream.getChannel();
if (!logFile.exists() || logFile.length() == 0) {
this.logChannel.write(
-
ByteBuffer.wrap(WALWriter.MAGIC_STRING.getBytes(StandardCharsets.UTF_8)));
+ ByteBuffer.wrap(
+ version == WALFileVersion.V1
+ ? WALWriter.MAGIC_STRING_V1.getBytes(StandardCharsets.UTF_8)
+ :
WALWriter.MAGIC_STRING_V2.getBytes(StandardCharsets.UTF_8)));
size += logChannel.position();
}
- if (IoTDBDescriptor.getInstance().getConfig().getWALCompressionAlgorithm()
- != CompressionType.UNCOMPRESSED) {
- // TODO: Use a dynamic strategy to enlarge the buffer size
- compressedByteBuffer =
- ByteBuffer.allocate(
- compressor.getMaxBytesForCompression(
-
IoTDBDescriptor.getInstance().getConfig().getWalBufferSize()));
- } else {
- compressedByteBuffer = null;
- }
}
@Override
public double write(ByteBuffer buffer) throws IOException {
+ // To support hot loading, we can't define it as a variable,
+ // because we need to dynamically check whether wal compression is enabled
+ // each time the buffer is serialized
CompressionType compressionType =
IoTDBDescriptor.getInstance().getConfig().getWALCompressionAlgorithm();
int bufferSize = buffer.position();
@@ -92,12 +99,12 @@ public abstract class LogWriter implements ILogWriter {
int uncompressedSize = bufferSize;
if (compressionType != CompressionType.UNCOMPRESSED
/* Do not compress buffer that is less than min size */
- && bufferSize > minCompressionSize) {
+ && bufferSize > MIN_COMPRESSION_SIZE) {
if (Objects.isNull(compressedByteBuffer)) {
+ // TODO: Use a dynamic strategy to enlarge the buffer size
compressedByteBuffer =
ByteBuffer.allocate(
- compressor.getMaxBytesForCompression(
-
IoTDBDescriptor.getInstance().getConfig().getWalBufferSize()));
+
compressor.getMaxBytesForCompression(WALBuffer.HALF_WAL_BUFFER_SIZE));
}
compressedByteBuffer.clear();
if (compressor.getType() != compressionType) {
@@ -108,6 +115,9 @@ public abstract class LogWriter implements ILogWriter {
bufferSize = buffer.position();
buffer.flip();
compressed = true;
+ size += COMPRESSED_HEADER_SIZE;
+ } else {
+ size += UN_COMPRESSED_HEADER_SIZE;
}
size += bufferSize;
/*
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALFileVersion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALFileVersion.java
new file mode 100644
index 00000000000..59e7a5534ce
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALFileVersion.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.storageengine.dataregion.wal.io;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.charset.StandardCharsets;
+
+public enum WALFileVersion {
+ V1,
+ V2,
+ UNKNOWN;
+
+ public static WALFileVersion getVersion(File file) throws IOException {
+ try (FileChannel channel = FileChannel.open(file.toPath())) {
+ return getVersion(channel);
+ }
+ }
+
+ public static WALFileVersion getVersion(FileChannel channel) throws
IOException {
+ long originalPosition = channel.position();
+ try {
+ channel.position(0);
+ ByteBuffer buffer = ByteBuffer.allocate(WALWriter.MAGIC_STRING_V2_BYTES);
+ channel.read(buffer);
+ buffer.flip();
+ if (buffer.remaining() < WALWriter.MAGIC_STRING_V2_BYTES) {
+ return UNKNOWN;
+ }
+ String version = new String(buffer.array(), StandardCharsets.UTF_8);
+ switch (version) {
+ case WALWriter.MAGIC_STRING_V2:
+ return V2;
+ case WALWriter.MAGIC_STRING_V1:
+ return V1;
+ default:
+ return UNKNOWN;
+ }
+ } finally {
+ channel.position(originalPosition);
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
index 844c06436b7..90bfd52e371 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
@@ -18,7 +18,7 @@
*/
package org.apache.iotdb.db.storageengine.dataregion.wal.io;
-import org.apache.iotdb.db.utils.MmapUtil;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.tsfile.compress.IUnCompressor;
import org.apache.tsfile.file.metadata.enums.CompressionType;
@@ -29,7 +29,6 @@ import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
-import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
@@ -38,11 +37,15 @@ public class WALInputStream extends InputStream implements
AutoCloseable {
private static final Logger logger =
LoggerFactory.getLogger(WALInputStream.class);
private final FileChannel channel;
- private final ByteBuffer segmentHeaderBuffer =
ByteBuffer.allocate(Integer.BYTES + Byte.BYTES);
- private final ByteBuffer compressedHeader =
ByteBuffer.allocate(Integer.BYTES);
+
+ /** 1 byte for whether enable compression, 4 byte for compressedSize */
+ private final ByteBuffer segmentHeaderWithoutCompressedSizeBuffer =
+ ByteBuffer.allocate(Integer.BYTES + Byte.BYTES);
+
+ private final ByteBuffer compressedSizeBuffer =
ByteBuffer.allocate(Integer.BYTES);
private ByteBuffer dataBuffer = null;
private ByteBuffer compressedBuffer = null;
- private long fileSize;
+ private final long fileSize;
File logFile;
/*
The WAL file consist of following parts:
@@ -52,24 +55,18 @@ public class WALInputStream extends InputStream implements
AutoCloseable {
*/
private long endOffset = -1;
- enum FileVersion {
- V1,
- V2,
- UNKNOWN
- };
-
- FileVersion version;
+ WALFileVersion version;
public WALInputStream(File logFile) throws IOException {
channel = FileChannel.open(logFile.toPath());
fileSize = channel.size();
+ this.logFile = logFile;
analyzeFileVersion();
getEndOffset();
- this.logFile = logFile;
}
private void getEndOffset() throws IOException {
- if (channel.size() < WALWriter.MAGIC_STRING_BYTES + Integer.BYTES) {
+ if (channel.size() < WALWriter.MAGIC_STRING_V2_BYTES + Integer.BYTES) {
// An broken file
endOffset = channel.size();
return;
@@ -77,27 +74,30 @@ public class WALInputStream extends InputStream implements
AutoCloseable {
ByteBuffer metadataSizeBuf = ByteBuffer.allocate(Integer.BYTES);
long position;
try {
- if (version == FileVersion.V2) {
+ if (version == WALFileVersion.V2) {
// New Version
- ByteBuffer magicStringBuffer =
ByteBuffer.allocate(WALWriter.MAGIC_STRING_BYTES);
- channel.read(magicStringBuffer, channel.size() -
WALWriter.MAGIC_STRING_BYTES);
+ ByteBuffer magicStringBuffer =
ByteBuffer.allocate(WALWriter.MAGIC_STRING_V2_BYTES);
+ channel.read(magicStringBuffer, channel.size() -
WALWriter.MAGIC_STRING_V2_BYTES);
magicStringBuffer.flip();
- if (!new String(magicStringBuffer.array(), StandardCharsets.UTF_8)
- .equals(WALWriter.MAGIC_STRING)) {
- // This is a broken wal file
+ if
(logFile.getName().endsWith(IoTDBConstant.WAL_CHECKPOINT_FILE_SUFFIX)
+ || !new String(magicStringBuffer.array(), StandardCharsets.UTF_8)
+ .equals(WALWriter.MAGIC_STRING_V2)) {
+ // This is a broken wal or checkpoint file
endOffset = channel.size();
return;
} else {
- // This is a normal wal file
- position = channel.size() - WALWriter.MAGIC_STRING_BYTES -
Integer.BYTES;
+ // This is a normal wal file or check point file
+ position = channel.size() - WALWriter.MAGIC_STRING_V2_BYTES -
Integer.BYTES;
}
} else {
+ if
(logFile.getName().endsWith(IoTDBConstant.WAL_CHECKPOINT_FILE_SUFFIX)) {
+ // this is an old check point file
+ endOffset = channel.size();
+ return;
+ }
// Old version
- ByteBuffer magicStringBuffer =
-
ByteBuffer.allocate(WALWriter.MAGIC_STRING_V1.getBytes(StandardCharsets.UTF_8).length);
- channel.read(
- magicStringBuffer,
- channel.size() -
WALWriter.MAGIC_STRING_V1.getBytes(StandardCharsets.UTF_8).length);
+ ByteBuffer magicStringBuffer =
ByteBuffer.allocate(WALWriter.MAGIC_STRING_V1_BYTES);
+ channel.read(magicStringBuffer, channel.size() -
WALWriter.MAGIC_STRING_V1_BYTES);
magicStringBuffer.flip();
if (!new String(magicStringBuffer.array(), StandardCharsets.UTF_8)
.equals(WALWriter.MAGIC_STRING_V1)) {
@@ -105,39 +105,29 @@ public class WALInputStream extends InputStream
implements AutoCloseable {
endOffset = channel.size();
return;
} else {
- position =
- channel.size()
- -
WALWriter.MAGIC_STRING_V1.getBytes(StandardCharsets.UTF_8).length
- - Integer.BYTES;
+ position = channel.size() - WALWriter.MAGIC_STRING_V1_BYTES -
Integer.BYTES;
}
}
- // Read the meta data size
+ // Read the metadata size
channel.read(metadataSizeBuf, position);
metadataSizeBuf.flip();
int metadataSize = metadataSizeBuf.getInt();
- endOffset = channel.size() - WALWriter.MAGIC_STRING_BYTES -
Integer.BYTES - metadataSize - 1;
+ // -1 is for the endmarker
+ endOffset =
+ channel.size() - WALWriter.MAGIC_STRING_V2_BYTES - Integer.BYTES -
metadataSize - 1;
} finally {
- channel.position(WALWriter.MAGIC_STRING_BYTES);
+ if (version == WALFileVersion.V2) {
+ // Set the position back to the end of head magic string
+ channel.position(WALWriter.MAGIC_STRING_V2_BYTES);
+ } else {
+ // There is no head magic string in V1 version
+ channel.position(0);
+ }
}
}
private void analyzeFileVersion() throws IOException {
- if (channel.size() < WALWriter.MAGIC_STRING_BYTES) {
- version = FileVersion.UNKNOWN;
- return;
- }
- if (isCurrentVersion()) {
- this.version = FileVersion.V2;
- return;
- }
- this.version = FileVersion.V1;
- }
-
- private boolean isCurrentVersion() throws IOException {
- channel.position(0);
- ByteBuffer buffer = ByteBuffer.allocate(WALWriter.MAGIC_STRING_BYTES);
- channel.read(buffer);
- return new String(buffer.array(),
StandardCharsets.UTF_8).equals(WALWriter.MAGIC_STRING);
+ version = WALFileVersion.getVersion(channel);
}
@Override
@@ -188,11 +178,11 @@ public class WALInputStream extends InputStream
implements AutoCloseable {
private void loadNextSegment() throws IOException {
if (channel.position() >= endOffset) {
- throw new IOException("End of file");
+ throw new IOException("Reach the end offset of wal file");
}
- if (version == FileVersion.V2) {
+ if (version == WALFileVersion.V2) {
loadNextSegmentV2();
- } else if (version == FileVersion.V1) {
+ } else if (version == WALFileVersion.V1) {
loadNextSegmentV1();
} else {
tryLoadSegment();
@@ -220,20 +210,14 @@ public class WALInputStream extends InputStream
implements AutoCloseable {
if (Objects.isNull(dataBuffer)
|| dataBuffer.capacity() < segmentInfo.uncompressedSize
|| dataBuffer.capacity() > segmentInfo.uncompressedSize * 2) {
- if (!Objects.isNull(dataBuffer)) {
- MmapUtil.clean((MappedByteBuffer) dataBuffer);
- }
- dataBuffer = ByteBuffer.allocateDirect(segmentInfo.uncompressedSize);
+ dataBuffer = ByteBuffer.allocate(segmentInfo.uncompressedSize);
}
dataBuffer.clear();
if (Objects.isNull(compressedBuffer)
|| compressedBuffer.capacity() < segmentInfo.dataInDiskSize
|| compressedBuffer.capacity() > segmentInfo.dataInDiskSize * 2) {
- if (!Objects.isNull(compressedBuffer)) {
- MmapUtil.clean((MappedByteBuffer) compressedBuffer);
- }
- compressedBuffer =
ByteBuffer.allocateDirect(segmentInfo.dataInDiskSize);
+ compressedBuffer = ByteBuffer.allocate(segmentInfo.dataInDiskSize);
}
compressedBuffer.clear();
// limit the buffer to prevent it from reading too much byte than
expected
@@ -250,10 +234,7 @@ public class WALInputStream extends InputStream implements
AutoCloseable {
if (Objects.isNull(dataBuffer)
|| dataBuffer.capacity() < segmentInfo.dataInDiskSize
|| dataBuffer.capacity() > segmentInfo.dataInDiskSize * 2) {
- if (!Objects.isNull(dataBuffer)) {
- MmapUtil.clean((MappedByteBuffer) dataBuffer);
- }
- dataBuffer = ByteBuffer.allocateDirect(segmentInfo.dataInDiskSize);
+ dataBuffer = ByteBuffer.allocate(segmentInfo.dataInDiskSize);
}
dataBuffer.clear();
// limit the buffer to prevent it from reading too much byte than
expected
@@ -270,17 +251,15 @@ public class WALInputStream extends InputStream
implements AutoCloseable {
long originPosition = channel.position();
try {
loadNextSegmentV2();
- version = FileVersion.V2;
+ version = WALFileVersion.V2;
+ return;
} catch (Throwable e) {
// failed to load in V2 way, try in V1 way
logger.warn("Failed to load WAL segment in V2 way, try in V1 way", e);
channel.position(originPosition);
}
-
- if (version == FileVersion.UNKNOWN) {
- loadNextSegmentV1();
- version = FileVersion.V1;
- }
+ loadNextSegmentV1();
+ version = WALFileVersion.V1;
}
/**
@@ -291,15 +270,16 @@ public class WALInputStream extends InputStream
implements AutoCloseable {
* @throws IOException If the file is broken or the given position is invalid
*/
public void skipToGivenLogicalPosition(long pos) throws IOException {
- if (version == FileVersion.V2) {
- channel.position(WALWriter.MAGIC_STRING_BYTES);
+ if (version == WALFileVersion.V2) {
+ channel.position(WALWriter.MAGIC_STRING_V2_BYTES);
long posRemain = pos;
- SegmentInfo segmentInfo = null;
+ SegmentInfo segmentInfo;
do {
+ long currentPos = channel.position();
segmentInfo = getNextSegmentInfo();
if (posRemain >= segmentInfo.uncompressedSize) {
posRemain -= segmentInfo.uncompressedSize;
- channel.position(channel.position() + segmentInfo.dataInDiskSize);
+ channel.position(currentPos + segmentInfo.dataInDiskSize +
segmentInfo.headerSize());
} else {
break;
}
@@ -340,26 +320,33 @@ public class WALInputStream extends InputStream
implements AutoCloseable {
}
private SegmentInfo getNextSegmentInfo() throws IOException {
- segmentHeaderBuffer.clear();
- channel.read(segmentHeaderBuffer);
- segmentHeaderBuffer.flip();
+ segmentHeaderWithoutCompressedSizeBuffer.clear();
+ channel.read(segmentHeaderWithoutCompressedSizeBuffer);
+ segmentHeaderWithoutCompressedSizeBuffer.flip();
SegmentInfo info = new SegmentInfo();
- info.compressionType =
CompressionType.deserialize(segmentHeaderBuffer.get());
- info.dataInDiskSize = segmentHeaderBuffer.getInt();
+ info.compressionType =
+
CompressionType.deserialize(segmentHeaderWithoutCompressedSizeBuffer.get());
+ info.dataInDiskSize = segmentHeaderWithoutCompressedSizeBuffer.getInt();
if (info.compressionType != CompressionType.UNCOMPRESSED) {
- compressedHeader.clear();
- channel.read(compressedHeader);
- compressedHeader.flip();
- info.uncompressedSize = compressedHeader.getInt();
+ compressedSizeBuffer.clear();
+ channel.read(compressedSizeBuffer);
+ compressedSizeBuffer.flip();
+ info.uncompressedSize = compressedSizeBuffer.getInt();
} else {
info.uncompressedSize = info.dataInDiskSize;
}
return info;
}
- private class SegmentInfo {
+ private static class SegmentInfo {
public CompressionType compressionType;
public int dataInDiskSize;
public int uncompressedSize;
+
+ int headerSize() {
+ return compressionType == CompressionType.UNCOMPRESSED
+ ? Byte.BYTES + Integer.BYTES
+ : Byte.BYTES + Integer.BYTES * 2;
+ }
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALMetaData.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALMetaData.java
index 7fa634ffbdc..ecd44268f59 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALMetaData.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALMetaData.java
@@ -40,6 +40,7 @@ import java.util.Set;
* entry and the number of entries.
*/
public class WALMetaData implements SerializedSize {
+
private static final Logger logger =
LoggerFactory.getLogger(WALMetaData.class);
// search index 8 byte, wal entries' number 4 bytes
private static final int FIXED_SERIALIZED_SIZE = Long.BYTES + Integer.BYTES;
@@ -85,7 +86,7 @@ public class WALMetaData implements SerializedSize {
+ (memTablesId.isEmpty() ? 0 : Integer.BYTES + memTablesId.size() *
Long.BYTES);
}
- public void serialize(File file, ByteBuffer buffer) {
+ public void serialize(ByteBuffer buffer) {
buffer.putLong(firstSearchIndex);
buffer.putInt(buffersSize.size());
for (int size : buffersSize) {
@@ -129,12 +130,20 @@ public class WALMetaData implements SerializedSize {
}
public static WALMetaData readFromWALFile(File logFile, FileChannel channel)
throws IOException {
- if (channel.size() < WALWriter.MAGIC_STRING_BYTES ||
!isValidMagicString(channel)) {
+ if (channel.size() < WALWriter.MAGIC_STRING_V2_BYTES ||
!isValidMagicString(channel)) {
throw new IOException(String.format("Broken wal file %s", logFile));
}
+
// load metadata size
ByteBuffer metadataSizeBuf = ByteBuffer.allocate(Integer.BYTES);
- long position = channel.size() - WALWriter.MAGIC_STRING_BYTES -
Integer.BYTES;
+ long position;
+ WALFileVersion version = WALFileVersion.getVersion(channel);
+ position =
+ channel.size()
+ - Integer.BYTES
+ - (version == WALFileVersion.V2
+ ? WALWriter.MAGIC_STRING_V2_BYTES
+ : WALWriter.MAGIC_STRING_V1_BYTES);
channel.read(metadataSizeBuf, position);
metadataSizeBuf.flip();
// load metadata
@@ -159,11 +168,11 @@ public class WALMetaData implements SerializedSize {
}
private static boolean isValidMagicString(FileChannel channel) throws
IOException {
- ByteBuffer magicStringBytes =
ByteBuffer.allocate(WALWriter.MAGIC_STRING_BYTES);
- channel.read(magicStringBytes, channel.size() -
WALWriter.MAGIC_STRING_BYTES);
+ ByteBuffer magicStringBytes =
ByteBuffer.allocate(WALWriter.MAGIC_STRING_V2_BYTES);
+ channel.read(magicStringBytes, channel.size() -
WALWriter.MAGIC_STRING_V2_BYTES);
magicStringBytes.flip();
String magicString = new String(magicStringBytes.array(),
StandardCharsets.UTF_8);
- return magicString.equals(WALWriter.MAGIC_STRING)
+ return magicString.equals(WALWriter.MAGIC_STRING_V2)
|| magicString.contains(WALWriter.MAGIC_STRING_V1);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALWriter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALWriter.java
index 7017b4be6cb..37ca859fab8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALWriter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALWriter.java
@@ -24,9 +24,6 @@ import
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryType;
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALSignalEntry;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALFileStatus;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -34,17 +31,25 @@ import java.nio.charset.StandardCharsets;
/** WALWriter writes the binary {@link WALEntry} into .wal file. */
public class WALWriter extends LogWriter {
- private static final Logger logger =
LoggerFactory.getLogger(WALWriter.class);
- public static final String MAGIC_STRING_V1 = "WAL";
- public static final String MAGIC_STRING = "V2-WAL";
- public static final int MAGIC_STRING_BYTES =
MAGIC_STRING.getBytes(StandardCharsets.UTF_8).length;
+ public static final String MAGIC_STRING_V1 = "WAL";
+ public static final String MAGIC_STRING_V2 = "V2-WAL";
+ public static final int MAGIC_STRING_V1_BYTES =
+ MAGIC_STRING_V1.getBytes(StandardCharsets.UTF_8).length;
+ public static final int MAGIC_STRING_V2_BYTES =
+ MAGIC_STRING_V2.getBytes(StandardCharsets.UTF_8).length;
private WALFileStatus walFileStatus =
WALFileStatus.CONTAINS_NONE_SEARCH_INDEX;
// wal files' metadata
protected final WALMetaData metaData = new WALMetaData();
+ // By default is V2
+ private WALFileVersion version = WALFileVersion.V2;
public WALWriter(File logFile) throws IOException {
- super(logFile);
+ this(logFile, WALFileVersion.V2);
+ }
+
+ public WALWriter(File logFile, WALFileVersion version) throws IOException {
+ super(logFile, version);
}
/**
@@ -68,14 +73,19 @@ public class WALWriter extends LogWriter {
int metaDataSize = metaData.serializedSize();
ByteBuffer buffer =
ByteBuffer.allocate(
- endMarker.serializedSize() + metaDataSize + Integer.BYTES +
MAGIC_STRING_BYTES);
+ endMarker.serializedSize()
+ + metaDataSize
+ + Integer.BYTES
+ + (version != WALFileVersion.V2 ? MAGIC_STRING_V1_BYTES :
MAGIC_STRING_V2_BYTES));
// mark info part ends
endMarker.serialize(buffer);
// flush meta data
- metaData.serialize(logFile, buffer);
+ metaData.serialize(buffer);
buffer.putInt(metaDataSize);
// add magic string
- buffer.put(MAGIC_STRING.getBytes(StandardCharsets.UTF_8));
+ buffer.put(
+ (version != WALFileVersion.V2 ? MAGIC_STRING_V1 : MAGIC_STRING_V2)
+ .getBytes(StandardCharsets.UTF_8));
size += buffer.position();
writeMetadata(buffer);
}
@@ -101,4 +111,8 @@ public class WALWriter extends LogWriter {
public WALFileStatus getWalFileStatus() {
return walFileStatus;
}
+
+ public void setVersion(WALFileVersion version) {
+ this.version = version;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverWriter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverWriter.java
index 5db0c17c987..900a706006b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverWriter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverWriter.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.storageengine.dataregion.wal.recover;
+import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALFileVersion;
import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALMetaData;
import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALWriter;
@@ -26,10 +27,13 @@ import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
+import java.nio.charset.StandardCharsets;
import java.nio.file.StandardOpenOption;
-import static
org.apache.iotdb.db.storageengine.dataregion.wal.io.WALWriter.MAGIC_STRING;
-import static
org.apache.iotdb.db.storageengine.dataregion.wal.io.WALWriter.MAGIC_STRING_BYTES;
+import static
org.apache.iotdb.db.storageengine.dataregion.wal.io.WALWriter.MAGIC_STRING_V1;
+import static
org.apache.iotdb.db.storageengine.dataregion.wal.io.WALWriter.MAGIC_STRING_V1_BYTES;
+import static
org.apache.iotdb.db.storageengine.dataregion.wal.io.WALWriter.MAGIC_STRING_V2;
+import static
org.apache.iotdb.db.storageengine.dataregion.wal.io.WALWriter.MAGIC_STRING_V2_BYTES;
/** Check whether the wal file is broken and recover it. */
public class WALRecoverWriter {
@@ -42,31 +46,38 @@ public class WALRecoverWriter {
public void recover(WALMetaData metaData) throws IOException {
// locate broken data
long truncateSize;
- if (logFile.length() < MAGIC_STRING_BYTES) { // file without magic string
+ WALFileVersion version = WALFileVersion.getVersion(logFile);
+ if (version == WALFileVersion.UNKNOWN) {
truncateSize = 0;
- } else {
- if (readTailMagic().equals(MAGIC_STRING)) { // complete file
+ } else if (version == WALFileVersion.V2) {
+ if (readTailMagic(MAGIC_STRING_V2_BYTES).equals(MAGIC_STRING_V2)) { //
complete file
return;
} else { // file with broken magic string
truncateSize = metaData.getTruncateOffSet();
}
+ } else {
+ if (readTailMagic(MAGIC_STRING_V1_BYTES).contains(MAGIC_STRING_V1)) {
+ return;
+ } else {
+ truncateSize = metaData.getTruncateOffSet();
+ }
}
// truncate broken data
try (FileChannel channel = FileChannel.open(logFile.toPath(),
StandardOpenOption.APPEND)) {
channel.truncate(truncateSize);
}
// flush metadata
- try (WALWriter walWriter = new WALWriter(logFile)) {
+ try (WALWriter walWriter = new WALWriter(logFile, version)) {
walWriter.updateMetaData(metaData);
}
}
- private String readTailMagic() throws IOException {
+ private String readTailMagic(int size) throws IOException {
try (FileChannel channel = FileChannel.open(logFile.toPath(),
StandardOpenOption.READ)) {
- ByteBuffer magicStringBytes = ByteBuffer.allocate(MAGIC_STRING_BYTES);
- channel.read(magicStringBytes, channel.size() - MAGIC_STRING_BYTES);
+ ByteBuffer magicStringBytes = ByteBuffer.allocate(size);
+ channel.read(magicStringBytes, channel.size() - size);
magicStringBytes.flip();
- return new String(magicStringBytes.array());
+ return new String(magicStringBytes.array(), StandardCharsets.UTF_8);
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java
index c794745c6f5..438e6a897e2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java
@@ -106,7 +106,6 @@ public class WALEntryPosition {
is.skipToGivenLogicalPosition(position);
ByteBuffer buffer = ByteBuffer.allocate(size);
is.read(buffer);
- buffer.flip();
return buffer;
}
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALTestUtils.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALTestUtils.java
index 8f671eb3f03..2e68d8f5532 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALTestUtils.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALTestUtils.java
@@ -36,7 +36,7 @@ public class WALTestUtils {
throws NoSuchFieldException, ClassNotFoundException,
IllegalAccessException {
Class<?> logWriterClass =
Class.forName("org.apache.iotdb.db.storageengine.dataregion.wal.io.LogWriter");
- Field minCompressionSizeField =
logWriterClass.getDeclaredField("minCompressionSize");
+ Field minCompressionSizeField =
logWriterClass.getDeclaredField("MIN_COMPRESSION_SIZE");
minCompressionSizeField.setAccessible(true);
minCompressionSizeField.setLong(null, size);
}
@@ -45,7 +45,7 @@ public class WALTestUtils {
throws ClassNotFoundException, NoSuchFieldException,
IllegalAccessException {
Class<?> logWriterClass =
Class.forName("org.apache.iotdb.db.storageengine.dataregion.wal.io.LogWriter");
- Field minCompressionSizeField =
logWriterClass.getDeclaredField("minCompressionSize");
+ Field minCompressionSizeField =
logWriterClass.getDeclaredField("MIN_COMPRESSION_SIZE");
minCompressionSizeField.setAccessible(true);
return minCompressionSizeField.getLong(null);
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/compression/WALCompressionTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/compression/WALCompressionTest.java
index d187f6107b6..425a16a26d9 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/compression/WALCompressionTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/compression/WALCompressionTest.java
@@ -126,17 +126,17 @@ public class WALCompressionTest {
throws QueryProcessException, IllegalPathException, IOException {
LogWriter writer = new WALWriter(walFile);
ByteBuffer buffer = ByteBuffer.allocate(1024 * 4);
- List<Pair<Long, InsertRowNode>> positionAndEntryPairList = new
ArrayList<>();
+ List<Pair<Long, Integer>> positionAndEntryPairList = new ArrayList<>();
int memTableId = 0;
long fileOffset = 0;
for (int i = 0; i < 100; ) {
InsertRowNode insertRowNode = WALTestUtils.getInsertRowNode(devicePath +
memTableId, i);
- long serializedSize = insertRowNode.serializedSize();
- if (buffer.remaining() >= serializedSize) {
+ if (buffer.remaining() >= buffer.capacity() / 4) {
int pos = buffer.position();
insertRowNode.serialize(buffer);
- positionAndEntryPairList.add(new Pair<>(fileOffset, insertRowNode));
- fileOffset += buffer.position() - pos;
+ int size = buffer.position() - pos;
+ positionAndEntryPairList.add(new Pair<>(fileOffset, size));
+ fileOffset += size;
i++;
} else {
writer.write(buffer);
@@ -149,19 +149,12 @@ public class WALCompressionTest {
writer.close();
try (WALInputStream stream = new WALInputStream(walFile)) {
for (int i = 0; i < 100; ++i) {
- Pair<Long, InsertRowNode> positionAndNodePair =
positionAndEntryPairList.get(i);
+ Pair<Long, Integer> positionAndNodePair =
positionAndEntryPairList.get(i);
stream.skipToGivenLogicalPosition(positionAndNodePair.left);
- /*
- Add the allocated buffer size by 2, because the actual serialized
size
- of InsertRowNode is larger than the estimated value got by
serializedSize.
- I don't know if this is a bug or not.
- */
- ByteBuffer nodeBuffer1 =
- ByteBuffer.allocate(positionAndNodePair.right.serializedSize() +
2);
+ ByteBuffer nodeBuffer1 =
ByteBuffer.allocate(positionAndNodePair.right);
stream.read(nodeBuffer1);
- ByteBuffer nodeBuffer2 =
- ByteBuffer.allocate(positionAndNodePair.right.serializedSize() +
2);
- positionAndNodePair.right.serialize(nodeBuffer2);
+ ByteBuffer nodeBuffer2 =
ByteBuffer.allocate(positionAndNodePair.right);
+ WALTestUtils.getInsertRowNode(devicePath + memTableId,
i).serialize(nodeBuffer2);
nodeBuffer2.flip();
Assert.assertArrayEquals(nodeBuffer1.array(), nodeBuffer2.array());
}
@@ -192,10 +185,10 @@ public class WALCompressionTest {
try (DataInputStream dataInputStream =
new DataInputStream(new
BufferedInputStream(Files.newInputStream(walFile.toPath())))) {
- byte[] magicStringBytes = new byte[WALWriter.MAGIC_STRING_BYTES];
+ byte[] magicStringBytes = new byte[WALWriter.MAGIC_STRING_V2_BYTES];
// head magic string
dataInputStream.readFully(magicStringBytes);
- Assert.assertEquals(WALWriter.MAGIC_STRING, new
String(magicStringBytes));
+ Assert.assertEquals(WALWriter.MAGIC_STRING_V2, new
String(magicStringBytes));
Assert.assertEquals(
CompressionType.UNCOMPRESSED,
CompressionType.deserialize(dataInputStream.readByte()));
Assert.assertEquals(buf.array().length, dataInputStream.readInt());
@@ -209,7 +202,7 @@ public class WALCompressionTest {
dataInputStream.readFully(metadataBuf.array());
// Tail magic string
dataInputStream.readFully(magicStringBytes);
- Assert.assertEquals(WALWriter.MAGIC_STRING, new
String(magicStringBytes));
+ Assert.assertEquals(WALWriter.MAGIC_STRING_V2, new
String(magicStringBytes));
}
}
@@ -243,10 +236,10 @@ public class WALCompressionTest {
try (DataInputStream dataInputStream =
new DataInputStream(new
BufferedInputStream(Files.newInputStream(walFile.toPath())))) {
- byte[] magicStringBytes = new byte[WALWriter.MAGIC_STRING_BYTES];
+ byte[] magicStringBytes = new byte[WALWriter.MAGIC_STRING_V2_BYTES];
// head magic string
dataInputStream.readFully(magicStringBytes);
- Assert.assertEquals(WALWriter.MAGIC_STRING, new
String(magicStringBytes));
+ Assert.assertEquals(WALWriter.MAGIC_STRING_V2, new
String(magicStringBytes));
Assert.assertEquals(
CompressionType.LZ4,
CompressionType.deserialize(dataInputStream.readByte()));
Assert.assertEquals(compressed.length, dataInputStream.readInt());
@@ -263,7 +256,7 @@ public class WALCompressionTest {
dataInputStream.readFully(metadataBuf.array());
// Tail magic string
dataInputStream.readFully(magicStringBytes);
- Assert.assertEquals(WALWriter.MAGIC_STRING, new
String(magicStringBytes));
+ Assert.assertEquals(WALWriter.MAGIC_STRING_V2, new
String(magicStringBytes));
}
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverWriterTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverWriterTest.java
index 97b00f379f2..7d49c448354 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverWriterTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRecoverWriterTest.java
@@ -77,7 +77,7 @@ public class WALRecoverWriterTest {
Byte.BYTES
+ (Long.BYTES + Integer.BYTES)
+ Integer.BYTES
- + WALWriter.MAGIC_STRING_BYTES * 2,
+ + WALWriter.MAGIC_STRING_V2_BYTES * 2,
logFile.length());
try (WALByteBufReader reader = new WALByteBufReader(logFile)) {
Assert.assertFalse(reader.hasNext());
@@ -102,7 +102,7 @@ public class WALRecoverWriterTest {
Byte.BYTES
+ (Long.BYTES + Integer.BYTES)
+ Integer.BYTES
- + WALWriter.MAGIC_STRING_BYTES * 2,
+ + WALWriter.MAGIC_STRING_V2_BYTES * 2,
logFile.length());
try (WALByteBufReader reader = new WALByteBufReader(logFile)) {
Assert.assertFalse(reader.hasNext());
diff --git
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
index 27f5a17d20d..32d4e8efde2 100644
---
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
+++
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
@@ -1409,9 +1409,12 @@ iot_consensus_throttle_threshold_in_byte=53687091200
iot_consensus_cache_window_time_in_ms=-1
# Enable Write Ahead Log compression.
-# Option: true, false
+# With this parameter enabled, IoTDB can save a lot of IO resources at the
cost of a small amount
+# of additional CPU resources, which is generally suitable for the scenario
+# where CPU is not the bottleneck but IO is the bottleneck.
# effectiveMode: hot_reload
-enable_wal_compression=false
+# Datatype: boolean
+enable_wal_compression=true
####################
### IoTConsensus Configuration