This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch mergemaster0808
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 2709557f40636cddbae35b743c527c61ed012e53
Author: shuwenwei <[email protected]>
AuthorDate: Tue Jul 30 12:39:58 2024 +0800

    Fix WALInputStream read ByteBuffer issues (#13059)
    
    (cherry picked from commit 137ec9aaff61a5ba8c8ee4360ba3a1b96d6656c5)
---
 .../dataregion/wal/io/WALInputStream.java          | 14 ++++++----
 .../dataregion/wal/recover/WALNodeRecoverTask.java | 11 ++++++++
 .../wal/compression/WALCompressionTest.java        | 32 ++++++++++++++++++++++
 3 files changed, 52 insertions(+), 5 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
index 0b10876d6db..48c0d7194ce 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java
@@ -312,12 +312,16 @@ public class WALInputStream extends InputStream 
implements AutoCloseable {
 
   public void read(ByteBuffer buffer) throws IOException {
     int totalBytesToBeRead = buffer.remaining();
-    int currReadBytes = Math.min(dataBuffer.remaining(), buffer.remaining());
-    dataBuffer.get(buffer.array(), buffer.position(), currReadBytes);
-    if (totalBytesToBeRead - currReadBytes > 0) {
-      loadNextSegment();
-      read(buffer);
+    while (totalBytesToBeRead > 0) {
+      if (dataBuffer.remaining() == 0) {
+        loadNextSegment();
+      }
+      int currReadBytes = Math.min(dataBuffer.remaining(), totalBytesToBeRead);
+      dataBuffer.get(buffer.array(), buffer.position(), currReadBytes);
+      buffer.position(buffer.position() + currReadBytes);
+      totalBytesToBeRead -= currReadBytes;
     }
+    buffer.flip();
   }
 
   public long getFileCurrentPos() throws IOException {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALNodeRecoverTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALNodeRecoverTask.java
index 43140cb56cc..18bb293792a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALNodeRecoverTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALNodeRecoverTask.java
@@ -29,6 +29,7 @@ import 
org.apache.iotdb.db.storageengine.dataregion.memtable.AbstractMemTable;
 import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
 import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.checkpoint.MemTableInfo;
+import 
org.apache.iotdb.db.storageengine.dataregion.wal.exception.BrokenWALFileException;
 import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALByteBufReader;
 import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALMetaData;
 import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALReader;
@@ -287,6 +288,16 @@ public class WALNodeRecoverTask implements Runnable {
                 "Fail to find TsFile recover performer for wal entry in TsFile 
{}", walFile);
           }
         }
+      } catch (BrokenWALFileException e) {
+        logger.warn(
+            "Fail to read memTable ids from the wal file {} of wal node: {}",
+            walFile.getAbsoluteFile(),
+            e.getMessage());
+      } catch (IOException e) {
+        logger.warn(
+            "Fail to read memTable ids from the wal file {} of wal node.",
+            walFile.getAbsoluteFile(),
+            e);
       } catch (Exception e) {
         logger.warn("Fail to read wal logs from {}, skip them", walFile, e);
       }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/compression/WALCompressionTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/compression/WALCompressionTest.java
index 11f1a1d6859..f3764694fe1 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/compression/WALCompressionTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/compression/WALCompressionTest.java
@@ -162,6 +162,38 @@ public class WALCompressionTest {
     }
   }
 
+  @Test
+  public void testWALInputStreamReadByteBufferInDifferentSegment()
+      throws QueryProcessException, IllegalPathException, IOException {
+    LogWriter writer = new WALWriter(walFile);
+    List<Pair<Long, Integer>> positionAndEntryPairList = new ArrayList<>();
+    int memTableId = 0;
+    long fileOffset = 0;
+    ByteBuffer buffer = ByteBuffer.allocate(1024 * 4);
+    InsertRowNode insertRowNode = WALTestUtils.getInsertRowNode(devicePath + 
memTableId, 0);
+    for (int i = 0; i < 2; i++) {
+      insertRowNode.serialize(buffer);
+      positionAndEntryPairList.add(new Pair<>(fileOffset, buffer.position()));
+      fileOffset += buffer.position();
+      writer.write(buffer);
+      buffer.clear();
+    }
+    writer.close();
+
+    try (WALInputStream stream = new WALInputStream(walFile)) {
+      stream.skipToGivenLogicalPosition(positionAndEntryPairList.get(0).left);
+      ByteBuffer buffer1 =
+          ByteBuffer.allocate(
+              positionAndEntryPairList.get(0).right + 
positionAndEntryPairList.get(1).right);
+      stream.read(buffer1);
+      ByteBuffer buffer2 = ByteBuffer.allocate(buffer1.capacity());
+      insertRowNode.serialize(buffer2);
+      insertRowNode.serialize(buffer2);
+      buffer2.flip();
+      Assert.assertArrayEquals(buffer1.array(), buffer2.array());
+    }
+  }
+
   @Test
   public void testUncompressedWALStructure()
       throws QueryProcessException, IllegalPathException, IOException {

Reply via email to