This is an automated email from the ASF dual-hosted git repository. Caideyipi pushed a commit to branch chunk-fix in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 5a3ea3171e5c76a225613223dfd9798ca3fc47c4 Merge: 435ea419e58 9348cb8e2fb Author: Caideyipi <[email protected]> AuthorDate: Tue May 12 11:10:47 2026 +0800 Merge remote-tracking branch 'origin/master' into chunk-fix # Conflicts: # iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java # iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java .idea/icon.png | Bin 6736 -> 0 bytes .../org/apache/iotdb/it/env/cluster/EnvUtils.java | 17 +- .../iotdb/it/env/cluster/config/MppBaseConfig.java | 2 + .../it/env/cluster/config/MppCommonConfig.java | 6 + .../env/cluster/config/MppSharedCommonConfig.java | 7 + .../it/env/remote/config/RemoteCommonConfig.java | 5 + .../org/apache/iotdb/itbase/env/CommonConfig.java | 2 + .../iotdb/ainode/it/AINodeModelManageIT.java | 10 - .../apache/iotdb/db/it/IoTDBLoadLastCacheIT.java | 1 - .../iotdb/db/it/auth/IoTDBTemplateAuthIT.java | 2 +- .../manual/enhanced/IoTDBPipeClusterIT.java | 33 ++ .../it/db/it/IoTDBAuthenticationTableIT.java | 4 +- .../relational/it/db/it/IoTDBLoadTsFileIT.java | 73 ++++ .../scalar/IoTDBDiffFunctionTableIT.java | 80 ++++ ...ableAggregationQueryWithNetworkPartitionIT.java | 3 +- iotdb-client/client-cpp/src/main/Session.cpp | 16 +- iotdb-client/client-cpp/src/main/Session.h | 10 +- .../apache/iotdb/rpc/TElasticFramedTransport.java | 23 +- iotdb-core/ainode/build_binary.py | 237 +----------- .../ainode/iotdb/ainode/core/model/model_info.py | 8 - .../iotdb/ainode/core/model/model_storage.py | 210 +--------- .../async/AsyncAINodeHeartbeatClientPool.java | 4 +- .../async/AsyncConfigNodeHeartbeatClientPool.java | 4 +- .../async/AsyncDataNodeHeartbeatClientPool.java | 4 +- .../CnToCnInternalServiceAsyncRequestManager.java | 8 +- .../client/async/CnToDnAsyncRequestType.java | 1 + .../CnToDnInternalServiceAsyncRequestManager.java | 11 + .../rpc/DataNodeAsyncRequestRPCHandler.java | 1 + .../iotdb/confignode/conf/ConfigNodeConfig.java | 61 +++ .../confignode/conf/ConfigNodeDescriptor.java | 49 +++ .../statemachine/ConfigRegionStateMachine.java | 5 + .../iotdb/confignode/manager/ConfigManager.java | 14 + .../iotdb/confignode/manager/load/LoadManager.java | 8 +- .../confignode/manager/load/cache/LoadCache.java | 2 +- .../manager/load/service/HeartbeatService.java | 8 - .../manager/load/service/TopologyService.java | 250 ++++++++---- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 32 +- .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 36 +- .../scan/TsFileInsertionEventScanParser.java | 36 +- ...ileInsertionEventTableParserTabletIterator.java | 143 ++++--- .../iotdb/db/protocol/client/ConfigNodeClient.java | 8 +- .../protocol/client/DataNodeClientPoolFactory.java | 28 -- .../DnToCnInternalServiceAsyncRequestManager.java | 5 + ...DataNodeExternalServiceAsyncRequestManager.java | 10 +- .../client/dn/DataNodeIntraHeartbeatManager.java | 5 + .../dn/DataNodeMPPServiceAsyncRequestManager.java | 10 +- .../DnToDnInternalServiceAsyncRequestManager.java | 5 + .../impl/DataNodeInternalRPCServiceImpl.java | 61 ++- .../schema/source/DevicePredicateHandler.java | 6 +- .../schema/source/TableDeviceFetchSource.java | 8 +- .../execution/schedule/DriverScheduler.java | 1 + .../iotdb/db/queryengine/plan/ClusterTopology.java | 109 +++--- .../iotdb/db/queryengine/plan/Coordinator.java | 12 +- .../db/queryengine/plan/analyze/AnalyzeUtils.java | 32 +- .../plan/analyze/load/LoadTsFileAnalyzer.java | 1 + .../analyze/load/LoadTsFileTableSchemaCache.java | 24 +- .../config/executor/ClusterConfigTaskExecutor.java | 46 +-- .../queryengine/plan/planner/TreeModelPlanner.java | 3 - .../plan/planner/plan/node/PlanVisitor.java | 5 - .../relational/analyzer/StatementAnalyzer.java | 52 ++- .../schema/CheckSchemaPredicateVisitor.java | 6 +- .../ConvertSchemaPredicateToFilterVisitor.java | 30 +- .../schema/ExtractPredicateColumnNameVisitor.java | 2 +- .../metadata/fetcher/SchemaPredicateUtil.java | 16 +- .../metadata/fetcher/TableDeviceSchemaFetcher.java | 10 +- .../plan/relational/planner/RelationPlanner.java | 4 +- .../distribute/TableDistributedPlanGenerator.java | 34 +- .../planner/node/DeviceTableScanNode.java | 2 +- .../PushAggregationIntoTableScan.java | 18 +- .../optimizations/PushPredicateIntoTableScan.java | 2 +- .../relational/security/AccessControlImpl.java | 13 + .../security/TreeAccessCheckVisitor.java | 13 + .../plan/statement/crud/LoadTsFileStatement.java | 1 + .../dataregion/modification/ModificationFile.java | 22 +- .../dataregion/wal/buffer/IWALBuffer.java | 6 +- .../dataregion/wal/buffer/WALBuffer.java | 18 +- .../storageengine/dataregion/wal/node/WALNode.java | 10 +- .../db/storageengine/load/LoadTsFileManager.java | 54 ++- .../load/memory/LoadTsFileMemoryManager.java | 13 +- .../load/splitter/TsFileSplitter.java | 9 +- .../pipe/event/TsFileInsertionEventParserTest.java | 50 +++ .../plan/analyze/load/LoadTsFileAnalyzerTest.java | 197 ++++++++++ .../plan/relational/analyzer/AnalyzerTest.java | 50 +++ .../statement/crud/LoadTsFileStatementTest.java | 79 ++++ .../table/TsTableRenameColumnSchemaTest.java | 60 +++ .../compaction/utils/CompactionUtilsTest.java | 36 ++ .../modification/ModificationFileTest.java | 62 +++ .../wal/node/WALNodeWaitForRollFileTest.java | 425 +++++++++++++++++++++ .../db/storageengine/load/TsFileSplitterTest.java | 157 ++++++++ .../load/memory/LoadTsFileMemoryManagerTest.java | 106 +++++ .../conf/iotdb-system.properties.template | 42 +- .../iotdb/commons/client/ClientPoolFactory.java | 58 ++- .../client/property/ClientPoolProperty.java | 16 +- .../client/property/ThriftClientProperty.java | 5 +- .../client/request/AsyncRequestManager.java | 6 +- ...nfigNodeInternalServiceAsyncRequestManager.java | 10 +- .../DataNodeInternalServiceRequestManager.java | 10 +- .../DataNodeIntraHeartbeatRequestManager.java | 9 +- .../iotdb/commons/concurrent/ThreadName.java | 3 +- .../apache/iotdb/commons/conf/CommonConfig.java | 15 +- .../iotdb/commons/conf/CommonDescriptor.java | 36 +- .../planner/plan/node/ICoreQueryPlanVisitor.java | 5 + .../apache/iotdb/commons/schema/table/Audit.java | 13 + .../apache/iotdb/commons/schema/table/TsTable.java | 35 +- .../plan/relational/metadata/MetadataUtil.java | 52 --- .../relational/metadata/QualifiedObjectName.java | 4 - .../relational/planner/node/TableScanNode.java | 18 +- .../iotdb/commons/client/ClientManagerTest.java | 19 +- .../src/main/thrift/datanode.thrift | 11 + 109 files changed, 2642 insertions(+), 1087 deletions(-) diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java index 7ed3bf5af33,32823459fcf..282e9bff91c --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java @@@ -596,47 -561,29 +596,31 @@@ public class TsFileInsertionEventScanPa needReturn = recordAlignedChunk(valueChunkList, marker); } else { final long chunkSize = timeChunkSize + valueChunkSize; + final long pageMemorySize = timeChunkPageMemorySize + valueChunkPageMemorySize; if (chunkSize + chunkHeader.getDataSize() - > allocatedMemoryBlockForChunk.getMemoryUsageInBytes() - || timeChunkPageMemorySize > 0 - && chunkPageMemorySize > 0 - && pageMemorySize + chunkPageMemorySize - > getPageDataMemoryLimitInBytes()) { - if (valueChunkList.size() == 1) { - if (chunkSize > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) { - PipeDataNodeResourceManager.memory() - .forceResize(allocatedMemoryBlockForChunk, chunkSize); - } - final long currentPageMemorySize = - timeChunkPageMemorySize > 0 && valueChunkPageMemorySize > 0 - ? pageMemorySize - : 0; - if (currentPageMemorySize > getPageDataMemoryLimitInBytes()) { - PipeDataNodeResourceManager.memory() - .forceResize(allocatedMemoryBlockForBatchData, currentPageMemorySize); - } - } + > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) { needReturn = recordAlignedChunk(valueChunkList, marker); } } } lastIndex = valueIndex; if (needReturn) { - firstChunkHeader4NextSequentialValueChunks = chunkHeader; + firstChunk4NextSequentialValueChunks = chunk; return; } + resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit(valueChunkList, chunkHeader); } else { - chunkHeader = firstChunkHeader4NextSequentialValueChunks; - firstChunkHeader4NextSequentialValueChunks = null; + chunk = firstChunk4NextSequentialValueChunks; + chunkHeader = chunk.getHeader(); + firstChunk4NextSequentialValueChunks = null; + resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit(valueChunkList, chunkHeader); } - Chunk chunk = - new Chunk( - chunkHeader, tsFileSequenceReader.readChunk(-1, chunkHeader.getDataSize())); - valueChunkSize += chunkHeader.getDataSize(); + if (isSinglePageValueChunk(chunkHeader)) { + valueChunkPageMemorySize += + SinglePageWholeChunkReader.calculatePageEstimatedMemoryUsageInBytes(chunk); + } valueChunkList.add(chunk); currentMeasurements.add( new MeasurementSchema( @@@ -786,16 -717,20 +770,30 @@@ return false; } + private void resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit( + final List<Chunk> valueChunkList, final ChunkHeader valueChunkHeader) { + if (!valueChunkList.isEmpty() || lastIndex < 0) { + return; + } + + final long chunkSize = + PipeMemoryWeightUtil.calculateChunkRamBytesUsed(timeChunkList.get(lastIndex)) + + valueChunkHeader.getDataSize(); + if (chunkSize > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) { + PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForChunk, chunkSize); + } + } + + private boolean isSinglePageValueChunk(final ChunkHeader chunkHeader) { + return (chunkHeader.getChunkType() & 0x3F) == MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER; + } + + private byte toValueChunkMarker(final ChunkHeader chunkHeader) { + return isSinglePageValueChunk(chunkHeader) + ? MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER + : MetaMarker.VALUE_CHUNK_HEADER; + } + @Override public void close() { super.close(); diff --cc iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java index 91bfa9c5d3b,8d02cc8a998..006473ebf77 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java @@@ -28,9 -28,8 +28,10 @@@ import org.apache.iotdb.commons.pipe.da import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.parser.TsFileInsertionEventParser; import org.apache.iotdb.db.pipe.event.common.tsfile.parser.query.TsFileInsertionEventQueryParser; +import org.apache.iotdb.db.pipe.event.common.tsfile.parser.scan.AlignedSinglePageWholeChunkReader; import org.apache.iotdb.db.pipe.event.common.tsfile.parser.scan.TsFileInsertionEventScanParser; + import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock; +import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil; import org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionTestFileWriter; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; @@@ -129,84 -122,46 +131,124 @@@ public class TsFileInsertionEventParser System.out.println(System.currentTimeMillis() - startTime); } + @Test + public void testScanParserSplitAlignedSinglePageChunkByEstimatedPageMemory() throws Exception { + final long originalPipeMaxReaderChunkSize = + CommonDescriptor.getInstance().getConfig().getPipeMaxReaderChunkSize(); + final int originalPageSizeInByte = + TSFileDescriptor.getInstance().getConfig().getPageSizeInByte(); + final int originalMaxNumberOfPointsInPage = + TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage(); + + try { + TSFileDescriptor.getInstance().getConfig().setPageSizeInByte(64 * 1024); + TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(10000); + + final int measurementCount = 16; + final int rowCount = 64; + final List<IMeasurementSchema> schemaList = new ArrayList<>(); + for (int i = 0; i < measurementCount; ++i) { + schemaList.add( + new MeasurementSchema( + "s" + i, TSDataType.STRING, TSEncoding.PLAIN, CompressionType.LZ4)); + } + + alignedTsFile = new File("aligned-single-page-high-compression.tsfile"); + final Tablet tablet = new Tablet("root.sg.d", schemaList, rowCount); + final Binary value = + new Binary(new String(new char[512]).replace('\0', 'a'), TSFileConfig.STRING_CHARSET); + for (int row = 0; row < rowCount; ++row) { + tablet.addTimestamp(row, row); + for (int measurementIndex = 0; measurementIndex < measurementCount; ++measurementIndex) { + tablet.addValue("s" + measurementIndex, row, value); + } + } + + try (final TsFileWriter writer = new TsFileWriter(alignedTsFile)) { + writer.registerAlignedTimeseries(new PartialPath("root.sg.d"), schemaList); + writer.writeAligned(tablet); + } + + CommonDescriptor.getInstance() + .getConfig() + .setPipeMaxReaderChunkSize( + calculatePipeMaxReaderChunkSizeForSinglePageAlignedChunk(alignedTsFile)); + + int tabletCount = 0; + int maxMeasurementCount = 0; + int pointCount = 0; + try (final TsFileInsertionEventScanParser parser = + new TsFileInsertionEventScanParser( + alignedTsFile, + new PrefixTreePattern("root"), + Long.MIN_VALUE, + Long.MAX_VALUE, + null, + null, + false)) { + for (final Pair<Tablet, Boolean> tabletWithIsAligned : parser.toTabletWithIsAligneds()) { + Assert.assertTrue(tabletWithIsAligned.getRight()); + final Tablet parsedTablet = tabletWithIsAligned.getLeft(); + tabletCount++; + maxMeasurementCount = Math.max(maxMeasurementCount, parsedTablet.getSchemas().size()); + pointCount += getNonNullSize(parsedTablet); + } + } + + Assert.assertTrue(tabletCount > 1); + Assert.assertTrue(maxMeasurementCount < measurementCount); + Assert.assertEquals(measurementCount * rowCount, pointCount); + } finally { + CommonDescriptor.getInstance() + .getConfig() + .setPipeMaxReaderChunkSize(originalPipeMaxReaderChunkSize); + TSFileDescriptor.getInstance().getConfig().setPageSizeInByte(originalPageSizeInByte); + TSFileDescriptor.getInstance() + .getConfig() + .setMaxNumberOfPointsInPage(originalMaxNumberOfPointsInPage); + } + } + + @Test + public void testScanParserResizesChunkMemoryForFirstAlignedValueChunk() throws Exception { + final long originalPipeMaxReaderChunkSize = + PipeConfig.getInstance().getPipeMaxReaderChunkSize(); + CommonDescriptor.getInstance().getConfig().setPipeMaxReaderChunkSize(0); + + alignedTsFile = new File("single-aligned-value-chunk.tsfile"); + final List<IMeasurementSchema> schemaList = new ArrayList<>(); + schemaList.add(new MeasurementSchema("s1", TSDataType.INT64)); + + final Tablet tablet = new Tablet("root.sg.d", schemaList, 2); + tablet.addTimestamp(0, 1); + tablet.addValue("s1", 0, 1L); + tablet.addTimestamp(1, 2); + tablet.addValue("s1", 1, 2L); + + try { + try (final TsFileWriter writer = new TsFileWriter(alignedTsFile)) { + writer.registerAlignedTimeseries(new PartialPath("root.sg.d"), schemaList); + writer.writeAligned(tablet); + } + + try (final TsFileInsertionEventScanParser parser = + new TsFileInsertionEventScanParser( + alignedTsFile, + new PrefixTreePattern("root"), + Long.MIN_VALUE, + Long.MAX_VALUE, + null, + null, + false)) { + Assert.assertTrue(getAllocatedChunkMemory(parser).getMemoryUsageInBytes() > 0); + } + } finally { + CommonDescriptor.getInstance() + .getConfig() + .setPipeMaxReaderChunkSize(originalPipeMaxReaderChunkSize); + } + } + public void testToTabletInsertionEvents(final boolean isQuery) throws Exception { // Test empty chunk testMixedTsFileWithEmptyChunk(isQuery); @@@ -754,36 -709,11 +796,44 @@@ return count; } + private PipeMemoryBlock getAllocatedChunkMemory(final TsFileInsertionEventScanParser parser) + throws NoSuchFieldException, IllegalAccessException { + final Field field = + TsFileInsertionEventScanParser.class.getDeclaredField("allocatedMemoryBlockForChunk"); + field.setAccessible(true); + return (PipeMemoryBlock) field.get(parser); + } ++ + private long calculatePipeMaxReaderChunkSizeForSinglePageAlignedChunk(final File tsFile) + throws Exception { + try (final TsFileSequenceReader reader = new TsFileSequenceReader(tsFile.getAbsolutePath())) { + final IDeviceID deviceID = reader.getDeviceMeasurementsMap().keySet().iterator().next(); + final List<AbstractAlignedChunkMetadata> alignedChunkMetadataList = + reader.getAlignedChunkMetadata(deviceID, true); + Assert.assertEquals(1, alignedChunkMetadataList.size()); + + final AbstractAlignedChunkMetadata alignedChunkMetadata = alignedChunkMetadataList.get(0); + final Chunk timeChunk = + reader.readMemChunk((ChunkMetadata) alignedChunkMetadata.getTimeChunkMetadata()); + Assert.assertEquals( + MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER, timeChunk.getHeader().getChunkType() & 0x3F); + + final List<Chunk> valueChunkList = new ArrayList<>(); + long chunkSizeLimit = PipeMemoryWeightUtil.calculateChunkRamBytesUsed(timeChunk); + for (final IChunkMetadata valueChunkMetadata : + alignedChunkMetadata.getValueChunkMetadataList()) { + final Chunk valueChunk = reader.readMemChunk((ChunkMetadata) valueChunkMetadata); + Assert.assertEquals( + MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER, valueChunk.getHeader().getChunkType() & 0x3F); + valueChunkList.add(valueChunk); + chunkSizeLimit += valueChunk.getHeader().getDataSize(); + } + + final long estimatedPageMemorySize = + AlignedSinglePageWholeChunkReader.calculatePageEstimatedMemoryUsageInBytes( + timeChunk, valueChunkList); + Assert.assertTrue(estimatedPageMemorySize > chunkSizeLimit); + return chunkSizeLimit; + } + } }
