This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch mergemaster0808
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 77ff8bf79261efc9f943c1d6d7b9998062c00e83
Author: shuwenwei <[email protected]>
AuthorDate: Tue Jul 30 14:17:27 2024 +0800

    fix WALInputStream not closed (#13050)
    
    * fix close
    
    * remove channel usage in WALByteBufReader
    
    * get wal file
    
    * add constructor
    
    * fix bug
    
    * catch exception
    
    * catch exception
    
    (cherry picked from commit 36455413b93c58ab0aff99bbd2c323dfebe8d572)
---
 .../dataregion/wal/io/WALByteBufReader.java        | 40 +++++++++++++---------
 .../dataregion/wal/io/WALInputStream.java          | 19 ++++++++--
 .../dataregion/wal/utils/WALInsertNodeCache.java   |  5 +--
 3 files changed, 41 insertions(+), 23 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALByteBufReader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALByteBufReader.java
index 882b5ea468c..b03b27a6994 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALByteBufReader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALByteBufReader.java
@@ -20,14 +20,13 @@
 package org.apache.iotdb.db.storageengine.dataregion.wal.io;
 
 import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry;
+import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALEntryPosition;
 
 import java.io.Closeable;
 import java.io.DataInputStream;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.file.StandardOpenOption;
 import java.util.Iterator;
 
 /**
@@ -35,23 +34,32 @@ import java.util.Iterator;
  * {@link Iterator}.
  */
 public class WALByteBufReader implements Closeable {
-  private final File logFile;
-  private final FileChannel channel;
-  private final WALMetaData metaData;
-  private final DataInputStream logStream;
-  private final Iterator<Integer> sizeIterator;
+  private WALMetaData metaData;
+  private DataInputStream logStream;
+  private Iterator<Integer> sizeIterator;
 
   public WALByteBufReader(File logFile) throws IOException {
-    this(logFile, FileChannel.open(logFile.toPath(), StandardOpenOption.READ));
+    WALInputStream walInputStream = new WALInputStream(logFile);
+    try {
+      this.logStream = new DataInputStream(walInputStream);
+      this.metaData = walInputStream.getWALMetaData();
+      this.sizeIterator = metaData.getBuffersSize().iterator();
+    } catch (Exception e) {
+      walInputStream.close();
+      throw e;
+    }
   }
 
-  public WALByteBufReader(File logFile, FileChannel channel) throws 
IOException {
-    this.logFile = logFile;
-    this.channel = channel;
-    this.logStream = new DataInputStream(new WALInputStream(logFile));
-    this.metaData = WALMetaData.readFromWALFile(logFile, channel);
-    this.sizeIterator = metaData.getBuffersSize().iterator();
-    channel.position(0);
+  public WALByteBufReader(WALEntryPosition walEntryPosition) throws 
IOException {
+    WALInputStream walInputStream = walEntryPosition.openReadFileStream();
+    try {
+      this.logStream = new DataInputStream(walInputStream);
+      this.metaData = walInputStream.getWALMetaData();
+      this.sizeIterator = metaData.getBuffersSize().iterator();
+    } catch (Exception e) {
+      walInputStream.close();
+      throw e;
+    }
   }
 
   /** Like {@link Iterator#hasNext()}. */
@@ -83,7 +91,7 @@ public class WALByteBufReader implements Closeable {
 
   @Override
   public void close() throws IOException {
-    channel.close();
+    logStream.close();
   }
 
   public long getFirstSearchIndex() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
index 48c0d7194ce..eff873510fd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
@@ -60,10 +60,15 @@ public class WALInputStream extends InputStream implements 
AutoCloseable {
 
   public WALInputStream(File logFile) throws IOException {
     channel = FileChannel.open(logFile.toPath());
-    fileSize = channel.size();
     this.logFile = logFile;
-    analyzeFileVersion();
-    getEndOffset();
+    try {
+      fileSize = channel.size();
+      analyzeFileVersion();
+      getEndOffset();
+    } catch (Exception e) {
+      channel.close();
+      throw e;
+    }
   }
 
   private void getEndOffset() throws IOException {
@@ -328,6 +333,14 @@ public class WALInputStream extends InputStream implements 
AutoCloseable {
     return channel.position();
   }
 
+  public WALMetaData getWALMetaData() throws IOException {
+    long position = channel.position();
+    channel.position(0);
+    WALMetaData walMetaData = WALMetaData.readFromWALFile(logFile, channel);
+    channel.position(position);
+    return walMetaData;
+  }
+
   private SegmentInfo getNextSegmentInfo() throws IOException {
     segmentHeaderWithoutCompressedSizeBuffer.clear();
     channel.read(segmentHeaderWithoutCompressedSizeBuffer);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
index 777545c738b..fb4f6e70d9b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
@@ -44,7 +44,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -273,9 +272,7 @@ public class WALInsertNodeCache {
 
         // batch load when wal file is sealed
         long position = 0;
-        try (final FileChannel channel = 
walEntryPosition.openReadFileChannel();
-            final WALByteBufReader walByteBufReader =
-                new WALByteBufReader(walEntryPosition.getWalFile(), channel)) {
+        try (final WALByteBufReader walByteBufReader = new 
WALByteBufReader(walEntryPosition)) {
           while (walByteBufReader.hasNext()) {
             // see WALInfoEntry#serialize, entry type + memtable id + plan 
node type
             final ByteBuffer buffer = walByteBufReader.next();

Reply via email to