This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a77b7ab162 Pipe: Optimize the TsFile memory control mechanism of the 
disassembly table model (#14890)
2a77b7ab162 is described below

commit 2a77b7ab16261cf56a799e0c18296e62b9e560a2
Author: Zhenyu Luo <[email protected]>
AuthorDate: Thu Feb 27 19:37:12 2025 +0800

    Pipe: Optimize the TsFile memory control mechanism of the disassembly table 
model (#14890)
---
 .../scan/TsFileInsertionEventScanParser.java       |   3 -
 .../table/TsFileInsertionEventTableParser.java     |  95 +++--
 ...ileInsertionEventTableParserTabletIterator.java | 421 +++++++++++++++++----
 .../resource/memory/InsertNodeMemoryEstimator.java |   2 +-
 .../pipe/resource/memory/PipeMemoryWeightUtil.java |  65 ++++
 5 files changed, 472 insertions(+), 114 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
index e4665c7a62f..54bdbc174a8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
@@ -53,7 +53,6 @@ import org.apache.tsfile.write.schema.MeasurementSchema;
 
 import java.io.File;
 import java.io.IOException;
-import java.time.LocalDate;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -64,8 +63,6 @@ import java.util.Objects;
 
 public class TsFileInsertionEventScanParser extends TsFileInsertionEventParser 
{
 
-  private static final LocalDate EMPTY_DATE = LocalDate.of(1000, 1, 1);
-
   private final long startTime;
   private final long endTime;
   private final Filter filter;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
index 00e3cef54cb..ce59156b508 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
@@ -24,28 +24,30 @@ import 
org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
 import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.parser.TsFileInsertionEventParser;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
-import org.apache.tsfile.file.metadata.TableSchema;
 import org.apache.tsfile.read.TsFileSequenceReader;
-import org.apache.tsfile.read.controller.CachedChunkLoaderImpl;
-import org.apache.tsfile.read.controller.MetadataQuerierByFileImpl;
-import org.apache.tsfile.read.query.executor.TableQueryExecutor;
 import org.apache.tsfile.write.record.Tablet;
 
 import java.io.File;
 import java.io.IOException;
 import java.util.Iterator;
-import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Objects;
 
 public class TsFileInsertionEventTableParser extends 
TsFileInsertionEventParser {
 
-  private final TableQueryExecutor tableQueryExecutor;
+  private final long startTime;
+  private final long endTime;
+  private final TablePattern tablePattern;
 
-  private final Iterator<Map.Entry<String, TableSchema>> 
filteredTableSchemaIterator;
+  private final PipeMemoryBlock allocatedMemoryBlockForBatchData;
+  private final PipeMemoryBlock allocatedMemoryBlockForChunk;
+  private final PipeMemoryBlock allocatedMemoryBlockForChunkMeta;
+  private final PipeMemoryBlock allocatedMemoryBlockForTableSchemas;
 
   public TsFileInsertionEventTableParser(
       final File tsFile,
@@ -58,16 +60,20 @@ public class TsFileInsertionEventTableParser extends 
TsFileInsertionEventParser
     super(null, pattern, startTime, endTime, pipeTaskMeta, sourceEvent);
 
     try {
+      this.allocatedMemoryBlockForChunk =
+          
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0);
+      this.allocatedMemoryBlockForBatchData =
+          
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0);
+      this.allocatedMemoryBlockForChunkMeta =
+          
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0);
+      this.allocatedMemoryBlockForTableSchemas =
+          
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0);
+
+      this.startTime = startTime;
+      this.endTime = endTime;
+      this.tablePattern = pattern;
+
       tsFileSequenceReader = new TsFileSequenceReader(tsFile.getPath(), true, 
true);
-      filteredTableSchemaIterator =
-          tsFileSequenceReader.getTableSchemaMap().entrySet().stream()
-              .filter(entry -> Objects.isNull(pattern) || 
pattern.matchesTable(entry.getKey()))
-              .iterator();
-      tableQueryExecutor =
-          new TableQueryExecutor(
-              new MetadataQuerierByFileImpl(tsFileSequenceReader),
-              new CachedChunkLoaderImpl(tsFileSequenceReader),
-              TableQueryExecutor.TableQueryOrdering.DEVICE);
     } catch (final Exception e) {
       close();
       throw e;
@@ -79,29 +85,35 @@ public class TsFileInsertionEventTableParser extends 
TsFileInsertionEventParser
     return () ->
         new Iterator<TabletInsertionEvent>() {
 
-          private TsFileInsertionEventTableParserTabletIterator tabletIterator 
= null;
+          private TsFileInsertionEventTableParserTabletIterator tabletIterator;
 
           @Override
           public boolean hasNext() {
-            while (tabletIterator == null || !tabletIterator.hasNext()) {
-              if (!filteredTableSchemaIterator.hasNext()) {
-                close();
-                return false;
-              }
-
-              final Map.Entry<String, TableSchema> entry = 
filteredTableSchemaIterator.next();
-
-              try {
+            try {
+              if (tabletIterator == null) {
                 tabletIterator =
                     new TsFileInsertionEventTableParserTabletIterator(
-                        tableQueryExecutor, entry.getKey(), entry.getValue(), 
startTime, endTime);
-              } catch (final Exception e) {
+                        tsFileSequenceReader,
+                        entry ->
+                            Objects.isNull(tablePattern)
+                                || tablePattern.matchesTable(entry.getKey()),
+                        allocatedMemoryBlockForTablet,
+                        allocatedMemoryBlockForBatchData,
+                        allocatedMemoryBlockForChunk,
+                        allocatedMemoryBlockForChunkMeta,
+                        allocatedMemoryBlockForTableSchemas,
+                        startTime,
+                        endTime);
+              }
+              if (!tabletIterator.hasNext()) {
                 close();
-                throw new PipeException("failed to create 
TsFileInsertionDataTabletIterator", e);
+                return false;
               }
+              return true;
+            } catch (Exception e) {
+              close();
+              throw new PipeException("Error while parsing tsfile insertion 
event", e);
             }
-
-            return true;
           }
 
           @Override
@@ -174,4 +186,25 @@ public class TsFileInsertionEventTableParser extends 
TsFileInsertionEventParser
           }
         };
   }
+
+  @Override
+  public void close() {
+    super.close();
+
+    if (allocatedMemoryBlockForBatchData != null) {
+      allocatedMemoryBlockForBatchData.close();
+    }
+
+    if (allocatedMemoryBlockForChunk != null) {
+      allocatedMemoryBlockForChunk.close();
+    }
+
+    if (allocatedMemoryBlockForChunkMeta != null) {
+      allocatedMemoryBlockForChunkMeta.close();
+    }
+
+    if (allocatedMemoryBlockForTableSchemas != null) {
+      allocatedMemoryBlockForTableSchemas.close();
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java
index 8aa47cfa595..e594b58e86b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java
@@ -19,130 +19,393 @@
 
 package org.apache.iotdb.db.pipe.event.common.tsfile.parser.table;
 
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
 import org.apache.tsfile.enums.TSDataType;
-import org.apache.tsfile.exception.read.ReadProcessException;
+import org.apache.tsfile.file.metadata.AbstractAlignedChunkMetadata;
+import org.apache.tsfile.file.metadata.ChunkMetadata;
+import org.apache.tsfile.file.metadata.IChunkMetadata;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.MetadataIndexNode;
 import org.apache.tsfile.file.metadata.TableSchema;
-import org.apache.tsfile.read.common.block.TsBlock;
-import org.apache.tsfile.read.query.executor.TableQueryExecutor;
-import org.apache.tsfile.read.reader.block.TsBlockReader;
+import org.apache.tsfile.file.metadata.TsFileMetadata;
+import org.apache.tsfile.read.TsFileSequenceReader;
+import org.apache.tsfile.read.common.BatchData;
+import org.apache.tsfile.read.common.Chunk;
+import org.apache.tsfile.read.controller.IMetadataQuerier;
+import org.apache.tsfile.read.controller.MetadataQuerierByFileImpl;
+import org.apache.tsfile.read.reader.IChunkReader;
+import org.apache.tsfile.read.reader.chunk.TableChunkReader;
 import org.apache.tsfile.utils.DateUtils;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.utils.TsPrimitiveType;
+import org.apache.tsfile.write.UnSupportedDataTypeException;
 import org.apache.tsfile.write.record.Tablet;
 import org.apache.tsfile.write.schema.IMeasurementSchema;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
-import java.util.NoSuchElementException;
-
-import static org.apache.tsfile.enums.TSDataType.DATE;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
 
 public class TsFileInsertionEventTableParserTabletIterator implements 
Iterator<Tablet> {
 
-  private final String tableName;
-
   private final long startTime;
   private final long endTime;
 
-  private final List<IMeasurementSchema> columnSchemas;
-  private final List<Tablet.ColumnCategory> columnTypes;
-  private final List<String> columnNames;
-  private final TsBlockReader tsBlockReader;
+  // Used to read or record TSFileMeta tools or meta information
+  private final TsFileSequenceReader reader;
+  private final IMetadataQuerier metadataQuerier;
+  private final TsFileMetadata fileMetadata;
+  private final Iterator<Map.Entry<String, TableSchema>> 
filteredTableSchemaIterator;
+
+  // For memory control
+  private final PipeMemoryBlock allocatedMemoryBlockForTablet;
+  private final PipeMemoryBlock allocatedMemoryBlockForBatchData;
+  private final PipeMemoryBlock allocatedMemoryBlockForChunk;
+  private final PipeMemoryBlock allocatedMemoryBlockForChunkMeta;
+  private final PipeMemoryBlock allocatedMemoryBlockForTableSchema;
+
+  // Used to read tsfile data
+  private IChunkReader chunkReader;
+  private BatchData batchData;
+
+  // Record the metadata information of the currently read Table
+  private Set<String> measurementNames;
+  private Iterator<Pair<IDeviceID, MetadataIndexNode>> deviceMetaIterator;
+  private Iterator<List<IChunkMetadata>> chunkMetadataList;
+
+  // Record the information of the currently read Table
+  private String tableName;
+  private IDeviceID deviceID;
+  private List<Tablet.ColumnCategory> columnTypes;
+  private List<String> measurementList;
+  private List<TSDataType> dataTypeList;
+
+  private List<Pair<String, Integer>> measurementColumIndexList;
+  private List<Integer> measurementIdIndexList;
+
+  // Used to record whether the same Tablet is generated when parsing starts. 
Different table
+  // information cannot be placed in the same Tablet.
+  private boolean isSameTableName;
 
   public TsFileInsertionEventTableParserTabletIterator(
-      final TableQueryExecutor tableQueryExecutor,
-      final String tableName,
-      final TableSchema tableSchema,
+      final TsFileSequenceReader tsFileSequenceReader,
+      final Predicate<Map.Entry<String, TableSchema>> predicate,
+      final PipeMemoryBlock allocatedMemoryBlockForTablet,
+      final PipeMemoryBlock allocatedMemoryBlockForBatchData,
+      final PipeMemoryBlock allocatedMemoryBlockForChunk,
+      final PipeMemoryBlock allocatedMemoryBlockForChunkMeta,
+      final PipeMemoryBlock allocatedMemoryBlockForTableSchema,
       final long startTime,
-      final long endTime) {
-    this.tableName = tableName;
+      final long endTime)
+      throws IOException {
+
     this.startTime = startTime;
     this.endTime = endTime;
 
-    columnSchemas = new ArrayList<>();
-    columnTypes = new ArrayList<>();
-    columnNames = new ArrayList<>();
-    try {
-      for (int i = 0, size = tableSchema.getColumnSchemas().size(); i < size; 
i++) {
-        final IMeasurementSchema schema = 
tableSchema.getColumnSchemas().get(i);
-        if (schema.getMeasurementName() != null && 
!schema.getMeasurementName().isEmpty()) {
-          columnSchemas.add(schema);
-          columnTypes.add(tableSchema.getColumnTypes().get(i));
-          columnNames.add(schema.getMeasurementName());
-        }
-      }
+    this.reader = tsFileSequenceReader;
+    this.metadataQuerier = new MetadataQuerierByFileImpl(reader);
+    fileMetadata = this.metadataQuerier.getWholeFileMetadata();
+    final List<Map.Entry<String, TableSchema>> tableSchemaList =
+        fileMetadata.getTableSchemaMap().entrySet().stream()
+            .filter(predicate)
+            .collect(Collectors.toList());
 
-      tsBlockReader = tableQueryExecutor.query(tableName, columnNames, null, 
null, null);
-    } catch (final ReadProcessException e) {
-      throw new PipeException("Failed to build query data set", e);
+    this.allocatedMemoryBlockForTablet = allocatedMemoryBlockForTablet;
+    this.allocatedMemoryBlockForBatchData = allocatedMemoryBlockForBatchData;
+    this.allocatedMemoryBlockForChunk = allocatedMemoryBlockForChunk;
+    this.allocatedMemoryBlockForChunkMeta = allocatedMemoryBlockForChunkMeta;
+    this.allocatedMemoryBlockForTableSchema = 
allocatedMemoryBlockForTableSchema;
+
+    long tableSchemaSize = 
fileMetadata.getBloomFilter().getRetainedSizeInBytes();
+    for (Map.Entry<String, TableSchema> tableSchemaEntry : tableSchemaList) {
+      tableSchemaSize +=
+          tableSchemaEntry.getKey().length()
+              + 
PipeMemoryWeightUtil.calculateTableSchemaBytesUsed(tableSchemaEntry.getValue());
+      PipeDataNodeResourceManager.memory()
+          .forceResize(this.allocatedMemoryBlockForTableSchema, 
tableSchemaSize);
     }
+
+    filteredTableSchemaIterator = tableSchemaList.iterator();
   }
 
   @Override
   public boolean hasNext() {
-    return tsBlockReader.hasNext();
+    try {
+      State state = State.CHECK_DATA;
+      while (true) {
+        switch (state) {
+          case CHECK_DATA:
+            if (batchData != null && batchData.hasCurrent()) {
+              return true;
+            }
+          case INIT_DATA:
+            if (chunkReader != null && chunkReader.hasNextSatisfiedPage()) {
+              batchData = chunkReader.nextPageData();
+              PipeDataNodeResourceManager.memory()
+                  .forceResize(
+                      allocatedMemoryBlockForBatchData,
+                      
PipeMemoryWeightUtil.calculateBatchDataRamBytesUsed(batchData));
+              state = State.CHECK_DATA;
+              break;
+            }
+          case INIT_CHUNK_READER:
+            if (chunkMetadataList != null && chunkMetadataList.hasNext()) {
+              initChunkReader((AbstractAlignedChunkMetadata) 
chunkMetadataList.next().get(0));
+              state = State.INIT_DATA;
+              break;
+            }
+          case INIT_CHUNK_METADATA:
+            if (deviceMetaIterator != null && deviceMetaIterator.hasNext()) {
+              final Pair<IDeviceID, MetadataIndexNode> pair = 
deviceMetaIterator.next();
+
+              long size = 0;
+              List<List<IChunkMetadata>> iChunkMetadataList =
+                  reader.getIChunkMetadataList(pair.left, measurementNames, 
pair.right);
+
+              Iterator<List<IChunkMetadata>> chunkMetadataIterator = 
iChunkMetadataList.iterator();
+              while (chunkMetadataIterator.hasNext()) {
+                final List<IChunkMetadata> chunkMetadata = 
chunkMetadataIterator.next();
+                if (chunkMetadata == null
+                    || chunkMetadata.isEmpty()
+                    || !(chunkMetadata.get(0) instanceof 
AbstractAlignedChunkMetadata)) {
+                  throw new PipeException(
+                      "Table model tsfile parsing does not support this type 
of ChunkMeta");
+                }
+
+                final AbstractAlignedChunkMetadata alignedChunkMetadata =
+                    (AbstractAlignedChunkMetadata) chunkMetadata.get(0);
+
+                // Reduce the number of times Chunks are read
+                if (alignedChunkMetadata.getEndTime() < startTime
+                    || alignedChunkMetadata.getStartTime() > endTime) {
+                  chunkMetadataIterator.remove();
+                  continue;
+                }
+
+                size +=
+                    
PipeMemoryWeightUtil.calculateAlignedChunkMetaBytesUsed(alignedChunkMetadata);
+                PipeDataNodeResourceManager.memory()
+                    .forceResize(allocatedMemoryBlockForChunkMeta, size);
+              }
+
+              deviceID = pair.getLeft();
+              chunkMetadataList = iChunkMetadataList.iterator();
+
+              state = State.INIT_CHUNK_READER;
+              break;
+            }
+          case INIT_DEVICE_META:
+            if (filteredTableSchemaIterator != null && 
filteredTableSchemaIterator.hasNext()) {
+              final Map.Entry<String, TableSchema> entry = 
filteredTableSchemaIterator.next();
+              tableName = entry.getKey();
+              final TableSchema tableSchema = entry.getValue();
+              // The table name has changed, set to false
+              isSameTableName = false;
+
+              final MetadataIndexNode tableRoot = 
fileMetadata.getTableMetadataIndexNode(tableName);
+              deviceMetaIterator = metadataQuerier.deviceIterator(tableRoot, 
null);
+
+              final int columnSchemaSize = 
tableSchema.getColumnSchemas().size();
+              dataTypeList = new ArrayList<>(columnSchemaSize);
+              columnTypes = new ArrayList<>(columnSchemaSize);
+              measurementList = new ArrayList<>(columnSchemaSize);
+              measurementNames = new HashSet<>();
+
+              measurementColumIndexList = new ArrayList<>(columnSchemaSize);
+              measurementIdIndexList = new ArrayList<>(columnSchemaSize);
+
+              for (int i = 0, j = 0; i < columnSchemaSize; i++) {
+                final IMeasurementSchema schema = 
tableSchema.getColumnSchemas().get(i);
+                final Tablet.ColumnCategory columnCategory = 
tableSchema.getColumnTypes().get(i);
+                if (schema != null
+                    && schema.getMeasurementName() != null
+                    && !schema.getMeasurementName().isEmpty()) {
+                  final String measurementName = schema.getMeasurementName();
+                  columnTypes.add(columnCategory);
+                  measurementList.add(measurementName);
+                  dataTypeList.add(schema.getType());
+                  if (!Tablet.ColumnCategory.TAG.equals(columnCategory)) {
+                    measurementNames.add(measurementName);
+                    measurementColumIndexList.add(new Pair<>(measurementName, 
j));
+                  } else {
+                    measurementIdIndexList.add(j);
+                  }
+                  j++;
+                }
+              }
+              state = State.INIT_CHUNK_METADATA;
+              break;
+            }
+            return false;
+        }
+      }
+    } catch (Exception e) {
+      throw new PipeException(e.getMessage(), e);
+    }
+  }
+
+  private enum State {
+    CHECK_DATA,
+    INIT_DATA,
+    INIT_CHUNK_READER,
+    INIT_CHUNK_METADATA,
+    INIT_DEVICE_META
   }
 
   @Override
   public Tablet next() {
-    if (!hasNext()) {
-      throw new NoSuchElementException();
+    return buildNextTablet();
+  }
+
+  private Tablet buildNextTablet() {
+    Tablet tablet = null;
+
+    boolean isFirstRow = true;
+    while (hasNext() && (isFirstRow || isSameTableName)) {
+      if (batchData.currentTime() >= startTime && batchData.currentTime() <= 
endTime) {
+        if (isFirstRow) {
+          // Record the name of the table when the tablet is started. 
Different table data cannot be
+          // in the same tablet.
+          isSameTableName = true;
+
+          // Calculate row count and memory size of the tablet based on the 
first row
+          final Pair<Integer, Integer> rowCountAndMemorySize =
+              PipeMemoryWeightUtil.calculateTabletRowCountAndMemory(batchData);
+          PipeDataNodeResourceManager.memory()
+              .forceResize(allocatedMemoryBlockForTablet, 
rowCountAndMemorySize.getLeft());
+
+          tablet =
+              new Tablet(
+                  tableName,
+                  measurementList,
+                  dataTypeList,
+                  columnTypes,
+                  rowCountAndMemorySize.getLeft());
+          tablet.initBitMaps();
+          isFirstRow = false;
+        }
+        final int rowIndex = tablet.getRowSize();
+        if (rowIndex >= tablet.getMaxRowNumber()) {
+          break;
+        }
+
+        tablet.addTimestamp(rowIndex, batchData.currentTime());
+        fillMeasurementValueColumns(batchData, tablet, rowIndex);
+        fillDeviceIdColumns(deviceID, tablet, rowIndex);
+      }
+
+      if (batchData != null) {
+        batchData.next();
+      }
     }
 
-    try {
-      return buildNextTablet();
-    } catch (final IOException e) {
-      throw new PipeException("Failed to build tablet", e);
+    if (isFirstRow) {
+      
PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForChunkMeta,
 0);
+      tablet = new Tablet(tableName, measurementList, dataTypeList, 
columnTypes, 0);
+      tablet.initBitMaps();
     }
+
+    return tablet;
   }
 
-  // TODO: memory control
-  private Tablet buildNextTablet() throws IOException {
-    final TsBlock tsBlock = tsBlockReader.next();
-
-    List<String> measurementList = new ArrayList<>(columnSchemas.size());
-    List<TSDataType> dataTypeList = new ArrayList<>(columnSchemas.size());
-    columnSchemas.forEach(
-        columnSchema -> {
-          measurementList.add(columnSchema.getMeasurementName());
-          dataTypeList.add(columnSchema.getType());
-        });
-    final Tablet tablet =
-        new Tablet(
-            tableName, measurementList, dataTypeList, columnTypes, 
tsBlock.getPositionCount());
-    tablet.initBitMaps();
-
-    boolean isAllNull = true;
-
-    final TsBlock.TsBlockRowIterator rowIterator = 
tsBlock.getTsBlockRowIterator();
-    while (rowIterator.hasNext()) {
-      final Object[] row = rowIterator.next();
-
-      final long timestamp = (Long) row[row.length - 1];
-      if (timestamp < startTime || timestamp > endTime) {
-        continue;
+  private void initChunkReader(AbstractAlignedChunkMetadata 
alignedChunkMetadata)
+      throws IOException {
+    final Chunk timeChunk =
+        reader.readMemChunk((ChunkMetadata) 
alignedChunkMetadata.getTimeChunkMetadata());
+    long size = PipeMemoryWeightUtil.calculateChunkRamBytesUsed(timeChunk);
+    
PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForChunk, 
size);
+
+    final List<Chunk> valueChunkList =
+        new 
ArrayList<>(alignedChunkMetadata.getValueChunkMetadataList().size());
+    final Map<String, ChunkMetadata> metadataMap = new HashMap<>();
+    for (IChunkMetadata metadata : 
alignedChunkMetadata.getValueChunkMetadataList()) {
+      if (metadata != null) {
+        metadataMap.put(metadata.getMeasurementUid(), (ChunkMetadata) 
metadata);
       }
+    }
 
-      final int rowIndex = tablet.getRowSize();
-      tablet.addTimestamp(rowIndex, timestamp);
-      for (int i = 0, fieldSize = row.length - 1; i < fieldSize; i++) {
-        final Object value =
-            columnSchemas.get(i).getType() != DATE || row[i] == null
-                ? row[i]
-                : DateUtils.parseIntToLocalDate((Integer) row[i]);
-        tablet.addValue(columnNames.get(i), rowIndex, value);
-        if (value != null && columnTypes.get(i) == 
Tablet.ColumnCategory.FIELD) {
-          isAllNull = false;
-        }
+    // The metadata obtained by alignedChunkMetadata.getValueChunkMetadataList 
may not be continuous
+    // when reading TSFile Chunks, so reordering the metadata here has little 
effect on the
+    // efficiency of reading chunks.
+    for (Pair<String, Integer> m : measurementColumIndexList) {
+      final ChunkMetadata metadata = metadataMap.get(m.getLeft());
+      if (metadata != null) {
+        final Chunk chunk = reader.readMemChunk(metadata);
+
+        size += PipeMemoryWeightUtil.calculateChunkRamBytesUsed(chunk);
+        
PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForChunk, 
size);
+
+        valueChunkList.add(chunk);
+        continue;
       }
+      valueChunkList.add(null);
     }
 
-    if (isAllNull) {
-      tablet.setRowSize(0);
+    this.chunkReader = new TableChunkReader(timeChunk, valueChunkList, null);
+  }
+
+  private void fillMeasurementValueColumns(
+      final BatchData data, final Tablet tablet, final int rowIndex) {
+    final TsPrimitiveType[] primitiveTypes = data.getVector();
+    final List<IMeasurementSchema> measurementSchemas = tablet.getSchemas();
+
+    for (int i = 0, size = measurementColumIndexList.size(); i < size; i++) {
+      final TsPrimitiveType primitiveType = primitiveTypes[i];
+      if (primitiveType == null) {
+        continue;
+      }
+
+      final int index = measurementColumIndexList.get(i).getRight();
+      switch (measurementSchemas.get(index).getType()) {
+        case BOOLEAN:
+          tablet.addValue(rowIndex, index, primitiveType.getBoolean());
+          break;
+        case INT32:
+          tablet.addValue(rowIndex, index, primitiveType.getInt());
+          break;
+        case DATE:
+          tablet.addValue(rowIndex, index, 
DateUtils.parseIntToLocalDate(primitiveType.getInt()));
+          break;
+        case INT64:
+        case TIMESTAMP:
+          tablet.addValue(rowIndex, index, primitiveType.getLong());
+          break;
+        case FLOAT:
+          tablet.addValue(rowIndex, index, primitiveType.getFloat());
+          break;
+        case DOUBLE:
+          tablet.addValue(rowIndex, index, primitiveType.getDouble());
+          break;
+        case TEXT:
+        case BLOB:
+        case STRING:
+          tablet.addValue(rowIndex, index, 
primitiveType.getBinary().getValues());
+          break;
+        default:
+          throw new UnSupportedDataTypeException("UnSupported" + 
primitiveType.getDataType());
+      }
     }
+  }
 
-    return tablet;
+  private void fillDeviceIdColumns(
+      final IDeviceID deviceID, final Tablet tablet, final int rowIndex) {
+    final String[] deviceIdSegments = (String[]) deviceID.getSegments();
+    for (int i = 1, totalColumns = deviceIdSegments.length; i < totalColumns; 
i++) {
+      if (deviceIdSegments[i] == null) {
+        continue;
+      }
+      tablet.addValue(rowIndex, measurementIdIndexList.get(i - 1), 
deviceIdSegments[i]);
+    }
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/InsertNodeMemoryEstimator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/InsertNodeMemoryEstimator.java
index 46c0ff4e121..3d87d8eb7b9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/InsertNodeMemoryEstimator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/InsertNodeMemoryEstimator.java
@@ -476,7 +476,7 @@ public class InsertNodeMemoryEstimator {
     return size;
   }
 
-  private static long sizeOfMeasurementSchema(final MeasurementSchema 
measurementSchema) {
+  public static long sizeOfMeasurementSchema(final MeasurementSchema 
measurementSchema) {
     if (measurementSchema == null) {
       return 0L;
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
index c016c703f8a..5f28fedf14a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
@@ -24,8 +24,13 @@ import org.apache.iotdb.db.pipe.event.common.row.PipeRow;
 import org.apache.iotdb.db.utils.MemUtils;
 
 import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.AbstractAlignedChunkMetadata;
+import org.apache.tsfile.file.metadata.ChunkMetadata;
+import org.apache.tsfile.file.metadata.IChunkMetadata;
 import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.TableSchema;
 import org.apache.tsfile.read.common.BatchData;
+import org.apache.tsfile.read.common.Chunk;
 import org.apache.tsfile.read.common.Field;
 import org.apache.tsfile.read.common.RowRecord;
 import org.apache.tsfile.utils.Binary;
@@ -34,10 +39,15 @@ import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.utils.TsPrimitiveType;
 import org.apache.tsfile.write.record.Tablet;
 import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
 
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.tsfile.utils.RamUsageEstimator.NUM_BYTES_ARRAY_HEADER;
+import static org.apache.tsfile.utils.RamUsageEstimator.NUM_BYTES_OBJECT_REF;
+import static org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize;
+
 public class PipeMemoryWeightUtil {
 
   /** Estimates memory usage of a {@link Map}<{@link IDeviceID}, {@link 
Boolean}>. */
@@ -252,6 +262,34 @@ public class PipeMemoryWeightUtil {
     return totalSizeInBytes;
   }
 
+  public static long calculateTableSchemaBytesUsed(TableSchema tableSchema) {
+    long totalSizeInBytes = 0;
+
+    final String tableName = tableSchema.getTableName();
+    if (tableName != null) {
+      totalSizeInBytes += tableName.length();
+    }
+
+    final List<IMeasurementSchema> measurementSchemas = 
tableSchema.getColumnSchemas();
+    if (measurementSchemas != null) {
+      totalSizeInBytes +=
+          NUM_BYTES_ARRAY_HEADER + (long) NUM_BYTES_OBJECT_REF * 
measurementSchemas.size();
+      for (IMeasurementSchema measurementSchema : measurementSchemas) {
+        InsertNodeMemoryEstimator.sizeOfMeasurementSchema((MeasurementSchema) 
measurementSchema);
+      }
+    }
+
+    final List<Tablet.ColumnCategory> categories = 
tableSchema.getColumnTypes();
+    if (categories != null) {
+      totalSizeInBytes +=
+          alignObjectSize(
+              (long) NUM_BYTES_ARRAY_HEADER
+                  + (long) NUM_BYTES_OBJECT_REF * (long) categories.size());
+    }
+
+    return totalSizeInBytes;
+  }
+
   public static int calculateBatchDataRamBytesUsed(BatchData batchData) {
     int totalSizeInBytes = 0;
 
@@ -285,6 +323,33 @@ public class PipeMemoryWeightUtil {
     return batchData.length() * totalSizeInBytes;
   }
 
+  public static long calculateChunkRamBytesUsed(Chunk chunk) {
+    return chunk != null ? chunk.getRetainedSizeInBytes() : 0L;
+  }
+
+  public static long calculateAlignedChunkMetaBytesUsed(
+      AbstractAlignedChunkMetadata alignedChunkMetadata) {
+    if (alignedChunkMetadata == null) {
+      return 0L;
+    }
+
+    final ChunkMetadata timeChunkMetadata =
+        (ChunkMetadata) alignedChunkMetadata.getTimeChunkMetadata();
+    final List<IChunkMetadata> valueChunkMetadataList =
+        alignedChunkMetadata.getValueChunkMetadataList();
+
+    long size = timeChunkMetadata != null ? 
timeChunkMetadata.getRetainedSizeInBytes() : 0;
+    if (valueChunkMetadataList != null && !valueChunkMetadataList.isEmpty()) {
+      for (IChunkMetadata valueChunkMetadata : valueChunkMetadataList) {
+        if (valueChunkMetadata != null) {
+          size += ((ChunkMetadata) 
valueChunkMetadata).getRetainedSizeInBytes();
+        }
+      }
+    }
+
+    return size;
+  }
+
   /**
    * Rounds up the given integer num to the nearest multiple of n.
    *

Reply via email to