This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch mergemaster0808 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 77ff8bf79261efc9f943c1d6d7b9998062c00e83 Author: shuwenwei <[email protected]> AuthorDate: Tue Jul 30 14:17:27 2024 +0800 fix WALInputStream not closed (#13050) * fix close * remove channel usage in WALByteBufReader * get wal file * add constructor * fix bug * catch exception * catch exception (cherry picked from commit 36455413b93c58ab0aff99bbd2c323dfebe8d572) --- .../dataregion/wal/io/WALByteBufReader.java | 40 +++++++++++++--------- .../dataregion/wal/io/WALInputStream.java | 19 ++++++++-- .../dataregion/wal/utils/WALInsertNodeCache.java | 5 +-- 3 files changed, 41 insertions(+), 23 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALByteBufReader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALByteBufReader.java index 882b5ea468c..b03b27a6994 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALByteBufReader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALByteBufReader.java @@ -20,14 +20,13 @@ package org.apache.iotdb.db.storageengine.dataregion.wal.io; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry; +import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALEntryPosition; import java.io.Closeable; import java.io.DataInputStream; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; -import java.nio.file.StandardOpenOption; import java.util.Iterator; /** @@ -35,23 +34,32 @@ import java.util.Iterator; * {@link Iterator}. */ public class WALByteBufReader implements Closeable { - private final File logFile; - private final FileChannel channel; - private final WALMetaData metaData; - private final DataInputStream logStream; - private final Iterator<Integer> sizeIterator; + private WALMetaData metaData; + private DataInputStream logStream; + private Iterator<Integer> sizeIterator; public WALByteBufReader(File logFile) throws IOException { - this(logFile, FileChannel.open(logFile.toPath(), StandardOpenOption.READ)); + WALInputStream walInputStream = new WALInputStream(logFile); + try { + this.logStream = new DataInputStream(walInputStream); + this.metaData = walInputStream.getWALMetaData(); + this.sizeIterator = metaData.getBuffersSize().iterator(); + } catch (Exception e) { + walInputStream.close(); + throw e; + } } - public WALByteBufReader(File logFile, FileChannel channel) throws IOException { - this.logFile = logFile; - this.channel = channel; - this.logStream = new DataInputStream(new WALInputStream(logFile)); - this.metaData = WALMetaData.readFromWALFile(logFile, channel); - this.sizeIterator = metaData.getBuffersSize().iterator(); - channel.position(0); + public WALByteBufReader(WALEntryPosition walEntryPosition) throws IOException { + WALInputStream walInputStream = walEntryPosition.openReadFileStream(); + try { + this.logStream = new DataInputStream(walInputStream); + this.metaData = walInputStream.getWALMetaData(); + this.sizeIterator = metaData.getBuffersSize().iterator(); + } catch (Exception e) { + walInputStream.close(); + throw e; + } } /** Like {@link Iterator#hasNext()}. */ @@ -83,7 +91,7 @@ public class WALByteBufReader implements Closeable { @Override public void close() throws IOException { - channel.close(); + logStream.close(); } public long getFirstSearchIndex() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java index 48c0d7194ce..eff873510fd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java @@ -60,10 +60,15 @@ public class WALInputStream extends InputStream implements AutoCloseable { public WALInputStream(File logFile) throws IOException { channel = FileChannel.open(logFile.toPath()); - fileSize = channel.size(); this.logFile = logFile; - analyzeFileVersion(); - getEndOffset(); + try { + fileSize = channel.size(); + analyzeFileVersion(); + getEndOffset(); + } catch (Exception e) { + channel.close(); + throw e; + } } private void getEndOffset() throws IOException { @@ -328,6 +333,14 @@ public class WALInputStream extends InputStream implements AutoCloseable { return channel.position(); } + public WALMetaData getWALMetaData() throws IOException { + long position = channel.position(); + channel.position(0); + WALMetaData walMetaData = WALMetaData.readFromWALFile(logFile, channel); + channel.position(position); + return walMetaData; + } + private SegmentInfo getNextSegmentInfo() throws IOException { segmentHeaderWithoutCompressedSizeBuffer.clear(); channel.read(segmentHeaderWithoutCompressedSizeBuffer); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java index 777545c738b..fb4f6e70d9b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java @@ -44,7 +44,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -273,9 +272,7 @@ public class WALInsertNodeCache { // batch load when wal file is sealed long position = 0; - try (final FileChannel channel = walEntryPosition.openReadFileChannel(); - final WALByteBufReader walByteBufReader = - new WALByteBufReader(walEntryPosition.getWalFile(), channel)) { + try (final WALByteBufReader walByteBufReader = new WALByteBufReader(walEntryPosition)) { while (walByteBufReader.hasNext()) { // see WALInfoEntry#serialize, entry type + memtable id + plan node type final ByteBuffer buffer = walByteBufReader.next();
