This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new e2ea8719da6 [To dev/1.3] Pipe: Fixed the OOM bug of parser for large
aligned pages (#17639) (#17646)
e2ea8719da6 is described below
commit e2ea8719da6a8a8ae1b259cf574b9afc3bebdec7
Author: Caideyipi <[email protected]>
AuthorDate: Wed May 13 15:29:13 2026 +0800
[To dev/1.3] Pipe: Fixed the OOM bug of parser for large aligned pages
(#17639) (#17646)
* Pipe: Fixed the OOM bug of parser for large aligned pages (#17639)
* chunk
* shop
* Update TsFileInsertionEventScanParser.java
* Fix
* cp-ger
* Update TsFileInsertionDataContainerTest.java
* Update TsFileInsertionDataContainerTest.java
---
.../scan/AlignedSinglePageWholeChunkReader.java | 53 +++++++-
.../container/scan/EstimatedMemoryChunkReader.java | 25 ++++
.../container/scan/SinglePageWholeChunkReader.java | 27 +++-
.../scan/TsFileInsertionScanDataContainer.java | 124 +++++++++++++++---
.../event/TsFileInsertionDataContainerTest.java | 144 +++++++++++++++++++++
5 files changed, 348 insertions(+), 25 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/AlignedSinglePageWholeChunkReader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/AlignedSinglePageWholeChunkReader.java
index 740a1523d27..a9de04cbe64 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/AlignedSinglePageWholeChunkReader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/AlignedSinglePageWholeChunkReader.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.pipe.event.common.tsfile.container.scan;
+import org.apache.tsfile.compress.IUnCompressor;
import org.apache.tsfile.encoding.decoder.Decoder;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.header.ChunkHeader;
@@ -29,6 +30,7 @@ import org.apache.tsfile.read.common.TimeRange;
import org.apache.tsfile.read.reader.chunk.AbstractChunkReader;
import org.apache.tsfile.read.reader.chunk.ChunkReader;
import org.apache.tsfile.read.reader.page.AlignedPageReader;
+import org.apache.tsfile.read.reader.page.LazyLoadPageData;
import java.io.IOException;
import java.io.Serializable;
@@ -40,7 +42,8 @@ import java.util.List;
* The {@link AlignedSinglePageWholeChunkReader} is used to read a whole
single page aligned chunk
* with need to pass in the statistics.
*/
-public class AlignedSinglePageWholeChunkReader extends AbstractChunkReader {
+public class AlignedSinglePageWholeChunkReader extends AbstractChunkReader
+ implements EstimatedMemoryChunkReader {
// chunk header of the time column
private final ChunkHeader timeChunkHeader;
@@ -53,12 +56,15 @@ public class AlignedSinglePageWholeChunkReader extends
AbstractChunkReader {
private final List<ByteBuffer> valueChunkDataBufferList = new ArrayList<>();
// deleted intervals of all the sub sensors
private final List<List<TimeRange>> valueDeleteIntervalsList = new
ArrayList<>();
+ private final long pageEstimatedMemoryUsageInBytes;
public AlignedSinglePageWholeChunkReader(Chunk timeChunk, List<Chunk>
valueChunkList)
throws IOException {
super(Long.MIN_VALUE, null);
this.timeChunkHeader = timeChunk.getHeader();
this.timeChunkDataBuffer = timeChunk.getData();
+ this.pageEstimatedMemoryUsageInBytes =
+ calculatePageEstimatedMemoryUsageInBytes(timeChunk, valueChunkList);
valueChunkList.forEach(
chunk -> {
@@ -124,7 +130,7 @@ public class AlignedSinglePageWholeChunkReader extends
AbstractChunkReader {
ChunkReader.deserializePageData(timePageHeader, timeChunkDataBuffer,
timeChunkHeader);
List<PageHeader> valuePageHeaderList = new ArrayList<>();
- List<ByteBuffer> valuePageDataList = new ArrayList<>();
+ LazyLoadPageData[] valuePageDataArray = new
LazyLoadPageData[rawValuePageHeaderList.size()];
List<TSDataType> valueDataTypeList = new ArrayList<>();
List<Decoder> valueDecoderList = new ArrayList<>();
@@ -135,15 +141,21 @@ public class AlignedSinglePageWholeChunkReader extends
AbstractChunkReader {
if (valuePageHeader == null || valuePageHeader.getUncompressedSize() ==
0) {
// Empty Page
valuePageHeaderList.add(null);
- valuePageDataList.add(null);
+ valuePageDataArray[i] = null;
valueDataTypeList.add(null);
valueDecoderList.add(null);
} else {
ChunkHeader valueChunkHeader = valueChunkHeaderList.get(i);
+ int currentPagePosition = valueChunkDataBufferList.get(i).position();
+ valueChunkDataBufferList
+ .get(i)
+ .position(currentPagePosition +
valuePageHeader.getCompressedSize());
valuePageHeaderList.add(valuePageHeader);
- valuePageDataList.add(
- ChunkReader.deserializePageData(
- valuePageHeader, valueChunkDataBufferList.get(i),
valueChunkHeader));
+ valuePageDataArray[i] =
+ new LazyLoadPageData(
+ valueChunkDataBufferList.get(i).array(),
+ currentPagePosition,
+
IUnCompressor.getUnCompressor(valueChunkHeader.getCompressionType()));
valueDataTypeList.add(valueChunkHeader.getDataType());
valueDecoderList.add(
Decoder.getDecoderByType(
@@ -160,11 +172,38 @@ public class AlignedSinglePageWholeChunkReader extends
AbstractChunkReader {
timePageData,
defaultTimeDecoder,
valuePageHeaderList,
- valuePageDataList,
+ valuePageDataArray,
valueDataTypeList,
valueDecoderList,
queryFilter);
alignedPageReader.setDeleteIntervalList(valueDeleteIntervalsList);
return alignedPageReader;
}
+
+ @Override
+ public long getCurrentPageEstimatedMemoryUsageInBytes() {
+ return pageEstimatedMemoryUsageInBytes;
+ }
+
+ public static long calculatePageEstimatedMemoryUsageInBytes(
+ final Chunk timeChunk, final List<Chunk> valueChunkList) throws
IOException {
+ final ByteBuffer timeChunkDataBuffer = timeChunk.getData().duplicate();
+ long estimatedMemoryUsageInBytes =
+ PageHeader.deserializeFrom(timeChunkDataBuffer, (Statistics<? extends
Serializable>) null)
+ .getUncompressedSize();
+
+ for (final Chunk valueChunk : valueChunkList) {
+ if (valueChunk == null) {
+ continue;
+ }
+
+ final ByteBuffer valueChunkDataBuffer = valueChunk.getData().duplicate();
+ estimatedMemoryUsageInBytes +=
+ PageHeader.deserializeFrom(
+ valueChunkDataBuffer, (Statistics<? extends Serializable>)
null)
+ .getUncompressedSize();
+ }
+
+ return estimatedMemoryUsageInBytes;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/EstimatedMemoryChunkReader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/EstimatedMemoryChunkReader.java
new file mode 100644
index 00000000000..dc1f2501bdf
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/EstimatedMemoryChunkReader.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.event.common.tsfile.container.scan;
+
+interface EstimatedMemoryChunkReader {
+
+ long getCurrentPageEstimatedMemoryUsageInBytes();
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/SinglePageWholeChunkReader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/SinglePageWholeChunkReader.java
index 3f1aadbba0c..ade50012903 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/SinglePageWholeChunkReader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/SinglePageWholeChunkReader.java
@@ -26,22 +26,25 @@ import org.apache.tsfile.file.header.PageHeader;
import org.apache.tsfile.file.metadata.statistics.Statistics;
import org.apache.tsfile.read.common.Chunk;
import org.apache.tsfile.read.reader.chunk.AbstractChunkReader;
+import org.apache.tsfile.read.reader.page.LazyLoadPageData;
import org.apache.tsfile.read.reader.page.PageReader;
import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
-public class SinglePageWholeChunkReader extends AbstractChunkReader {
+public class SinglePageWholeChunkReader extends AbstractChunkReader
+ implements EstimatedMemoryChunkReader {
private final ChunkHeader chunkHeader;
private final ByteBuffer chunkDataBuffer;
+ private final long pageEstimatedMemoryUsageInBytes;
public SinglePageWholeChunkReader(Chunk chunk) throws IOException {
super(Long.MIN_VALUE, null);
this.chunkHeader = chunk.getHeader();
this.chunkDataBuffer = chunk.getData();
-
+ this.pageEstimatedMemoryUsageInBytes =
calculatePageEstimatedMemoryUsageInBytes(chunk);
initAllPageReaders();
}
@@ -56,15 +59,33 @@ public class SinglePageWholeChunkReader extends
AbstractChunkReader {
}
private PageReader constructPageReader(PageHeader pageHeader) throws
IOException {
+ final int currentPagePosition = chunkDataBuffer.position();
+ chunkDataBuffer.position(currentPagePosition +
pageHeader.getCompressedSize());
return new PageReader(
pageHeader,
- deserializePageData(pageHeader, chunkDataBuffer, chunkHeader),
+ new LazyLoadPageData(
+ chunkDataBuffer.array(),
+ currentPagePosition,
+ IUnCompressor.getUnCompressor(chunkHeader.getCompressionType())),
chunkHeader.getDataType(),
Decoder.getDecoderByType(chunkHeader.getEncodingType(),
chunkHeader.getDataType()),
defaultTimeDecoder,
null);
}
+ @Override
+ public long getCurrentPageEstimatedMemoryUsageInBytes() {
+ return pageEstimatedMemoryUsageInBytes;
+ }
+
+ public static long calculatePageEstimatedMemoryUsageInBytes(final Chunk
chunk)
+ throws IOException {
+ final ByteBuffer chunkDataBuffer = chunk.getData().duplicate();
+ final PageHeader pageHeader =
+ PageHeader.deserializeFrom(chunkDataBuffer, (Statistics<? extends
Serializable>) null);
+ return pageHeader.getUncompressedSize();
+ }
+
/////////////////////////////////////////////////////////////////////////////////////////////////
// util methods
/////////////////////////////////////////////////////////////////////////////////////////////////
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
index 07b91cd68af..9366d0f62df 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
@@ -98,10 +98,11 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
// Cached time chunk
private final List<Chunk> timeChunkList = new ArrayList<>();
private final List<Boolean> isMultiPageList = new ArrayList<>();
+ private final List<Long> timeChunkPageMemorySizeList = new ArrayList<>();
private final Map<String, Integer> measurementIndexMap = new HashMap<>();
private int lastIndex = -1;
- private ChunkHeader firstChunkHeader4NextSequentialValueChunks;
+ private Chunk firstChunk4NextSequentialValueChunks;
private byte lastMarker = Byte.MIN_VALUE;
@@ -343,6 +344,7 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
}
do {
+ resizePageDataMemoryForCurrentPageIfNeeded();
data = chunkReader.nextPageData();
long size = PipeMemoryWeightUtil.calculateBatchDataRamBytesUsed(data);
if (allocatedMemoryBlockForBatchData.getMemoryUsageInBytes() < size) {
@@ -352,6 +354,23 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
} while (!data.hasCurrent());
}
+ private void resizePageDataMemoryForCurrentPageIfNeeded() {
+ if (!(chunkReader instanceof EstimatedMemoryChunkReader)) {
+ return;
+ }
+
+ final long estimatedMemoryUsageInBytes =
+ ((EstimatedMemoryChunkReader)
chunkReader).getCurrentPageEstimatedMemoryUsageInBytes();
+ resizePageDataMemoryIfNeeded(estimatedMemoryUsageInBytes);
+ }
+
+ private void resizePageDataMemoryIfNeeded(final long
estimatedMemoryUsageInBytes) {
+ if (allocatedMemoryBlockForBatchData.getMemoryUsageInBytes() <
estimatedMemoryUsageInBytes) {
+ PipeDataNodeResourceManager.memory()
+ .forceResize(allocatedMemoryBlockForBatchData,
estimatedMemoryUsageInBytes);
+ }
+ }
+
private boolean putValueToColumns(final BatchData data, final Tablet tablet,
final int rowIndex) {
final Object[] columns = tablet.values;
boolean isNeedFillTime = false;
@@ -439,6 +458,7 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
private void moveToNextChunkReader() throws IOException,
IllegalStateException {
ChunkHeader chunkHeader;
long valueChunkSize = 0;
+ long valueChunkPageMemorySize = 0;
final List<Chunk> valueChunkList = new ArrayList<>();
currentMeasurements.clear();
modsInfos.clear();
@@ -449,7 +469,12 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
}
byte marker;
- while ((marker = lastMarker != Byte.MIN_VALUE ? lastMarker :
tsFileSequenceReader.readMarker())
+ while ((marker =
+ lastMarker != Byte.MIN_VALUE
+ ? lastMarker
+ : Objects.nonNull(firstChunk4NextSequentialValueChunks)
+ ?
toValueChunkMarker(firstChunk4NextSequentialValueChunks.getHeader())
+ : tsFileSequenceReader.readMarker())
!= MetaMarker.SEPARATOR) {
lastMarker = Byte.MIN_VALUE;
switch (marker) {
@@ -473,10 +498,17 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
if ((chunkHeader.getChunkType() & TsFileConstant.TIME_COLUMN_MASK)
== TsFileConstant.TIME_COLUMN_MASK) {
- timeChunkList.add(
+ final Chunk timeChunk =
new Chunk(
- chunkHeader, tsFileSequenceReader.readChunk(-1,
chunkHeader.getDataSize())));
- isMultiPageList.add(marker == MetaMarker.TIME_CHUNK_HEADER);
+ chunkHeader, tsFileSequenceReader.readChunk(-1,
chunkHeader.getDataSize()));
+ final boolean isMultiPage = marker ==
MetaMarker.TIME_CHUNK_HEADER;
+ timeChunkList.add(timeChunk);
+ isMultiPageList.add(isMultiPage);
+ timeChunkPageMemorySizeList.add(
+ isMultiPage
+ ? 0
+ :
SinglePageWholeChunkReader.calculatePageEstimatedMemoryUsageInBytes(
+ timeChunk));
break;
}
@@ -540,8 +572,10 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
case MetaMarker.VALUE_CHUNK_HEADER:
case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER:
{
- if (Objects.isNull(firstChunkHeader4NextSequentialValueChunks)) {
- long currentChunkHeaderOffset = tsFileSequenceReader.position()
- 1;
+ Chunk chunk;
+ long currentValueChunkPageMemorySize = 0;
+ if (Objects.isNull(firstChunk4NextSequentialValueChunks)) {
+ final long currentChunkHeaderOffset =
tsFileSequenceReader.position() - 1;
chunkHeader = tsFileSequenceReader.readChunkHeader(marker);
final long nextMarkerOffset =
@@ -592,40 +626,56 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
if (chunkHeader.getDataSize() == 0) {
break;
}
+ chunk =
+ new Chunk(
+ chunkHeader, tsFileSequenceReader.readChunk(-1,
chunkHeader.getDataSize()));
+ currentValueChunkPageMemorySize =
+ calculatePageMemorySizeIfSinglePageValueChunk(chunk);
boolean needReturn = false;
final long timeChunkSize =
lastIndex >= 0
? PipeMemoryWeightUtil.calculateChunkRamBytesUsed(
timeChunkList.get(lastIndex))
: 0;
+ final long timeChunkPageMemorySize =
+ lastIndex >= 0 ? timeChunkPageMemorySizeList.get(lastIndex)
: 0;
if (lastIndex >= 0) {
if (valueIndex != lastIndex) {
needReturn = recordAlignedChunk(valueChunkList, marker);
} else {
final long chunkSize = timeChunkSize + valueChunkSize;
+ final long pageMemorySize = timeChunkPageMemorySize +
valueChunkPageMemorySize;
if (chunkSize + chunkHeader.getDataSize()
- > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
+ >
allocatedMemoryBlockForChunk.getMemoryUsageInBytes()
+ || timeChunkPageMemorySize > 0
+ && currentValueChunkPageMemorySize > 0
+ && pageMemorySize + currentValueChunkPageMemorySize
+ > getPageDataMemoryLimitInBytes()) {
needReturn = recordAlignedChunk(valueChunkList, marker);
}
}
}
lastIndex = valueIndex;
if (needReturn) {
- firstChunkHeader4NextSequentialValueChunks = chunkHeader;
+ firstChunk4NextSequentialValueChunks = chunk;
return;
}
resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit(valueChunkList,
chunkHeader);
+ resizePageDataMemoryBlockIfFirstValueChunkExceedsLimit(
+ valueChunkList, currentValueChunkPageMemorySize);
} else {
- chunkHeader = firstChunkHeader4NextSequentialValueChunks;
- firstChunkHeader4NextSequentialValueChunks = null;
+ chunk = firstChunk4NextSequentialValueChunks;
+ chunkHeader = chunk.getHeader();
+ firstChunk4NextSequentialValueChunks = null;
+ currentValueChunkPageMemorySize =
+ calculatePageMemorySizeIfSinglePageValueChunk(chunk);
resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit(valueChunkList,
chunkHeader);
+ resizePageDataMemoryBlockIfFirstValueChunkExceedsLimit(
+ valueChunkList, currentValueChunkPageMemorySize);
}
- Chunk chunk =
- new Chunk(
- chunkHeader, tsFileSequenceReader.readChunk(-1,
chunkHeader.getDataSize()));
-
valueChunkSize += chunkHeader.getDataSize();
+ valueChunkPageMemorySize += currentValueChunkPageMemorySize;
valueChunkList.add(chunk);
currentMeasurements.add(
new MeasurementSchema(chunkHeader.getMeasurementID(),
chunkHeader.getDataType()));
@@ -649,6 +699,7 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
lastIndex = -1;
timeChunkList.clear();
isMultiPageList.clear();
+ timeChunkPageMemorySizeList.clear();
measurementIndexMap.clear();
currentDevice = pattern.mayOverlapWithDevice(deviceID) ? deviceID
: null;
@@ -670,12 +721,21 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
}
}
+ private long getPageDataMemoryLimitInBytes() {
+ return PipeConfig.getInstance().getPipeMaxReaderChunkSize();
+ }
+
private boolean recordAlignedChunk(final List<Chunk> valueChunkList, final
byte marker)
throws IOException {
if (!valueChunkList.isEmpty()) {
final Chunk timeChunk = timeChunkList.get(lastIndex);
timeChunk.getData().rewind();
currentIsMultiPage = isMultiPageList.get(lastIndex);
+ if (!currentIsMultiPage) {
+ resizePageDataMemoryIfNeeded(
+
AlignedSinglePageWholeChunkReader.calculatePageEstimatedMemoryUsageInBytes(
+ timeChunk, valueChunkList));
+ }
chunkReader =
currentIsMultiPage
? new AlignedChunkReader(timeChunk, valueChunkList, filter)
@@ -701,6 +761,40 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
}
}
+ private void resizePageDataMemoryBlockIfFirstValueChunkExceedsLimit(
+ final List<Chunk> valueChunkList, final long valueChunkPageMemorySize) {
+ if (!valueChunkList.isEmpty() || lastIndex < 0 || valueChunkPageMemorySize
<= 0) {
+ return;
+ }
+
+ final long timeChunkPageMemorySize =
timeChunkPageMemorySizeList.get(lastIndex);
+ if (timeChunkPageMemorySize <= 0) {
+ return;
+ }
+
+ final long pageMemorySize = timeChunkPageMemorySize +
valueChunkPageMemorySize;
+ if (pageMemorySize > getPageDataMemoryLimitInBytes()) {
+ PipeDataNodeResourceManager.memory()
+ .forceResize(allocatedMemoryBlockForBatchData, pageMemorySize);
+ }
+ }
+
+ private long calculatePageMemorySizeIfSinglePageValueChunk(final Chunk
chunk) throws IOException {
+ return isSinglePageValueChunk(chunk.getHeader())
+ ?
SinglePageWholeChunkReader.calculatePageEstimatedMemoryUsageInBytes(chunk)
+ : 0;
+ }
+
+ private boolean isSinglePageValueChunk(final ChunkHeader chunkHeader) {
+ return (chunkHeader.getChunkType() & 0x3F) ==
MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER;
+ }
+
+ private byte toValueChunkMarker(final ChunkHeader chunkHeader) {
+ return isSinglePageValueChunk(chunkHeader)
+ ? MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER
+ : MetaMarker.VALUE_CHUNK_HEADER;
+ }
+
@Override
public void close() {
super.close();
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
index 51fc1f3a556..7fe514b277e 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
@@ -26,20 +26,29 @@ import
org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.PrefixPipePattern;
import
org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer;
import
org.apache.iotdb.db.pipe.event.common.tsfile.container.query.TsFileInsertionQueryDataContainer;
+import
org.apache.iotdb.db.pipe.event.common.tsfile.container.scan.AlignedSinglePageWholeChunkReader;
import
org.apache.iotdb.db.pipe.event.common.tsfile.container.scan.TsFileInsertionScanDataContainer;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionTestFileWriter;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
import org.apache.iotdb.pipe.api.access.Row;
import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.common.conf.TSFileDescriptor;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.file.MetaMarker;
+import org.apache.tsfile.file.metadata.AlignedChunkMetadata;
+import org.apache.tsfile.file.metadata.ChunkMetadata;
+import org.apache.tsfile.file.metadata.IChunkMetadata;
+import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.PlainDeviceID;
import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.read.TsFileSequenceReader;
+import org.apache.tsfile.read.common.Chunk;
import org.apache.tsfile.read.common.Path;
import org.apache.tsfile.read.common.TimeRange;
import org.apache.tsfile.utils.Binary;
@@ -121,11 +130,96 @@ public class TsFileInsertionDataContainerTest {
System.out.println(System.currentTimeMillis() - startTime);
}
+ @Test
+ public void testScanParserSplitAlignedSinglePageChunkByEstimatedPageMemory()
throws Exception {
+ final long originalPipeMaxReaderChunkSize =
+ CommonDescriptor.getInstance().getConfig().getPipeMaxReaderChunkSize();
+ final int originalPageSizeInByte =
+ TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+ final int originalMaxNumberOfPointsInPage =
+
TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
+
+ try {
+ TSFileDescriptor.getInstance().getConfig().setPageSizeInByte(64 * 1024);
+
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(10000);
+
+ final int measurementCount = 16;
+ final int rowCount = 64;
+ final List<MeasurementSchema> schemaList = new ArrayList<>();
+ for (int i = 0; i < measurementCount; ++i) {
+ schemaList.add(
+ new MeasurementSchema(
+ "s" + i, TSDataType.STRING, TSEncoding.PLAIN,
CompressionType.LZ4));
+ }
+
+ alignedTsFile = new File("aligned-single-page-high-compression.tsfile");
+ final Tablet tablet = new Tablet("root.sg.d", schemaList, rowCount);
+ final Binary value =
+ new Binary(new String(new char[512]).replace('\0', 'a'),
TSFileConfig.STRING_CHARSET);
+ for (int row = 0; row < rowCount; ++row) {
+ tablet.addTimestamp(row, row);
+ for (int measurementIndex = 0; measurementIndex < measurementCount;
++measurementIndex) {
+ tablet.addValue("s" + measurementIndex, row, value);
+ }
+ }
+ tablet.rowSize = rowCount;
+
+ try (final TsFileWriter writer = new TsFileWriter(alignedTsFile)) {
+ writer.registerAlignedTimeseries(new Path("root.sg.d"), schemaList);
+ writer.writeAligned(tablet);
+ }
+
+ CommonDescriptor.getInstance()
+ .getConfig()
+ .setPipeMaxReaderChunkSize(
+
calculatePipeMaxReaderChunkSizeForSinglePageAlignedChunk(alignedTsFile));
+
+ int tabletCount = 0;
+ int maxMeasurementCount = 0;
+ int pointCount = 0;
+ try (final TsFileInsertionScanDataContainer parser =
+ new TsFileInsertionScanDataContainer(
+ alignedTsFile,
+ new PrefixPipePattern("root"),
+ Long.MIN_VALUE,
+ Long.MAX_VALUE,
+ null,
+ null,
+ false)) {
+ for (final Pair<Tablet, Boolean> tabletWithIsAligned :
parser.toTabletWithIsAligneds()) {
+ Assert.assertTrue(tabletWithIsAligned.getRight());
+ final Tablet parsedTablet = tabletWithIsAligned.getLeft();
+ tabletCount++;
+ maxMeasurementCount = Math.max(maxMeasurementCount,
parsedTablet.getSchemas().size());
+ pointCount += getNonNullSize(parsedTablet);
+ }
+ }
+
+ Assert.assertTrue(tabletCount > 1);
+ Assert.assertTrue(maxMeasurementCount < measurementCount);
+ Assert.assertEquals(measurementCount * rowCount, pointCount);
+ } finally {
+ CommonDescriptor.getInstance()
+ .getConfig()
+ .setPipeMaxReaderChunkSize(originalPipeMaxReaderChunkSize);
+
TSFileDescriptor.getInstance().getConfig().setPageSizeInByte(originalPageSizeInByte);
+ TSFileDescriptor.getInstance()
+ .getConfig()
+ .setMaxNumberOfPointsInPage(originalMaxNumberOfPointsInPage);
+ }
+ }
+
@Test
public void testScanParserResizesChunkMemoryForFirstAlignedValueChunk()
throws Exception {
final long originalPipeMaxReaderChunkSize =
PipeConfig.getInstance().getPipeMaxReaderChunkSize();
+ final int originalPipeDataStructureTabletSizeInBytes =
+ PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes();
+ final int configuredBatchMemorySize = 1024 * 1024;
CommonDescriptor.getInstance().getConfig().setPipeMaxReaderChunkSize(0);
+ CommonDescriptor.getInstance()
+ .getConfig()
+ .setPipeDataStructureTabletSizeInBytes(configuredBatchMemorySize);
alignedTsFile = new File("single-aligned-value-chunk.tsfile");
final List<MeasurementSchema> schemaList = new ArrayList<>();
@@ -154,11 +248,18 @@ public class TsFileInsertionDataContainerTest {
null,
false)) {
Assert.assertTrue(getAllocatedChunkMemory(parser).getMemoryUsageInBytes() > 0);
+
Assert.assertTrue(getAllocatedBatchDataMemory(parser).getMemoryUsageInBytes() >
0);
+ Assert.assertTrue(
+ getAllocatedBatchDataMemory(parser).getMemoryUsageInBytes()
+ < configuredBatchMemorySize);
}
} finally {
CommonDescriptor.getInstance()
.getConfig()
.setPipeMaxReaderChunkSize(originalPipeMaxReaderChunkSize);
+ CommonDescriptor.getInstance()
+ .getConfig()
+
.setPipeDataStructureTabletSizeInBytes(originalPipeDataStructureTabletSizeInBytes);
}
}
@@ -696,4 +797,47 @@ public class TsFileInsertionDataContainerTest {
field.setAccessible(true);
return (PipeMemoryBlock) field.get(parser);
}
+
+ private PipeMemoryBlock getAllocatedBatchDataMemory(final
TsFileInsertionScanDataContainer parser)
+ throws NoSuchFieldException, IllegalAccessException {
+ final Field field =
+
TsFileInsertionScanDataContainer.class.getDeclaredField("allocatedMemoryBlockForBatchData");
+ field.setAccessible(true);
+ return (PipeMemoryBlock) field.get(parser);
+ }
+
+ private long calculatePipeMaxReaderChunkSizeForSinglePageAlignedChunk(final
File tsFile)
+ throws Exception {
+ try (final TsFileSequenceReader reader = new
TsFileSequenceReader(tsFile.getAbsolutePath())) {
+ final List<IDeviceID> deviceIDList = reader.getAllDevices();
+ Assert.assertEquals(1, deviceIDList.size());
+ final IDeviceID deviceID = deviceIDList.get(0);
+ final List<AlignedChunkMetadata> alignedChunkMetadataList =
+ reader.getAlignedChunkMetadata(deviceID);
+ Assert.assertEquals(1, alignedChunkMetadataList.size());
+
+ final AlignedChunkMetadata alignedChunkMetadata =
alignedChunkMetadataList.get(0);
+ final Chunk timeChunk =
+ reader.readMemChunk((ChunkMetadata)
alignedChunkMetadata.getTimeChunkMetadata());
+ Assert.assertEquals(
+ MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER,
timeChunk.getHeader().getChunkType() & 0x3F);
+
+ final List<Chunk> valueChunkList = new ArrayList<>();
+ long chunkSizeLimit =
PipeMemoryWeightUtil.calculateChunkRamBytesUsed(timeChunk);
+ for (final IChunkMetadata valueChunkMetadata :
+ alignedChunkMetadata.getValueChunkMetadataList()) {
+ final Chunk valueChunk = reader.readMemChunk((ChunkMetadata)
valueChunkMetadata);
+ Assert.assertEquals(
+ MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER,
valueChunk.getHeader().getChunkType() & 0x3F);
+ valueChunkList.add(valueChunk);
+ chunkSizeLimit += valueChunk.getHeader().getDataSize();
+ }
+
+ final long estimatedPageMemorySize =
+
AlignedSinglePageWholeChunkReader.calculatePageEstimatedMemoryUsageInBytes(
+ timeChunk, valueChunkList);
+ Assert.assertTrue(estimatedPageMemorySize > chunkSizeLimit);
+ return chunkSizeLimit;
+ }
+ }
}