This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 0008782ae79 Pipe: Fixed the first-chunk calculation bug for scan 
parser (#17597)
0008782ae79 is described below

commit 0008782ae79ea44b77654cc2a80f74b83a22ec3d
Author: Caideyipi <[email protected]>
AuthorDate: Thu May 7 09:35:08 2026 +0800

    Pipe: Fixed the first-chunk calculation bug for scan parser (#17597)
---
 .../scan/TsFileInsertionEventScanParser.java       | 21 ++++++---
 .../pipe/event/TsFileInsertionEventParserTest.java | 50 ++++++++++++++++++++++
 2 files changed, 66 insertions(+), 5 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
index aeb7aaadf28..32823459fcf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
@@ -563,11 +563,6 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
                   final long chunkSize = timeChunkSize + valueChunkSize;
                   if (chunkSize + chunkHeader.getDataSize()
                       > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
-                    if (valueChunkList.size() == 1
-                        && chunkSize > 
allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
-                      PipeDataNodeResourceManager.memory()
-                          .forceResize(allocatedMemoryBlockForChunk, 
chunkSize);
-                    }
                     needReturn = recordAlignedChunk(valueChunkList, marker);
                   }
                 }
@@ -577,9 +572,11 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
                 firstChunkHeader4NextSequentialValueChunks = chunkHeader;
                 return;
               }
+              
resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit(valueChunkList, 
chunkHeader);
             } else {
               chunkHeader = firstChunkHeader4NextSequentialValueChunks;
               firstChunkHeader4NextSequentialValueChunks = null;
+              
resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit(valueChunkList, 
chunkHeader);
             }
 
             Chunk chunk =
@@ -720,6 +717,20 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
     return false;
   }
 
+  private void resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit(
+      final List<Chunk> valueChunkList, final ChunkHeader valueChunkHeader) {
+    if (!valueChunkList.isEmpty() || lastIndex < 0) {
+      return;
+    }
+
+    final long chunkSize =
+        
PipeMemoryWeightUtil.calculateChunkRamBytesUsed(timeChunkList.get(lastIndex))
+            + valueChunkHeader.getDataSize();
+    if (chunkSize > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
+      
PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForChunk, 
chunkSize);
+    }
+  }
+
   @Override
   public void close() {
     super.close();
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
index a2e7c558ea0..8d02cc8a998 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
@@ -29,6 +29,7 @@ import 
org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.parser.TsFileInsertionEventParser;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.parser.query.TsFileInsertionEventQueryParser;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.parser.scan.TsFileInsertionEventScanParser;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionTestFileWriter;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
@@ -57,6 +58,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.time.LocalDate;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -120,6 +122,46 @@ public class TsFileInsertionEventParserTest {
     System.out.println(System.currentTimeMillis() - startTime);
   }
 
+  @Test
+  public void testScanParserResizesChunkMemoryForFirstAlignedValueChunk() 
throws Exception {
+    final long originalPipeMaxReaderChunkSize =
+        PipeConfig.getInstance().getPipeMaxReaderChunkSize();
+    CommonDescriptor.getInstance().getConfig().setPipeMaxReaderChunkSize(0);
+
+    alignedTsFile = new File("single-aligned-value-chunk.tsfile");
+    final List<IMeasurementSchema> schemaList = new ArrayList<>();
+    schemaList.add(new MeasurementSchema("s1", TSDataType.INT64));
+
+    final Tablet tablet = new Tablet("root.sg.d", schemaList, 2);
+    tablet.addTimestamp(0, 1);
+    tablet.addValue("s1", 0, 1L);
+    tablet.addTimestamp(1, 2);
+    tablet.addValue("s1", 1, 2L);
+
+    try {
+      try (final TsFileWriter writer = new TsFileWriter(alignedTsFile)) {
+        writer.registerAlignedTimeseries(new PartialPath("root.sg.d"), 
schemaList);
+        writer.writeAligned(tablet);
+      }
+
+      try (final TsFileInsertionEventScanParser parser =
+          new TsFileInsertionEventScanParser(
+              alignedTsFile,
+              new PrefixTreePattern("root"),
+              Long.MIN_VALUE,
+              Long.MAX_VALUE,
+              null,
+              null,
+              false)) {
+        
Assert.assertTrue(getAllocatedChunkMemory(parser).getMemoryUsageInBytes() > 0);
+      }
+    } finally {
+      CommonDescriptor.getInstance()
+          .getConfig()
+          .setPipeMaxReaderChunkSize(originalPipeMaxReaderChunkSize);
+    }
+  }
+
   public void testToTabletInsertionEvents(final boolean isQuery) throws 
Exception {
     // Test empty chunk
     testMixedTsFileWithEmptyChunk(isQuery);
@@ -666,4 +708,12 @@ public class TsFileInsertionEventParserTest {
     }
     return count;
   }
+
+  private PipeMemoryBlock getAllocatedChunkMemory(final 
TsFileInsertionEventScanParser parser)
+      throws NoSuchFieldException, IllegalAccessException {
+    final Field field =
+        
TsFileInsertionEventScanParser.class.getDeclaredField("allocatedMemoryBlockForChunk");
+    field.setAccessible(true);
+    return (PipeMemoryBlock) field.get(parser);
+  }
 }

Reply via email to