This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch fix_wal_repair in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 97cdeca3dbd9f01a86faf077b49527df0a06ac47 Author: Tian Jiang <[email protected]> AuthorDate: Mon Jul 22 14:34:38 2024 +0800 Remove UNKNOWN from WALFileVersion which fails recovering V1 WAL file --- .../storageengine/dataregion/wal/io/LogWriter.java | 7 +-- .../dataregion/wal/io/WALFileVersion.java | 54 ++++++++++++++-------- .../dataregion/wal/io/WALInputStream.java | 24 +++++----- .../dataregion/wal/io/WALMetaData.java | 18 +++----- .../storageengine/dataregion/wal/io/WALWriter.java | 14 ++---- .../dataregion/wal/recover/WALRepairWriter.java | 27 +++-------- .../wal/compression/WALCompressionTest.java | 13 +++--- .../wal/recover/WALRepairWriterTest.java | 5 +- 8 files changed, 76 insertions(+), 86 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java index 668da2bbd23..0021188a8b3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java @@ -34,7 +34,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.FileChannel; -import java.nio.charset.StandardCharsets; /** * LogWriter writes the binary logs into a file, including writing {@link WALEntry} into .wal file @@ -70,11 +69,7 @@ public abstract class LogWriter implements ILogWriter { this.logStream = new FileOutputStream(logFile, true); this.logChannel = this.logStream.getChannel(); if (!logFile.exists() || logFile.length() == 0) { - this.logChannel.write( - ByteBuffer.wrap( - version == WALFileVersion.V1 - ? WALWriter.MAGIC_STRING_V1.getBytes(StandardCharsets.UTF_8) - : WALWriter.MAGIC_STRING_V2.getBytes(StandardCharsets.UTF_8))); + this.logChannel.write(ByteBuffer.wrap(version.getVersionBytes())); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALFileVersion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALFileVersion.java index 59e7a5534ce..e3d374551b1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALFileVersion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALFileVersion.java @@ -25,9 +25,26 @@ import java.nio.channels.FileChannel; import java.nio.charset.StandardCharsets; public enum WALFileVersion { - V1, - V2, - UNKNOWN; + V1("WAL"), + V2("V2-WAL"); + + private final String versionString; + private byte[] versionBytes; + + public String getVersionString() { + return versionString; + } + + public byte[] getVersionBytes() { + return versionBytes; + } + + WALFileVersion(String versionString) { + this.versionString = versionString; + if (versionString != null) { + this.versionBytes = versionString.getBytes(StandardCharsets.UTF_8); + } + } public static WALFileVersion getVersion(File file) throws IOException { try (FileChannel channel = FileChannel.open(file.toPath())) { @@ -38,22 +55,23 @@ public enum WALFileVersion { public static WALFileVersion getVersion(FileChannel channel) throws IOException { long originalPosition = channel.position(); try { - channel.position(0); - ByteBuffer buffer = ByteBuffer.allocate(WALWriter.MAGIC_STRING_V2_BYTES); - channel.read(buffer); - buffer.flip(); - if (buffer.remaining() < WALWriter.MAGIC_STRING_V2_BYTES) { - return UNKNOWN; - } - String version = new String(buffer.array(), StandardCharsets.UTF_8); - switch (version) { - case WALWriter.MAGIC_STRING_V2: - return V2; - case WALWriter.MAGIC_STRING_V1: - return V1; - default: - return UNKNOWN; + // head magic string starts to exist since V2 + WALFileVersion[] versions = {V2}; + for (WALFileVersion version : versions) { + channel.position(0); + if (channel.size() < version.versionBytes.length) { + continue; + } + ByteBuffer buffer = ByteBuffer.allocate(version.versionBytes.length); + channel.read(buffer); + buffer.flip(); + String versionString = new String(buffer.array(), StandardCharsets.UTF_8); + if (version.versionString.equals(versionString)) { + return version; + } } + // v1 by default + return V1; } finally { channel.position(originalPosition); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java index 14854a05623..0b10876d6db 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java @@ -67,7 +67,7 @@ public class WALInputStream extends InputStream implements AutoCloseable { } private void getEndOffset() throws IOException { - if (channel.size() < WALWriter.MAGIC_STRING_V2_BYTES + Integer.BYTES) { + if (channel.size() < WALFileVersion.V2.getVersionBytes().length + Integer.BYTES) { // An broken file endOffset = channel.size(); return; @@ -77,18 +77,18 @@ public class WALInputStream extends InputStream implements AutoCloseable { try { if (version == WALFileVersion.V2) { // New Version - ByteBuffer magicStringBuffer = ByteBuffer.allocate(WALWriter.MAGIC_STRING_V2_BYTES); - channel.read(magicStringBuffer, channel.size() - WALWriter.MAGIC_STRING_V2_BYTES); + ByteBuffer magicStringBuffer = ByteBuffer.allocate(version.getVersionBytes().length); + channel.read(magicStringBuffer, channel.size() - version.getVersionBytes().length); magicStringBuffer.flip(); if (logFile.getName().endsWith(IoTDBConstant.WAL_CHECKPOINT_FILE_SUFFIX) || !new String(magicStringBuffer.array(), StandardCharsets.UTF_8) - .equals(WALWriter.MAGIC_STRING_V2)) { + .equals(version.getVersionString())) { // This is a broken wal or checkpoint file endOffset = channel.size(); return; } else { // This is a normal wal file or check point file - position = channel.size() - WALWriter.MAGIC_STRING_V2_BYTES - Integer.BYTES; + position = channel.size() - version.getVersionBytes().length - Integer.BYTES; } } else { if (logFile.getName().endsWith(IoTDBConstant.WAL_CHECKPOINT_FILE_SUFFIX)) { @@ -97,16 +97,16 @@ public class WALInputStream extends InputStream implements AutoCloseable { return; } // Old version - ByteBuffer magicStringBuffer = ByteBuffer.allocate(WALWriter.MAGIC_STRING_V1_BYTES); - channel.read(magicStringBuffer, channel.size() - WALWriter.MAGIC_STRING_V1_BYTES); + ByteBuffer magicStringBuffer = ByteBuffer.allocate(version.getVersionBytes().length); + channel.read(magicStringBuffer, channel.size() - version.getVersionBytes().length); magicStringBuffer.flip(); if (!new String(magicStringBuffer.array(), StandardCharsets.UTF_8) - .equals(WALWriter.MAGIC_STRING_V1)) { + .equals(version.getVersionString())) { // this is a broken wal file endOffset = channel.size(); return; } else { - position = channel.size() - WALWriter.MAGIC_STRING_V1_BYTES - Integer.BYTES; + position = channel.size() - version.getVersionBytes().length - Integer.BYTES; } } // Read the metadata size @@ -115,11 +115,11 @@ public class WALInputStream extends InputStream implements AutoCloseable { int metadataSize = metadataSizeBuf.getInt(); // -1 is for the endmarker endOffset = - channel.size() - WALWriter.MAGIC_STRING_V2_BYTES - Integer.BYTES - metadataSize - 1; + channel.size() - version.getVersionBytes().length - Integer.BYTES - metadataSize - 1; } finally { if (version == WALFileVersion.V2) { // Set the position back to the end of head magic string - channel.position(WALWriter.MAGIC_STRING_V2_BYTES); + channel.position(version.getVersionBytes().length); } else { // There is no head magic string in V1 version channel.position(0); @@ -275,7 +275,7 @@ public class WALInputStream extends InputStream implements AutoCloseable { */ public void skipToGivenLogicalPosition(long pos) throws IOException { if (version == WALFileVersion.V2) { - channel.position(WALWriter.MAGIC_STRING_V2_BYTES); + channel.position(version.getVersionBytes().length); long posRemain = pos; SegmentInfo segmentInfo; do { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALMetaData.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALMetaData.java index 846fc56e949..ba9211656ef 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALMetaData.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALMetaData.java @@ -131,7 +131,8 @@ public class WALMetaData implements SerializedSize { } public static WALMetaData readFromWALFile(File logFile, FileChannel channel) throws IOException { - if (channel.size() < WALWriter.MAGIC_STRING_V2_BYTES || !isValidMagicString(channel)) { + if (channel.size() < WALFileVersion.V2.getVersionBytes().length + || !isValidMagicString(channel)) { throw new BrokenWALFileException(logFile); } @@ -141,12 +142,7 @@ public class WALMetaData implements SerializedSize { try { ByteBuffer metadataSizeBuf = ByteBuffer.allocate(Integer.BYTES); WALFileVersion version = WALFileVersion.getVersion(channel); - position = - channel.size() - - Integer.BYTES - - (version == WALFileVersion.V2 - ? WALWriter.MAGIC_STRING_V2_BYTES - : WALWriter.MAGIC_STRING_V1_BYTES); + position = channel.size() - Integer.BYTES - (version.getVersionBytes().length); channel.read(metadataSizeBuf, position); metadataSizeBuf.flip(); // load metadata @@ -178,12 +174,12 @@ public class WALMetaData implements SerializedSize { } private static boolean isValidMagicString(FileChannel channel) throws IOException { - ByteBuffer magicStringBytes = ByteBuffer.allocate(WALWriter.MAGIC_STRING_V2_BYTES); - channel.read(magicStringBytes, channel.size() - WALWriter.MAGIC_STRING_V2_BYTES); + ByteBuffer magicStringBytes = ByteBuffer.allocate(WALFileVersion.V2.getVersionBytes().length); + channel.read(magicStringBytes, channel.size() - WALFileVersion.V2.getVersionBytes().length); magicStringBytes.flip(); String magicString = new String(magicStringBytes.array(), StandardCharsets.UTF_8); - return magicString.equals(WALWriter.MAGIC_STRING_V2) - || magicString.contains(WALWriter.MAGIC_STRING_V1); + return magicString.equals(WALFileVersion.V2.getVersionString()) + || magicString.contains(WALFileVersion.V1.getVersionString()); } public void setTruncateOffSet(long offset) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALWriter.java index 4d2d522c45d..f23a6a4391e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALWriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALWriter.java @@ -27,17 +27,10 @@ import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALFileStatus; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; /** WALWriter writes the binary {@link WALEntry} into .wal file. */ public class WALWriter extends LogWriter { - public static final String MAGIC_STRING_V1 = "WAL"; - public static final String MAGIC_STRING_V2 = "V2-WAL"; - public static final int MAGIC_STRING_V1_BYTES = - MAGIC_STRING_V1.getBytes(StandardCharsets.UTF_8).length; - public static final int MAGIC_STRING_V2_BYTES = - MAGIC_STRING_V2.getBytes(StandardCharsets.UTF_8).length; private WALFileStatus walFileStatus = WALFileStatus.CONTAINS_NONE_SEARCH_INDEX; // wal files' metadata protected final WALMetaData metaData = new WALMetaData(); @@ -50,6 +43,7 @@ public class WALWriter extends LogWriter { public WALWriter(File logFile, WALFileVersion version) throws IOException { super(logFile, version); + this.version = version; } /** @@ -76,16 +70,14 @@ public class WALWriter extends LogWriter { endMarker.serializedSize() + metaDataSize + Integer.BYTES - + (version != WALFileVersion.V2 ? MAGIC_STRING_V1_BYTES : MAGIC_STRING_V2_BYTES)); + + version.getVersionBytes().length); // mark info part ends endMarker.serialize(buffer); // flush meta data metaData.serialize(buffer); buffer.putInt(metaDataSize); // add magic string - buffer.put( - (version != WALFileVersion.V2 ? MAGIC_STRING_V1 : MAGIC_STRING_V2) - .getBytes(StandardCharsets.UTF_8)); + buffer.put(version.getVersionBytes()); writeMetadata(buffer); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRepairWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRepairWriter.java index c60c4d1a809..8572455feaa 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRepairWriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRepairWriter.java @@ -30,11 +30,6 @@ import java.nio.channels.FileChannel; import java.nio.charset.StandardCharsets; import java.nio.file.StandardOpenOption; -import static org.apache.iotdb.db.storageengine.dataregion.wal.io.WALWriter.MAGIC_STRING_V1; -import static org.apache.iotdb.db.storageengine.dataregion.wal.io.WALWriter.MAGIC_STRING_V1_BYTES; -import static org.apache.iotdb.db.storageengine.dataregion.wal.io.WALWriter.MAGIC_STRING_V2; -import static org.apache.iotdb.db.storageengine.dataregion.wal.io.WALWriter.MAGIC_STRING_V2_BYTES; - /** Check whether the wal file is broken and repair it. */ public class WALRepairWriter { private final File logFile; @@ -47,21 +42,12 @@ public class WALRepairWriter { // locate broken data long truncateSize; WALFileVersion version = WALFileVersion.getVersion(logFile); - if (version == WALFileVersion.UNKNOWN) { - truncateSize = 0; - } else if (version == WALFileVersion.V2) { - if (readTailMagic(MAGIC_STRING_V2_BYTES).equals(MAGIC_STRING_V2)) { // complete file - return; - } else { // file with broken magic string - truncateSize = metaData.getTruncateOffSet(); - } - } else { - if (readTailMagic(MAGIC_STRING_V1_BYTES).contains(MAGIC_STRING_V1)) { - return; - } else { - truncateSize = metaData.getTruncateOffSet(); - } + if (readTailMagic(version).equals(version.getVersionString())) { // complete file + return; + } else { // file with broken magic string + truncateSize = metaData.getTruncateOffSet(); } + // truncate broken data try (FileChannel channel = FileChannel.open(logFile.toPath(), StandardOpenOption.APPEND)) { channel.truncate(truncateSize); @@ -72,7 +58,8 @@ public class WALRepairWriter { } } - private String readTailMagic(int size) throws IOException { + private String readTailMagic(WALFileVersion version) throws IOException { + int size = version.getVersionBytes().length; try (FileChannel channel = FileChannel.open(logFile.toPath(), StandardOpenOption.READ)) { ByteBuffer magicStringBytes = ByteBuffer.allocate(size); channel.read(magicStringBytes, channel.size() - size); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/compression/WALCompressionTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/compression/WALCompressionTest.java index d723f2e5038..11f1a1d6859 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/compression/WALCompressionTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/compression/WALCompressionTest.java @@ -30,6 +30,7 @@ import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALInfoEntry; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALSignalEntry; import org.apache.iotdb.db.storageengine.dataregion.wal.io.LogWriter; import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALByteBufReader; +import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALFileVersion; import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALInputStream; import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALReader; import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALWriter; @@ -185,10 +186,10 @@ public class WALCompressionTest { try (DataInputStream dataInputStream = new DataInputStream(new BufferedInputStream(Files.newInputStream(walFile.toPath())))) { - byte[] magicStringBytes = new byte[WALWriter.MAGIC_STRING_V2_BYTES]; + byte[] magicStringBytes = new byte[WALFileVersion.V2.getVersionBytes().length]; // head magic string dataInputStream.readFully(magicStringBytes); - Assert.assertEquals(WALWriter.MAGIC_STRING_V2, new String(magicStringBytes)); + Assert.assertEquals(WALFileVersion.V2.getVersionString(), new String(magicStringBytes)); Assert.assertEquals( CompressionType.UNCOMPRESSED, CompressionType.deserialize(dataInputStream.readByte())); Assert.assertEquals(buf.array().length, dataInputStream.readInt()); @@ -202,7 +203,7 @@ public class WALCompressionTest { dataInputStream.readFully(metadataBuf.array()); // Tail magic string dataInputStream.readFully(magicStringBytes); - Assert.assertEquals(WALWriter.MAGIC_STRING_V2, new String(magicStringBytes)); + Assert.assertEquals(WALFileVersion.V2.getVersionString(), new String(magicStringBytes)); } } @@ -238,10 +239,10 @@ public class WALCompressionTest { try (DataInputStream dataInputStream = new DataInputStream(new BufferedInputStream(Files.newInputStream(walFile.toPath())))) { - byte[] magicStringBytes = new byte[WALWriter.MAGIC_STRING_V2_BYTES]; + byte[] magicStringBytes = new byte[WALFileVersion.V2.getVersionBytes().length]; // head magic string dataInputStream.readFully(magicStringBytes); - Assert.assertEquals(WALWriter.MAGIC_STRING_V2, new String(magicStringBytes)); + Assert.assertEquals(WALFileVersion.V2.getVersionString(), new String(magicStringBytes)); Assert.assertEquals( CompressionType.LZ4, CompressionType.deserialize(dataInputStream.readByte())); Assert.assertEquals(compressed.length, dataInputStream.readInt()); @@ -258,7 +259,7 @@ public class WALCompressionTest { dataInputStream.readFully(metadataBuf.array()); // Tail magic string dataInputStream.readFully(magicStringBytes); - Assert.assertEquals(WALWriter.MAGIC_STRING_V2, new String(magicStringBytes)); + Assert.assertEquals(WALFileVersion.V2.getVersionString(), new String(magicStringBytes)); } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRepairWriterTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRepairWriterTest.java index aca54fe2f19..e2aabe9da12 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRepairWriterTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRepairWriterTest.java @@ -26,6 +26,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNo import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALInfoEntry; import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALByteBufReader; +import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALFileVersion; import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALMetaData; import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALWriter; import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALByteBufferForTest; @@ -76,7 +77,7 @@ public class WALRepairWriterTest { Byte.BYTES + (Long.BYTES + Integer.BYTES) + Integer.BYTES - + WALWriter.MAGIC_STRING_V2_BYTES * 2, + + WALFileVersion.V2.getVersionBytes().length * 2L, logFile.length()); try (WALByteBufReader reader = new WALByteBufReader(logFile)) { Assert.assertFalse(reader.hasNext()); @@ -100,7 +101,7 @@ public class WALRepairWriterTest { Byte.BYTES + (Long.BYTES + Integer.BYTES) + Integer.BYTES - + WALWriter.MAGIC_STRING_V2_BYTES * 2, + + WALFileVersion.V2.getVersionBytes().length * 2L, logFile.length()); try (WALByteBufReader reader = new WALByteBufReader(logFile)) { Assert.assertFalse(reader.hasNext());
