This is an automated email from the ASF dual-hosted git repository.
marklau99 pushed a commit to branch wal-compress
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/wal-compress by this push:
new 531c99bb7ce enable wal compression
531c99bb7ce is described below
commit 531c99bb7cebeb64e14139fd7241d5e925e61bcc
Author: LiuXuxin <[email protected]>
AuthorDate: Thu Mar 28 22:46:06 2024 +0800
enable wal compression
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 10 ++
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 5 +
.../dataregion/wal/io/CheckpointReader.java | 3 +-
.../storageengine/dataregion/wal/io/LogWriter.java | 36 +++++++
.../dataregion/wal/io/WALByteBufReader.java | 7 +-
.../dataregion/wal/io/WALInputStream.java | 103 +++++++++++++++++++++
.../storageengine/dataregion/wal/io/WALReader.java | 6 +-
.../storageengine/dataregion/wal/io/WALWriter.java | 2 +
.../dataregion/wal/utils/WALWriteUtils.java | 10 +-
9 files changed, 169 insertions(+), 13 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 1380a167505..d850748b92d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -94,6 +94,7 @@ public class IoTDBConfig {
"([" + PATH_SEPARATOR + "])?" + NODE_NAME_MATCHER + "(" +
PARTIAL_NODE_MATCHER + ")*";
public static final Pattern NODE_PATTERN = Pattern.compile(NODE_MATCHER);
+ boolean enableWALCompression = false;
/** Whether to enable the mqtt service. */
private boolean enableMQTTService = false;
@@ -3829,4 +3830,13 @@ public class IoTDBConfig {
double innerCompactionTaskSelectionDiskRedundancy) {
this.innerCompactionTaskSelectionDiskRedundancy =
innerCompactionTaskSelectionDiskRedundancy;
}
+
+ public boolean isEnableWALCompression() {
+
+ return enableWALCompression;
+ }
+
+ public void setEnableWALCompression(boolean enableWALCompression) {
+ this.enableWALCompression = enableWALCompression;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 139a1374b44..3b71dc7d27c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -412,6 +412,11 @@ public class IoTDBDescriptor {
"io_task_queue_size_for_flushing",
Integer.toString(conf.getIoTaskQueueSizeForFlushing()))));
+ conf.setEnableWALCompression(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "enable_wal_compression",
Boolean.toString(conf.isEnableWALCompression()))));
+
conf.setCompactionScheduleIntervalInMs(
Long.parseLong(
properties.getProperty(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/CheckpointReader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/CheckpointReader.java
index 5d2bad0a874..081b3ed4a4f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/CheckpointReader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/CheckpointReader.java
@@ -27,7 +27,6 @@ import org.slf4j.LoggerFactory;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.File;
-import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -48,7 +47,7 @@ public class CheckpointReader {
private void init() {
checkpoints = new ArrayList<>();
try (DataInputStream logStream =
- new DataInputStream(new BufferedInputStream(new
FileInputStream(logFile)))) {
+ new DataInputStream(new BufferedInputStream(new
WALInputStream(logFile)))) {
maxMemTableId = logStream.readLong();
while (logStream.available() > 0) {
Checkpoint checkpoint = Checkpoint.deserialize(logStream);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java
index 68f4deae318..978b453e258 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java
@@ -19,8 +19,11 @@
package org.apache.iotdb.db.storageengine.dataregion.wal.io;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry;
import org.apache.iotdb.db.storageengine.dataregion.wal.checkpoint.Checkpoint;
+import org.apache.iotdb.tsfile.compress.ICompressor;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,18 +47,51 @@ public abstract class LogWriter implements ILogWriter {
protected final FileOutputStream logStream;
protected final FileChannel logChannel;
protected long size;
+ protected boolean isEndFile = false;
+ private final ByteBuffer headerBuffer = ByteBuffer.allocate(Integer.BYTES *
2 + 1);
+ private final ICompressor compressor =
ICompressor.getCompressor(CompressionType.LZ4);
+ private final ByteBuffer compressedByteBuffer;
protected LogWriter(File logFile) throws FileNotFoundException {
this.logFile = logFile;
this.logStream = new FileOutputStream(logFile, true);
this.logChannel = this.logStream.getChannel();
+ if (IoTDBDescriptor.getInstance().getConfig().isEnableWALCompression()) {
+ compressedByteBuffer =
+ ByteBuffer.allocate(
+ compressor.getMaxBytesForCompression(
+
IoTDBDescriptor.getInstance().getConfig().getWalBufferSize()));
+ } else {
+ compressedByteBuffer = null;
+ }
}
@Override
public void write(ByteBuffer buffer) throws IOException {
+ int bufferSize = buffer.position();
size += buffer.position();
buffer.flip();
+ boolean compressed = false;
+ int uncompressedSize = bufferSize;
+ if (!isEndFile &&
IoTDBDescriptor.getInstance().getConfig().isEnableWALCompression()
+ /* && bufferSize > 1024 * 512 Do not compress buffer that is less than
512KB */ ) {
+ compressedByteBuffer.clear();
+ compressor.compress(buffer, compressedByteBuffer);
+ buffer = compressedByteBuffer;
+ bufferSize = buffer.position();
+ buffer.flip();
+ compressed = true;
+ }
+ size += bufferSize;
+ headerBuffer.clear();
+ headerBuffer.putInt(bufferSize);
+ headerBuffer.put((byte) (compressed ? 1 : 0));
try {
+ if (compressed) {
+ headerBuffer.putInt(uncompressedSize);
+ }
+ headerBuffer.flip();
+ logChannel.write(headerBuffer);
logChannel.write(buffer);
} catch (ClosedChannelException e) {
logger.warn("Cannot write to {}", logFile, e);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALByteBufReader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALByteBufReader.java
index f101eaf3647..ad3b7479de9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALByteBufReader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALByteBufReader.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.storageengine.dataregion.wal.io;
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry;
import java.io.Closeable;
+import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -37,6 +38,7 @@ public class WALByteBufReader implements Closeable {
private final File logFile;
private final FileChannel channel;
private final WALMetaData metaData;
+ private final DataInputStream logStream;
private final Iterator<Integer> sizeIterator;
public WALByteBufReader(File logFile) throws IOException {
@@ -46,6 +48,7 @@ public class WALByteBufReader implements Closeable {
public WALByteBufReader(File logFile, FileChannel channel) throws
IOException {
this.logFile = logFile;
this.channel = channel;
+ this.logStream = new DataInputStream(new WALInputStream(logFile));
this.metaData = WALMetaData.readFromWALFile(logFile, channel);
this.sizeIterator = metaData.getBuffersSize().iterator();
channel.position(0);
@@ -64,8 +67,8 @@ public class WALByteBufReader implements Closeable {
public ByteBuffer next() throws IOException {
int size = sizeIterator.next();
ByteBuffer buffer = ByteBuffer.allocate(size);
- channel.read(buffer);
- buffer.clear();
+ logStream.readFully(buffer.array(), 0, size);
+ buffer.flip();
return buffer;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
new file mode 100644
index 00000000000..8e742b3cb1b
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.storageengine.dataregion.wal.io;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.tsfile.compress.IUnCompressor;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.Objects;
+
+public class WALInputStream extends InputStream implements AutoCloseable {
+
+ private static final Logger logger =
LoggerFactory.getLogger(WALInputStream.class);
+ private final FileChannel channel;
+ private final ByteBuffer headerBuffer = ByteBuffer.allocate(Integer.BYTES +
1);
+ private final ByteBuffer compressedHeader =
ByteBuffer.allocate(Integer.BYTES);
+ private ByteBuffer dataBuffer =
+ ByteBuffer.allocate(
+ IoTDBDescriptor.getInstance().getConfig().getWalBufferSize()); //
uncompressed data buffer
+
+ public WALInputStream(File logFile) throws IOException {
+ channel = FileChannel.open(logFile.toPath());
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (Objects.isNull(dataBuffer) || dataBuffer.position() ==
dataBuffer.limit()) {
+ loadNextSegment();
+ }
+ return dataBuffer.get() & 0xFF;
+ }
+
+ @Override
+ public void close() throws IOException {
+ channel.close();
+ dataBuffer = null;
+ }
+
+ @Override
+ public int available() throws IOException {
+ return (int) (channel.size() - channel.position());
+ }
+
+ private void loadNextSegment() throws IOException {
+ headerBuffer.clear();
+ if (channel.read(headerBuffer) != Integer.BYTES + 1) {
+ throw new IOException("Unexpected end of file");
+ }
+ headerBuffer.flip();
+ int dataSize = headerBuffer.getInt();
+ boolean isCompressed = headerBuffer.get() == 1;
+ if (isCompressed) {
+ compressedHeader.clear();
+ if (channel.read(compressedHeader) != Integer.BYTES) {
+ throw new IOException("Unexpected end of file");
+ }
+ compressedHeader.flip();
+ int uncompressedSize = compressedHeader.getInt();
+ if (uncompressedSize > dataBuffer.capacity()) {
+ // enlarge buffer
+ dataBuffer = ByteBuffer.allocateDirect(uncompressedSize);
+ }
+ ByteBuffer compressedData = ByteBuffer.allocateDirect(dataSize);
+ if (channel.read(compressedData) != dataSize) {
+ throw new IOException("Unexpected end of file");
+ }
+ compressedData.flip();
+ IUnCompressor unCompressor =
IUnCompressor.getUnCompressor(CompressionType.LZ4);
+ dataBuffer.clear();
+ unCompressor.uncompress(compressedData, dataBuffer);
+ } else {
+ dataBuffer = ByteBuffer.allocateDirect(dataSize);
+ if (channel.read(dataBuffer) != dataSize) {
+ throw new IOException("Unexpected end of file");
+ }
+ }
+ dataBuffer.flip();
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALReader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALReader.java
index ee50c73df97..475ea2b0b2d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALReader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALReader.java
@@ -26,12 +26,10 @@ import
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.BufferedInputStream;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
-import java.nio.file.Files;
import java.util.Iterator;
import java.util.NoSuchElementException;
@@ -57,9 +55,7 @@ public class WALReader implements Closeable {
public WALReader(File logFile, boolean fileMayCorrupt) throws IOException {
this.logFile = logFile;
this.fileMayCorrupt = fileMayCorrupt;
- this.logStream =
- new DataInputStream(
- new BufferedInputStream(Files.newInputStream(logFile.toPath()),
STREAM_BUFFER_SIZE));
+ this.logStream = new DataInputStream(new WALInputStream(logFile));
}
/** Like {@link Iterator#hasNext()}. */
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALWriter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALWriter.java
index 425fc676fad..20ae9975450 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALWriter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALWriter.java
@@ -59,6 +59,7 @@ public class WALWriter extends LogWriter {
}
private void endFile() throws IOException {
+ this.isEndFile = true;
WALSignalEntry endMarker = new
WALSignalEntry(WALEntryType.WAL_FILE_INFO_END_MARKER);
int metaDataSize = metaData.serializedSize();
ByteBuffer buffer =
@@ -72,6 +73,7 @@ public class WALWriter extends LogWriter {
// add magic string
buffer.put(MAGIC_STRING.getBytes());
write(buffer);
+ this.isEndFile = false;
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALWriteUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALWriteUtils.java
index d5702e7004a..633a8153b66 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALWriteUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALWriteUtils.java
@@ -126,10 +126,12 @@ public class WALWriteUtils {
return write(NO_BYTE_TO_READ, buffer);
}
int len = 0;
- byte[] bytes = s.getBytes();
- len += write(bytes.length, buffer);
- buffer.put(bytes);
- len += bytes.length;
+ len += write(s.length(), buffer);
+ for (int i = 0; i < s.length(); i++) {
+ char c = s.charAt(i);
+ buffer.put((byte) c); // ascii only
+ }
+ len += s.length();
return len;
}