This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch wal-compress-formal-branch
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit ee767637a941b2ae91e35e8586b056138843dc3f
Author: Liu Xuxin <[email protected]>
AuthorDate: Mon Jun 17 15:04:57 2024 +0800

    Edit according to comment
---
 .../dataregion/wal/io/WALInputStream.java           | 21 ++++++++++++++++++++-
 .../dataregion/wal/utils/WALEntryPosition.java      |  4 +++-
 .../wal/compression/WALCompressionTest.java         |  4 ++--
 3 files changed, 25 insertions(+), 4 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
index 7cdcb63095f..844c06436b7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iotdb.db.storageengine.dataregion.wal.io;
 
+import org.apache.iotdb.db.utils.MmapUtil;
+
 import org.apache.tsfile.compress.IUnCompressor;
 import org.apache.tsfile.file.metadata.enums.CompressionType;
 import org.slf4j.Logger;
@@ -27,6 +29,7 @@ import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.charset.StandardCharsets;
 import java.util.Objects;
@@ -217,6 +220,9 @@ public class WALInputStream extends InputStream implements 
AutoCloseable {
       if (Objects.isNull(dataBuffer)
           || dataBuffer.capacity() < segmentInfo.uncompressedSize
           || dataBuffer.capacity() > segmentInfo.uncompressedSize * 2) {
+        if (!Objects.isNull(dataBuffer)) {
+          MmapUtil.clean((MappedByteBuffer) dataBuffer);
+        }
         dataBuffer = ByteBuffer.allocateDirect(segmentInfo.uncompressedSize);
       }
       dataBuffer.clear();
@@ -224,6 +230,9 @@ public class WALInputStream extends InputStream implements 
AutoCloseable {
       if (Objects.isNull(compressedBuffer)
           || compressedBuffer.capacity() < segmentInfo.dataInDiskSize
           || compressedBuffer.capacity() > segmentInfo.dataInDiskSize * 2) {
+        if (!Objects.isNull(compressedBuffer)) {
+          MmapUtil.clean((MappedByteBuffer) compressedBuffer);
+        }
         compressedBuffer = 
ByteBuffer.allocateDirect(segmentInfo.dataInDiskSize);
       }
       compressedBuffer.clear();
@@ -241,6 +250,9 @@ public class WALInputStream extends InputStream implements 
AutoCloseable {
       if (Objects.isNull(dataBuffer)
           || dataBuffer.capacity() < segmentInfo.dataInDiskSize
           || dataBuffer.capacity() > segmentInfo.dataInDiskSize * 2) {
+        if (!Objects.isNull(dataBuffer)) {
+          MmapUtil.clean((MappedByteBuffer) dataBuffer);
+        }
         dataBuffer = ByteBuffer.allocateDirect(segmentInfo.dataInDiskSize);
       }
       dataBuffer.clear();
@@ -271,7 +283,14 @@ public class WALInputStream extends InputStream implements 
AutoCloseable {
     }
   }
 
-  public void skipToGivenPosition(long pos) throws IOException {
+  /**
+   * Since current WAL file is compressed, but some part of the system need to 
skip the offset of an
+   * uncompressed wal file, this method is used to skip to the given logical 
position.
+   *
+   * @param pos The logical offset to skip to
+   * @throws IOException If the file is broken or the given position is invalid
+   */
+  public void skipToGivenLogicalPosition(long pos) throws IOException {
     if (version == FileVersion.V2) {
       channel.position(WALWriter.MAGIC_STRING_BYTES);
       long posRemain = pos;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java
index 1e89d8c546d..c794745c6f5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryPosition.java
@@ -101,8 +101,9 @@ public class WALEntryPosition {
     if (!canRead()) {
       throw new IOException("Target file hasn't been specified.");
     }
+    // TODO: Reuse the file stream
     try (WALInputStream is = openReadFileStream()) {
-      is.skipToGivenPosition(position);
+      is.skipToGivenLogicalPosition(position);
       ByteBuffer buffer = ByteBuffer.allocate(size);
       is.read(buffer);
       buffer.flip();
@@ -137,6 +138,7 @@ public class WALEntryPosition {
   }
 
   public WALInputStream openReadFileStream() throws IOException {
+    // TODO: Refactor this part of code
     if (isInSealedFile()) {
       walFile = walNode.getWALFile(walFileVersionId);
       return new WALInputStream(walFile);
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/compression/WALCompressionTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/compression/WALCompressionTest.java
index 7ab33343b5d..d187f6107b6 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/compression/WALCompressionTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/compression/WALCompressionTest.java
@@ -150,7 +150,7 @@ public class WALCompressionTest {
     try (WALInputStream stream = new WALInputStream(walFile)) {
       for (int i = 0; i < 100; ++i) {
         Pair<Long, InsertRowNode> positionAndNodePair = 
positionAndEntryPairList.get(i);
-        stream.skipToGivenPosition(positionAndNodePair.left);
+        stream.skipToGivenLogicalPosition(positionAndNodePair.left);
         /*
           Add the allocated buffer size by 2, because the actual serialized 
size
           of InsertRowNode is larger than the estimated value got by 
serializedSize.
@@ -231,7 +231,7 @@ public class WALCompressionTest {
     }
     dataOutputStream.close();
     ByteBuffer buf = ByteBuffer.wrap(baos.toByteArray());
-    // Do not compress it
+    // Compress it
     
IoTDBDescriptor.getInstance().getConfig().setWALCompressionAlgorithm(CompressionType.LZ4);
     WALTestUtils.setMinCompressionSize(0);
     try (WALWriter writer = new WALWriter(walFile)) {

Reply via email to