This is an automated email from the ASF dual-hosted git repository. Caideyipi pushed a commit to branch fgerst in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit ee5d2c69d03c4af62c467fb22121ffb5c5c2a412 Author: Caideyipi <[email protected]> AuthorDate: Thu May 7 09:35:08 2026 +0800 Pipe: Fixed the first-chunk calculation bug for scan parser (#17597) --- .../scan/TsFileInsertionScanDataContainer.java | 21 ++++++--- .../event/TsFileInsertionDataContainerTest.java | 51 ++++++++++++++++++++++ 2 files changed, 67 insertions(+), 5 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java index 8ed9bdcd662..07b91cd68af 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java @@ -605,11 +605,6 @@ public class TsFileInsertionScanDataContainer extends TsFileInsertionDataContain final long chunkSize = timeChunkSize + valueChunkSize; if (chunkSize + chunkHeader.getDataSize() > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) { - if (valueChunkList.size() == 1 - && chunkSize > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) { - PipeDataNodeResourceManager.memory() - .forceResize(allocatedMemoryBlockForChunk, chunkSize); - } needReturn = recordAlignedChunk(valueChunkList, marker); } } @@ -619,9 +614,11 @@ public class TsFileInsertionScanDataContainer extends TsFileInsertionDataContain firstChunkHeader4NextSequentialValueChunks = chunkHeader; return; } + resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit(valueChunkList, chunkHeader); } else { chunkHeader = firstChunkHeader4NextSequentialValueChunks; firstChunkHeader4NextSequentialValueChunks = null; + resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit(valueChunkList, chunkHeader); } Chunk chunk = @@ -690,6 +687,20 @@ public class TsFileInsertionScanDataContainer extends TsFileInsertionDataContain return false; } + private void resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit( + final List<Chunk> valueChunkList, final ChunkHeader valueChunkHeader) { + if (!valueChunkList.isEmpty() || lastIndex < 0) { + return; + } + + final long chunkSize = + PipeMemoryWeightUtil.calculateChunkRamBytesUsed(timeChunkList.get(lastIndex)) + + valueChunkHeader.getDataSize(); + if (chunkSize > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) { + PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForChunk, chunkSize); + } + } + @Override public void close() { super.close(); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java index bd4e3923815..b959a77d761 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.pipe.event; import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern; import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern; @@ -27,6 +28,7 @@ import org.apache.iotdb.commons.pipe.datastructure.pattern.PrefixPipePattern; import org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer; import org.apache.iotdb.db.pipe.event.common.tsfile.container.query.TsFileInsertionQueryDataContainer; import org.apache.iotdb.db.pipe.event.common.tsfile.container.scan.TsFileInsertionScanDataContainer; +import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock; import org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionTestFileWriter; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; @@ -56,6 +58,7 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; +import java.lang.reflect.Field; import java.time.LocalDate; import java.util.ArrayList; import java.util.Arrays; @@ -119,6 +122,46 @@ public class TsFileInsertionDataContainerTest { System.out.println(System.currentTimeMillis() - startTime); } + @Test + public void testScanParserResizesChunkMemoryForFirstAlignedValueChunk() throws Exception { + final long originalPipeMaxReaderChunkSize = + PipeConfig.getInstance().getPipeMaxReaderChunkSize(); + CommonDescriptor.getInstance().getConfig().setPipeMaxReaderChunkSize(0); + + alignedTsFile = new File("single-aligned-value-chunk.tsfile"); + final List<MeasurementSchema> schemaList = new ArrayList<>(); + schemaList.add(new MeasurementSchema("s1", TSDataType.INT64)); + + final Tablet tablet = new Tablet("root.sg.d", schemaList, 2); + tablet.addTimestamp(0, 1); + tablet.addValue("s1", 0, 1L); + tablet.addTimestamp(1, 2); + tablet.addValue("s1", 1, 2L); + + try { + try (final TsFileWriter writer = new TsFileWriter(alignedTsFile)) { + writer.registerAlignedTimeseries(new PartialPath("root.sg.d"), schemaList); + writer.writeAligned(tablet); + } + + try (final TsFileInsertionScanDataContainer parser = + new TsFileInsertionScanDataContainer( + alignedTsFile, + new PrefixPipePattern("root"), + Long.MIN_VALUE, + Long.MAX_VALUE, + null, + null, + false)) { + Assert.assertTrue(getAllocatedChunkMemory(parser).getMemoryUsageInBytes() > 0); + } + } finally { + CommonDescriptor.getInstance() + .getConfig() + .setPipeMaxReaderChunkSize(originalPipeMaxReaderChunkSize); + } + } + public void testToTabletInsertionEvents(final boolean isQuery) throws Exception { // Test empty chunk testMixedTsFileWithEmptyChunk(isQuery); @@ -645,4 +688,12 @@ public class TsFileInsertionDataContainerTest { } return count; } + + private PipeMemoryBlock getAllocatedChunkMemory(final TsFileInsertionScanDataContainer parser) + throws NoSuchFieldException, IllegalAccessException { + final Field field = + TsFileInsertionScanDataContainer.class.getDeclaredField("allocatedMemoryBlockForChunk"); + field.setAccessible(true); + return (PipeMemoryBlock) field.get(parser); + } }
