This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch chunk-fix
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f6db62691aa0090cba34c3f4bdfde5cbe222faa9
Author: Caideyipi <[email protected]>
AuthorDate: Tue May 12 12:23:20 2026 +0800

    Fix
---
 .../scan/TsFileInsertionEventScanParser.java       | 56 ++++++++++++++--------
 .../pipe/event/TsFileInsertionEventParserTest.java | 21 ++++++++
 2 files changed, 57 insertions(+), 20 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
index afd4b7ca83d..faf96e538e7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
@@ -556,6 +556,7 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
         case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER:
           {
             Chunk chunk;
+            long currentValueChunkPageMemorySize = 0;
             if (Objects.isNull(firstChunk4NextSequentialValueChunks)) {
               final long currentChunkHeaderOffset = 
tsFileSequenceReader.position() - 1;
               chunkHeader = tsFileSequenceReader.readChunkHeader(marker);
@@ -579,6 +580,8 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
               chunk =
                   new Chunk(
                       chunkHeader, tsFileSequenceReader.readChunk(-1, 
chunkHeader.getDataSize()));
+              currentValueChunkPageMemorySize =
+                  calculatePageMemorySizeIfSinglePageValueChunk(chunk);
               boolean needReturn = false;
               final long timeChunkSize =
                   lastIndex >= 0
@@ -587,10 +590,6 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
                       : 0;
               final long timeChunkPageMemorySize =
                   lastIndex >= 0 ? timeChunkPageMemorySizeList.get(lastIndex) 
: 0;
-              final long chunkPageMemorySize =
-                  isSinglePageValueChunk(chunkHeader)
-                      ? 
SinglePageWholeChunkReader.calculatePageEstimatedMemoryUsageInBytes(chunk)
-                      : 0;
               if (lastIndex >= 0) {
                 if (valueIndex != lastIndex) {
                   needReturn = recordAlignedChunk(valueChunkList, marker);
@@ -600,19 +599,9 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
                   if (chunkSize + chunkHeader.getDataSize()
                           > 
allocatedMemoryBlockForChunk.getMemoryUsageInBytes()
                       || timeChunkPageMemorySize > 0
-                          && chunkPageMemorySize > 0
-                          && pageMemorySize + chunkPageMemorySize
+                          && currentValueChunkPageMemorySize > 0
+                          && pageMemorySize + currentValueChunkPageMemorySize
                               > getPageDataMemoryLimitInBytes()) {
-                    if (valueChunkList.size() == 1) {
-                      final long currentPageMemorySize =
-                          timeChunkPageMemorySize > 0 && 
valueChunkPageMemorySize > 0
-                              ? pageMemorySize
-                              : 0;
-                      if (currentPageMemorySize > 
getPageDataMemoryLimitInBytes()) {
-                        PipeDataNodeResourceManager.memory()
-                            .forceResize(allocatedMemoryBlockForBatchData, 
currentPageMemorySize);
-                      }
-                    }
                     needReturn = recordAlignedChunk(valueChunkList, marker);
                   }
                 }
@@ -623,18 +612,21 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
                 return;
               }
               
resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit(valueChunkList, 
chunkHeader);
+              resizePageDataMemoryBlockIfFirstValueChunkExceedsLimit(
+                  valueChunkList, currentValueChunkPageMemorySize);
             } else {
               chunk = firstChunk4NextSequentialValueChunks;
               chunkHeader = chunk.getHeader();
               firstChunk4NextSequentialValueChunks = null;
+              currentValueChunkPageMemorySize =
+                  calculatePageMemorySizeIfSinglePageValueChunk(chunk);
               
resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit(valueChunkList, 
chunkHeader);
+              resizePageDataMemoryBlockIfFirstValueChunkExceedsLimit(
+                  valueChunkList, currentValueChunkPageMemorySize);
             }
 
             valueChunkSize += chunkHeader.getDataSize();
-            if (isSinglePageValueChunk(chunkHeader)) {
-              valueChunkPageMemorySize +=
-                  
SinglePageWholeChunkReader.calculatePageEstimatedMemoryUsageInBytes(chunk);
-            }
+            valueChunkPageMemorySize += currentValueChunkPageMemorySize;
             valueChunkList.add(chunk);
             currentMeasurements.add(
                 new MeasurementSchema(
@@ -798,6 +790,30 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
     }
   }
 
+  private void resizePageDataMemoryBlockIfFirstValueChunkExceedsLimit(
+      final List<Chunk> valueChunkList, final long valueChunkPageMemorySize) {
+    if (!valueChunkList.isEmpty() || lastIndex < 0 || valueChunkPageMemorySize 
<= 0) {
+      return;
+    }
+
+    final long timeChunkPageMemorySize = 
timeChunkPageMemorySizeList.get(lastIndex);
+    if (timeChunkPageMemorySize <= 0) {
+      return;
+    }
+
+    final long pageMemorySize = timeChunkPageMemorySize + 
valueChunkPageMemorySize;
+    if (pageMemorySize > getPageDataMemoryLimitInBytes()) {
+      PipeDataNodeResourceManager.memory()
+          .forceResize(allocatedMemoryBlockForBatchData, pageMemorySize);
+    }
+  }
+
+  private long calculatePageMemorySizeIfSinglePageValueChunk(final Chunk 
chunk) throws IOException {
+    return isSinglePageValueChunk(chunk.getHeader())
+        ? 
SinglePageWholeChunkReader.calculatePageEstimatedMemoryUsageInBytes(chunk)
+        : 0;
+  }
+
   private boolean isSinglePageValueChunk(final ChunkHeader chunkHeader) {
     return (chunkHeader.getChunkType() & 0x3F) == 
MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER;
   }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
index 006473ebf77..8a19b25986a 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
@@ -213,7 +213,13 @@ public class TsFileInsertionEventParserTest {
   public void testScanParserResizesChunkMemoryForFirstAlignedValueChunk() 
throws Exception {
     final long originalPipeMaxReaderChunkSize =
         PipeConfig.getInstance().getPipeMaxReaderChunkSize();
+    final int originalPipeDataStructureTabletSizeInBytes =
+        PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes();
+    final int configuredBatchMemorySize = 1024 * 1024;
     CommonDescriptor.getInstance().getConfig().setPipeMaxReaderChunkSize(0);
+    CommonDescriptor.getInstance()
+        .getConfig()
+        .setPipeDataStructureTabletSizeInBytes(configuredBatchMemorySize);
 
     alignedTsFile = new File("single-aligned-value-chunk.tsfile");
     final List<IMeasurementSchema> schemaList = new ArrayList<>();
@@ -241,11 +247,18 @@ public class TsFileInsertionEventParserTest {
               null,
               false)) {
         
Assert.assertTrue(getAllocatedChunkMemory(parser).getMemoryUsageInBytes() > 0);
+        
Assert.assertTrue(getAllocatedBatchDataMemory(parser).getMemoryUsageInBytes() > 
0);
+        Assert.assertTrue(
+            getAllocatedBatchDataMemory(parser).getMemoryUsageInBytes()
+                < configuredBatchMemorySize);
       }
     } finally {
       CommonDescriptor.getInstance()
           .getConfig()
           .setPipeMaxReaderChunkSize(originalPipeMaxReaderChunkSize);
+      CommonDescriptor.getInstance()
+          .getConfig()
+          
.setPipeDataStructureTabletSizeInBytes(originalPipeDataStructureTabletSizeInBytes);
     }
   }
 
@@ -804,6 +817,14 @@ public class TsFileInsertionEventParserTest {
     return (PipeMemoryBlock) field.get(parser);
   }
 
+  private PipeMemoryBlock getAllocatedBatchDataMemory(final 
TsFileInsertionEventScanParser parser)
+      throws NoSuchFieldException, IllegalAccessException {
+    final Field field =
+        
TsFileInsertionEventScanParser.class.getDeclaredField("allocatedMemoryBlockForBatchData");
+    field.setAccessible(true);
+    return (PipeMemoryBlock) field.get(parser);
+  }
+
   private long calculatePipeMaxReaderChunkSizeForSinglePageAlignedChunk(final 
File tsFile)
       throws Exception {
     try (final TsFileSequenceReader reader = new 
TsFileSequenceReader(tsFile.getAbsolutePath())) {

Reply via email to