This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch chunk-fix
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 5a3ea3171e5c76a225613223dfd9798ca3fc47c4
Merge: 435ea419e58 9348cb8e2fb
Author: Caideyipi <[email protected]>
AuthorDate: Tue May 12 11:10:47 2026 +0800

    Merge remote-tracking branch 'origin/master' into chunk-fix
    
    # Conflicts:
    #       
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
    #       
iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java

 .idea/icon.png                                     | Bin 6736 -> 0 bytes
 .../org/apache/iotdb/it/env/cluster/EnvUtils.java  |  17 +-
 .../iotdb/it/env/cluster/config/MppBaseConfig.java |   2 +
 .../it/env/cluster/config/MppCommonConfig.java     |   6 +
 .../env/cluster/config/MppSharedCommonConfig.java  |   7 +
 .../it/env/remote/config/RemoteCommonConfig.java   |   5 +
 .../org/apache/iotdb/itbase/env/CommonConfig.java  |   2 +
 .../iotdb/ainode/it/AINodeModelManageIT.java       |  10 -
 .../apache/iotdb/db/it/IoTDBLoadLastCacheIT.java   |   1 -
 .../iotdb/db/it/auth/IoTDBTemplateAuthIT.java      |   2 +-
 .../manual/enhanced/IoTDBPipeClusterIT.java        |  33 ++
 .../it/db/it/IoTDBAuthenticationTableIT.java       |   4 +-
 .../relational/it/db/it/IoTDBLoadTsFileIT.java     |  73 ++++
 .../scalar/IoTDBDiffFunctionTableIT.java           |  80 ++++
 ...ableAggregationQueryWithNetworkPartitionIT.java |   3 +-
 iotdb-client/client-cpp/src/main/Session.cpp       |  16 +-
 iotdb-client/client-cpp/src/main/Session.h         |  10 +-
 .../apache/iotdb/rpc/TElasticFramedTransport.java  |  23 +-
 iotdb-core/ainode/build_binary.py                  | 237 +-----------
 .../ainode/iotdb/ainode/core/model/model_info.py   |   8 -
 .../iotdb/ainode/core/model/model_storage.py       | 210 +---------
 .../async/AsyncAINodeHeartbeatClientPool.java      |   4 +-
 .../async/AsyncConfigNodeHeartbeatClientPool.java  |   4 +-
 .../async/AsyncDataNodeHeartbeatClientPool.java    |   4 +-
 .../CnToCnInternalServiceAsyncRequestManager.java  |   8 +-
 .../client/async/CnToDnAsyncRequestType.java       |   1 +
 .../CnToDnInternalServiceAsyncRequestManager.java  |  11 +
 .../rpc/DataNodeAsyncRequestRPCHandler.java        |   1 +
 .../iotdb/confignode/conf/ConfigNodeConfig.java    |  61 +++
 .../confignode/conf/ConfigNodeDescriptor.java      |  49 +++
 .../statemachine/ConfigRegionStateMachine.java     |   5 +
 .../iotdb/confignode/manager/ConfigManager.java    |  14 +
 .../iotdb/confignode/manager/load/LoadManager.java |   8 +-
 .../confignode/manager/load/cache/LoadCache.java   |   2 +-
 .../manager/load/service/HeartbeatService.java     |   8 -
 .../manager/load/service/TopologyService.java      | 250 ++++++++----
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  32 +-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  36 +-
 .../scan/TsFileInsertionEventScanParser.java       |  36 +-
 ...ileInsertionEventTableParserTabletIterator.java | 143 ++++---
 .../iotdb/db/protocol/client/ConfigNodeClient.java |   8 +-
 .../protocol/client/DataNodeClientPoolFactory.java |  28 --
 .../DnToCnInternalServiceAsyncRequestManager.java  |   5 +
 ...DataNodeExternalServiceAsyncRequestManager.java |  10 +-
 .../client/dn/DataNodeIntraHeartbeatManager.java   |   5 +
 .../dn/DataNodeMPPServiceAsyncRequestManager.java  |  10 +-
 .../DnToDnInternalServiceAsyncRequestManager.java  |   5 +
 .../impl/DataNodeInternalRPCServiceImpl.java       |  61 ++-
 .../schema/source/DevicePredicateHandler.java      |   6 +-
 .../schema/source/TableDeviceFetchSource.java      |   8 +-
 .../execution/schedule/DriverScheduler.java        |   1 +
 .../iotdb/db/queryengine/plan/ClusterTopology.java | 109 +++---
 .../iotdb/db/queryengine/plan/Coordinator.java     |  12 +-
 .../db/queryengine/plan/analyze/AnalyzeUtils.java  |  32 +-
 .../plan/analyze/load/LoadTsFileAnalyzer.java      |   1 +
 .../analyze/load/LoadTsFileTableSchemaCache.java   |  24 +-
 .../config/executor/ClusterConfigTaskExecutor.java |  46 +--
 .../queryengine/plan/planner/TreeModelPlanner.java |   3 -
 .../plan/planner/plan/node/PlanVisitor.java        |   5 -
 .../relational/analyzer/StatementAnalyzer.java     |  52 ++-
 .../schema/CheckSchemaPredicateVisitor.java        |   6 +-
 .../ConvertSchemaPredicateToFilterVisitor.java     |  30 +-
 .../schema/ExtractPredicateColumnNameVisitor.java  |   2 +-
 .../metadata/fetcher/SchemaPredicateUtil.java      |  16 +-
 .../metadata/fetcher/TableDeviceSchemaFetcher.java |  10 +-
 .../plan/relational/planner/RelationPlanner.java   |   4 +-
 .../distribute/TableDistributedPlanGenerator.java  |  34 +-
 .../planner/node/DeviceTableScanNode.java          |   2 +-
 .../PushAggregationIntoTableScan.java              |  18 +-
 .../optimizations/PushPredicateIntoTableScan.java  |   2 +-
 .../relational/security/AccessControlImpl.java     |  13 +
 .../security/TreeAccessCheckVisitor.java           |  13 +
 .../plan/statement/crud/LoadTsFileStatement.java   |   1 +
 .../dataregion/modification/ModificationFile.java  |  22 +-
 .../dataregion/wal/buffer/IWALBuffer.java          |   6 +-
 .../dataregion/wal/buffer/WALBuffer.java           |  18 +-
 .../storageengine/dataregion/wal/node/WALNode.java |  10 +-
 .../db/storageengine/load/LoadTsFileManager.java   |  54 ++-
 .../load/memory/LoadTsFileMemoryManager.java       |  13 +-
 .../load/splitter/TsFileSplitter.java              |   9 +-
 .../pipe/event/TsFileInsertionEventParserTest.java |  50 +++
 .../plan/analyze/load/LoadTsFileAnalyzerTest.java  | 197 ++++++++++
 .../plan/relational/analyzer/AnalyzerTest.java     |  50 +++
 .../statement/crud/LoadTsFileStatementTest.java    |  79 ++++
 .../table/TsTableRenameColumnSchemaTest.java       |  60 +++
 .../compaction/utils/CompactionUtilsTest.java      |  36 ++
 .../modification/ModificationFileTest.java         |  62 +++
 .../wal/node/WALNodeWaitForRollFileTest.java       | 425 +++++++++++++++++++++
 .../db/storageengine/load/TsFileSplitterTest.java  | 157 ++++++++
 .../load/memory/LoadTsFileMemoryManagerTest.java   | 106 +++++
 .../conf/iotdb-system.properties.template          |  42 +-
 .../iotdb/commons/client/ClientPoolFactory.java    |  58 ++-
 .../client/property/ClientPoolProperty.java        |  16 +-
 .../client/property/ThriftClientProperty.java      |   5 +-
 .../client/request/AsyncRequestManager.java        |   6 +-
 ...nfigNodeInternalServiceAsyncRequestManager.java |  10 +-
 .../DataNodeInternalServiceRequestManager.java     |  10 +-
 .../DataNodeIntraHeartbeatRequestManager.java      |   9 +-
 .../iotdb/commons/concurrent/ThreadName.java       |   3 +-
 .../apache/iotdb/commons/conf/CommonConfig.java    |  15 +-
 .../iotdb/commons/conf/CommonDescriptor.java       |  36 +-
 .../planner/plan/node/ICoreQueryPlanVisitor.java   |   5 +
 .../apache/iotdb/commons/schema/table/Audit.java   |  13 +
 .../apache/iotdb/commons/schema/table/TsTable.java |  35 +-
 .../plan/relational/metadata/MetadataUtil.java     |  52 ---
 .../relational/metadata/QualifiedObjectName.java   |   4 -
 .../relational/planner/node/TableScanNode.java     |  18 +-
 .../iotdb/commons/client/ClientManagerTest.java    |  19 +-
 .../src/main/thrift/datanode.thrift                |  11 +
 109 files changed, 2642 insertions(+), 1087 deletions(-)

diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
index 7ed3bf5af33,32823459fcf..282e9bff91c
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
@@@ -596,47 -561,29 +596,31 @@@ public class TsFileInsertionEventScanPa
                    needReturn = recordAlignedChunk(valueChunkList, marker);
                  } else {
                    final long chunkSize = timeChunkSize + valueChunkSize;
 +                  final long pageMemorySize = timeChunkPageMemorySize + 
valueChunkPageMemorySize;
                    if (chunkSize + chunkHeader.getDataSize()
-                           > 
allocatedMemoryBlockForChunk.getMemoryUsageInBytes()
-                       || timeChunkPageMemorySize > 0
-                           && chunkPageMemorySize > 0
-                           && pageMemorySize + chunkPageMemorySize
-                               > getPageDataMemoryLimitInBytes()) {
-                     if (valueChunkList.size() == 1) {
-                       if (chunkSize > 
allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
-                         PipeDataNodeResourceManager.memory()
-                             .forceResize(allocatedMemoryBlockForChunk, 
chunkSize);
-                       }
-                       final long currentPageMemorySize =
-                           timeChunkPageMemorySize > 0 && 
valueChunkPageMemorySize > 0
-                               ? pageMemorySize
-                               : 0;
-                       if (currentPageMemorySize > 
getPageDataMemoryLimitInBytes()) {
-                         PipeDataNodeResourceManager.memory()
-                             .forceResize(allocatedMemoryBlockForBatchData, 
currentPageMemorySize);
-                       }
-                     }
+                       > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) 
{
                      needReturn = recordAlignedChunk(valueChunkList, marker);
                    }
                  }
                }
                lastIndex = valueIndex;
                if (needReturn) {
 -                firstChunkHeader4NextSequentialValueChunks = chunkHeader;
 +                firstChunk4NextSequentialValueChunks = chunk;
                  return;
                }
+               
resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit(valueChunkList, 
chunkHeader);
              } else {
 -              chunkHeader = firstChunkHeader4NextSequentialValueChunks;
 -              firstChunkHeader4NextSequentialValueChunks = null;
 +              chunk = firstChunk4NextSequentialValueChunks;
 +              chunkHeader = chunk.getHeader();
 +              firstChunk4NextSequentialValueChunks = null;
+               
resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit(valueChunkList, 
chunkHeader);
              }
  
 -            Chunk chunk =
 -                new Chunk(
 -                    chunkHeader, tsFileSequenceReader.readChunk(-1, 
chunkHeader.getDataSize()));
 -
              valueChunkSize += chunkHeader.getDataSize();
 +            if (isSinglePageValueChunk(chunkHeader)) {
 +              valueChunkPageMemorySize +=
 +                  
SinglePageWholeChunkReader.calculatePageEstimatedMemoryUsageInBytes(chunk);
 +            }
              valueChunkList.add(chunk);
              currentMeasurements.add(
                  new MeasurementSchema(
@@@ -786,16 -717,20 +770,30 @@@
      return false;
    }
  
+   private void resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit(
+       final List<Chunk> valueChunkList, final ChunkHeader valueChunkHeader) {
+     if (!valueChunkList.isEmpty() || lastIndex < 0) {
+       return;
+     }
+ 
+     final long chunkSize =
+         
PipeMemoryWeightUtil.calculateChunkRamBytesUsed(timeChunkList.get(lastIndex))
+             + valueChunkHeader.getDataSize();
+     if (chunkSize > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
+       
PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForChunk, 
chunkSize);
+     }
+   }
+ 
 +  private boolean isSinglePageValueChunk(final ChunkHeader chunkHeader) {
 +    return (chunkHeader.getChunkType() & 0x3F) == 
MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER;
 +  }
 +
 +  private byte toValueChunkMarker(final ChunkHeader chunkHeader) {
 +    return isSinglePageValueChunk(chunkHeader)
 +        ? MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER
 +        : MetaMarker.VALUE_CHUNK_HEADER;
 +  }
 +
    @Override
    public void close() {
      super.close();
diff --cc 
iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
index 91bfa9c5d3b,8d02cc8a998..006473ebf77
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
@@@ -28,9 -28,8 +28,10 @@@ import org.apache.iotdb.commons.pipe.da
  import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
  import 
org.apache.iotdb.db.pipe.event.common.tsfile.parser.TsFileInsertionEventParser;
  import 
org.apache.iotdb.db.pipe.event.common.tsfile.parser.query.TsFileInsertionEventQueryParser;
 +import 
org.apache.iotdb.db.pipe.event.common.tsfile.parser.scan.AlignedSinglePageWholeChunkReader;
  import 
org.apache.iotdb.db.pipe.event.common.tsfile.parser.scan.TsFileInsertionEventScanParser;
+ import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
 +import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
  import 
org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionTestFileWriter;
  import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
  import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
@@@ -129,84 -122,46 +131,124 @@@ public class TsFileInsertionEventParser
      System.out.println(System.currentTimeMillis() - startTime);
    }
  
 +  @Test
 +  public void 
testScanParserSplitAlignedSinglePageChunkByEstimatedPageMemory() throws 
Exception {
 +    final long originalPipeMaxReaderChunkSize =
 +        
CommonDescriptor.getInstance().getConfig().getPipeMaxReaderChunkSize();
 +    final int originalPageSizeInByte =
 +        TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
 +    final int originalMaxNumberOfPointsInPage =
 +        
TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
 +
 +    try {
 +      TSFileDescriptor.getInstance().getConfig().setPageSizeInByte(64 * 1024);
 +      
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(10000);
 +
 +      final int measurementCount = 16;
 +      final int rowCount = 64;
 +      final List<IMeasurementSchema> schemaList = new ArrayList<>();
 +      for (int i = 0; i < measurementCount; ++i) {
 +        schemaList.add(
 +            new MeasurementSchema(
 +                "s" + i, TSDataType.STRING, TSEncoding.PLAIN, 
CompressionType.LZ4));
 +      }
 +
 +      alignedTsFile = new File("aligned-single-page-high-compression.tsfile");
 +      final Tablet tablet = new Tablet("root.sg.d", schemaList, rowCount);
 +      final Binary value =
 +          new Binary(new String(new char[512]).replace('\0', 'a'), 
TSFileConfig.STRING_CHARSET);
 +      for (int row = 0; row < rowCount; ++row) {
 +        tablet.addTimestamp(row, row);
 +        for (int measurementIndex = 0; measurementIndex < measurementCount; 
++measurementIndex) {
 +          tablet.addValue("s" + measurementIndex, row, value);
 +        }
 +      }
 +
 +      try (final TsFileWriter writer = new TsFileWriter(alignedTsFile)) {
 +        writer.registerAlignedTimeseries(new PartialPath("root.sg.d"), 
schemaList);
 +        writer.writeAligned(tablet);
 +      }
 +
 +      CommonDescriptor.getInstance()
 +          .getConfig()
 +          .setPipeMaxReaderChunkSize(
 +              
calculatePipeMaxReaderChunkSizeForSinglePageAlignedChunk(alignedTsFile));
 +
 +      int tabletCount = 0;
 +      int maxMeasurementCount = 0;
 +      int pointCount = 0;
 +      try (final TsFileInsertionEventScanParser parser =
 +          new TsFileInsertionEventScanParser(
 +              alignedTsFile,
 +              new PrefixTreePattern("root"),
 +              Long.MIN_VALUE,
 +              Long.MAX_VALUE,
 +              null,
 +              null,
 +              false)) {
 +        for (final Pair<Tablet, Boolean> tabletWithIsAligned : 
parser.toTabletWithIsAligneds()) {
 +          Assert.assertTrue(tabletWithIsAligned.getRight());
 +          final Tablet parsedTablet = tabletWithIsAligned.getLeft();
 +          tabletCount++;
 +          maxMeasurementCount = Math.max(maxMeasurementCount, 
parsedTablet.getSchemas().size());
 +          pointCount += getNonNullSize(parsedTablet);
 +        }
 +      }
 +
 +      Assert.assertTrue(tabletCount > 1);
 +      Assert.assertTrue(maxMeasurementCount < measurementCount);
 +      Assert.assertEquals(measurementCount * rowCount, pointCount);
 +    } finally {
 +      CommonDescriptor.getInstance()
 +          .getConfig()
 +          .setPipeMaxReaderChunkSize(originalPipeMaxReaderChunkSize);
 +      
TSFileDescriptor.getInstance().getConfig().setPageSizeInByte(originalPageSizeInByte);
 +      TSFileDescriptor.getInstance()
 +          .getConfig()
 +          .setMaxNumberOfPointsInPage(originalMaxNumberOfPointsInPage);
 +    }
 +  }
 +
+   @Test
+   public void testScanParserResizesChunkMemoryForFirstAlignedValueChunk() 
throws Exception {
+     final long originalPipeMaxReaderChunkSize =
+         PipeConfig.getInstance().getPipeMaxReaderChunkSize();
+     CommonDescriptor.getInstance().getConfig().setPipeMaxReaderChunkSize(0);
+ 
+     alignedTsFile = new File("single-aligned-value-chunk.tsfile");
+     final List<IMeasurementSchema> schemaList = new ArrayList<>();
+     schemaList.add(new MeasurementSchema("s1", TSDataType.INT64));
+ 
+     final Tablet tablet = new Tablet("root.sg.d", schemaList, 2);
+     tablet.addTimestamp(0, 1);
+     tablet.addValue("s1", 0, 1L);
+     tablet.addTimestamp(1, 2);
+     tablet.addValue("s1", 1, 2L);
+ 
+     try {
+       try (final TsFileWriter writer = new TsFileWriter(alignedTsFile)) {
+         writer.registerAlignedTimeseries(new PartialPath("root.sg.d"), 
schemaList);
+         writer.writeAligned(tablet);
+       }
+ 
+       try (final TsFileInsertionEventScanParser parser =
+           new TsFileInsertionEventScanParser(
+               alignedTsFile,
+               new PrefixTreePattern("root"),
+               Long.MIN_VALUE,
+               Long.MAX_VALUE,
+               null,
+               null,
+               false)) {
+         
Assert.assertTrue(getAllocatedChunkMemory(parser).getMemoryUsageInBytes() > 0);
+       }
+     } finally {
+       CommonDescriptor.getInstance()
+           .getConfig()
+           .setPipeMaxReaderChunkSize(originalPipeMaxReaderChunkSize);
+     }
+   }
+ 
    public void testToTabletInsertionEvents(final boolean isQuery) throws 
Exception {
      // Test empty chunk
      testMixedTsFileWithEmptyChunk(isQuery);
@@@ -754,36 -709,11 +796,44 @@@
      return count;
    }
  
+   private PipeMemoryBlock getAllocatedChunkMemory(final 
TsFileInsertionEventScanParser parser)
+       throws NoSuchFieldException, IllegalAccessException {
+     final Field field =
+         
TsFileInsertionEventScanParser.class.getDeclaredField("allocatedMemoryBlockForChunk");
+     field.setAccessible(true);
+     return (PipeMemoryBlock) field.get(parser);
+   }
++
 +  private long calculatePipeMaxReaderChunkSizeForSinglePageAlignedChunk(final 
File tsFile)
 +      throws Exception {
 +    try (final TsFileSequenceReader reader = new 
TsFileSequenceReader(tsFile.getAbsolutePath())) {
 +      final IDeviceID deviceID = 
reader.getDeviceMeasurementsMap().keySet().iterator().next();
 +      final List<AbstractAlignedChunkMetadata> alignedChunkMetadataList =
 +          reader.getAlignedChunkMetadata(deviceID, true);
 +      Assert.assertEquals(1, alignedChunkMetadataList.size());
 +
 +      final AbstractAlignedChunkMetadata alignedChunkMetadata = 
alignedChunkMetadataList.get(0);
 +      final Chunk timeChunk =
 +          reader.readMemChunk((ChunkMetadata) 
alignedChunkMetadata.getTimeChunkMetadata());
 +      Assert.assertEquals(
 +          MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER, 
timeChunk.getHeader().getChunkType() & 0x3F);
 +
 +      final List<Chunk> valueChunkList = new ArrayList<>();
 +      long chunkSizeLimit = 
PipeMemoryWeightUtil.calculateChunkRamBytesUsed(timeChunk);
 +      for (final IChunkMetadata valueChunkMetadata :
 +          alignedChunkMetadata.getValueChunkMetadataList()) {
 +        final Chunk valueChunk = reader.readMemChunk((ChunkMetadata) 
valueChunkMetadata);
 +        Assert.assertEquals(
 +            MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER, 
valueChunk.getHeader().getChunkType() & 0x3F);
 +        valueChunkList.add(valueChunk);
 +        chunkSizeLimit += valueChunk.getHeader().getDataSize();
 +      }
 +
 +      final long estimatedPageMemorySize =
 +          
AlignedSinglePageWholeChunkReader.calculatePageEstimatedMemoryUsageInBytes(
 +              timeChunk, valueChunkList);
 +      Assert.assertTrue(estimatedPageMemorySize > chunkSizeLimit);
 +      return chunkSizeLimit;
 +    }
 +  }
  }

Reply via email to