This is an automated email from the ASF dual-hosted git repository. Caideyipi pushed a commit to branch chunk-fix in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit f6db62691aa0090cba34c3f4bdfde5cbe222faa9 Author: Caideyipi <[email protected]> AuthorDate: Tue May 12 12:23:20 2026 +0800 Fix --- .../scan/TsFileInsertionEventScanParser.java | 56 ++++++++++++++-------- .../pipe/event/TsFileInsertionEventParserTest.java | 21 ++++++++ 2 files changed, 57 insertions(+), 20 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java index afd4b7ca83d..faf96e538e7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java @@ -556,6 +556,7 @@ public class TsFileInsertionEventScanParser extends TsFileInsertionEventParser { case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER: { Chunk chunk; + long currentValueChunkPageMemorySize = 0; if (Objects.isNull(firstChunk4NextSequentialValueChunks)) { final long currentChunkHeaderOffset = tsFileSequenceReader.position() - 1; chunkHeader = tsFileSequenceReader.readChunkHeader(marker); @@ -579,6 +580,8 @@ public class TsFileInsertionEventScanParser extends TsFileInsertionEventParser { chunk = new Chunk( chunkHeader, tsFileSequenceReader.readChunk(-1, chunkHeader.getDataSize())); + currentValueChunkPageMemorySize = + calculatePageMemorySizeIfSinglePageValueChunk(chunk); boolean needReturn = false; final long timeChunkSize = lastIndex >= 0 @@ -587,10 +590,6 @@ public class TsFileInsertionEventScanParser extends TsFileInsertionEventParser { : 0; final long timeChunkPageMemorySize = lastIndex >= 0 ? timeChunkPageMemorySizeList.get(lastIndex) : 0; - final long chunkPageMemorySize = - isSinglePageValueChunk(chunkHeader) - ? SinglePageWholeChunkReader.calculatePageEstimatedMemoryUsageInBytes(chunk) - : 0; if (lastIndex >= 0) { if (valueIndex != lastIndex) { needReturn = recordAlignedChunk(valueChunkList, marker); @@ -600,19 +599,9 @@ public class TsFileInsertionEventScanParser extends TsFileInsertionEventParser { if (chunkSize + chunkHeader.getDataSize() > allocatedMemoryBlockForChunk.getMemoryUsageInBytes() || timeChunkPageMemorySize > 0 - && chunkPageMemorySize > 0 - && pageMemorySize + chunkPageMemorySize + && currentValueChunkPageMemorySize > 0 + && pageMemorySize + currentValueChunkPageMemorySize > getPageDataMemoryLimitInBytes()) { - if (valueChunkList.size() == 1) { - final long currentPageMemorySize = - timeChunkPageMemorySize > 0 && valueChunkPageMemorySize > 0 - ? pageMemorySize - : 0; - if (currentPageMemorySize > getPageDataMemoryLimitInBytes()) { - PipeDataNodeResourceManager.memory() - .forceResize(allocatedMemoryBlockForBatchData, currentPageMemorySize); - } - } needReturn = recordAlignedChunk(valueChunkList, marker); } } @@ -623,18 +612,21 @@ public class TsFileInsertionEventScanParser extends TsFileInsertionEventParser { return; } resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit(valueChunkList, chunkHeader); + resizePageDataMemoryBlockIfFirstValueChunkExceedsLimit( + valueChunkList, currentValueChunkPageMemorySize); } else { chunk = firstChunk4NextSequentialValueChunks; chunkHeader = chunk.getHeader(); firstChunk4NextSequentialValueChunks = null; + currentValueChunkPageMemorySize = + calculatePageMemorySizeIfSinglePageValueChunk(chunk); resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit(valueChunkList, chunkHeader); + resizePageDataMemoryBlockIfFirstValueChunkExceedsLimit( + valueChunkList, currentValueChunkPageMemorySize); } valueChunkSize += chunkHeader.getDataSize(); - if (isSinglePageValueChunk(chunkHeader)) { - valueChunkPageMemorySize += - SinglePageWholeChunkReader.calculatePageEstimatedMemoryUsageInBytes(chunk); - } + valueChunkPageMemorySize += currentValueChunkPageMemorySize; valueChunkList.add(chunk); currentMeasurements.add( new MeasurementSchema( @@ -798,6 +790,30 @@ public class TsFileInsertionEventScanParser extends TsFileInsertionEventParser { } } + private void resizePageDataMemoryBlockIfFirstValueChunkExceedsLimit( + final List<Chunk> valueChunkList, final long valueChunkPageMemorySize) { + if (!valueChunkList.isEmpty() || lastIndex < 0 || valueChunkPageMemorySize <= 0) { + return; + } + + final long timeChunkPageMemorySize = timeChunkPageMemorySizeList.get(lastIndex); + if (timeChunkPageMemorySize <= 0) { + return; + } + + final long pageMemorySize = timeChunkPageMemorySize + valueChunkPageMemorySize; + if (pageMemorySize > getPageDataMemoryLimitInBytes()) { + PipeDataNodeResourceManager.memory() + .forceResize(allocatedMemoryBlockForBatchData, pageMemorySize); + } + } + + private long calculatePageMemorySizeIfSinglePageValueChunk(final Chunk chunk) throws IOException { + return isSinglePageValueChunk(chunk.getHeader()) + ? SinglePageWholeChunkReader.calculatePageEstimatedMemoryUsageInBytes(chunk) + : 0; + } + private boolean isSinglePageValueChunk(final ChunkHeader chunkHeader) { return (chunkHeader.getChunkType() & 0x3F) == MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER; } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java index 006473ebf77..8a19b25986a 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java @@ -213,7 +213,13 @@ public class TsFileInsertionEventParserTest { public void testScanParserResizesChunkMemoryForFirstAlignedValueChunk() throws Exception { final long originalPipeMaxReaderChunkSize = PipeConfig.getInstance().getPipeMaxReaderChunkSize(); + final int originalPipeDataStructureTabletSizeInBytes = + PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes(); + final int configuredBatchMemorySize = 1024 * 1024; CommonDescriptor.getInstance().getConfig().setPipeMaxReaderChunkSize(0); + CommonDescriptor.getInstance() + .getConfig() + .setPipeDataStructureTabletSizeInBytes(configuredBatchMemorySize); alignedTsFile = new File("single-aligned-value-chunk.tsfile"); final List<IMeasurementSchema> schemaList = new ArrayList<>(); @@ -241,11 +247,18 @@ public class TsFileInsertionEventParserTest { null, false)) { Assert.assertTrue(getAllocatedChunkMemory(parser).getMemoryUsageInBytes() > 0); + Assert.assertTrue(getAllocatedBatchDataMemory(parser).getMemoryUsageInBytes() > 0); + Assert.assertTrue( + getAllocatedBatchDataMemory(parser).getMemoryUsageInBytes() + < configuredBatchMemorySize); } } finally { CommonDescriptor.getInstance() .getConfig() .setPipeMaxReaderChunkSize(originalPipeMaxReaderChunkSize); + CommonDescriptor.getInstance() + .getConfig() + .setPipeDataStructureTabletSizeInBytes(originalPipeDataStructureTabletSizeInBytes); } } @@ -804,6 +817,14 @@ public class TsFileInsertionEventParserTest { return (PipeMemoryBlock) field.get(parser); } + private PipeMemoryBlock getAllocatedBatchDataMemory(final TsFileInsertionEventScanParser parser) + throws NoSuchFieldException, IllegalAccessException { + final Field field = + TsFileInsertionEventScanParser.class.getDeclaredField("allocatedMemoryBlockForBatchData"); + field.setAccessible(true); + return (PipeMemoryBlock) field.get(parser); + } + private long calculatePipeMaxReaderChunkSizeForSinglePageAlignedChunk(final File tsFile) throws Exception { try (final TsFileSequenceReader reader = new TsFileSequenceReader(tsFile.getAbsolutePath())) {
