This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch mergemaster0808 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 2709557f40636cddbae35b743c527c61ed012e53 Author: shuwenwei <[email protected]> AuthorDate: Tue Jul 30 12:39:58 2024 +0800 Fix WALInputStream read ByteBuffer issues (#13059) (cherry picked from commit 137ec9aaff61a5ba8c8ee4360ba3a1b96d6656c5) --- .../dataregion/wal/io/WALInputStream.java | 14 ++++++---- .../dataregion/wal/recover/WALNodeRecoverTask.java | 11 ++++++++ .../wal/compression/WALCompressionTest.java | 32 ++++++++++++++++++++++ 3 files changed, 52 insertions(+), 5 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java index 0b10876d6db..48c0d7194ce 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java @@ -312,12 +312,16 @@ public class WALInputStream extends InputStream implements AutoCloseable { public void read(ByteBuffer buffer) throws IOException { int totalBytesToBeRead = buffer.remaining(); - int currReadBytes = Math.min(dataBuffer.remaining(), buffer.remaining()); - dataBuffer.get(buffer.array(), buffer.position(), currReadBytes); - if (totalBytesToBeRead - currReadBytes > 0) { - loadNextSegment(); - read(buffer); + while (totalBytesToBeRead > 0) { + if (dataBuffer.remaining() == 0) { + loadNextSegment(); + } + int currReadBytes = Math.min(dataBuffer.remaining(), totalBytesToBeRead); + dataBuffer.get(buffer.array(), buffer.position(), currReadBytes); + buffer.position(buffer.position() + currReadBytes); + totalBytesToBeRead -= currReadBytes; } + buffer.flip(); } public long getFileCurrentPos() throws IOException { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALNodeRecoverTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALNodeRecoverTask.java index 43140cb56cc..18bb293792a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALNodeRecoverTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALNodeRecoverTask.java @@ -29,6 +29,7 @@ import org.apache.iotdb.db.storageengine.dataregion.memtable.AbstractMemTable; import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry; import org.apache.iotdb.db.storageengine.dataregion.wal.checkpoint.MemTableInfo; +import org.apache.iotdb.db.storageengine.dataregion.wal.exception.BrokenWALFileException; import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALByteBufReader; import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALMetaData; import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALReader; @@ -287,6 +288,16 @@ public class WALNodeRecoverTask implements Runnable { "Fail to find TsFile recover performer for wal entry in TsFile {}", walFile); } } + } catch (BrokenWALFileException e) { + logger.warn( + "Fail to read memTable ids from the wal file {} of wal node: {}", + walFile.getAbsoluteFile(), + e.getMessage()); + } catch (IOException e) { + logger.warn( + "Fail to read memTable ids from the wal file {} of wal node.", + walFile.getAbsoluteFile(), + e); } catch (Exception e) { logger.warn("Fail to read wal logs from {}, skip them", walFile, e); } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/compression/WALCompressionTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/compression/WALCompressionTest.java index 11f1a1d6859..f3764694fe1 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/compression/WALCompressionTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/compression/WALCompressionTest.java @@ -162,6 +162,38 @@ public class WALCompressionTest { } } + @Test + public void testWALInputStreamReadByteBufferInDifferentSegment() + throws QueryProcessException, IllegalPathException, IOException { + LogWriter writer = new WALWriter(walFile); + List<Pair<Long, Integer>> positionAndEntryPairList = new ArrayList<>(); + int memTableId = 0; + long fileOffset = 0; + ByteBuffer buffer = ByteBuffer.allocate(1024 * 4); + InsertRowNode insertRowNode = WALTestUtils.getInsertRowNode(devicePath + memTableId, 0); + for (int i = 0; i < 2; i++) { + insertRowNode.serialize(buffer); + positionAndEntryPairList.add(new Pair<>(fileOffset, buffer.position())); + fileOffset += buffer.position(); + writer.write(buffer); + buffer.clear(); + } + writer.close(); + + try (WALInputStream stream = new WALInputStream(walFile)) { + stream.skipToGivenLogicalPosition(positionAndEntryPairList.get(0).left); + ByteBuffer buffer1 = + ByteBuffer.allocate( + positionAndEntryPairList.get(0).right + positionAndEntryPairList.get(1).right); + stream.read(buffer1); + ByteBuffer buffer2 = ByteBuffer.allocate(buffer1.capacity()); + insertRowNode.serialize(buffer2); + insertRowNode.serialize(buffer2); + buffer2.flip(); + Assert.assertArrayEquals(buffer1.array(), buffer2.array()); + } + } + @Test public void testUncompressedWALStructure() throws QueryProcessException, IllegalPathException, IOException {
