This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 1907f4f804f Fix WAL file end marker (#13125)
1907f4f804f is described below
commit 1907f4f804fe459e986fb8a055c7cf3595684f9c
Author: shuwenwei <[email protected]>
AuthorDate: Tue Aug 13 14:27:10 2024 +0800
Fix WAL file end marker (#13125)
* fix WAL file end marker
* fix ut
---
.../db/storageengine/dataregion/wal/io/ILogWriter.java | 13 +++++++++++++
.../iotdb/db/storageengine/dataregion/wal/io/LogWriter.java | 11 +++++++++--
.../db/storageengine/dataregion/wal/io/WALInputStream.java | 6 +++---
.../iotdb/db/storageengine/dataregion/wal/io/WALReader.java | 3 +++
.../iotdb/db/storageengine/dataregion/wal/io/WALWriter.java | 13 ++++++-------
.../dataregion/wal/compression/WALCompressionTest.java | 4 ++++
.../dataregion/wal/recover/WALRepairWriterTest.java | 10 ++++++----
7 files changed, 44 insertions(+), 16 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/ILogWriter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/ILogWriter.java
index f4d65612c47..e73a1056b2e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/ILogWriter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/ILogWriter.java
@@ -37,6 +37,19 @@ public interface ILogWriter extends Closeable {
*/
double write(ByteBuffer buffer) throws IOException;
+ /**
+ * Write given logs to a persistent medium. NOTICE: the logs may be cached
in the storage device,
+ * if the storage device you are using do not guarantee strong persistence,
and you want the logs
+ * to be persisted immediately, please call {@link #force()} after calling
this method. Notice: do
+ * not flip the buffer before calling this method
+ *
+ * @param buffer content that have been converted to bytes
+ * @param allowCompress if the buffer should be compressed
+ * @throws IOException if an I/O error occurs
+ * @return Compression rate of the buffer after compression
+ */
+ double write(ByteBuffer buffer, boolean allowCompress) throws IOException;
+
/**
* Forces any updates to this file to be written to the storage device that
contains it.
*
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java
index cdec209e507..95721f846cc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java
@@ -75,13 +75,15 @@ public abstract class LogWriter implements ILogWriter {
}
@Override
- public double write(ByteBuffer buffer) throws IOException {
+ public double write(ByteBuffer buffer, boolean allowCompress) throws
IOException {
long startTime = System.nanoTime();
// To support hot loading, we can't define it as a variable,
// because we need to dynamically check whether wal compression is enabled
// each time the buffer is serialized
CompressionType compressionType =
- IoTDBDescriptor.getInstance().getConfig().getWALCompressionAlgorithm();
+ allowCompress
+ ?
IoTDBDescriptor.getInstance().getConfig().getWALCompressionAlgorithm()
+ : CompressionType.UNCOMPRESSED;
int bufferSize = buffer.position();
if (bufferSize == 0) {
return 1.0;
@@ -130,6 +132,11 @@ public abstract class LogWriter implements ILogWriter {
return ((double) bufferSize / uncompressedSize);
}
+ @Override
+ public double write(ByteBuffer buffer) throws IOException {
+ return write(buffer, true);
+ }
+
@Override
public void force() throws IOException {
force(true);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
index 19e1564f946..582e9448445 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
@@ -27,6 +27,7 @@ import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
@@ -120,8 +121,7 @@ public class WALInputStream extends InputStream implements
AutoCloseable {
metadataSizeBuf.flip();
int metadataSize = metadataSizeBuf.getInt();
// -1 is for the endmarker
- endOffset =
- channel.size() - version.getVersionBytes().length - Integer.BYTES -
metadataSize - 1;
+ endOffset = channel.size() - version.getVersionBytes().length -
Integer.BYTES - metadataSize;
} finally {
if (version == WALFileVersion.V2) {
// Set the position back to the end of head magic string
@@ -187,7 +187,7 @@ public class WALInputStream extends InputStream implements
AutoCloseable {
private void loadNextSegment() throws IOException {
if (channel.position() >= endOffset) {
- throw new IOException("Reach the end offset of wal file");
+ throw new EOFException("Reach the end offset of wal file");
}
long startTime = System.nanoTime();
long startPosition = channel.position();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALReader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALReader.java
index 1310bb36b46..ce0d9689a85 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALReader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALReader.java
@@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.DataInputStream;
+import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
@@ -75,6 +76,8 @@ public class WALReader implements Closeable {
nextEntry = null;
return false;
}
+ } catch (EOFException e) {
+ return false;
} catch (Exception e) {
fileCorrupted = true;
// log only when file should be complete
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALWriter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALWriter.java
index f23a6a4391e..44a3429f522 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALWriter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALWriter.java
@@ -64,15 +64,14 @@ public class WALWriter extends LogWriter {
private void endFile() throws IOException {
WALSignalEntry endMarker = new
WALSignalEntry(WALEntryType.WAL_FILE_INFO_END_MARKER);
+ ByteBuffer markerBuffer = ByteBuffer.allocate(Byte.BYTES);
+ // mark info part ends
+ endMarker.serialize(markerBuffer);
+ write(markerBuffer, false);
int metaDataSize = metaData.serializedSize();
+
ByteBuffer buffer =
- ByteBuffer.allocate(
- endMarker.serializedSize()
- + metaDataSize
- + Integer.BYTES
- + version.getVersionBytes().length);
- // mark info part ends
- endMarker.serialize(buffer);
+ ByteBuffer.allocate(metaDataSize + Integer.BYTES +
version.getVersionBytes().length);
// flush meta data
metaData.serialize(buffer);
buffer.putInt(metaDataSize);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/compression/WALCompressionTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/compression/WALCompressionTest.java
index f3764694fe1..28b95625594 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/compression/WALCompressionTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/compression/WALCompressionTest.java
@@ -228,6 +228,8 @@ public class WALCompressionTest {
ByteBuffer dataBuf = ByteBuffer.allocate(buf.array().length);
dataInputStream.readFully(dataBuf.array());
Assert.assertArrayEquals(buf.array(), dataBuf.array());
+ Assert.assertEquals(CompressionType.UNCOMPRESSED.serialize(),
dataInputStream.readByte());
+ Assert.assertEquals(Byte.BYTES, dataInputStream.readInt());
Assert.assertEquals(
new WALSignalEntry(WALEntryType.WAL_FILE_INFO_END_MARKER),
WALEntry.deserialize(dataInputStream));
@@ -284,6 +286,8 @@ public class WALCompressionTest {
Assert.assertArrayEquals(compressed, dataBuf.array());
IUnCompressor unCompressor =
IUnCompressor.getUnCompressor(CompressionType.LZ4);
Assert.assertArrayEquals(unCompressor.uncompress(compressed),
buf.array());
+ Assert.assertEquals(CompressionType.UNCOMPRESSED.serialize(),
dataInputStream.readByte());
+ Assert.assertEquals(Byte.BYTES, dataInputStream.readInt());
Assert.assertEquals(
new WALSignalEntry(WALEntryType.WAL_FILE_INFO_END_MARKER),
WALEntry.deserialize(dataInputStream));
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRepairWriterTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRepairWriterTest.java
index e8c5ee02dfc..5c1df08a7a2 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRepairWriterTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRepairWriterTest.java
@@ -71,11 +71,12 @@ public class WALRepairWriterTest {
WALMetaData walMetaData = new WALMetaData(firstSearchIndex, new
ArrayList<>(), new HashSet<>());
// repair
new WALRepairWriter(logFile).repair(walMetaData);
- // verify file, marker + metadata(search index + size number) + metadata
size + head magic
+ // verify file, marker(header size + marker buffer size) + metadata(search
index + size number)
+ // + metadata size + head magic
// string + tail magic string
// empty file will be assumed as V1 (because of no header magic)
Assert.assertEquals(
- Byte.BYTES
+ (Byte.BYTES + Integer.BYTES + Byte.BYTES)
+ (Long.BYTES + Integer.BYTES)
+ Integer.BYTES
+ WALFileVersion.V1.getVersionBytes().length,
@@ -97,10 +98,11 @@ public class WALRepairWriterTest {
WALMetaData walMetaData = new WALMetaData(firstSearchIndex, new
ArrayList<>(), new HashSet<>());
// repair
new WALRepairWriter(logFile).repair(walMetaData);
- // verify file, marker + metadata(search index + size number) + metadata
size + magic string
+ // verify file, marker(header size + marker buffer size) + metadata(search
index + size number)
+ // + metadata size + magic string
// file too small will be assumed as V1 (because of no header magic)
Assert.assertEquals(
- Byte.BYTES
+ (Byte.BYTES + Integer.BYTES + Byte.BYTES)
+ (Long.BYTES + Integer.BYTES)
+ Integer.BYTES
+ WALFileVersion.V1.getVersionBytes().length,