This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 59b79efb17e Pipe: account page decode memory in scan parser (#17807)
59b79efb17e is described below

commit 59b79efb17efcbef4733cab107a15c58f18968ac
Author: Caideyipi <[email protected]>
AuthorDate: Wed Jun 3 15:13:46 2026 +0800

    Pipe: account page decode memory in scan parser (#17807)
    
    * Pipe: account page decode memory in scan parser
    
    * Fix pipe scan parser single page row count
    
    * Fix pipe scan parser page memory test
---
 .../scan/AlignedSinglePageWholeChunkReader.java    |  55 +++++-
 .../parser/scan/MemoryControlledChunkReader.java   |  76 ++++++++
 .../parser/scan/SinglePageWholeChunkReader.java    | 193 +++++++++++++++++-
 .../scan/TsFileInsertionEventScanParser.java       |  53 +++--
 .../pipe/event/TsFileInsertionEventParserTest.java | 215 +++++++++++++++++++++
 5 files changed, 568 insertions(+), 24 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/AlignedSinglePageWholeChunkReader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/AlignedSinglePageWholeChunkReader.java
index ec6ed5f2da9..f0135f84cb7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/AlignedSinglePageWholeChunkReader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/AlignedSinglePageWholeChunkReader.java
@@ -39,6 +39,7 @@ import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
 import java.util.function.LongConsumer;
 
 /**
@@ -71,7 +72,7 @@ public class AlignedSinglePageWholeChunkReader extends 
AbstractChunkReader
     this.timeChunkDataBuffer = timeChunk.getData();
     this.encryptParam = timeChunk.getEncryptParam();
     this.pageEstimatedMemoryUsageInBytes =
-        calculatePageEstimatedMemoryUsageInBytes(timeChunk, valueChunkList);
+        calculateMaxPageEstimatedMemoryUsageInBytesWithBatchData(timeChunk, 
valueChunkList);
 
     valueChunkList.forEach(
         chunk -> {
@@ -216,4 +217,56 @@ public class AlignedSinglePageWholeChunkReader extends 
AbstractChunkReader
 
     return estimatedMemoryUsageInBytes;
   }
+
+  public static long calculateMaxPageEstimatedMemoryUsageInBytesWithBatchData(
+      final Chunk timeChunk, final List<Chunk> valueChunkList) throws 
IOException {
+    final List<Long> pageEstimatedMemoryUsageInBytesList =
+        calculatePageEstimatedMemoryUsageInBytesWithBatchDataList(timeChunk, 
valueChunkList);
+    return pageEstimatedMemoryUsageInBytesList.isEmpty()
+        ? 0
+        : pageEstimatedMemoryUsageInBytesList.get(0);
+  }
+
+  public static List<Long> 
calculatePageEstimatedMemoryUsageInBytesWithBatchDataList(
+      final Chunk timeChunk, final List<Chunk> valueChunkList) throws 
IOException {
+    final ByteBuffer timeChunkDataBuffer = timeChunk.getData().duplicate();
+    final List<ByteBuffer> valueChunkDataBufferList = new 
ArrayList<>(valueChunkList.size());
+    for (final Chunk valueChunk : valueChunkList) {
+      valueChunkDataBufferList.add(
+          Objects.isNull(valueChunk) ? null : 
valueChunk.getData().duplicate());
+    }
+
+    final List<Long> pageEstimatedMemoryUsageInBytesList = new ArrayList<>();
+    while (timeChunkDataBuffer.remaining() > 0) {
+      long pageUncompressedSizeInBytes = 0;
+      final PageHeader timePageHeader =
+          SinglePageWholeChunkReader.deserializePageHeader(
+              timeChunkDataBuffer, timeChunk.getHeader());
+      pageUncompressedSizeInBytes += timePageHeader.getUncompressedSize();
+      SinglePageWholeChunkReader.skipCompressedPageData(timeChunkDataBuffer, 
timePageHeader);
+
+      final List<TSDataType> valueDataTypeList = new 
ArrayList<>(valueChunkList.size());
+      for (int i = 0; i < valueChunkList.size(); ++i) {
+        final Chunk valueChunk = valueChunkList.get(i);
+        final ByteBuffer valueChunkDataBuffer = 
valueChunkDataBufferList.get(i);
+        if (Objects.isNull(valueChunk) || 
Objects.isNull(valueChunkDataBuffer)) {
+          valueDataTypeList.add(null);
+          continue;
+        }
+
+        final PageHeader valuePageHeader =
+            SinglePageWholeChunkReader.deserializePageHeader(
+                valueChunkDataBuffer, valueChunk.getHeader());
+        pageUncompressedSizeInBytes += valuePageHeader.getUncompressedSize();
+        valueDataTypeList.add(valueChunk.getHeader().getDataType());
+        
SinglePageWholeChunkReader.skipCompressedPageData(valueChunkDataBuffer, 
valuePageHeader);
+      }
+      pageEstimatedMemoryUsageInBytesList.add(
+          
SinglePageWholeChunkReader.estimatePageMemoryUsageInBytesWithBatchData(
+              pageUncompressedSizeInBytes,
+              SinglePageWholeChunkReader.getPageRowCount(timePageHeader, 
timeChunk),
+              valueDataTypeList));
+    }
+    return 
SinglePageWholeChunkReader.toSuffixMaxList(pageEstimatedMemoryUsageInBytesList);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/MemoryControlledChunkReader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/MemoryControlledChunkReader.java
new file mode 100644
index 00000000000..105bd5e8e33
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/MemoryControlledChunkReader.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.event.common.tsfile.parser.scan;
+
+import org.apache.tsfile.read.common.BatchData;
+import org.apache.tsfile.read.reader.IChunkReader;
+import org.apache.tsfile.read.reader.IPageReader;
+
+import java.io.IOException;
+import java.util.List;
+
+class MemoryControlledChunkReader implements IChunkReader, 
EstimatedMemoryChunkReader {
+
+  private final IChunkReader delegate;
+  private final List<Long> pageEstimatedMemoryUsageInBytesList;
+  private int pageIndex;
+
+  MemoryControlledChunkReader(
+      final IChunkReader delegate, final List<Long> 
pageEstimatedMemoryUsageInBytesList) {
+    this.delegate = delegate;
+    this.pageEstimatedMemoryUsageInBytesList = 
pageEstimatedMemoryUsageInBytesList;
+  }
+
+  @Override
+  public long getCurrentPageEstimatedMemoryUsageInBytes() {
+    return pageIndex < pageEstimatedMemoryUsageInBytesList.size()
+        ? pageEstimatedMemoryUsageInBytesList.get(pageIndex)
+        : 0;
+  }
+
+  @Override
+  public boolean hasNextSatisfiedPage() throws IOException {
+    return delegate.hasNextSatisfiedPage();
+  }
+
+  @Override
+  public BatchData nextPageData() throws IOException {
+    try {
+      return delegate.nextPageData();
+    } finally {
+      ++pageIndex;
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    delegate.close();
+  }
+
+  @Override
+  public void markDataTypeModifiedAndCannotUseStatistics() {
+    delegate.markDataTypeModifiedAndCannotUseStatistics();
+  }
+
+  @Override
+  public List<IPageReader> loadPageReaderList() throws IOException {
+    return delegate.loadPageReaderList();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/SinglePageWholeChunkReader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/SinglePageWholeChunkReader.java
index 2bfa70f1dd0..4d8b35bac4a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/SinglePageWholeChunkReader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/SinglePageWholeChunkReader.java
@@ -25,6 +25,8 @@ import org.apache.tsfile.compress.IUnCompressor;
 import org.apache.tsfile.encoding.decoder.Decoder;
 import org.apache.tsfile.encrypt.EncryptParameter;
 import org.apache.tsfile.encrypt.IDecryptor;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.MetaMarker;
 import org.apache.tsfile.file.header.ChunkHeader;
 import org.apache.tsfile.file.header.PageHeader;
 import org.apache.tsfile.file.metadata.enums.EncryptionType;
@@ -33,10 +35,15 @@ import org.apache.tsfile.read.common.Chunk;
 import org.apache.tsfile.read.reader.chunk.AbstractChunkReader;
 import org.apache.tsfile.read.reader.page.LazyLoadPageData;
 import org.apache.tsfile.read.reader.page.PageReader;
+import org.apache.tsfile.utils.RamUsageEstimator;
 
 import java.io.IOException;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
 
 import static 
org.apache.tsfile.file.metadata.enums.CompressionType.UNCOMPRESSED;
 
@@ -53,7 +60,8 @@ public class SinglePageWholeChunkReader extends 
AbstractChunkReader
     this.chunkHeader = chunk.getHeader();
     this.chunkDataBuffer = chunk.getData();
     this.encryptParam = chunk.getEncryptParam();
-    this.pageEstimatedMemoryUsageInBytes = 
calculatePageEstimatedMemoryUsageInBytes(chunk);
+    this.pageEstimatedMemoryUsageInBytes =
+        calculateMaxPageEstimatedMemoryUsageInBytesWithBatchData(chunk);
     initAllPageReaders();
   }
 
@@ -91,11 +99,190 @@ public class SinglePageWholeChunkReader extends 
AbstractChunkReader
   public static long calculatePageEstimatedMemoryUsageInBytes(final Chunk 
chunk)
       throws IOException {
     final ByteBuffer chunkDataBuffer = chunk.getData().duplicate();
-    final PageHeader pageHeader =
-        PageHeader.deserializeFrom(chunkDataBuffer, (Statistics<? extends 
Serializable>) null);
+    final PageHeader pageHeader = deserializePageHeader(chunkDataBuffer, 
chunk.getHeader());
     return pageHeader.getUncompressedSize();
   }
 
+  public static long calculateMaxPageEstimatedMemoryUsageInBytes(final Chunk 
chunk)
+      throws IOException {
+    final ByteBuffer chunkDataBuffer = chunk.getData().duplicate();
+    long maxPageEstimatedMemoryUsageInBytes = 0;
+    while (chunkDataBuffer.remaining() > 0) {
+      final PageHeader pageHeader = deserializePageHeader(chunkDataBuffer, 
chunk.getHeader());
+      maxPageEstimatedMemoryUsageInBytes =
+          Math.max(maxPageEstimatedMemoryUsageInBytes, 
pageHeader.getUncompressedSize());
+      skipCompressedPageData(chunkDataBuffer, pageHeader);
+    }
+    return maxPageEstimatedMemoryUsageInBytes;
+  }
+
+  public static long 
calculateMaxPageEstimatedMemoryUsageInBytesWithBatchData(final Chunk chunk)
+      throws IOException {
+    final List<Long> pageEstimatedMemoryUsageInBytesList =
+        calculatePageEstimatedMemoryUsageInBytesWithBatchDataList(chunk);
+    return pageEstimatedMemoryUsageInBytesList.isEmpty()
+        ? 0
+        : pageEstimatedMemoryUsageInBytesList.get(0);
+  }
+
+  public static List<Long> 
calculatePageEstimatedMemoryUsageInBytesWithBatchDataList(
+      final Chunk chunk) throws IOException {
+    final ByteBuffer chunkDataBuffer = chunk.getData().duplicate();
+    final List<Long> pageEstimatedMemoryUsageInBytesList = new ArrayList<>();
+    while (chunkDataBuffer.remaining() > 0) {
+      final PageHeader pageHeader = deserializePageHeader(chunkDataBuffer, 
chunk.getHeader());
+      pageEstimatedMemoryUsageInBytesList.add(
+          estimatePageMemoryUsageInBytesWithBatchData(
+              pageHeader, chunk, 
Collections.singletonList(chunk.getHeader().getDataType())));
+      skipCompressedPageData(chunkDataBuffer, pageHeader);
+    }
+    return toSuffixMaxList(pageEstimatedMemoryUsageInBytesList);
+  }
+
+  static PageHeader deserializePageHeader(
+      final ByteBuffer chunkDataBuffer, final ChunkHeader chunkHeader) throws 
IOException {
+    return isSinglePageChunk(chunkHeader)
+        ? PageHeader.deserializeFrom(chunkDataBuffer, (Statistics<? extends 
Serializable>) null)
+        : PageHeader.deserializeFrom(chunkDataBuffer, 
chunkHeader.getDataType());
+  }
+
+  static boolean isSinglePageChunk(final ChunkHeader chunkHeader) {
+    return (chunkHeader.getChunkType() & 0x3F) == 
MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER;
+  }
+
+  static void skipCompressedPageData(
+      final ByteBuffer chunkDataBuffer, final PageHeader pageHeader) {
+    chunkDataBuffer.position(chunkDataBuffer.position() + 
pageHeader.getCompressedSize());
+  }
+
+  static List<Long> toSuffixMaxList(final List<Long> 
pageEstimatedMemoryUsageInBytesList) {
+    long suffixMaxPageEstimatedMemoryUsageInBytes = 0;
+    for (int i = pageEstimatedMemoryUsageInBytesList.size() - 1; i >= 0; --i) {
+      suffixMaxPageEstimatedMemoryUsageInBytes =
+          Math.max(
+              suffixMaxPageEstimatedMemoryUsageInBytes, 
pageEstimatedMemoryUsageInBytesList.get(i));
+      pageEstimatedMemoryUsageInBytesList.set(i, 
suffixMaxPageEstimatedMemoryUsageInBytes);
+    }
+    return pageEstimatedMemoryUsageInBytesList;
+  }
+
+  static long estimatePageMemoryUsageInBytesWithBatchData(
+      final PageHeader timePageHeader,
+      final Chunk timeChunk,
+      final List<TSDataType> valueDataTypeList) {
+    return estimatePageMemoryUsageInBytesWithBatchData(
+        timePageHeader.getUncompressedSize(),
+        getPageRowCount(timePageHeader, timeChunk),
+        valueDataTypeList);
+  }
+
+  static int getPageRowCount(final PageHeader pageHeader, final Chunk chunk) {
+    if (isSinglePageChunk(chunk.getHeader())) {
+      return Objects.isNull(chunk.getChunkStatistic())
+          ? 0
+          : saturateToInt(chunk.getChunkStatistic().getCount());
+    }
+    return saturateToInt(pageHeader.getNumOfValues());
+  }
+
+  private static int saturateToInt(final long value) {
+    return (int) Math.min(Integer.MAX_VALUE, value);
+  }
+
+  static long estimatePageMemoryUsageInBytesWithBatchData(
+      final long pageUncompressedSizeInBytes,
+      final int rowCount,
+      final List<TSDataType> valueDataTypeList) {
+    return pageUncompressedSizeInBytes
+        + estimateBatchDataMemoryUsageInBytes(rowCount, valueDataTypeList);
+  }
+
+  private static long estimateBatchDataMemoryUsageInBytes(
+      final int rowCount, final List<TSDataType> valueDataTypeList) {
+    final int valueCount = valueDataTypeList.size();
+    final long segmentCount = Math.max(1, (rowCount + 15L) / 16);
+    long estimatedMemoryUsageInBytes = RamUsageEstimator.sizeOfLongArray(16) * 
segmentCount;
+
+    if (valueCount == 1) {
+      estimatedMemoryUsageInBytes +=
+          estimateSingleValueArrayMemoryUsageInBytes(rowCount, 
valueDataTypeList.get(0));
+    } else if (valueCount > 1) {
+      estimatedMemoryUsageInBytes += RamUsageEstimator.sizeOfObjectArray(16) * 
segmentCount;
+      estimatedMemoryUsageInBytes +=
+          (long) rowCount
+              * (RamUsageEstimator.sizeOfObjectArray(valueCount)
+                  + estimateVectorValueMemoryUsageInBytes(valueDataTypeList));
+    }
+
+    return estimatedMemoryUsageInBytes;
+  }
+
+  private static long estimateSingleValueArrayMemoryUsageInBytes(
+      final int rowCount, final TSDataType dataType) {
+    final long segmentCount = Math.max(1, (rowCount + 15L) / 16);
+    if (Objects.isNull(dataType)) {
+      return 0;
+    }
+
+    switch (dataType) {
+      case BOOLEAN:
+        return RamUsageEstimator.sizeOfBooleanArray(16) * segmentCount;
+      case INT32:
+      case DATE:
+        return RamUsageEstimator.sizeOfIntArray(16) * segmentCount;
+      case INT64:
+      case TIMESTAMP:
+        return RamUsageEstimator.sizeOfLongArray(16) * segmentCount;
+      case FLOAT:
+        return RamUsageEstimator.sizeOfFloatArray(16) * segmentCount;
+      case DOUBLE:
+        return RamUsageEstimator.sizeOfDoubleArray(16) * segmentCount;
+      case TEXT:
+      case BLOB:
+      case STRING:
+        return RamUsageEstimator.sizeOfObjectArray(16) * segmentCount;
+      default:
+        return 0;
+    }
+  }
+
+  private static long estimateVectorValueMemoryUsageInBytes(
+      final List<TSDataType> valueDataTypeList) {
+    long estimatedMemoryUsageInBytes = 0;
+    for (final TSDataType dataType : valueDataTypeList) {
+      if (Objects.isNull(dataType)) {
+        continue;
+      }
+
+      estimatedMemoryUsageInBytes +=
+          RamUsageEstimator.alignObjectSize(
+              RamUsageEstimator.NUM_BYTES_OBJECT_HEADER
+                  + estimateTsPrimitiveTypeValueMemoryUsageInBytes(dataType));
+    }
+    return estimatedMemoryUsageInBytes;
+  }
+
+  private static long estimateTsPrimitiveTypeValueMemoryUsageInBytes(final 
TSDataType dataType) {
+    switch (dataType) {
+      case BOOLEAN:
+        return 1;
+      case INT32:
+      case DATE:
+      case FLOAT:
+        return Integer.BYTES;
+      case INT64:
+      case TIMESTAMP:
+      case DOUBLE:
+        return Long.BYTES;
+      case TEXT:
+      case BLOB:
+      case STRING:
+        return RamUsageEstimator.NUM_BYTES_OBJECT_REF;
+      default:
+        return 0;
+    }
+  }
+
   
/////////////////////////////////////////////////////////////////////////////////////////////////
   // util methods
   
/////////////////////////////////////////////////////////////////////////////////////////////////
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
index b18bf6255ab..d843ab38ab4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
@@ -340,7 +340,7 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
 
         data.next();
         while (!data.hasCurrent() && chunkReader.hasNextSatisfiedPage()) {
-          data = chunkReader.nextPageData();
+          data = nextPageData();
         }
 
         if (tablet != null && tablet.getRowSize() == tablet.getMaxRowNumber()) 
{
@@ -376,16 +376,18 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
       }
 
       do {
-        resizePageDataMemoryForCurrentPageIfNeeded();
-        data = chunkReader.nextPageData();
-        long size = PipeMemoryWeightUtil.calculateBatchDataRamBytesUsed(data);
-        if (allocatedMemoryBlockForBatchData.getMemoryUsageInBytes() < size) {
-          
PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForBatchData,
 size);
-        }
+        data = nextPageData();
       } while (!data.hasCurrent() && chunkReader.hasNextSatisfiedPage());
     } while (!data.hasCurrent());
   }
 
+  private BatchData nextPageData() throws IOException {
+    resizePageDataMemoryForCurrentPageIfNeeded();
+    final BatchData nextData = chunkReader.nextPageData();
+    
resizePageDataMemoryIfNeeded(PipeMemoryWeightUtil.calculateBatchDataRamBytesUsed(nextData));
+    return nextData;
+  }
+
   private void resizePageDataMemoryForCurrentPageIfNeeded() {
     if (!(chunkReader instanceof EstimatedMemoryChunkReader)) {
       return;
@@ -587,10 +589,14 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
             Chunk chunk =
                 new Chunk(
                     chunkHeader, tsFileSequenceReader.readChunk(-1, 
chunkHeader.getDataSize()));
+            final List<Long> pageEstimatedMemoryUsageInBytesList =
+                SinglePageWholeChunkReader
+                    
.calculatePageEstimatedMemoryUsageInBytesWithBatchDataList(chunk);
 
             chunkReader =
                 currentIsMultiPage
-                    ? new ChunkReader(chunk, filter)
+                    ? new MemoryControlledChunkReader(
+                        new ChunkReader(chunk, filter), 
pageEstimatedMemoryUsageInBytesList)
                     : new SinglePageWholeChunkReader(chunk);
             currentIsAligned = false;
             final String measurementID =
@@ -636,8 +642,7 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
               chunk =
                   new Chunk(
                       chunkHeader, tsFileSequenceReader.readChunk(-1, 
chunkHeader.getDataSize()));
-              currentValueChunkPageMemorySize =
-                  calculatePageMemorySizeIfSinglePageValueChunk(chunk);
+              currentValueChunkPageMemorySize = 
calculateMaxPageMemorySize(chunk);
               boolean needReturn = false;
               final long timeChunkSize =
                   lastIndex >= 0
@@ -674,8 +679,7 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
               chunk = firstChunk4NextSequentialValueChunks;
               chunkHeader = chunk.getHeader();
               firstChunk4NextSequentialValueChunks = null;
-              currentValueChunkPageMemorySize =
-                  calculatePageMemorySizeIfSinglePageValueChunk(chunk);
+              currentValueChunkPageMemorySize = 
calculateMaxPageMemorySize(chunk);
               
resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit(valueChunkList, 
chunkHeader);
               resizePageDataMemoryBlockIfFirstValueChunkExceedsLimit(
                   valueChunkList, currentValueChunkPageMemorySize);
@@ -759,9 +763,7 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
         final boolean isMultiPage = marker == MetaMarker.TIME_CHUNK_HEADER;
         isMultiPageList.add(isMultiPage);
         timeChunkPageMemorySizeList.add(
-            isMultiPage
-                ? 0
-                : 
SinglePageWholeChunkReader.calculatePageEstimatedMemoryUsageInBytes(timeChunk));
+            
SinglePageWholeChunkReader.calculateMaxPageEstimatedMemoryUsageInBytes(timeChunk));
         return true;
       }
     }
@@ -825,9 +827,22 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
             
AlignedSinglePageWholeChunkReader.calculatePageEstimatedMemoryUsageInBytes(
                 timeChunk, valueChunkList));
       }
+      final List<Long> pageEstimatedMemoryUsageInBytesList =
+          currentIsMultiPage
+              ? AlignedSinglePageWholeChunkReader
+                  .calculatePageEstimatedMemoryUsageInBytesWithBatchDataList(
+                      timeChunk, valueChunkList)
+              : Collections.emptyList();
+      final long maxPageEstimatedMemoryUsageInBytes =
+          pageEstimatedMemoryUsageInBytesList.isEmpty()
+              ? 0
+              : pageEstimatedMemoryUsageInBytesList.get(0);
+      resizePageDataMemoryIfNeeded(maxPageEstimatedMemoryUsageInBytes);
       chunkReader =
           currentIsMultiPage
-              ? new AlignedChunkReader(timeChunk, valueChunkList, filter)
+              ? new MemoryControlledChunkReader(
+                  new AlignedChunkReader(timeChunk, valueChunkList, filter),
+                  pageEstimatedMemoryUsageInBytesList)
               : new AlignedSinglePageWholeChunkReader(timeChunk, 
valueChunkList, null);
       currentIsAligned = true;
       lastMarker = marker;
@@ -868,10 +883,8 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
     }
   }
 
-  private long calculatePageMemorySizeIfSinglePageValueChunk(final Chunk 
chunk) throws IOException {
-    return isSinglePageValueChunk(chunk.getHeader())
-        ? 
SinglePageWholeChunkReader.calculatePageEstimatedMemoryUsageInBytes(chunk)
-        : 0;
+  private long calculateMaxPageMemorySize(final Chunk chunk) throws 
IOException {
+    return 
SinglePageWholeChunkReader.calculateMaxPageEstimatedMemoryUsageInBytes(chunk);
   }
 
   private boolean isSinglePageValueChunk(final ChunkHeader chunkHeader) {
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
index 825a5bf6e66..84569bf586c 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
@@ -31,6 +31,7 @@ import 
org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.parser.TsFileInsertionEventParser;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.parser.query.TsFileInsertionEventQueryParser;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.parser.scan.AlignedSinglePageWholeChunkReader;
+import 
org.apache.iotdb.db.pipe.event.common.tsfile.parser.scan.SinglePageWholeChunkReader;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.parser.scan.TsFileInsertionEventScanParser;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.parser.table.TsFileInsertionEventTableParser;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
@@ -148,6 +149,84 @@ public class TsFileInsertionEventParserTest {
     System.out.println(System.currentTimeMillis() - startTime);
   }
 
+  @Test
+  public void 
testScanParserSplitNonAlignedSinglePageChunkByEstimatedPageMemory() throws 
Exception {
+    final long originalPipeMaxReaderChunkSize =
+        CommonDescriptor.getInstance().getConfig().getPipeMaxReaderChunkSize();
+    final int originalPageSizeInByte =
+        TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+    final int originalMaxNumberOfPointsInPage =
+        
TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
+
+    try {
+      TSFileDescriptor.getInstance().getConfig().setPageSizeInByte(64 * 1024);
+      
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(10000);
+
+      final int measurementCount = 16;
+      final int rowCount = 64;
+      final List<IMeasurementSchema> schemaList = new ArrayList<>();
+      for (int i = 0; i < measurementCount; ++i) {
+        schemaList.add(
+            new MeasurementSchema(
+                "s" + i, TSDataType.STRING, TSEncoding.PLAIN, 
CompressionType.LZ4));
+      }
+
+      nonalignedTsFile = new 
File("nonaligned-single-page-high-compression.tsfile");
+      final Tablet tablet = new Tablet("root.sg.d", schemaList, rowCount);
+      final Binary value =
+          new Binary(new String(new char[512]).replace('\0', 'a'), 
TSFileConfig.STRING_CHARSET);
+      for (int row = 0; row < rowCount; ++row) {
+        tablet.addTimestamp(row, row);
+        for (int measurementIndex = 0; measurementIndex < measurementCount; 
++measurementIndex) {
+          tablet.addValue("s" + measurementIndex, row, value);
+        }
+      }
+
+      try (final TsFileWriter writer = new TsFileWriter(nonalignedTsFile)) {
+        writer.registerTimeseries(new PartialPath("root.sg.d"), schemaList);
+        writer.writeTree(tablet);
+      }
+
+      CommonDescriptor.getInstance()
+          .getConfig()
+          .setPipeMaxReaderChunkSize(
+              
calculatePipeMaxReaderChunkSizeForSinglePageNonAlignedChunk(nonalignedTsFile));
+
+      int tabletCount = 0;
+      int maxMeasurementCount = 0;
+      int pointCount = 0;
+      try (final TsFileInsertionEventScanParser parser =
+          new TsFileInsertionEventScanParser(
+              nonalignedTsFile,
+              new PrefixTreePattern("root"),
+              Long.MIN_VALUE,
+              Long.MAX_VALUE,
+              null,
+              null,
+              false)) {
+        for (final Pair<Tablet, Boolean> tabletWithIsAligned : 
parser.toTabletWithIsAligneds()) {
+          Assert.assertFalse(tabletWithIsAligned.getRight());
+          final Tablet parsedTablet = tabletWithIsAligned.getLeft();
+          tabletCount++;
+          maxMeasurementCount = Math.max(maxMeasurementCount, 
parsedTablet.getSchemas().size());
+          pointCount += getNonNullSize(parsedTablet);
+        }
+      }
+
+      Assert.assertTrue(tabletCount > 1);
+      Assert.assertTrue(maxMeasurementCount < measurementCount);
+      Assert.assertEquals(measurementCount * rowCount, pointCount);
+    } finally {
+      CommonDescriptor.getInstance()
+          .getConfig()
+          .setPipeMaxReaderChunkSize(originalPipeMaxReaderChunkSize);
+      
TSFileDescriptor.getInstance().getConfig().setPageSizeInByte(originalPageSizeInByte);
+      TSFileDescriptor.getInstance()
+          .getConfig()
+          .setMaxNumberOfPointsInPage(originalMaxNumberOfPointsInPage);
+    }
+  }
+
   @Test
   public void testScanParserSplitAlignedSinglePageChunkByEstimatedPageMemory() 
throws Exception {
     final long originalPipeMaxReaderChunkSize =
@@ -226,6 +305,84 @@ public class TsFileInsertionEventParserTest {
     }
   }
 
+  @Test
+  public void testScanParserSplitAlignedMultiPageChunkByEstimatedPageMemory() 
throws Exception {
+    final long originalPipeMaxReaderChunkSize =
+        CommonDescriptor.getInstance().getConfig().getPipeMaxReaderChunkSize();
+    final int originalPageSizeInByte =
+        TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+    final int originalMaxNumberOfPointsInPage =
+        
TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
+
+    try {
+      TSFileDescriptor.getInstance().getConfig().setPageSizeInByte(64 * 1024);
+      
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(32);
+
+      final int measurementCount = 16;
+      final int rowCount = 64;
+      final List<IMeasurementSchema> schemaList = new ArrayList<>();
+      for (int i = 0; i < measurementCount; ++i) {
+        schemaList.add(
+            new MeasurementSchema(
+                "s" + i, TSDataType.STRING, TSEncoding.PLAIN, 
CompressionType.LZ4));
+      }
+
+      alignedTsFile = new File("aligned-multi-page-high-compression.tsfile");
+      final Tablet tablet = new Tablet("root.sg.d", schemaList, rowCount);
+      final Binary value =
+          new Binary(new String(new char[512]).replace('\0', 'a'), 
TSFileConfig.STRING_CHARSET);
+      for (int row = 0; row < rowCount; ++row) {
+        tablet.addTimestamp(row, row);
+        for (int measurementIndex = 0; measurementIndex < measurementCount; 
++measurementIndex) {
+          tablet.addValue("s" + measurementIndex, row, value);
+        }
+      }
+
+      try (final TsFileWriter writer = new TsFileWriter(alignedTsFile)) {
+        writer.registerAlignedTimeseries(new PartialPath("root.sg.d"), 
schemaList);
+        writer.writeAligned(tablet);
+      }
+
+      CommonDescriptor.getInstance()
+          .getConfig()
+          .setPipeMaxReaderChunkSize(
+              
calculatePipeMaxReaderChunkSizeForMultiPageAlignedChunk(alignedTsFile));
+
+      int tabletCount = 0;
+      int maxMeasurementCount = 0;
+      int pointCount = 0;
+      try (final TsFileInsertionEventScanParser parser =
+          new TsFileInsertionEventScanParser(
+              alignedTsFile,
+              new PrefixTreePattern("root"),
+              Long.MIN_VALUE,
+              Long.MAX_VALUE,
+              null,
+              null,
+              false)) {
+        for (final Pair<Tablet, Boolean> tabletWithIsAligned : 
parser.toTabletWithIsAligneds()) {
+          Assert.assertTrue(tabletWithIsAligned.getRight());
+          final Tablet parsedTablet = tabletWithIsAligned.getLeft();
+          tabletCount++;
+          maxMeasurementCount = Math.max(maxMeasurementCount, 
parsedTablet.getSchemas().size());
+          pointCount += getNonNullSize(parsedTablet);
+        }
+      }
+
+      Assert.assertTrue(tabletCount > 1);
+      Assert.assertTrue(maxMeasurementCount < measurementCount);
+      Assert.assertEquals(measurementCount * rowCount, pointCount);
+    } finally {
+      CommonDescriptor.getInstance()
+          .getConfig()
+          .setPipeMaxReaderChunkSize(originalPipeMaxReaderChunkSize);
+      
TSFileDescriptor.getInstance().getConfig().setPageSizeInByte(originalPageSizeInByte);
+      TSFileDescriptor.getInstance()
+          .getConfig()
+          .setMaxNumberOfPointsInPage(originalMaxNumberOfPointsInPage);
+    }
+  }
+
   @Test
   public void testScanParserResizesChunkMemoryForFirstAlignedValueChunk() 
throws Exception {
     final long originalPipeMaxReaderChunkSize =
@@ -1584,6 +1741,64 @@ public class TsFileInsertionEventParserTest {
     }
   }
 
+  private long 
calculatePipeMaxReaderChunkSizeForSinglePageNonAlignedChunk(final File tsFile)
+      throws Exception {
+    try (final TsFileSequenceReader reader = new 
TsFileSequenceReader(tsFile.getAbsolutePath())) {
+      final IDeviceID deviceID = 
reader.getDeviceMeasurementsMap().keySet().iterator().next();
+      final List<String> measurements = 
reader.getDeviceMeasurementsMap().get(deviceID);
+      Assert.assertFalse(measurements.isEmpty());
+
+      long chunkSizeLimit = 0;
+      long estimatedPageMemorySize = 0;
+      for (final String measurement : measurements) {
+        final List<ChunkMetadata> chunkMetadataList =
+            reader.getChunkMetadataList(new Path(deviceID, measurement, 
false));
+        Assert.assertEquals(1, chunkMetadataList.size());
+
+        final Chunk chunk = reader.readMemChunk(chunkMetadataList.get(0));
+        Assert.assertEquals(
+            MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER, 
chunk.getHeader().getChunkType() & 0x3F);
+        chunkSizeLimit += chunk.getHeader().getDataSize();
+        estimatedPageMemorySize +=
+            
SinglePageWholeChunkReader.calculateMaxPageEstimatedMemoryUsageInBytesWithBatchData(
+                chunk);
+      }
+
+      Assert.assertTrue(estimatedPageMemorySize > chunkSizeLimit);
+      return chunkSizeLimit;
+    }
+  }
+
+  private long calculatePipeMaxReaderChunkSizeForMultiPageAlignedChunk(final 
File tsFile)
+      throws Exception {
+    try (final TsFileSequenceReader reader = new 
TsFileSequenceReader(tsFile.getAbsolutePath())) {
+      final IDeviceID deviceID = 
reader.getDeviceMeasurementsMap().keySet().iterator().next();
+      final List<AbstractAlignedChunkMetadata> alignedChunkMetadataList =
+          reader.getAlignedChunkMetadata(deviceID, true);
+      Assert.assertEquals(1, alignedChunkMetadataList.size());
+
+      final AbstractAlignedChunkMetadata alignedChunkMetadata = 
alignedChunkMetadataList.get(0);
+      final Chunk timeChunk =
+          reader.readMemChunk((ChunkMetadata) 
alignedChunkMetadata.getTimeChunkMetadata());
+      Assert.assertEquals(MetaMarker.CHUNK_HEADER, 
timeChunk.getHeader().getChunkType() & 0x3F);
+
+      long chunkSizeLimit = 
PipeMemoryWeightUtil.calculateChunkRamBytesUsed(timeChunk);
+      long estimatedMaxPageMemorySize =
+          
SinglePageWholeChunkReader.calculateMaxPageEstimatedMemoryUsageInBytes(timeChunk);
+      for (final IChunkMetadata valueChunkMetadata :
+          alignedChunkMetadata.getValueChunkMetadataList()) {
+        final Chunk valueChunk = reader.readMemChunk((ChunkMetadata) 
valueChunkMetadata);
+        Assert.assertEquals(MetaMarker.CHUNK_HEADER, 
valueChunk.getHeader().getChunkType() & 0x3F);
+        chunkSizeLimit += valueChunk.getHeader().getDataSize();
+        estimatedMaxPageMemorySize +=
+            
SinglePageWholeChunkReader.calculateMaxPageEstimatedMemoryUsageInBytes(valueChunk);
+      }
+
+      Assert.assertTrue(estimatedMaxPageMemorySize > chunkSizeLimit);
+      return chunkSizeLimit;
+    }
+  }
+
   private static class ParserPerformanceStats {
 
     private long pointCount;


Reply via email to