This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new e2ea8719da6 [To dev/1.3] Pipe: Fixed the OOM bug of parser for large 
aligned pages (#17639) (#17646)
e2ea8719da6 is described below

commit e2ea8719da6a8a8ae1b259cf574b9afc3bebdec7
Author: Caideyipi <[email protected]>
AuthorDate: Wed May 13 15:29:13 2026 +0800

    [To dev/1.3] Pipe: Fixed the OOM bug of parser for large aligned pages 
(#17639) (#17646)
    
    * Pipe: Fixed the OOM bug of parser for large aligned pages (#17639)
    
    * chunk
    
    * shop
    
    * Update TsFileInsertionEventScanParser.java
    
    * Fix
    
    * cp-ger
    
    * Update TsFileInsertionDataContainerTest.java
    
    * Update TsFileInsertionDataContainerTest.java
---
 .../scan/AlignedSinglePageWholeChunkReader.java    |  53 +++++++-
 .../container/scan/EstimatedMemoryChunkReader.java |  25 ++++
 .../container/scan/SinglePageWholeChunkReader.java |  27 +++-
 .../scan/TsFileInsertionScanDataContainer.java     | 124 +++++++++++++++---
 .../event/TsFileInsertionDataContainerTest.java    | 144 +++++++++++++++++++++
 5 files changed, 348 insertions(+), 25 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/AlignedSinglePageWholeChunkReader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/AlignedSinglePageWholeChunkReader.java
index 740a1523d27..a9de04cbe64 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/AlignedSinglePageWholeChunkReader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/AlignedSinglePageWholeChunkReader.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.pipe.event.common.tsfile.container.scan;
 
+import org.apache.tsfile.compress.IUnCompressor;
 import org.apache.tsfile.encoding.decoder.Decoder;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.file.header.ChunkHeader;
@@ -29,6 +30,7 @@ import org.apache.tsfile.read.common.TimeRange;
 import org.apache.tsfile.read.reader.chunk.AbstractChunkReader;
 import org.apache.tsfile.read.reader.chunk.ChunkReader;
 import org.apache.tsfile.read.reader.page.AlignedPageReader;
+import org.apache.tsfile.read.reader.page.LazyLoadPageData;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -40,7 +42,8 @@ import java.util.List;
  * The {@link AlignedSinglePageWholeChunkReader} is used to read a whole 
single page aligned chunk
  * with need to pass in the statistics.
  */
-public class AlignedSinglePageWholeChunkReader extends AbstractChunkReader {
+public class AlignedSinglePageWholeChunkReader extends AbstractChunkReader
+    implements EstimatedMemoryChunkReader {
 
   // chunk header of the time column
   private final ChunkHeader timeChunkHeader;
@@ -53,12 +56,15 @@ public class AlignedSinglePageWholeChunkReader extends 
AbstractChunkReader {
   private final List<ByteBuffer> valueChunkDataBufferList = new ArrayList<>();
   // deleted intervals of all the sub sensors
   private final List<List<TimeRange>> valueDeleteIntervalsList = new 
ArrayList<>();
+  private final long pageEstimatedMemoryUsageInBytes;
 
   public AlignedSinglePageWholeChunkReader(Chunk timeChunk, List<Chunk> 
valueChunkList)
       throws IOException {
     super(Long.MIN_VALUE, null);
     this.timeChunkHeader = timeChunk.getHeader();
     this.timeChunkDataBuffer = timeChunk.getData();
+    this.pageEstimatedMemoryUsageInBytes =
+        calculatePageEstimatedMemoryUsageInBytes(timeChunk, valueChunkList);
 
     valueChunkList.forEach(
         chunk -> {
@@ -124,7 +130,7 @@ public class AlignedSinglePageWholeChunkReader extends 
AbstractChunkReader {
         ChunkReader.deserializePageData(timePageHeader, timeChunkDataBuffer, 
timeChunkHeader);
 
     List<PageHeader> valuePageHeaderList = new ArrayList<>();
-    List<ByteBuffer> valuePageDataList = new ArrayList<>();
+    LazyLoadPageData[] valuePageDataArray = new 
LazyLoadPageData[rawValuePageHeaderList.size()];
     List<TSDataType> valueDataTypeList = new ArrayList<>();
     List<Decoder> valueDecoderList = new ArrayList<>();
 
@@ -135,15 +141,21 @@ public class AlignedSinglePageWholeChunkReader extends 
AbstractChunkReader {
       if (valuePageHeader == null || valuePageHeader.getUncompressedSize() == 
0) {
         // Empty Page
         valuePageHeaderList.add(null);
-        valuePageDataList.add(null);
+        valuePageDataArray[i] = null;
         valueDataTypeList.add(null);
         valueDecoderList.add(null);
       } else {
         ChunkHeader valueChunkHeader = valueChunkHeaderList.get(i);
+        int currentPagePosition = valueChunkDataBufferList.get(i).position();
+        valueChunkDataBufferList
+            .get(i)
+            .position(currentPagePosition + 
valuePageHeader.getCompressedSize());
         valuePageHeaderList.add(valuePageHeader);
-        valuePageDataList.add(
-            ChunkReader.deserializePageData(
-                valuePageHeader, valueChunkDataBufferList.get(i), 
valueChunkHeader));
+        valuePageDataArray[i] =
+            new LazyLoadPageData(
+                valueChunkDataBufferList.get(i).array(),
+                currentPagePosition,
+                
IUnCompressor.getUnCompressor(valueChunkHeader.getCompressionType()));
         valueDataTypeList.add(valueChunkHeader.getDataType());
         valueDecoderList.add(
             Decoder.getDecoderByType(
@@ -160,11 +172,38 @@ public class AlignedSinglePageWholeChunkReader extends 
AbstractChunkReader {
             timePageData,
             defaultTimeDecoder,
             valuePageHeaderList,
-            valuePageDataList,
+            valuePageDataArray,
             valueDataTypeList,
             valueDecoderList,
             queryFilter);
     alignedPageReader.setDeleteIntervalList(valueDeleteIntervalsList);
     return alignedPageReader;
   }
+
+  @Override
+  public long getCurrentPageEstimatedMemoryUsageInBytes() {
+    return pageEstimatedMemoryUsageInBytes;
+  }
+
+  public static long calculatePageEstimatedMemoryUsageInBytes(
+      final Chunk timeChunk, final List<Chunk> valueChunkList) throws 
IOException {
+    final ByteBuffer timeChunkDataBuffer = timeChunk.getData().duplicate();
+    long estimatedMemoryUsageInBytes =
+        PageHeader.deserializeFrom(timeChunkDataBuffer, (Statistics<? extends 
Serializable>) null)
+            .getUncompressedSize();
+
+    for (final Chunk valueChunk : valueChunkList) {
+      if (valueChunk == null) {
+        continue;
+      }
+
+      final ByteBuffer valueChunkDataBuffer = valueChunk.getData().duplicate();
+      estimatedMemoryUsageInBytes +=
+          PageHeader.deserializeFrom(
+                  valueChunkDataBuffer, (Statistics<? extends Serializable>) 
null)
+              .getUncompressedSize();
+    }
+
+    return estimatedMemoryUsageInBytes;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/EstimatedMemoryChunkReader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/EstimatedMemoryChunkReader.java
new file mode 100644
index 00000000000..dc1f2501bdf
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/EstimatedMemoryChunkReader.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.event.common.tsfile.container.scan;
+
+interface EstimatedMemoryChunkReader {
+
+  long getCurrentPageEstimatedMemoryUsageInBytes();
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/SinglePageWholeChunkReader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/SinglePageWholeChunkReader.java
index 3f1aadbba0c..ade50012903 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/SinglePageWholeChunkReader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/SinglePageWholeChunkReader.java
@@ -26,22 +26,25 @@ import org.apache.tsfile.file.header.PageHeader;
 import org.apache.tsfile.file.metadata.statistics.Statistics;
 import org.apache.tsfile.read.common.Chunk;
 import org.apache.tsfile.read.reader.chunk.AbstractChunkReader;
+import org.apache.tsfile.read.reader.page.LazyLoadPageData;
 import org.apache.tsfile.read.reader.page.PageReader;
 
 import java.io.IOException;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
 
-public class SinglePageWholeChunkReader extends AbstractChunkReader {
+public class SinglePageWholeChunkReader extends AbstractChunkReader
+    implements EstimatedMemoryChunkReader {
   private final ChunkHeader chunkHeader;
   private final ByteBuffer chunkDataBuffer;
+  private final long pageEstimatedMemoryUsageInBytes;
 
   public SinglePageWholeChunkReader(Chunk chunk) throws IOException {
     super(Long.MIN_VALUE, null);
 
     this.chunkHeader = chunk.getHeader();
     this.chunkDataBuffer = chunk.getData();
-
+    this.pageEstimatedMemoryUsageInBytes = 
calculatePageEstimatedMemoryUsageInBytes(chunk);
     initAllPageReaders();
   }
 
@@ -56,15 +59,33 @@ public class SinglePageWholeChunkReader extends 
AbstractChunkReader {
   }
 
   private PageReader constructPageReader(PageHeader pageHeader) throws 
IOException {
+    final int currentPagePosition = chunkDataBuffer.position();
+    chunkDataBuffer.position(currentPagePosition + 
pageHeader.getCompressedSize());
     return new PageReader(
         pageHeader,
-        deserializePageData(pageHeader, chunkDataBuffer, chunkHeader),
+        new LazyLoadPageData(
+            chunkDataBuffer.array(),
+            currentPagePosition,
+            IUnCompressor.getUnCompressor(chunkHeader.getCompressionType())),
         chunkHeader.getDataType(),
         Decoder.getDecoderByType(chunkHeader.getEncodingType(), 
chunkHeader.getDataType()),
         defaultTimeDecoder,
         null);
   }
 
+  @Override
+  public long getCurrentPageEstimatedMemoryUsageInBytes() {
+    return pageEstimatedMemoryUsageInBytes;
+  }
+
+  public static long calculatePageEstimatedMemoryUsageInBytes(final Chunk 
chunk)
+      throws IOException {
+    final ByteBuffer chunkDataBuffer = chunk.getData().duplicate();
+    final PageHeader pageHeader =
+        PageHeader.deserializeFrom(chunkDataBuffer, (Statistics<? extends 
Serializable>) null);
+    return pageHeader.getUncompressedSize();
+  }
+
   
/////////////////////////////////////////////////////////////////////////////////////////////////
   // util methods
   
/////////////////////////////////////////////////////////////////////////////////////////////////
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
index 07b91cd68af..9366d0f62df 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
@@ -98,10 +98,11 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
   // Cached time chunk
   private final List<Chunk> timeChunkList = new ArrayList<>();
   private final List<Boolean> isMultiPageList = new ArrayList<>();
+  private final List<Long> timeChunkPageMemorySizeList = new ArrayList<>();
 
   private final Map<String, Integer> measurementIndexMap = new HashMap<>();
   private int lastIndex = -1;
-  private ChunkHeader firstChunkHeader4NextSequentialValueChunks;
+  private Chunk firstChunk4NextSequentialValueChunks;
 
   private byte lastMarker = Byte.MIN_VALUE;
 
@@ -343,6 +344,7 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
       }
 
       do {
+        resizePageDataMemoryForCurrentPageIfNeeded();
         data = chunkReader.nextPageData();
         long size = PipeMemoryWeightUtil.calculateBatchDataRamBytesUsed(data);
         if (allocatedMemoryBlockForBatchData.getMemoryUsageInBytes() < size) {
@@ -352,6 +354,23 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
     } while (!data.hasCurrent());
   }
 
+  private void resizePageDataMemoryForCurrentPageIfNeeded() {
+    if (!(chunkReader instanceof EstimatedMemoryChunkReader)) {
+      return;
+    }
+
+    final long estimatedMemoryUsageInBytes =
+        ((EstimatedMemoryChunkReader) 
chunkReader).getCurrentPageEstimatedMemoryUsageInBytes();
+    resizePageDataMemoryIfNeeded(estimatedMemoryUsageInBytes);
+  }
+
+  private void resizePageDataMemoryIfNeeded(final long 
estimatedMemoryUsageInBytes) {
+    if (allocatedMemoryBlockForBatchData.getMemoryUsageInBytes() < 
estimatedMemoryUsageInBytes) {
+      PipeDataNodeResourceManager.memory()
+          .forceResize(allocatedMemoryBlockForBatchData, 
estimatedMemoryUsageInBytes);
+    }
+  }
+
   private boolean putValueToColumns(final BatchData data, final Tablet tablet, 
final int rowIndex) {
     final Object[] columns = tablet.values;
     boolean isNeedFillTime = false;
@@ -439,6 +458,7 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
   private void moveToNextChunkReader() throws IOException, 
IllegalStateException {
     ChunkHeader chunkHeader;
     long valueChunkSize = 0;
+    long valueChunkPageMemorySize = 0;
     final List<Chunk> valueChunkList = new ArrayList<>();
     currentMeasurements.clear();
     modsInfos.clear();
@@ -449,7 +469,12 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
     }
 
     byte marker;
-    while ((marker = lastMarker != Byte.MIN_VALUE ? lastMarker : 
tsFileSequenceReader.readMarker())
+    while ((marker =
+            lastMarker != Byte.MIN_VALUE
+                ? lastMarker
+                : Objects.nonNull(firstChunk4NextSequentialValueChunks)
+                    ? 
toValueChunkMarker(firstChunk4NextSequentialValueChunks.getHeader())
+                    : tsFileSequenceReader.readMarker())
         != MetaMarker.SEPARATOR) {
       lastMarker = Byte.MIN_VALUE;
       switch (marker) {
@@ -473,10 +498,17 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
 
             if ((chunkHeader.getChunkType() & TsFileConstant.TIME_COLUMN_MASK)
                 == TsFileConstant.TIME_COLUMN_MASK) {
-              timeChunkList.add(
+              final Chunk timeChunk =
                   new Chunk(
-                      chunkHeader, tsFileSequenceReader.readChunk(-1, 
chunkHeader.getDataSize())));
-              isMultiPageList.add(marker == MetaMarker.TIME_CHUNK_HEADER);
+                      chunkHeader, tsFileSequenceReader.readChunk(-1, 
chunkHeader.getDataSize()));
+              final boolean isMultiPage = marker == 
MetaMarker.TIME_CHUNK_HEADER;
+              timeChunkList.add(timeChunk);
+              isMultiPageList.add(isMultiPage);
+              timeChunkPageMemorySizeList.add(
+                  isMultiPage
+                      ? 0
+                      : 
SinglePageWholeChunkReader.calculatePageEstimatedMemoryUsageInBytes(
+                          timeChunk));
               break;
             }
 
@@ -540,8 +572,10 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
         case MetaMarker.VALUE_CHUNK_HEADER:
         case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER:
           {
-            if (Objects.isNull(firstChunkHeader4NextSequentialValueChunks)) {
-              long currentChunkHeaderOffset = tsFileSequenceReader.position() 
- 1;
+            Chunk chunk;
+            long currentValueChunkPageMemorySize = 0;
+            if (Objects.isNull(firstChunk4NextSequentialValueChunks)) {
+              final long currentChunkHeaderOffset = 
tsFileSequenceReader.position() - 1;
               chunkHeader = tsFileSequenceReader.readChunkHeader(marker);
 
               final long nextMarkerOffset =
@@ -592,40 +626,56 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
               if (chunkHeader.getDataSize() == 0) {
                 break;
               }
+              chunk =
+                  new Chunk(
+                      chunkHeader, tsFileSequenceReader.readChunk(-1, 
chunkHeader.getDataSize()));
+              currentValueChunkPageMemorySize =
+                  calculatePageMemorySizeIfSinglePageValueChunk(chunk);
               boolean needReturn = false;
               final long timeChunkSize =
                   lastIndex >= 0
                       ? PipeMemoryWeightUtil.calculateChunkRamBytesUsed(
                           timeChunkList.get(lastIndex))
                       : 0;
+              final long timeChunkPageMemorySize =
+                  lastIndex >= 0 ? timeChunkPageMemorySizeList.get(lastIndex) 
: 0;
               if (lastIndex >= 0) {
                 if (valueIndex != lastIndex) {
                   needReturn = recordAlignedChunk(valueChunkList, marker);
                 } else {
                   final long chunkSize = timeChunkSize + valueChunkSize;
+                  final long pageMemorySize = timeChunkPageMemorySize + 
valueChunkPageMemorySize;
                   if (chunkSize + chunkHeader.getDataSize()
-                      > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
+                          > 
allocatedMemoryBlockForChunk.getMemoryUsageInBytes()
+                      || timeChunkPageMemorySize > 0
+                          && currentValueChunkPageMemorySize > 0
+                          && pageMemorySize + currentValueChunkPageMemorySize
+                              > getPageDataMemoryLimitInBytes()) {
                     needReturn = recordAlignedChunk(valueChunkList, marker);
                   }
                 }
               }
               lastIndex = valueIndex;
               if (needReturn) {
-                firstChunkHeader4NextSequentialValueChunks = chunkHeader;
+                firstChunk4NextSequentialValueChunks = chunk;
                 return;
               }
               
resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit(valueChunkList, 
chunkHeader);
+              resizePageDataMemoryBlockIfFirstValueChunkExceedsLimit(
+                  valueChunkList, currentValueChunkPageMemorySize);
             } else {
-              chunkHeader = firstChunkHeader4NextSequentialValueChunks;
-              firstChunkHeader4NextSequentialValueChunks = null;
+              chunk = firstChunk4NextSequentialValueChunks;
+              chunkHeader = chunk.getHeader();
+              firstChunk4NextSequentialValueChunks = null;
+              currentValueChunkPageMemorySize =
+                  calculatePageMemorySizeIfSinglePageValueChunk(chunk);
               
resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit(valueChunkList, 
chunkHeader);
+              resizePageDataMemoryBlockIfFirstValueChunkExceedsLimit(
+                  valueChunkList, currentValueChunkPageMemorySize);
             }
 
-            Chunk chunk =
-                new Chunk(
-                    chunkHeader, tsFileSequenceReader.readChunk(-1, 
chunkHeader.getDataSize()));
-
             valueChunkSize += chunkHeader.getDataSize();
+            valueChunkPageMemorySize += currentValueChunkPageMemorySize;
             valueChunkList.add(chunk);
             currentMeasurements.add(
                 new MeasurementSchema(chunkHeader.getMeasurementID(), 
chunkHeader.getDataType()));
@@ -649,6 +699,7 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
             lastIndex = -1;
             timeChunkList.clear();
             isMultiPageList.clear();
+            timeChunkPageMemorySizeList.clear();
             measurementIndexMap.clear();
 
             currentDevice = pattern.mayOverlapWithDevice(deviceID) ? deviceID 
: null;
@@ -670,12 +721,21 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
     }
   }
 
+  private long getPageDataMemoryLimitInBytes() {
+    return PipeConfig.getInstance().getPipeMaxReaderChunkSize();
+  }
+
   private boolean recordAlignedChunk(final List<Chunk> valueChunkList, final 
byte marker)
       throws IOException {
     if (!valueChunkList.isEmpty()) {
       final Chunk timeChunk = timeChunkList.get(lastIndex);
       timeChunk.getData().rewind();
       currentIsMultiPage = isMultiPageList.get(lastIndex);
+      if (!currentIsMultiPage) {
+        resizePageDataMemoryIfNeeded(
+            
AlignedSinglePageWholeChunkReader.calculatePageEstimatedMemoryUsageInBytes(
+                timeChunk, valueChunkList));
+      }
       chunkReader =
           currentIsMultiPage
               ? new AlignedChunkReader(timeChunk, valueChunkList, filter)
@@ -701,6 +761,40 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
     }
   }
 
+  private void resizePageDataMemoryBlockIfFirstValueChunkExceedsLimit(
+      final List<Chunk> valueChunkList, final long valueChunkPageMemorySize) {
+    if (!valueChunkList.isEmpty() || lastIndex < 0 || valueChunkPageMemorySize 
<= 0) {
+      return;
+    }
+
+    final long timeChunkPageMemorySize = 
timeChunkPageMemorySizeList.get(lastIndex);
+    if (timeChunkPageMemorySize <= 0) {
+      return;
+    }
+
+    final long pageMemorySize = timeChunkPageMemorySize + 
valueChunkPageMemorySize;
+    if (pageMemorySize > getPageDataMemoryLimitInBytes()) {
+      PipeDataNodeResourceManager.memory()
+          .forceResize(allocatedMemoryBlockForBatchData, pageMemorySize);
+    }
+  }
+
+  private long calculatePageMemorySizeIfSinglePageValueChunk(final Chunk 
chunk) throws IOException {
+    return isSinglePageValueChunk(chunk.getHeader())
+        ? 
SinglePageWholeChunkReader.calculatePageEstimatedMemoryUsageInBytes(chunk)
+        : 0;
+  }
+
+  private boolean isSinglePageValueChunk(final ChunkHeader chunkHeader) {
+    return (chunkHeader.getChunkType() & 0x3F) == 
MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER;
+  }
+
+  private byte toValueChunkMarker(final ChunkHeader chunkHeader) {
+    return isSinglePageValueChunk(chunkHeader)
+        ? MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER
+        : MetaMarker.VALUE_CHUNK_HEADER;
+  }
+
   @Override
   public void close() {
     super.close();
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
index 51fc1f3a556..7fe514b277e 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
@@ -26,20 +26,29 @@ import 
org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.PrefixPipePattern;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.container.query.TsFileInsertionQueryDataContainer;
+import 
org.apache.iotdb.db.pipe.event.common.tsfile.container.scan.AlignedSinglePageWholeChunkReader;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.container.scan.TsFileInsertionScanDataContainer;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionTestFileWriter;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
 import org.apache.iotdb.pipe.api.access.Row;
 
 import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.common.conf.TSFileDescriptor;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.file.MetaMarker;
+import org.apache.tsfile.file.metadata.AlignedChunkMetadata;
+import org.apache.tsfile.file.metadata.ChunkMetadata;
+import org.apache.tsfile.file.metadata.IChunkMetadata;
+import org.apache.tsfile.file.metadata.IDeviceID;
 import org.apache.tsfile.file.metadata.PlainDeviceID;
 import org.apache.tsfile.file.metadata.enums.CompressionType;
 import org.apache.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.tsfile.read.TsFileSequenceReader;
+import org.apache.tsfile.read.common.Chunk;
 import org.apache.tsfile.read.common.Path;
 import org.apache.tsfile.read.common.TimeRange;
 import org.apache.tsfile.utils.Binary;
@@ -121,11 +130,96 @@ public class TsFileInsertionDataContainerTest {
     System.out.println(System.currentTimeMillis() - startTime);
   }
 
+  @Test
+  public void testScanParserSplitAlignedSinglePageChunkByEstimatedPageMemory() 
throws Exception {
+    final long originalPipeMaxReaderChunkSize =
+        CommonDescriptor.getInstance().getConfig().getPipeMaxReaderChunkSize();
+    final int originalPageSizeInByte =
+        TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+    final int originalMaxNumberOfPointsInPage =
+        
TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
+
+    try {
+      TSFileDescriptor.getInstance().getConfig().setPageSizeInByte(64 * 1024);
+      
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(10000);
+
+      final int measurementCount = 16;
+      final int rowCount = 64;
+      final List<MeasurementSchema> schemaList = new ArrayList<>();
+      for (int i = 0; i < measurementCount; ++i) {
+        schemaList.add(
+            new MeasurementSchema(
+                "s" + i, TSDataType.STRING, TSEncoding.PLAIN, 
CompressionType.LZ4));
+      }
+
+      alignedTsFile = new File("aligned-single-page-high-compression.tsfile");
+      final Tablet tablet = new Tablet("root.sg.d", schemaList, rowCount);
+      final Binary value =
+          new Binary(new String(new char[512]).replace('\0', 'a'), 
TSFileConfig.STRING_CHARSET);
+      for (int row = 0; row < rowCount; ++row) {
+        tablet.addTimestamp(row, row);
+        for (int measurementIndex = 0; measurementIndex < measurementCount; 
++measurementIndex) {
+          tablet.addValue("s" + measurementIndex, row, value);
+        }
+      }
+      tablet.rowSize = rowCount;
+
+      try (final TsFileWriter writer = new TsFileWriter(alignedTsFile)) {
+        writer.registerAlignedTimeseries(new Path("root.sg.d"), schemaList);
+        writer.writeAligned(tablet);
+      }
+
+      CommonDescriptor.getInstance()
+          .getConfig()
+          .setPipeMaxReaderChunkSize(
+              
calculatePipeMaxReaderChunkSizeForSinglePageAlignedChunk(alignedTsFile));
+
+      int tabletCount = 0;
+      int maxMeasurementCount = 0;
+      int pointCount = 0;
+      try (final TsFileInsertionScanDataContainer parser =
+          new TsFileInsertionScanDataContainer(
+              alignedTsFile,
+              new PrefixPipePattern("root"),
+              Long.MIN_VALUE,
+              Long.MAX_VALUE,
+              null,
+              null,
+              false)) {
+        for (final Pair<Tablet, Boolean> tabletWithIsAligned : 
parser.toTabletWithIsAligneds()) {
+          Assert.assertTrue(tabletWithIsAligned.getRight());
+          final Tablet parsedTablet = tabletWithIsAligned.getLeft();
+          tabletCount++;
+          maxMeasurementCount = Math.max(maxMeasurementCount, 
parsedTablet.getSchemas().size());
+          pointCount += getNonNullSize(parsedTablet);
+        }
+      }
+
+      Assert.assertTrue(tabletCount > 1);
+      Assert.assertTrue(maxMeasurementCount < measurementCount);
+      Assert.assertEquals(measurementCount * rowCount, pointCount);
+    } finally {
+      CommonDescriptor.getInstance()
+          .getConfig()
+          .setPipeMaxReaderChunkSize(originalPipeMaxReaderChunkSize);
+      
TSFileDescriptor.getInstance().getConfig().setPageSizeInByte(originalPageSizeInByte);
+      TSFileDescriptor.getInstance()
+          .getConfig()
+          .setMaxNumberOfPointsInPage(originalMaxNumberOfPointsInPage);
+    }
+  }
+
   @Test
   public void testScanParserResizesChunkMemoryForFirstAlignedValueChunk() 
throws Exception {
     final long originalPipeMaxReaderChunkSize =
         PipeConfig.getInstance().getPipeMaxReaderChunkSize();
+    final int originalPipeDataStructureTabletSizeInBytes =
+        PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes();
+    final int configuredBatchMemorySize = 1024 * 1024;
     CommonDescriptor.getInstance().getConfig().setPipeMaxReaderChunkSize(0);
+    CommonDescriptor.getInstance()
+        .getConfig()
+        .setPipeDataStructureTabletSizeInBytes(configuredBatchMemorySize);
 
     alignedTsFile = new File("single-aligned-value-chunk.tsfile");
     final List<MeasurementSchema> schemaList = new ArrayList<>();
@@ -154,11 +248,18 @@ public class TsFileInsertionDataContainerTest {
               null,
               false)) {
         
Assert.assertTrue(getAllocatedChunkMemory(parser).getMemoryUsageInBytes() > 0);
+        
Assert.assertTrue(getAllocatedBatchDataMemory(parser).getMemoryUsageInBytes() > 
0);
+        Assert.assertTrue(
+            getAllocatedBatchDataMemory(parser).getMemoryUsageInBytes()
+                < configuredBatchMemorySize);
       }
     } finally {
       CommonDescriptor.getInstance()
           .getConfig()
           .setPipeMaxReaderChunkSize(originalPipeMaxReaderChunkSize);
+      CommonDescriptor.getInstance()
+          .getConfig()
+          
.setPipeDataStructureTabletSizeInBytes(originalPipeDataStructureTabletSizeInBytes);
     }
   }
 
@@ -696,4 +797,47 @@ public class TsFileInsertionDataContainerTest {
     field.setAccessible(true);
     return (PipeMemoryBlock) field.get(parser);
   }
+
+  private PipeMemoryBlock getAllocatedBatchDataMemory(final 
TsFileInsertionScanDataContainer parser)
+      throws NoSuchFieldException, IllegalAccessException {
+    final Field field =
+        
TsFileInsertionScanDataContainer.class.getDeclaredField("allocatedMemoryBlockForBatchData");
+    field.setAccessible(true);
+    return (PipeMemoryBlock) field.get(parser);
+  }
+
+  private long calculatePipeMaxReaderChunkSizeForSinglePageAlignedChunk(final 
File tsFile)
+      throws Exception {
+    try (final TsFileSequenceReader reader = new 
TsFileSequenceReader(tsFile.getAbsolutePath())) {
+      final List<IDeviceID> deviceIDList = reader.getAllDevices();
+      Assert.assertEquals(1, deviceIDList.size());
+      final IDeviceID deviceID = deviceIDList.get(0);
+      final List<AlignedChunkMetadata> alignedChunkMetadataList =
+          reader.getAlignedChunkMetadata(deviceID);
+      Assert.assertEquals(1, alignedChunkMetadataList.size());
+
+      final AlignedChunkMetadata alignedChunkMetadata = 
alignedChunkMetadataList.get(0);
+      final Chunk timeChunk =
+          reader.readMemChunk((ChunkMetadata) 
alignedChunkMetadata.getTimeChunkMetadata());
+      Assert.assertEquals(
+          MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER, 
timeChunk.getHeader().getChunkType() & 0x3F);
+
+      final List<Chunk> valueChunkList = new ArrayList<>();
+      long chunkSizeLimit = 
PipeMemoryWeightUtil.calculateChunkRamBytesUsed(timeChunk);
+      for (final IChunkMetadata valueChunkMetadata :
+          alignedChunkMetadata.getValueChunkMetadataList()) {
+        final Chunk valueChunk = reader.readMemChunk((ChunkMetadata) 
valueChunkMetadata);
+        Assert.assertEquals(
+            MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER, 
valueChunk.getHeader().getChunkType() & 0x3F);
+        valueChunkList.add(valueChunk);
+        chunkSizeLimit += valueChunk.getHeader().getDataSize();
+      }
+
+      final long estimatedPageMemorySize =
+          
AlignedSinglePageWholeChunkReader.calculatePageEstimatedMemoryUsageInBytes(
+              timeChunk, valueChunkList);
+      Assert.assertTrue(estimatedPageMemorySize > chunkSizeLimit);
+      return chunkSizeLimit;
+    }
+  }
 }

Reply via email to