This is an automated email from the ASF dual-hosted git repository. marklau99 pushed a commit to branch wal-compress-formal-branch in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit ee767637a941b2ae91e35e8586b056138843dc3f Author: Liu Xuxin <[email protected]> AuthorDate: Mon Jun 17 15:04:57 2024 +0800 Edit according to comment --- .../dataregion/wal/io/WALInputStream.java | 21 ++++++++++++++++++++- .../dataregion/wal/utils/WALEntryPosition.java | 4 +++- .../wal/compression/WALCompressionTest.java | 4 ++-- 3 files changed, 25 insertions(+), 4 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java index 7cdcb63095f..844c06436b7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java @@ -18,6 +18,8 @@ */ package org.apache.iotdb.db.storageengine.dataregion.wal.io; +import org.apache.iotdb.db.utils.MmapUtil; + import org.apache.tsfile.compress.IUnCompressor; import org.apache.tsfile.file.metadata.enums.CompressionType; import org.slf4j.Logger; @@ -27,6 +29,7 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.nio.charset.StandardCharsets; import java.util.Objects; @@ -217,6 +220,9 @@ public class WALInputStream extends InputStream implements AutoCloseable { if (Objects.isNull(dataBuffer) || dataBuffer.capacity() < segmentInfo.uncompressedSize || dataBuffer.capacity() > segmentInfo.uncompressedSize * 2) { + if (!Objects.isNull(dataBuffer)) { + MmapUtil.clean((MappedByteBuffer) dataBuffer); + } dataBuffer = ByteBuffer.allocateDirect(segmentInfo.uncompressedSize); } dataBuffer.clear(); @@ -224,6 +230,9 @@ public class WALInputStream extends InputStream implements AutoCloseable { if (Objects.isNull(compressedBuffer) || compressedBuffer.capacity() < segmentInfo.dataInDiskSize || compressedBuffer.capacity() > segmentInfo.dataInDiskSize * 2) { + if (!Objects.isNull(compressedBuffer)) { + MmapUtil.clean((MappedByteBuffer) compressedBuffer); + } compressedBuffer = ByteBuffer.allocateDirect(segmentInfo.dataInDiskSize); } compressedBuffer.clear(); @@ -241,6 +250,9 @@ public class WALInputStream extends InputStream implements AutoCloseable { if (Objects.isNull(dataBuffer) || dataBuffer.capacity() < segmentInfo.dataInDiskSize || dataBuffer.capacity() > segmentInfo.dataInDiskSize * 2) { + if (!Objects.isNull(dataBuffer)) { + MmapUtil.clean((MappedByteBuffer) dataBuffer); + } dataBuffer = ByteBuffer.allocateDirect(segmentInfo.dataInDiskSize); } dataBuffer.clear(); @@ -271,7 +283,14 @@ public class WALInputStream extends InputStream implements AutoCloseable { } } - public void skipToGivenPosition(long pos) throws IOException { + /** + * Since current WAL file is compressed, but some part of the system need to skip the offset of an + * uncompressed wal file, this method is used to skip to the given logical position. + * + * @param pos The logical offset to skip to + * @throws IOException If the file is broken or the given position is invalid + */ + public void skipToGivenLogicalPosition(long pos) throws IOException { if (version == FileVersion.V2) { channel.position(WALWriter.MAGIC_STRING_BYTES); long posRemain = pos; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java index 1e89d8c546d..c794745c6f5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java @@ -101,8 +101,9 @@ public class WALEntryPosition { if (!canRead()) { throw new IOException("Target file hasn't been specified."); } + // TODO: Reuse the file stream try (WALInputStream is = openReadFileStream()) { - is.skipToGivenPosition(position); + is.skipToGivenLogicalPosition(position); ByteBuffer buffer = ByteBuffer.allocate(size); is.read(buffer); buffer.flip(); @@ -137,6 +138,7 @@ public class WALEntryPosition { } public WALInputStream openReadFileStream() throws IOException { + // TODO: Refactor this part of code if (isInSealedFile()) { walFile = walNode.getWALFile(walFileVersionId); return new WALInputStream(walFile); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/compression/WALCompressionTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/compression/WALCompressionTest.java index 7ab33343b5d..d187f6107b6 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/compression/WALCompressionTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/compression/WALCompressionTest.java @@ -150,7 +150,7 @@ public class WALCompressionTest { try (WALInputStream stream = new WALInputStream(walFile)) { for (int i = 0; i < 100; ++i) { Pair<Long, InsertRowNode> positionAndNodePair = positionAndEntryPairList.get(i); - stream.skipToGivenPosition(positionAndNodePair.left); + stream.skipToGivenLogicalPosition(positionAndNodePair.left); /* Add the allocated buffer size by 2, because the actual serialized size of InsertRowNode is larger than the estimated value got by serializedSize. @@ -231,7 +231,7 @@ public class WALCompressionTest { } dataOutputStream.close(); ByteBuffer buf = ByteBuffer.wrap(baos.toByteArray()); - // Do not compress it + // Compress it IoTDBDescriptor.getInstance().getConfig().setWALCompressionAlgorithm(CompressionType.LZ4); WALTestUtils.setMinCompressionSize(0); try (WALWriter writer = new WALWriter(walFile)) {
