This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 09da138a93f Resolve the problem that it will execute endFile() twice
and generate an invalid wal file that… (#16627)
09da138a93f is described below
commit 09da138a93f0b3e353e5dfac7304827c20d79b80
Author: libo <[email protected]>
AuthorDate: Tue Oct 21 08:31:47 2025 +0800
Resolve the problem that it will execute endFile() twice and generate an
invalid wal file that… (#16627)
* It will execute endFile() twice and generate an invalid wal file that
only contain the mark string and mistakenly treated mark as the header and get
wrong data When stopping datanode.
* Adjust the test content via wal file broken.
* don't delete file when end file.
* don't delete file when end file.
---
.../dataregion/wal/io/CheckpointReader.java | 6 ++++++
.../dataregion/wal/io/WALInputStream.java | 14 +++++++-------
.../storageengine/dataregion/wal/io/WALWriter.java | 4 ++++
.../dataregion/wal/io/WALFileTest.java | 22 +++++++++++++++++-----
4 files changed, 34 insertions(+), 12 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/CheckpointReader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/CheckpointReader.java
index 578ab21ae8c..9bd8643e386 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/CheckpointReader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/CheckpointReader.java
@@ -25,6 +25,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.DataInputStream;
+import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
@@ -51,6 +52,11 @@ public class CheckpointReader {
Checkpoint checkpoint = Checkpoint.deserialize(logStream);
checkpoints.add(checkpoint);
}
+ } catch (EOFException e) {
+ logger.debug(
+ "Meet error when reading checkpoint file {}, skip broken
checkpoints",
+ logFile,
+ e.getMessage());
} catch (IOException e) {
logger.warn(
"Meet error when reading checkpoint file {}, skip broken
checkpoints", logFile, e);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
index 91e7dddd072..0a7dbb5463c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
@@ -74,14 +74,14 @@ public class WALInputStream extends InputStream implements
AutoCloseable {
}
private void getEndOffset() throws IOException {
- if (channel.size() < WALFileVersion.V2.getVersionBytes().length +
Integer.BYTES) {
- // An broken file
- endOffset = channel.size();
- return;
- }
- ByteBuffer metadataSizeBuf = ByteBuffer.allocate(Integer.BYTES);
- long position;
try {
+ if (channel.size() < WALFileVersion.V2.getVersionBytes().length +
Integer.BYTES) {
+ // An broken file
+ endOffset = channel.size();
+ return;
+ }
+ ByteBuffer metadataSizeBuf = ByteBuffer.allocate(Integer.BYTES);
+ long position;
if (version == WALFileVersion.V2) {
// New Version
ByteBuffer magicStringBuffer =
ByteBuffer.allocate(version.getVersionBytes().length);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALWriter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALWriter.java
index 44a3429f522..6f13040bec8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALWriter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALWriter.java
@@ -63,6 +63,10 @@ public class WALWriter extends LogWriter {
}
private void endFile() throws IOException {
+ if (logFile.length() == WALFileVersion.V2.getVersionBytes().length) {
+ super.close();
+ return;
+ }
WALSignalEntry endMarker = new
WALSignalEntry(WALEntryType.WAL_FILE_INFO_END_MARKER);
ByteBuffer markerBuffer = ByteBuffer.allocate(Byte.BYTES);
// mark info part ends
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALFileTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALFileTest.java
index 762c54ac8d9..3a4cb319bda 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALFileTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALFileTest.java
@@ -57,7 +57,6 @@ import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
-import static org.junit.Assert.assertTrue;
public class WALFileTest {
@@ -183,10 +182,23 @@ public class WALFileTest {
final FileChannel fileChannel1 = FileChannel.open(walFile.toPath());
assertThrows(IOException.class, () -> WALMetaData.readFromWALFile(walFile,
fileChannel1));
walWriter.close();
- FileChannel fileChannel2 = FileChannel.open(walFile.toPath());
- WALMetaData walMetaData = WALMetaData.readFromWALFile(walFile,
fileChannel2);
- fileChannel2.close();
- assertTrue(walMetaData.getMemTablesId().isEmpty());
+
+ if (!walFile.exists()) {
+ Files.createFile(walFile.toPath());
+ Files.write(walFile.toPath(),
ByteBuffer.wrap(WALFileVersion.V2.getVersionBytes()).array());
+ }
+ try {
+ FileChannel fileChannel2 = FileChannel.open(walFile.toPath());
+ WALMetaData walMetaData = WALMetaData.readFromWALFile(walFile,
fileChannel2);
+ fileChannel2.close();
+ } catch (Exception e) {
+ assertEquals(
+ "Broken wal file "
+ + walFile.getPath()
+ + ", size "
+ + WALFileVersion.V2.getVersionBytes().length,
+ e.getMessage());
+ }
}
public static InsertRowNode getInsertRowNode(String devicePath) throws
IllegalPathException {