This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch TableModelIngestion
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/TableModelIngestion by this 
push:
     new 64ef149aacb finish memory estimation
64ef149aacb is described below

commit 64ef149aacbef5e3c9e8bfb336b8ab6aff71351b
Author: DESKTOP-L0L5GPJ\jt <[email protected]>
AuthorDate: Wed Jun 19 15:25:41 2024 +0800

    finish memory estimation
---
 .../planner/plan/node/write/InsertTabletNode.java  |  46 ++-
 .../db/storageengine/dataregion/DataRegion.java    | 311 +++++++++------------
 .../dataregion/memtable/AbstractMemTable.java      |   4 +-
 .../memtable/AlignedWritableMemChunk.java          |   2 +-
 .../dataregion/memtable/IMemTable.java             |   4 +-
 .../dataregion/memtable/TsFileProcessor.java       | 227 ++++++++++-----
 .../wal/recover/file/TsFilePlanRedoer.java         |   2 +-
 .../java/org/apache/iotdb/db/utils/MemUtils.java   |   7 +-
 .../db/utils/datastructure/AlignedTVList.java      |  19 +-
 .../dataregion/memtable/PrimitiveMemTableTest.java |   2 +-
 10 files changed, 348 insertions(+), 276 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
index f0b0725af6e..d0c9f9c5aa6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write;
 
 import java.util.Map.Entry;
-import java.util.function.Function;
 import java.util.function.IntFunction;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
@@ -228,19 +227,19 @@ public class InsertTabletNode extends InsertNode 
implements WALEntryValue {
     return splitByPartition(analysis, this::getTableDeviceID);
   }
 
-  private Map<IDeviceID, SplitInfo> collectSplitRanges(IntFunction<IDeviceID> 
rowNumDeviceIdMapper) {
+  private Map<IDeviceID, PartitionSplitInfo> 
collectSplitRanges(IntFunction<IDeviceID> rowNumDeviceIdMapper) {
     long upperBoundOfTimePartition = 
TimePartitionUtils.getTimePartitionUpperBound(times[0]);
     TTimePartitionSlot timePartitionSlot = 
TimePartitionUtils.getTimePartitionSlot(times[0]);
     int startLoc = 0; // included
     IDeviceID currDeviceId = rowNumDeviceIdMapper.apply(0);
 
-    Map<IDeviceID, SplitInfo> deviceIDSplitInfoMap = new HashMap<>();
+    Map<IDeviceID, PartitionSplitInfo> deviceIDSplitInfoMap = new HashMap<>();
 
     for (int i = 1; i < times.length; i++) { // times are sorted in session 
API.
       IDeviceID nextDeviceId = rowNumDeviceIdMapper.apply(i);
       if (times[i] >= upperBoundOfTimePartition || 
!currDeviceId.equals(nextDeviceId)) {
-        final SplitInfo splitInfo = 
deviceIDSplitInfoMap.computeIfAbsent(currDeviceId,
-            deviceID1 -> new SplitInfo());
+        final PartitionSplitInfo splitInfo = 
deviceIDSplitInfoMap.computeIfAbsent(currDeviceId,
+            deviceID1 -> new PartitionSplitInfo());
         // a new range.
         splitInfo.ranges.add(startLoc); // included
         splitInfo.ranges.add(i); // excluded
@@ -253,8 +252,8 @@ public class InsertTabletNode extends InsertNode implements 
WALEntryValue {
       }
     }
 
-    SplitInfo splitInfo = deviceIDSplitInfoMap.computeIfAbsent(currDeviceId,
-        deviceID1 -> new SplitInfo());
+    PartitionSplitInfo splitInfo = 
deviceIDSplitInfoMap.computeIfAbsent(currDeviceId,
+        deviceID1 -> new PartitionSplitInfo());
     // the final range
     splitInfo.ranges.add(startLoc); // included
     splitInfo.ranges.add(times.length); // excluded
@@ -263,12 +262,12 @@ public class InsertTabletNode extends InsertNode 
implements WALEntryValue {
     return deviceIDSplitInfoMap;
   }
 
-  public  Map<TRegionReplicaSet, List<Integer>> 
splitByReplicaSet(Map<IDeviceID, SplitInfo> deviceIDSplitInfoMap, IAnalysis 
analysis) {
+  public  Map<TRegionReplicaSet, List<Integer>> 
splitByReplicaSet(Map<IDeviceID, PartitionSplitInfo> deviceIDSplitInfoMap, 
IAnalysis analysis) {
     Map<TRegionReplicaSet, List<Integer>> splitMap = new HashMap<>();
 
-    for (Entry<IDeviceID, SplitInfo> entry : deviceIDSplitInfoMap.entrySet()) {
+    for (Entry<IDeviceID, PartitionSplitInfo> entry : 
deviceIDSplitInfoMap.entrySet()) {
       final IDeviceID deviceID = entry.getKey();
-      final SplitInfo splitInfo = entry.getValue();
+      final PartitionSplitInfo splitInfo = entry.getValue();
       final List<TRegionReplicaSet> replicaSets = analysis
           .getDataPartitionInfo()
           .getDataRegionReplicaSetForWriting(
@@ -369,7 +368,7 @@ public class InsertTabletNode extends InsertNode implements 
WALEntryValue {
       return Collections.emptyList();
     }
 
-    final Map<IDeviceID, SplitInfo> deviceIDSplitInfoMap = 
collectSplitRanges(rowNumDeviceIdMapper);
+    final Map<IDeviceID, PartitionSplitInfo> deviceIDSplitInfoMap = 
collectSplitRanges(rowNumDeviceIdMapper);
     final Map<TRegionReplicaSet, List<Integer>> splitMap = splitByReplicaSet(
         deviceIDSplitInfoMap, analysis);
 
@@ -1205,10 +1204,33 @@ public class InsertTabletNode extends InsertNode 
implements WALEntryValue {
     return deviceIDs[rowIdx];
   }
 
-  private class SplitInfo {
+  private static class PartitionSplitInfo {
     // for each List in split, they are range1.start, range1.end, 
range2.start, range2.end, ...
     private List<Integer> ranges = new ArrayList<>();
     private List<TTimePartitionSlot> timePartitionSlots = new ArrayList<>();
     private List<TRegionReplicaSet> replicaSets;
   }
+
+  /**
+   * Split the tablet of the given range according to Table deviceID.
+   * @param start inclusive
+   * @param end exclusive
+   * @return each the number in the pair is the end offset of a device
+   */
+  public List<Pair<IDeviceID, Integer>> splitByDevice(int start, int end) {
+    List<Pair<IDeviceID, Integer>> result = new ArrayList<>();
+    IDeviceID prevDeviceId = getTableDeviceID(start);
+
+    int i = start + 1;
+    for (; i < end; i++) {
+      IDeviceID currentDeviceId = getTableDeviceID(i);
+      if (!currentDeviceId.equals(prevDeviceId)) {
+        result.add(new Pair<>(prevDeviceId, i));
+        prevDeviceId = getTableDeviceID(i);
+      }
+    }
+    result.add(new Pair<>(prevDeviceId, start));
+
+    return result;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 9bab11f9503..688cf838a50 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -343,10 +343,10 @@ public class DataRegion implements IDataRegionForQuery {
   /**
    * Construct a database processor.
    *
-   * @param systemDir system dir path
-   * @param dataRegionId data region id e.g. 1
+   * @param systemDir       system dir path
+   * @param dataRegionId    data region id e.g. 1
    * @param fileFlushPolicy file flush policy
-   * @param databaseName database name e.g. root.sg1
+   * @param databaseName    database name e.g. root.sg1
    */
   public DataRegion(
       String systemDir, String dataRegionId, TsFileFlushPolicy 
fileFlushPolicy, String databaseName)
@@ -945,18 +945,7 @@ public class DataRegion implements IDataRegionForQuery {
       }
       // init map
       long timePartitionId = 
TimePartitionUtils.getTimePartitionId(insertRowNode.getTime());
-
-      if (config.isEnableSeparateData()
-          && 
!lastFlushTimeMap.checkAndCreateFlushedTimePartition(timePartitionId)) {
-        TimePartitionManager.getInstance()
-            .registerTimePartitionInfo(
-                new TimePartitionInfo(
-                    new DataRegionId(Integer.parseInt(dataRegionId)),
-                    timePartitionId,
-                    true,
-                    Long.MAX_VALUE,
-                    0));
-      }
+      initFlushTimeMap(timePartitionId);
 
       boolean isSequence =
           config.isEnableSeparateData()
@@ -976,28 +965,100 @@ public class DataRegion implements IDataRegionForQuery {
     }
   }
 
-  public void insertTreeTablet(InsertTabletNode insertTabletNode,
-      IntFunction<IDeviceID> rowDeviceIdGetter, IntToLongFunction 
rowLastFlushTimeGetter)
+  public void insertTreeTablet(InsertTabletNode insertTabletNode)
       throws BatchProcessException, WriteProcessException {
     final IDeviceID deviceID = insertTabletNode.getDeviceID();
-
     insertTablet(insertTabletNode, i -> deviceID, i ->
         config.isEnableSeparateData()
             ? lastFlushTimeMap.getFlushedTime(
             
TimePartitionUtils.getTimePartitionId(insertTabletNode.getTimes()[i]),
             insertTabletNode.getDeviceID())
-            : Long.MAX_VALUE
+            : Long.MAX_VALUE,
+        false
+    );
+  }
+
+  public void insertTableTablet(InsertTabletNode insertTabletNode)
+      throws BatchProcessException, WriteProcessException {
+    insertTablet(insertTabletNode, insertTabletNode::getTableDeviceID, i ->
+            config.isEnableSeparateData()
+                ? lastFlushTimeMap.getFlushedTime(
+                
TimePartitionUtils.getTimePartitionId(insertTabletNode.getTimes()[i]),
+                insertTabletNode.getTableDeviceID(i))
+                : Long.MAX_VALUE,
+        true
     );
   }
 
+
+  private boolean splitAndInsert(InsertTabletNode insertTabletNode,
+      IntFunction<IDeviceID> rowDeviceIdGetter, IntToLongFunction 
rowLastFlushTimeGetter, int loc,
+      TSStatus[] results)
+      throws BatchProcessException, WriteProcessException {
+    boolean noFailure = true;
+
+    // before is first start point
+    int before = loc;
+    long beforeTime = insertTabletNode.getTimes()[before];
+    // before time partition
+    long beforeTimePartition =
+        TimePartitionUtils.getTimePartitionId(beforeTime);
+    // init flush time map
+    initFlushTimeMap(beforeTimePartition);
+
+    // if is sequence
+    boolean isSequence = false;
+    while (loc < insertTabletNode.getRowCount()) {
+      long lastFlushTime = rowLastFlushTimeGetter.applyAsLong(loc);
+      long time = insertTabletNode.getTimes()[loc];
+      final long timePartitionId = TimePartitionUtils.getTimePartitionId(time);
+      // always in some time partition
+      // judge if we should insert sequence
+      if (timePartitionId != beforeTimePartition) {
+        // a new partition, insert the remaining of the previous partition
+        noFailure =
+            insertTabletToTsFileProcessor(
+                insertTabletNode, before, loc, isSequence, results,
+                beforeTimePartition, rowDeviceIdGetter, noFailure)
+                && noFailure;
+        before = loc;
+        beforeTimePartition = timePartitionId;
+        isSequence = time > lastFlushTime;
+      } else if (!isSequence && time > lastFlushTime) {
+        // the same partition and switch to sequence data
+        // insert previous range into unsequence
+        noFailure =
+            insertTabletToTsFileProcessor(
+                insertTabletNode, before, loc, isSequence, results,
+                beforeTimePartition, rowDeviceIdGetter, noFailure)
+                && noFailure;
+        before = loc;
+        isSequence = true;
+      }
+      // else: the same partition and isSequence not changed, just move the 
cursor forward
+      loc++;
+    }
+
+    // do not forget last part
+    if (before < loc) {
+      noFailure =
+          insertTabletToTsFileProcessor(
+              insertTabletNode, before, loc, isSequence, results, 
beforeTimePartition, rowDeviceIdGetter, noFailure)
+              && noFailure;
+    }
+
+    return noFailure;
+  }
+
   /**
-   * Insert a tablet (rows belonging to the same devices) into this database.
+   * Insert a tablet into this database.
    *
    * @throws BatchProcessException if some of the rows failed to be inserted
    */
   @SuppressWarnings({"squid:S3776", "squid:S6541"}) // Suppress high Cognitive 
Complexity warning
-  public void insertTablet(InsertTabletNode insertTabletNode,
-      IntFunction<IDeviceID> rowDeviceIdGetter, IntToLongFunction 
rowLastFlushTimeGetter)
+  private void insertTablet(InsertTabletNode insertTabletNode,
+      IntFunction<IDeviceID> rowDeviceIdGetter, IntToLongFunction 
rowLastFlushTimeGetter,
+      boolean checkAllRowTtl)
       throws BatchProcessException, WriteProcessException {
     StorageEngine.blockInsertionIfReject(null);
     long startTime = System.nanoTime();
@@ -1012,57 +1073,11 @@ public class DataRegion implements IDataRegionForQuery {
       boolean noFailure;
 
       int loc = checkTTL(insertTabletNode, results, i -> 
DataNodeTTLCache.getInstance()
-          .getTTL(rowDeviceIdGetter.apply(i)));
+          .getTTL(rowDeviceIdGetter.apply(i)), !checkAllRowTtl);
       noFailure = loc != 0;
 
-      // before is first start point
-      int before = loc;
-      long before
-      // before time partition
-      long beforeTimePartition =
-          
TimePartitionUtils.getTimePartitionId(insertTabletNode.getTimes()[before]);
-      // init map
+      noFailure = noFailure & splitAndInsert(insertTabletNode, 
rowDeviceIdGetter, rowLastFlushTimeGetter, loc, results);
 
-      if (config.isEnableSeparateData()
-          && 
!lastFlushTimeMap.checkAndCreateFlushedTimePartition(beforeTimePartition)) {
-        TimePartitionManager.getInstance()
-            .registerTimePartitionInfo(
-                new TimePartitionInfo(
-                    new DataRegionId(Integer.parseInt(dataRegionId)),
-                    beforeTimePartition,
-                    true,
-                    Long.MAX_VALUE,
-                    0));
-      }
-
-      // if is sequence
-      boolean isSequence = false;
-      while (loc < insertTabletNode.getRowCount()) {
-        long lastFlushTime = rowLastFlushTimeGetter.applyAsLong(loc);
-        long time = insertTabletNode.getTimes()[loc];
-        final long timePartitionId = 
TimePartitionUtils.getTimePartitionId(time);
-        // always in some time partition
-        // judge if we should insert sequence
-        if (!isSequence && time > lastFlushTime) {
-          // insert into unsequence and then start sequence
-          noFailure =
-              insertTabletToTsFileProcessor(
-                  insertTabletNode, before, loc, false, results,
-                  timePartitionId)
-                  && noFailure;
-          before = loc;
-          isSequence = true;
-        }
-        loc++;
-      }
-
-      // do not forget last part
-      if (before < loc) {
-        noFailure =
-            insertTabletToTsFileProcessor(
-                insertTabletNode, before, loc, isSequence, results, time)
-                && noFailure;
-      }
       startTime = System.nanoTime();
       tryToUpdateInsertTabletLastCache(insertTabletNode);
       
PERFORMANCE_OVERVIEW_METRICS.recordScheduleUpdateLastCacheCost(System.nanoTime()
 - startTime);
@@ -1075,8 +1090,22 @@ public class DataRegion implements IDataRegionForQuery {
     }
   }
 
+  private void initFlushTimeMap(long timePartitionId) {
+    if (config.isEnableSeparateData()
+        && 
!lastFlushTimeMap.checkAndCreateFlushedTimePartition(timePartitionId)) {
+      TimePartitionManager.getInstance()
+          .registerTimePartitionInfo(
+              new TimePartitionInfo(
+                  new DataRegionId(Integer.parseInt(dataRegionId)),
+                  timePartitionId,
+                  true,
+                  Long.MAX_VALUE,
+                  0));
+    }
+  }
+
   private int checkTTL(InsertTabletNode insertTabletNode, TSStatus[] results,
-      IntToLongFunction rowTTLGetter)
+      IntToLongFunction rowTTLGetter, boolean breakOnFirstAlive)
       throws OutOfTTLException {
 
     /*
@@ -1084,6 +1113,7 @@ public class DataRegion implements IDataRegionForQuery {
      */
     int loc = 0;
     long ttl = 0;
+    int firstAliveLoc = -1;
     while (loc < insertTabletNode.getRowCount()) {
       ttl = rowTTLGetter.applyAsLong(loc);
       long currTime = insertTabletNode.getTimes()[loc];
@@ -1097,102 +1127,24 @@ public class DataRegion implements IDataRegionForQuery 
{
                     DateTimeUtils.convertLongToDate(currTime),
                     DateTimeUtils.convertLongToDate(
                         CommonDateTimeUtils.currentTime() - ttl)));
-        loc++;
       } else {
-        break;
+        if (firstAliveLoc == -1) {
+          firstAliveLoc = loc;
+        }
+        if (breakOnFirstAlive) {
+          break;
+        }
       }
+      loc++;
     }
-    // loc pointing at first legal position
-    if (loc == insertTabletNode.getRowCount()) {
+
+    if (firstAliveLoc == -1) {
+      // no alive data
       throw new OutOfTTLException(
           insertTabletNode.getTimes()[insertTabletNode.getTimes().length - 1],
           (CommonDateTimeUtils.currentTime() - ttl));
     }
-    return loc;
-  }
-
-  /**
-   * Insert a tablet (rows belonging to the same devices) into this database.
-   *
-   * @throws BatchProcessException if some of the rows failed to be inserted
-   */
-  @SuppressWarnings({"squid:S3776", "squid:S6541"}) // Suppress high Cognitive 
Complexity warning
-  public void insertTableTablet(InsertTabletNode insertTabletNode)
-      throws BatchProcessException, WriteProcessException {
-    StorageEngine.blockInsertionIfReject(null);
-    long startTime = System.nanoTime();
-    writeLock("insertTablet");
-    PERFORMANCE_OVERVIEW_METRICS.recordScheduleLockCost(System.nanoTime() - 
startTime);
-    try {
-      if (deleted) {
-        return;
-      }
-      TSStatus[] results = new TSStatus[insertTabletNode.getRowCount()];
-      Arrays.fill(results, RpcUtils.SUCCESS_STATUS);
-      boolean noFailure = true;
-      int loc = checkTTL(insertTabletNode, results,
-          i -> 
DataNodeTTLCache.getInstance().getTTL(insertTabletNode.getTableDeviceID(i)));
-
-      // before is first start point
-      int before = loc;
-      // before time partition
-      long beforeTimePartition =
-          
TimePartitionUtils.getTimePartitionId(insertTabletNode.getTimes()[before]);
-      // init map
-
-      if (config.isEnableSeparateData()
-          && 
!lastFlushTimeMap.checkAndCreateFlushedTimePartition(beforeTimePartition)) {
-        TimePartitionManager.getInstance()
-            .registerTimePartitionInfo(
-                new TimePartitionInfo(
-                    new DataRegionId(Integer.parseInt(dataRegionId)),
-                    beforeTimePartition,
-                    true,
-                    Long.MAX_VALUE,
-                    0));
-      }
-
-      // if is sequence
-      boolean isSequence = false;
-      while (loc < insertTabletNode.getRowCount()) {
-        long lastFlushTime =
-            config.isEnableSeparateData()
-                ? lastFlushTimeMap.getFlushedTime(beforeTimePartition,
-                insertTabletNode.getTableDeviceID(loc))
-                : Long.MAX_VALUE;
-
-        long time = insertTabletNode.getTimes()[loc];
-        // always in some time partition
-        // judge if we should insert sequence
-        if (!isSequence && time > lastFlushTime) {
-          // insert into unsequence and then start sequence
-          noFailure =
-              insertTabletToTsFileProcessor(
-                  insertTabletNode, before, loc, false, results, 
beforeTimePartition)
-                  && noFailure;
-          before = loc;
-          isSequence = true;
-        }
-        loc++;
-      }
-
-      // do not forget last part
-      if (before < loc) {
-        noFailure =
-            insertTabletToTsFileProcessor(
-                insertTabletNode, before, loc, isSequence, results, 
beforeTimePartition)
-                && noFailure;
-      }
-      startTime = System.nanoTime();
-      tryToUpdateInsertTabletLastCache(insertTabletNode);
-      
PERFORMANCE_OVERVIEW_METRICS.recordScheduleUpdateLastCacheCost(System.nanoTime()
 - startTime);
-
-      if (!noFailure) {
-        throw new BatchProcessException(results);
-      }
-    } finally {
-      writeUnlock();
-    }
+    return firstAliveLoc;
   }
 
   /**
@@ -1203,7 +1155,11 @@ public class DataRegion implements IDataRegionForQuery {
   @SuppressWarnings({"squid:S3776", "squid:S6541"}) // Suppress high Cognitive 
Complexity warning
   public void insertTablet(InsertTabletNode insertTabletNode)
       throws BatchProcessException, WriteProcessException {
-
+      if (insertTabletNode.isWriteToTable()) {
+        insertTableTablet(insertTabletNode);
+      } else {
+        insertTreeTablet(insertTabletNode);
+      }
   }
 
   /**
@@ -1221,11 +1177,11 @@ public class DataRegion implements IDataRegionForQuery {
    * subsequent non-null value, e.g., {1, null, 3, null, 5} will be {1, 3, 5, 
null, 5}
    *
    * @param insertTabletNode insert a tablet of a device
-   * @param sequence whether is sequence
-   * @param start start index of rows to be inserted in insertTabletPlan
-   * @param end end index of rows to be inserted in insertTabletPlan
-   * @param results result array
-   * @param timePartitionId time partition id
+   * @param sequence         whether is sequence
+   * @param start            start index of rows to be inserted in 
insertTabletPlan
+   * @param end              end index of rows to be inserted in 
insertTabletPlan
+   * @param results          result array
+   * @param timePartitionId  time partition id
    * @return false if any failure occurs when inserting the tablet, true 
otherwise
    */
   private boolean insertTabletToTsFileProcessor(
@@ -1234,7 +1190,9 @@ public class DataRegion implements IDataRegionForQuery {
       int end,
       boolean sequence,
       TSStatus[] results,
-      long timePartitionId) {
+      long timePartitionId,
+      IntFunction<IDeviceID> rowDeviceIdGetter,
+      boolean noFailure) {
     // return when start >= end or all measurement failed
     if (start >= end || insertTabletNode.allMeasurementFailed()) {
       return true;
@@ -1252,7 +1210,7 @@ public class DataRegion implements IDataRegionForQuery {
     }
 
     try {
-      tsFileProcessor.insertTablet(insertTabletNode, start, end, results);
+      tsFileProcessor.insertTablet(insertTabletNode, start, end, results, 
rowDeviceIdGetter, noFailure);
     } catch (WriteProcessRejectException e) {
       logger.warn("insert to TsFileProcessor rejected, {}", e.getMessage());
       return false;
@@ -1552,9 +1510,9 @@ public class DataRegion implements IDataRegionForQuery {
   /**
    * get processor from hashmap, flush oldest processor if necessary
    *
-   * @param timeRangeId time partition range
+   * @param timeRangeId            time partition range
    * @param tsFileProcessorTreeMap tsFileProcessorTreeMap
-   * @param sequence whether is sequence or not
+   * @param sequence               whether is sequence or not
    */
   private TsFileProcessor getOrCreateTsFileProcessorIntern(
       long timeRangeId, TreeMap<Long, TsFileProcessor> tsFileProcessorTreeMap, 
boolean sequence)
@@ -1638,7 +1596,7 @@ public class DataRegion implements IDataRegionForQuery {
   /**
    * close one tsfile processor
    *
-   * @param sequence whether this tsfile processor is sequence or not
+   * @param sequence        whether this tsfile processor is sequence or not
    * @param tsFileProcessor tsfile processor
    */
   public void syncCloseOneTsFileProcessor(boolean sequence, TsFileProcessor 
tsFileProcessor) {
@@ -1670,7 +1628,7 @@ public class DataRegion implements IDataRegionForQuery {
   /**
    * close one tsfile processor, thread-safety should be ensured by caller
    *
-   * @param sequence whether this tsfile processor is sequence or not
+   * @param sequence        whether this tsfile processor is sequence or not
    * @param tsFileProcessor tsfile processor
    */
   public Future<?> asyncCloseOneTsFileProcessor(boolean sequence, 
TsFileProcessor tsFileProcessor) {
@@ -2884,7 +2842,7 @@ public class DataRegion implements IDataRegionForQuery {
    * <p>Then, update the latestTimeForEachDevice and 
partitionLatestFlushedTimeForEachDevice.
    *
    * @param newTsFileResource tsfile resource @UsedBy load external tsfile 
module
-   * @param deleteOriginFile whether to delete origin tsfile
+   * @param deleteOriginFile  whether to delete origin tsfile
    * @param isGeneratedByPipe whether the load tsfile request is generated by 
pipe
    */
   public void loadNewTsFile(
@@ -3071,6 +3029,7 @@ public class DataRegion implements IDataRegionForQuery {
 
   /**
    * Update latest time in latestTimeForEachDevice and 
partitionLatestFlushedTimeForEachDevice.
+   *
    * @UsedBy sync module, load external tsfile module.
    */
   protected void updateLastFlushTime(TsFileResource newTsFileResource) {
@@ -3089,8 +3048,8 @@ public class DataRegion implements IDataRegionForQuery {
   /**
    * Execute the loading process by the type.
    *
-   * @param tsFileResource tsfile resource to be loaded
-   * @param filePartitionId the partition id of the new file
+   * @param tsFileResource   tsfile resource to be loaded
+   * @param filePartitionId  the partition id of the new file
    * @param deleteOriginFile whether to delete the original file
    * @return load the file successfully @UsedBy sync module, load external 
tsfile module.
    */
@@ -3638,7 +3597,7 @@ public class DataRegion implements IDataRegionForQuery {
   }
 
   /**
-   * @param folder the folder's path
+   * @param folder   the folder's path
    * @param diskSize the disk space occupied by this folder, unit is MB
    */
   private void countFolderDiskSize(String folder, AtomicLong diskSize) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
index a1d1047251e..263edad395b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.storageengine.dataregion.memtable;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.AlignedFullPath;
@@ -349,7 +350,8 @@ public abstract class AbstractMemTable implements IMemTable 
{
   }
 
   @Override
-  public void insertAlignedTablet(InsertTabletNode insertTabletNode, int 
start, int end)
+  public void insertAlignedTablet(InsertTabletNode insertTabletNode, int 
start, int end,
+      TSStatus[] results)
       throws WriteProcessException {
     try {
       writeAlignedTablet(insertTabletNode, start, end);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
index b93474fdb92..a219df17908 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
@@ -245,7 +245,7 @@ public class AlignedWritableMemChunk implements 
IWritableMemChunk {
     return (long) list.rowCount() * measurementIndexMap.size();
   }
 
-  public long alignedListSize() {
+  public int alignedListSize() {
     return list.rowCount();
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
index f998a76cf1a..4fbfb451302 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.storageengine.dataregion.memtable;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.IFullPath;
 import org.apache.iotdb.commons.path.PartialPath;
@@ -115,7 +116,8 @@ public interface IMemTable extends WALEntryValue {
   void insertTablet(InsertTabletNode insertTabletNode, int start, int end)
       throws WriteProcessException;
 
-  void insertAlignedTablet(InsertTabletNode insertTabletNode, int start, int 
end)
+  void insertAlignedTablet(InsertTabletNode insertTabletNode, int start, int 
end,
+      TSStatus[] results)
       throws WriteProcessException;
 
   ReadOnlyMemChunk query(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index c43078a24e5..08c0431132f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.storageengine.dataregion.memtable;
 
+import java.util.function.IntFunction;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.exception.IllegalPathException;
@@ -26,6 +27,7 @@ import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.AlignedPath;
 import org.apache.iotdb.commons.path.IFullPath;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
 import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
 import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
 import org.apache.iotdb.commons.utils.TestOnly;
@@ -116,6 +118,7 @@ public class TsFileProcessor {
 
   /** Logger fot this class. */
   private static final Logger logger = 
LoggerFactory.getLogger(TsFileProcessor.class);
+  private static final int NUM_MEM_TO_ESTIMATE = 3;
 
   /** Storage group name of this tsfile. */
   private final String storageGroupName;
@@ -413,6 +416,79 @@ public class TsFileProcessor {
     walNode.onMemTableCreated(workMemTable, tsFileResource.getTsFilePath());
   }
 
+  private long[] checkMemCost(InsertTabletNode insertTabletNode, int start, 
int end, TSStatus[] results, boolean noFailure)
+      throws WriteProcessException {
+    long[] memIncrements;
+    try {
+      long startTime = System.nanoTime();
+      if (insertTabletNode.isWriteToTable()) {
+        memIncrements = checkTreeMemCost(insertTabletNode, start, end, 
noFailure, results);
+      } else {
+        memIncrements = checkTableMemCost(insertTabletNode, start, end, 
noFailure, results);
+      }
+      
PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemoryBlockCost(System.nanoTime() - 
startTime);
+    } catch (WriteProcessException e) {
+      for (int i = start; i < end; i++) {
+        results[i] = RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_REJECT, 
e.getMessage());
+      }
+      throw new WriteProcessException(e);
+    }
+
+    return memIncrements;
+  }
+
+  private long[] checkTreeMemCost(InsertTabletNode insertTabletNode, int 
start, int end,
+      boolean noFailure, TSStatus[] results)
+      throws WriteProcessException {
+    long[] memIncrements;
+    if (insertTabletNode.isAligned()) {
+      memIncrements =
+          checkAlignedMemCostAndAddToTspForTablet(
+              insertTabletNode.getDeviceID(),
+              insertTabletNode.getMeasurements(),
+              insertTabletNode.getDataTypes(),
+              insertTabletNode.getColumns(),
+              insertTabletNode.getColumnCategories(),
+              start,
+              end,
+              noFailure, results);
+    } else {
+      memIncrements =
+          checkMemCostAndAddToTspInfoForTablet(
+              insertTabletNode.getDeviceID(),
+              insertTabletNode.getMeasurements(),
+              insertTabletNode.getDataTypes(),
+              insertTabletNode.getColumns(),
+              start,
+              end);
+    }
+    return memIncrements;
+  }
+
+  private long[] checkTableMemCost(InsertTabletNode insertTabletNode, int 
start, int end,
+      boolean noFailure, TSStatus[] results)
+      throws WriteProcessException {
+    List<Pair<IDeviceID, Integer>> deviceEndPosList = 
insertTabletNode.splitByDevice(start, end);
+    long[] memIncrements = new long[NUM_MEM_TO_ESTIMATE];
+    int splitStart = start;
+    for (Pair<IDeviceID, Integer> iDeviceIDIntegerPair : deviceEndPosList) {
+      int splitEnd = iDeviceIDIntegerPair.getRight();
+      IDeviceID deviceID = iDeviceIDIntegerPair.getLeft();
+      long[] splitMemIncrements = checkAlignedMemCostAndAddToTspForTablet(
+          deviceID,
+          insertTabletNode.getMeasurements(),
+          insertTabletNode.getDataTypes(),
+          insertTabletNode.getColumns(),
+          insertTabletNode.getColumnCategories(), splitStart,
+          splitEnd, noFailure, results);
+      for (int i = 0; i < NUM_MEM_TO_ESTIMATE; i++) {
+        memIncrements[i] += splitMemIncrements[i];
+      }
+      splitStart = splitEnd;
+    }
+    return memIncrements;
+  }
+
   /**
    * Insert batch data of insertTabletPlan into the workingMemtable. The rows 
to be inserted are in
    * the range [start, end). Null value in each column values will be replaced 
by the subsequent
@@ -424,7 +500,9 @@ public class TsFileProcessor {
    * @param results result array
    */
   public void insertTablet(
-      InsertTabletNode insertTabletNode, int start, int end, TSStatus[] 
results)
+      InsertTabletNode insertTabletNode, int start, int end, TSStatus[] 
results,
+      IntFunction<IDeviceID> rowDeviceIdGetter,
+      boolean noFailure)
       throws WriteProcessException {
 
     if (workMemTable == null) {
@@ -435,35 +513,7 @@ public class TsFileProcessor {
           
.recordActiveMemTableCount(dataRegionInfo.getDataRegion().getDataRegionId(), 1);
     }
 
-    long[] memIncrements;
-    try {
-      long startTime = System.nanoTime();
-      if (insertTabletNode.isAligned()) {
-        memIncrements =
-            checkAlignedMemCostAndAddToTspForTablet(
-                insertTabletNode.getDeviceID(),
-                insertTabletNode.getMeasurements(),
-                insertTabletNode.getDataTypes(),
-                insertTabletNode.getColumns(),
-                start,
-                end);
-      } else {
-        memIncrements =
-            checkMemCostAndAddToTspInfoForTablet(
-                insertTabletNode.getDeviceID(),
-                insertTabletNode.getMeasurements(),
-                insertTabletNode.getDataTypes(),
-                insertTabletNode.getColumns(),
-                start,
-                end);
-      }
-      
PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemoryBlockCost(System.nanoTime() - 
startTime);
-    } catch (WriteProcessException e) {
-      for (int i = start; i < end; i++) {
-        results[i] = RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_REJECT, 
e.getMessage());
-      }
-      throw new WriteProcessException(e);
-    }
+    long[] memIncrements = checkMemCost(insertTabletNode, start, end, results, 
noFailure);
 
     long startTime = System.nanoTime();
     WALFlushListener walFlushListener;
@@ -497,7 +547,7 @@ public class TsFileProcessor {
 
     try {
       if (insertTabletNode.isAligned()) {
-        workMemTable.insertAlignedTablet(insertTabletNode, start, end);
+        workMemTable.insertAlignedTablet(insertTabletNode, start, end, 
noFailure ? null : results);
       } else {
         workMemTable.insertTablet(insertTabletNode, start, end);
       }
@@ -626,7 +676,7 @@ public class TsFileProcessor {
       chunkMetadataIncrement +=
           ChunkMetadata.calculateRamSize(AlignedPath.VECTOR_PLACEHOLDER, 
TSDataType.VECTOR)
               * dataTypes.length;
-      memTableIncrement += AlignedTVList.alignedTvListArrayMemCost(dataTypes);
+      memTableIncrement += AlignedTVList.alignedTvListArrayMemCost(dataTypes, 
null);
       for (int i = 0; i < dataTypes.length; i++) {
         // Skip failed Measurements
         if (dataTypes[i] == null || measurements[i] == null) {
@@ -692,7 +742,7 @@ public class TsFileProcessor {
         chunkMetadataIncrement +=
             ChunkMetadata.calculateRamSize(AlignedPath.VECTOR_PLACEHOLDER, 
TSDataType.VECTOR)
                 * dataTypes.length;
-        memTableIncrement += 
AlignedTVList.alignedTvListArrayMemCost(dataTypes);
+        memTableIncrement += 
AlignedTVList.alignedTvListArrayMemCost(dataTypes, null);
         for (int i = 0; i < dataTypes.length; i++) {
           // Skip failed Measurements
           if (dataTypes[i] == null || measurements[i] == null) {
@@ -787,15 +837,15 @@ public class TsFileProcessor {
       String[] measurements,
       TSDataType[] dataTypes,
       Object[] columns,
-      int start,
-      int end)
+      TsTableColumnCategory[] columnCategories, int start,
+      int end, boolean noFailure, TSStatus[] results)
       throws WriteProcessException {
     if (start >= end) {
       return new long[] {0, 0, 0};
     }
     long[] memIncrements = new long[3]; // memTable, text, chunk metadata
 
-    updateAlignedMemCost(dataTypes, deviceId, measurements, start, end, 
memIncrements, columns);
+    updateAlignedMemCost(dataTypes, deviceId, measurements, start, end, 
memIncrements, columns, columnCategories, noFailure, results);
     long memTableIncrement = memIncrements[0];
     long textDataIncrement = memIncrements[1];
     long chunkMetadataIncrement = memIncrements[2];
@@ -837,7 +887,7 @@ public class TsFileProcessor {
     // TEXT data size
     if (dataType.isBinary()) {
       Binary[] binColumn = (Binary[]) column;
-      memIncrements[1] += MemUtils.getBinaryColumnSize(binColumn, start, end);
+      memIncrements[1] += MemUtils.getBinaryColumnSize(binColumn, start, end, 
null);
     }
   }
 
@@ -848,72 +898,99 @@ public class TsFileProcessor {
       int start,
       int end,
       long[] memIncrements,
-      Object[] columns) {
+      Object[] columns, TsTableColumnCategory[] columnCategories, boolean 
noFailure,
+      TSStatus[] results) {
+    int incomingPointNum;
+    if (noFailure) {
+      incomingPointNum = end - start;
+    } else {
+      incomingPointNum = end - start;
+      for (TSStatus result : results) {
+        if (result != null) {
+          incomingPointNum --;
+        }
+      }
+    }
+
+    int measurementColumnNum = 0;
+    if (columnCategories == null) {
+      measurementColumnNum = dataTypes.length;
+    } else {
+      for (TsTableColumnCategory columnCategory : columnCategories) {
+        if (columnCategory == TsTableColumnCategory.MEASUREMENT) {
+          measurementColumnNum++;
+        }
+      }
+    }
+
     // memIncrements = [memTable, text, chunk metadata] respectively
     if (workMemTable.checkIfChunkDoesNotExist(deviceId, 
AlignedPath.VECTOR_PLACEHOLDER)) {
-      // ChunkMetadataIncrement
+      // new devices introduce new ChunkMetadata
+      // ChunkMetadata memory Increment
       memIncrements[2] +=
-          dataTypes.length
+          measurementColumnNum
               * ChunkMetadata.calculateRamSize(AlignedPath.VECTOR_PLACEHOLDER, 
TSDataType.VECTOR);
-      memIncrements[0] +=
-          ((end - start) / PrimitiveArrayManager.ARRAY_SIZE + 1)
-              * AlignedTVList.alignedTvListArrayMemCost(dataTypes);
-      for (int i = 0; i < dataTypes.length; i++) {
-        TSDataType dataType = dataTypes[i];
-        String measurement = measurementIds[i];
-        Object column = columns[i];
-        if (dataType == null || column == null || measurement == null) {
-          continue;
-        }
-        // TEXT data size
-        if (dataType.isBinary()) {
-          Binary[] binColumn = (Binary[]) columns[i];
-          memIncrements[1] += MemUtils.getBinaryColumnSize(binColumn, start, 
end);
-        }
-      }
+      // TVList memory
 
+      int numArraysToAdd = incomingPointNum / PrimitiveArrayManager.ARRAY_SIZE 
+
+          incomingPointNum % PrimitiveArrayManager.ARRAY_SIZE > 0 ? 1 : 0;
+      memIncrements[0] +=
+          numArraysToAdd * AlignedTVList.alignedTvListArrayMemCost(dataTypes, 
columnCategories);
     } else {
       AlignedWritableMemChunk alignedMemChunk =
           ((AlignedWritableMemChunkGroup) 
workMemTable.getMemTableMap().get(deviceId))
               .getAlignedMemChunk();
       List<TSDataType> dataTypesInTVList = new ArrayList<>();
+      int currentPointNum = alignedMemChunk.alignedListSize();
+      int newPointNum = currentPointNum + incomingPointNum;
       for (int i = 0; i < dataTypes.length; i++) {
         TSDataType dataType = dataTypes[i];
         String measurement = measurementIds[i];
         Object column = columns[i];
-        if (dataType == null || column == null || measurement == null) {
+        if (dataType == null || column == null || measurement == null ||
+            (columnCategories != null && columnCategories[i] != 
TsTableColumnCategory.MEASUREMENT)) {
           continue;
         }
-        // Extending the column of aligned mem chunk
+
         if (!alignedMemChunk.containsMeasurement(measurementIds[i])) {
+          // add a new column in the TVList, the new column should be as long 
as existing ones
           memIncrements[0] +=
-              (alignedMemChunk.alignedListSize() / 
PrimitiveArrayManager.ARRAY_SIZE + 1)
+              (currentPointNum / PrimitiveArrayManager.ARRAY_SIZE + 1)
                   * AlignedTVList.valueListArrayMemCost(dataType);
           dataTypesInTVList.add(dataType);
         }
-        // TEXT data size
-        if (dataType.isBinary()) {
-          Binary[] binColumn = (Binary[]) columns[i];
-          memIncrements[1] += MemUtils.getBinaryColumnSize(binColumn, start, 
end);
-        }
-      }
-      long acquireArray;
-      if (alignedMemChunk.alignedListSize() % PrimitiveArrayManager.ARRAY_SIZE 
== 0) {
-        acquireArray = (end - start) / PrimitiveArrayManager.ARRAY_SIZE + 1L;
-      } else {
-        acquireArray =
-            (end
-                    - start
-                    - 1
-                    + (alignedMemChunk.alignedListSize() % 
PrimitiveArrayManager.ARRAY_SIZE))
-                / PrimitiveArrayManager.ARRAY_SIZE;
       }
+
+      // calculate how many new arrays will be added after this insertion
+      int currentArrayCnt =  currentPointNum / 
PrimitiveArrayManager.ARRAY_SIZE +
+          currentPointNum % PrimitiveArrayManager.ARRAY_SIZE > 0 ? 1 : 0;
+      int newArrayCnt = newPointNum / PrimitiveArrayManager.ARRAY_SIZE +
+          newPointNum % PrimitiveArrayManager.ARRAY_SIZE > 0 ? 1 : 0;
+      long acquireArray = newArrayCnt - currentArrayCnt;
+
       if (acquireArray != 0) {
+        // memory of extending the TVList
         dataTypesInTVList.addAll(((AlignedTVList) 
alignedMemChunk.getTVList()).getTsDataTypes());
         memIncrements[0] +=
             acquireArray * 
AlignedTVList.alignedTvListArrayMemCost(dataTypesInTVList);
       }
     }
+
+    // flexible-length data size
+    for (int i = 0; i < dataTypes.length; i++) {
+      TSDataType dataType = dataTypes[i];
+      String measurement = measurementIds[i];
+      Object column = columns[i];
+      if (dataType == null || column == null || measurement == null ||
+          (columnCategories != null && columnCategories[i] != 
TsTableColumnCategory.MEASUREMENT)) {
+        continue;
+      }
+
+      if (dataType.isBinary()) {
+        Binary[] binColumn = (Binary[]) columns[i];
+        memIncrements[1] += MemUtils.getBinaryColumnSize(binColumn, start, 
end, results);
+      }
+    }
   }
 
   private void updateMemoryInfo(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/TsFilePlanRedoer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/TsFilePlanRedoer.java
index 162b1ac02b2..1340554af99 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/TsFilePlanRedoer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/TsFilePlanRedoer.java
@@ -100,7 +100,7 @@ public class TsFilePlanRedoer {
     } else {
       if (node.isAligned()) {
         recoveryMemTable.insertAlignedTablet(
-            (InsertTabletNode) node, 0, ((InsertTabletNode) 
node).getRowCount());
+            (InsertTabletNode) node, 0, ((InsertTabletNode) 
node).getRowCount(), null);
       } else {
         recoveryMemTable.insertTablet(
             (InsertTabletNode) node, 0, ((InsertTabletNode) 
node).getRowCount());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
index 89af1e78427..2665efa8e68 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.utils;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
 
@@ -95,11 +96,13 @@ public class MemUtils {
     return RamUsageEstimator.NUM_BYTES_OBJECT_HEADER + 
RamUsageEstimator.sizeOf(value.getValues());
   }
 
-  public static long getBinaryColumnSize(Binary[] column, int start, int end) {
+  public static long getBinaryColumnSize(Binary[] column, int start, int end, 
TSStatus[] results) {
     long memSize = 0;
     memSize += (long) (end - start) * 
RamUsageEstimator.NUM_BYTES_OBJECT_HEADER;
     for (int i = start; i < end; i++) {
-      memSize += RamUsageEstimator.sizeOf(column[i].getValues());
+      if (results == null || results[i] == null) {
+        memSize += RamUsageEstimator.sizeOf(column[i].getValues());
+      }
     }
     return memSize;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
index 3aa1f204c14..8372468e3cd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.utils.datastructure;
 
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
 import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
 import org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager;
@@ -851,18 +852,24 @@ public abstract class AlignedTVList extends TVList {
     return TSDataType.VECTOR;
   }
 
+
+
   /**
    * Get the single alignedTVList array mem cost by give types.
    *
    * @param types the types in the vector
    * @return AlignedTvListArrayMemSize
    */
-  public static long alignedTvListArrayMemCost(TSDataType[] types) {
+  public static long alignedTvListArrayMemCost(TSDataType[] types, 
TsTableColumnCategory[] columnCategories) {
+
+    int measurementColumnNum = 0;
     long size = 0;
     // value array mem size
-    for (TSDataType type : types) {
-      if (type != null) {
-        size += (long) PrimitiveArrayManager.ARRAY_SIZE * (long) 
type.getDataTypeSize();
+    for (int i = 0; i < types.length; i++) {
+      TSDataType type = types[i];
+      if (type != null || columnCategories != null || columnCategories[i] == 
TsTableColumnCategory.MEASUREMENT) {
+        size += (long) ARRAY_SIZE * (long) type.getDataTypeSize();
+        measurementColumnNum++;
       }
     }
     // size is 0 when all types are null
@@ -874,9 +881,9 @@ public abstract class AlignedTVList extends TVList {
     // index array mem size
     size += PrimitiveArrayManager.ARRAY_SIZE * 4L;
     // array headers mem size
-    size += (long) NUM_BYTES_ARRAY_HEADER * (2 + types.length);
+    size += (long) NUM_BYTES_ARRAY_HEADER * (2 + measurementColumnNum);
     // Object references size in ArrayList
-    size += (long) NUM_BYTES_OBJECT_REF * (2 + types.length);
+    size += (long) NUM_BYTES_OBJECT_REF * (2 + measurementColumnNum);
     return size;
   }
 
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java
index 5dece7fd940..451a0ad364e 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java
@@ -376,7 +376,7 @@ public class PrimitiveMemTableTest {
 
   private void writeVector(IMemTable memTable)
       throws IOException, QueryProcessException, MetadataException, 
WriteProcessException {
-    memTable.insertAlignedTablet(genInsertTableNode(), 0, 100);
+    memTable.insertAlignedTablet(genInsertTableNode(), 0, 100, null);
 
     IDeviceID tmpDeviceId = 
IDeviceID.Factory.DEFAULT_FACTORY.create("root.sg.device5");
 

Reply via email to