This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 1907f4f804f Fix WAL file end marker (#13125)
1907f4f804f is described below

commit 1907f4f804fe459e986fb8a055c7cf3595684f9c
Author: shuwenwei <[email protected]>
AuthorDate: Tue Aug 13 14:27:10 2024 +0800

    Fix WAL file end marker (#13125)
    
    * fix WAL file end marker
    
    * fix ut
---
 .../db/storageengine/dataregion/wal/io/ILogWriter.java      | 13 +++++++++++++
 .../iotdb/db/storageengine/dataregion/wal/io/LogWriter.java | 11 +++++++++--
 .../db/storageengine/dataregion/wal/io/WALInputStream.java  |  6 +++---
 .../iotdb/db/storageengine/dataregion/wal/io/WALReader.java |  3 +++
 .../iotdb/db/storageengine/dataregion/wal/io/WALWriter.java | 13 ++++++-------
 .../dataregion/wal/compression/WALCompressionTest.java      |  4 ++++
 .../dataregion/wal/recover/WALRepairWriterTest.java         | 10 ++++++----
 7 files changed, 44 insertions(+), 16 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/ILogWriter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/ILogWriter.java
index f4d65612c47..e73a1056b2e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/ILogWriter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/ILogWriter.java
@@ -37,6 +37,19 @@ public interface ILogWriter extends Closeable {
    */
   double write(ByteBuffer buffer) throws IOException;
 
+  /**
+   * Write given logs to a persistent medium. NOTICE: the logs may be cached 
in the storage device,
+   * if the storage device you are using do not guarantee strong persistence, 
and you want the logs
+   * to be persisted immediately, please call {@link #force()} after calling 
this method. Notice: do
+   * not flip the buffer before calling this method
+   *
+   * @param buffer content that have been converted to bytes
+   * @param allowCompress if the buffer should be compressed
+   * @throws IOException if an I/O error occurs
+   * @return Compression rate of the buffer after compression
+   */
+  double write(ByteBuffer buffer, boolean allowCompress) throws IOException;
+
   /**
    * Forces any updates to this file to be written to the storage device that 
contains it.
    *
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java
index cdec209e507..95721f846cc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java
@@ -75,13 +75,15 @@ public abstract class LogWriter implements ILogWriter {
   }
 
   @Override
-  public double write(ByteBuffer buffer) throws IOException {
+  public double write(ByteBuffer buffer, boolean allowCompress) throws 
IOException {
     long startTime = System.nanoTime();
     // To support hot loading, we can't define it as a variable,
     // because we need to dynamically check whether wal compression is enabled
     // each time the buffer is serialized
     CompressionType compressionType =
-        IoTDBDescriptor.getInstance().getConfig().getWALCompressionAlgorithm();
+        allowCompress
+            ? 
IoTDBDescriptor.getInstance().getConfig().getWALCompressionAlgorithm()
+            : CompressionType.UNCOMPRESSED;
     int bufferSize = buffer.position();
     if (bufferSize == 0) {
       return 1.0;
@@ -130,6 +132,11 @@ public abstract class LogWriter implements ILogWriter {
     return ((double) bufferSize / uncompressedSize);
   }
 
+  @Override
+  public double write(ByteBuffer buffer) throws IOException {
+    return write(buffer, true);
+  }
+
   @Override
   public void force() throws IOException {
     force(true);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
index 19e1564f946..582e9448445 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
@@ -27,6 +27,7 @@ import org.apache.tsfile.file.metadata.enums.CompressionType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.EOFException;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
@@ -120,8 +121,7 @@ public class WALInputStream extends InputStream implements 
AutoCloseable {
       metadataSizeBuf.flip();
       int metadataSize = metadataSizeBuf.getInt();
       // -1 is for the endmarker
-      endOffset =
-          channel.size() - version.getVersionBytes().length - Integer.BYTES - 
metadataSize - 1;
+      endOffset = channel.size() - version.getVersionBytes().length - 
Integer.BYTES - metadataSize;
     } finally {
       if (version == WALFileVersion.V2) {
         // Set the position back to the end of head magic string
@@ -187,7 +187,7 @@ public class WALInputStream extends InputStream implements 
AutoCloseable {
 
   private void loadNextSegment() throws IOException {
     if (channel.position() >= endOffset) {
-      throw new IOException("Reach the end offset of wal file");
+      throw new EOFException("Reach the end offset of wal file");
     }
     long startTime = System.nanoTime();
     long startPosition = channel.position();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALReader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALReader.java
index 1310bb36b46..ce0d9689a85 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALReader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALReader.java
@@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.io.DataInputStream;
+import java.io.EOFException;
 import java.io.File;
 import java.io.IOException;
 import java.util.Iterator;
@@ -75,6 +76,8 @@ public class WALReader implements Closeable {
         nextEntry = null;
         return false;
       }
+    } catch (EOFException e) {
+      return false;
     } catch (Exception e) {
       fileCorrupted = true;
       // log only when file should be complete
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALWriter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALWriter.java
index f23a6a4391e..44a3429f522 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALWriter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALWriter.java
@@ -64,15 +64,14 @@ public class WALWriter extends LogWriter {
 
   private void endFile() throws IOException {
     WALSignalEntry endMarker = new 
WALSignalEntry(WALEntryType.WAL_FILE_INFO_END_MARKER);
+    ByteBuffer markerBuffer = ByteBuffer.allocate(Byte.BYTES);
+    // mark info part ends
+    endMarker.serialize(markerBuffer);
+    write(markerBuffer, false);
     int metaDataSize = metaData.serializedSize();
+
     ByteBuffer buffer =
-        ByteBuffer.allocate(
-            endMarker.serializedSize()
-                + metaDataSize
-                + Integer.BYTES
-                + version.getVersionBytes().length);
-    // mark info part ends
-    endMarker.serialize(buffer);
+        ByteBuffer.allocate(metaDataSize + Integer.BYTES + 
version.getVersionBytes().length);
     // flush meta data
     metaData.serialize(buffer);
     buffer.putInt(metaDataSize);
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/compression/WALCompressionTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/compression/WALCompressionTest.java
index f3764694fe1..28b95625594 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/compression/WALCompressionTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/compression/WALCompressionTest.java
@@ -228,6 +228,8 @@ public class WALCompressionTest {
       ByteBuffer dataBuf = ByteBuffer.allocate(buf.array().length);
       dataInputStream.readFully(dataBuf.array());
       Assert.assertArrayEquals(buf.array(), dataBuf.array());
+      Assert.assertEquals(CompressionType.UNCOMPRESSED.serialize(), 
dataInputStream.readByte());
+      Assert.assertEquals(Byte.BYTES, dataInputStream.readInt());
       Assert.assertEquals(
           new WALSignalEntry(WALEntryType.WAL_FILE_INFO_END_MARKER),
           WALEntry.deserialize(dataInputStream));
@@ -284,6 +286,8 @@ public class WALCompressionTest {
       Assert.assertArrayEquals(compressed, dataBuf.array());
       IUnCompressor unCompressor = 
IUnCompressor.getUnCompressor(CompressionType.LZ4);
       Assert.assertArrayEquals(unCompressor.uncompress(compressed), 
buf.array());
+      Assert.assertEquals(CompressionType.UNCOMPRESSED.serialize(), 
dataInputStream.readByte());
+      Assert.assertEquals(Byte.BYTES, dataInputStream.readInt());
       Assert.assertEquals(
           new WALSignalEntry(WALEntryType.WAL_FILE_INFO_END_MARKER),
           WALEntry.deserialize(dataInputStream));
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRepairWriterTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRepairWriterTest.java
index e8c5ee02dfc..5c1df08a7a2 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRepairWriterTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRepairWriterTest.java
@@ -71,11 +71,12 @@ public class WALRepairWriterTest {
     WALMetaData walMetaData = new WALMetaData(firstSearchIndex, new 
ArrayList<>(), new HashSet<>());
     // repair
     new WALRepairWriter(logFile).repair(walMetaData);
-    // verify file, marker + metadata(search index + size number) + metadata 
size + head magic
+    // verify file, marker(header size + marker buffer size) + metadata(search 
index + size number)
+    // + metadata size + head magic
     // string + tail magic string
     // empty file will be assumed as V1 (because of no header magic)
     Assert.assertEquals(
-        Byte.BYTES
+        (Byte.BYTES + Integer.BYTES + Byte.BYTES)
             + (Long.BYTES + Integer.BYTES)
             + Integer.BYTES
             + WALFileVersion.V1.getVersionBytes().length,
@@ -97,10 +98,11 @@ public class WALRepairWriterTest {
     WALMetaData walMetaData = new WALMetaData(firstSearchIndex, new 
ArrayList<>(), new HashSet<>());
     // repair
     new WALRepairWriter(logFile).repair(walMetaData);
-    // verify file, marker + metadata(search index + size number) + metadata 
size + magic string
+    // verify file, marker(header size + marker buffer size) + metadata(search 
index + size number)
+    // + metadata size + magic string
     // file too small will be assumed as V1 (because of no header magic)
     Assert.assertEquals(
-        Byte.BYTES
+        (Byte.BYTES + Integer.BYTES + Byte.BYTES)
             + (Long.BYTES + Integer.BYTES)
             + Integer.BYTES
             + WALFileVersion.V1.getVersionBytes().length,

Reply via email to