This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch TableModelIngestion
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/TableModelIngestion by this 
push:
     new 3402f6f7d37 add nnenory calculation for insertRow
3402f6f7d37 is described below

commit 3402f6f7d37c7561745e2b32bea723c1e90882f8
Author: Tian Jiang <[email protected]>
AuthorDate: Mon Jul 8 14:32:48 2024 +0800

    add nnenory calculation for insertRow
---
 .../db/storageengine/dataregion/DataRegion.java    | 277 ++++++++++++++-------
 .../dataregion/memtable/AbstractMemTable.java      |   2 +-
 .../dataregion/memtable/IMemTable.java             |   2 +-
 .../dataregion/memtable/TsFileProcessor.java       | 205 +++++++++------
 4 files changed, 316 insertions(+), 170 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index cf751e63f44..08034b5022e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.commons.file.SystemFileFactory;
 import org.apache.iotdb.commons.path.IFullPath;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.schema.SchemaConstant;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
 import org.apache.iotdb.commons.service.metric.MetricService;
 import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
 import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
@@ -214,13 +215,19 @@ public class DataRegion implements IDataRegionForQuery {
    */
   private final ReadWriteLock insertLock = new ReentrantReadWriteLock();
 
-  /** Condition to safely delete data region. */
+  /**
+   * Condition to safely delete data region.
+   */
   private final Condition deletedCondition = 
insertLock.writeLock().newCondition();
 
-  /** Data region has been deleted or not. */
+  /**
+   * Data region has been deleted or not.
+   */
   private volatile boolean deleted = false;
 
-  /** closeStorageGroupCondition is used to wait for all currently closing 
TsFiles to be done. */
+  /**
+   * closeStorageGroupCondition is used to wait for all currently closing 
TsFiles to be done.
+   */
   private final Object closeStorageGroupCondition = new Object();
 
   /**
@@ -228,32 +235,50 @@ public class DataRegion implements IDataRegionForQuery {
    */
   private final ReadWriteLock closeQueryLock = new ReentrantReadWriteLock();
 
-  /** time partition id in the database -> {@link TsFileProcessor} for this 
time partition. */
+  /**
+   * time partition id in the database -> {@link TsFileProcessor} for this 
time partition.
+   */
   private final TreeMap<Long, TsFileProcessor> workSequenceTsFileProcessors = 
new TreeMap<>();
 
-  /** time partition id in the database -> {@link TsFileProcessor} for this 
time partition. */
+  /**
+   * time partition id in the database -> {@link TsFileProcessor} for this 
time partition.
+   */
   private final TreeMap<Long, TsFileProcessor> workUnsequenceTsFileProcessors 
= new TreeMap<>();
 
-  /** sequence {@link TsFileProcessor}s which are closing. */
+  /**
+   * sequence {@link TsFileProcessor}s which are closing.
+   */
   private final Set<TsFileProcessor> closingSequenceTsFileProcessor = 
ConcurrentHashMap.newKeySet();
 
-  /** unsequence {@link TsFileProcessor}s which are closing. */
+  /**
+   * unsequence {@link TsFileProcessor}s which are closing.
+   */
   private final Set<TsFileProcessor> closingUnSequenceTsFileProcessor =
       ConcurrentHashMap.newKeySet();
 
-  /** data region id. */
+  /**
+   * data region id.
+   */
   private final String dataRegionId;
 
-  /** database name. */
+  /**
+   * database name.
+   */
   private final String databaseName;
 
-  /** database system directory. */
+  /**
+   * database system directory.
+   */
   private File storageGroupSysDir;
 
-  /** manage seqFileList and unSeqFileList. */
+  /**
+   * manage seqFileList and unSeqFileList.
+   */
   private final TsFileManager tsFileManager;
 
-  /** manage tsFileResource degrade. */
+  /**
+   * manage tsFileResource degrade.
+   */
   private final TsFileResourceManager tsFileResourceManager = 
TsFileResourceManager.getInstance();
 
   /**
@@ -264,10 +289,14 @@ public class DataRegion implements IDataRegionForQuery {
   private final HashMap<Long, VersionController> 
timePartitionIdVersionControllerMap =
       new HashMap<>();
 
-  /** file system factory (local or hdfs). */
+  /**
+   * file system factory (local or hdfs).
+   */
   private final FSFactory fsFactory = FSFactoryProducer.getFSFactory();
 
-  /** File flush policy. */
+  /**
+   * File flush policy.
+   */
   private TsFileFlushPolicy fileFlushPolicy;
 
   /**
@@ -278,16 +307,24 @@ public class DataRegion implements IDataRegionForQuery {
    */
   private Map<Long, Long> partitionMaxFileVersions = new ConcurrentHashMap<>();
 
-  /** database info for mem control. */
+  /**
+   * database info for mem control.
+   */
   private final DataRegionInfo dataRegionInfo = new DataRegionInfo(this);
 
-  /** whether it's ready from recovery. */
+  /**
+   * whether it's ready from recovery.
+   */
   private boolean isReady = false;
 
-  /** close file listeners. */
+  /**
+   * close file listeners.
+   */
   private List<CloseFileListener> customCloseFileListeners = 
Collections.emptyList();
 
-  /** flush listeners. */
+  /**
+   * flush listeners.
+   */
   private List<FlushListener> customFlushListeners = Collections.emptyList();
 
   private ILastFlushTimeMap lastFlushTimeMap;
@@ -311,10 +348,10 @@ public class DataRegion implements IDataRegionForQuery {
   /**
    * Construct a database processor.
    *
-   * @param systemDir system dir path
-   * @param dataRegionId data region id e.g. 1
+   * @param systemDir       system dir path
+   * @param dataRegionId    data region id e.g. 1
    * @param fileFlushPolicy file flush policy
-   * @param databaseName database name e.g. root.sg1
+   * @param databaseName    database name e.g. root.sg1
    */
   public DataRegion(
       String systemDir, String dataRegionId, TsFileFlushPolicy 
fileFlushPolicy, String databaseName)
@@ -397,19 +434,29 @@ public class DataRegion implements IDataRegionForQuery {
     return ret;
   }
 
-  /** this class is used to store recovering context. */
+  /**
+   * this class is used to store recovering context.
+   */
   private class DataRegionRecoveryContext {
 
-    /** number of files to be recovered. */
+    /**
+     * number of files to be recovered.
+     */
     private final long numOfFilesToRecover;
 
-    /** number of already recovered files. */
+    /**
+     * number of already recovered files.
+     */
     private long recoveredFilesNum;
 
-    /** last recovery log time. */
+    /**
+     * last recovery log time.
+     */
     private long lastLogTime;
 
-    /** recover performers of unsealed TsFiles. */
+    /**
+     * recover performers of unsealed TsFiles.
+     */
     private final List<UnsealedTsFileRecoverPerformer> recoverPerformers = new 
ArrayList<>();
 
     public DataRegionRecoveryContext(long numOfFilesToRecover) {
@@ -441,7 +488,9 @@ public class DataRegion implements IDataRegionForQuery {
     }
   }
 
-  /** recover from file */
+  /**
+   * recover from file
+   */
   @SuppressWarnings({"squid:S3776", "squid:S6541"}) // Suppress high Cognitive 
Complexity warning
   private void recover() throws DataRegionException {
     try {
@@ -711,7 +760,9 @@ public class DataRegion implements IDataRegionForQuery {
     }
   }
 
-  /** check if the tsfile's time is smaller than system current time. */
+  /**
+   * check if the tsfile's time is smaller than system current time.
+   */
   private void checkTsFileTime(File tsFile, long currentTime) throws 
DataRegionException {
     String[] items = tsFile.getName().replace(TSFILE_SUFFIX, 
"").split(FILE_NAME_SEPARATOR);
     long fileTime = Long.parseLong(items[0]);
@@ -724,7 +775,9 @@ public class DataRegion implements IDataRegionForQuery {
     }
   }
 
-  /** submit unsealed TsFile to WALRecoverManager. */
+  /**
+   * submit unsealed TsFile to WALRecoverManager.
+   */
   private WALRecoverListener recoverUnsealedTsFile(
       TsFileResource unsealedTsFile, DataRegionRecoveryContext context, 
boolean isSeq) {
     UnsealedTsFileRecoverPerformer recoverPerformer =
@@ -809,7 +862,9 @@ public class DataRegion implements IDataRegionForQuery {
     }
   }
 
-  /** recover sealed TsFile. */
+  /**
+   * recover sealed TsFile.
+   */
   private void recoverSealedTsFiles(
       TsFileResource sealedTsFile, DataRegionRecoveryContext context, boolean 
isSeq) {
     try (SealedTsFileRecoverPerformer recoverPerformer =
@@ -900,7 +955,7 @@ public class DataRegion implements IDataRegionForQuery {
       boolean isSequence =
           config.isEnableSeparateData()
               && insertRowNode.getTime()
-                  > lastFlushTimeMap.getFlushedTime(timePartitionId, 
insertRowNode.getDeviceID());
+              > lastFlushTimeMap.getFlushedTime(timePartitionId, 
insertRowNode.getDeviceID());
 
       // insert to sequence or unSequence file
       TsFileProcessor tsFileProcessor =
@@ -945,13 +1000,13 @@ public class DataRegion implements IDataRegionForQuery {
         // a new partition, insert the remaining of the previous partition
         noFailure =
             insertTabletToTsFileProcessor(
-                    insertTabletNode,
-                    before,
-                    loc,
-                    isSequence,
-                    results,
-                    beforeTimePartition,
-                    noFailure)
+                insertTabletNode,
+                before,
+                loc,
+                isSequence,
+                results,
+                beforeTimePartition,
+                noFailure)
                 && noFailure;
         before = loc;
         beforeTimePartition = timePartitionId;
@@ -961,13 +1016,13 @@ public class DataRegion implements IDataRegionForQuery {
         // insert previous range into unsequence
         noFailure =
             insertTabletToTsFileProcessor(
-                    insertTabletNode,
-                    before,
-                    loc,
-                    isSequence,
-                    results,
-                    beforeTimePartition,
-                    noFailure)
+                insertTabletNode,
+                before,
+                loc,
+                isSequence,
+                results,
+                beforeTimePartition,
+                noFailure)
                 && noFailure;
         before = loc;
         isSequence = true;
@@ -980,13 +1035,13 @@ public class DataRegion implements IDataRegionForQuery {
     if (before < loc) {
       noFailure =
           insertTabletToTsFileProcessor(
-                  insertTabletNode,
-                  before,
-                  loc,
-                  isSequence,
-                  results,
-                  beforeTimePartition,
-                  noFailure)
+              insertTabletNode,
+              before,
+              loc,
+              isSequence,
+              results,
+              beforeTimePartition,
+              noFailure)
               && noFailure;
     }
 
@@ -1096,11 +1151,11 @@ public class DataRegion implements IDataRegionForQuery {
    * subsequent non-null value, e.g., {1, null, 3, null, 5} will be {1, 3, 5, 
null, 5}
    *
    * @param insertTabletNode insert a tablet of a device
-   * @param sequence whether is sequence
-   * @param start start index of rows to be inserted in insertTabletPlan
-   * @param end end index of rows to be inserted in insertTabletPlan
-   * @param results result array
-   * @param timePartitionId time partition id
+   * @param sequence         whether is sequence
+   * @param start            start index of rows to be inserted in 
insertTabletPlan
+   * @param end              end index of rows to be inserted in 
insertTabletPlan
+   * @param results          result array
+   * @param timePartitionId  time partition id
    * @return false if any failure occurs when inserting the tablet, true 
otherwise
    */
   private boolean insertTabletToTsFileProcessor(
@@ -1161,7 +1216,7 @@ public class DataRegion implements IDataRegionForQuery {
   private void tryToUpdateInsertTabletLastCache(InsertTabletNode node) {
     if (!CommonDescriptor.getInstance().getConfig().isLastCacheEnable()
         || 
(config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)
-            && node.isSyncFromLeaderWhenUsingIoTConsensus())) {
+        && node.isSyncFromLeaderWhenUsingIoTConsensus())) {
       // disable updating last cache on follower
       return;
     }
@@ -1185,7 +1240,8 @@ public class DataRegion implements IDataRegionForQuery {
             node.getMeasurementSchemas(),
             node.isAligned(),
             node::composeLastTimeValuePair,
-            index -> node.getColumns()[index] != null,
+            index -> node.getColumns()[index] != null && 
(node.getColumnCategories() == null
+                || node.getColumnCategories()[index] == 
TsTableColumnCategory.MEASUREMENT),
             true,
             latestFlushedTime);
   }
@@ -1238,7 +1294,8 @@ public class DataRegion implements IDataRegionForQuery {
             node.getMeasurementSchemas(),
             node.isAligned(),
             node::composeTimeValuePair,
-            index -> node.getValues()[index] != null,
+            index -> node.getValues()[index] != null && 
(node.getColumnCategories() == null
+                || node.getColumnCategories()[index] == 
TsTableColumnCategory.MEASUREMENT),
             true,
             latestFlushedTime);
   }
@@ -1442,9 +1499,9 @@ public class DataRegion implements IDataRegionForQuery {
   /**
    * get processor from hashmap, flush oldest processor if necessary
    *
-   * @param timeRangeId time partition range
+   * @param timeRangeId            time partition range
    * @param tsFileProcessorTreeMap tsFileProcessorTreeMap
-   * @param sequence whether is sequence or not
+   * @param sequence               whether is sequence or not
    */
   private TsFileProcessor getOrCreateTsFileProcessorIntern(
       long timeRangeId, TreeMap<Long, TsFileProcessor> tsFileProcessorTreeMap, 
boolean sequence)
@@ -1528,7 +1585,7 @@ public class DataRegion implements IDataRegionForQuery {
   /**
    * close one tsfile processor
    *
-   * @param sequence whether this tsfile processor is sequence or not
+   * @param sequence        whether this tsfile processor is sequence or not
    * @param tsFileProcessor tsfile processor
    */
   public void syncCloseOneTsFileProcessor(boolean sequence, TsFileProcessor 
tsFileProcessor) {
@@ -1560,7 +1617,7 @@ public class DataRegion implements IDataRegionForQuery {
   /**
    * close one tsfile processor, thread-safety should be ensured by caller
    *
-   * @param sequence whether this tsfile processor is sequence or not
+   * @param sequence        whether this tsfile processor is sequence or not
    * @param tsFileProcessor tsfile processor
    */
   public Future<?> asyncCloseOneTsFileProcessor(boolean sequence, 
TsFileProcessor tsFileProcessor) {
@@ -1625,7 +1682,9 @@ public class DataRegion implements IDataRegionForQuery {
     }
   }
 
-  /** close all tsfile resource */
+  /**
+   * close all tsfile resource
+   */
   public void closeAllResources() {
     for (TsFileResource tsFileResource : tsFileManager.getTsFileList(false)) {
       try {
@@ -1643,7 +1702,9 @@ public class DataRegion implements IDataRegionForQuery {
     }
   }
 
-  /** delete tsfile */
+  /**
+   * delete tsfile
+   */
   public void syncDeleteDataFiles() throws TsFileProcessorException {
     logger.info(
         "{} will close all files for deleting data files", databaseName + "-" 
+ dataRegionId);
@@ -1753,7 +1814,9 @@ public class DataRegion implements IDataRegionForQuery {
     WritingMetrics.getInstance().recordTimedFlushMemTableCount(dataRegionId, 
count);
   }
 
-  /** This method will be blocked until all tsfile processors are closed. */
+  /**
+   * This method will be blocked until all tsfile processors are closed.
+   */
   public void syncCloseAllWorkingTsFileProcessors() {
     try {
       List<Future<?>> tsFileProcessorsClosingFutures = 
asyncCloseAllWorkingTsFileProcessors();
@@ -1792,7 +1855,9 @@ public class DataRegion implements IDataRegionForQuery {
     }
   }
 
-  /** close all working tsfile processors */
+  /**
+   * close all working tsfile processors
+   */
   List<Future<?>> asyncCloseAllWorkingTsFileProcessors() {
     writeLock("asyncCloseAllWorkingTsFileProcessors");
     List<Future<?>> futures = new ArrayList<>();
@@ -1814,7 +1879,9 @@ public class DataRegion implements IDataRegionForQuery {
     return futures;
   }
 
-  /** force close all working tsfile processors */
+  /**
+   * force close all working tsfile processors
+   */
   public void forceCloseAllWorkingTsFileProcessors() throws 
TsFileProcessorException {
     writeLock("forceCloseAllWorkingTsFileProcessors");
     try {
@@ -1835,7 +1902,9 @@ public class DataRegion implements IDataRegionForQuery {
     }
   }
 
-  /** used for queryengine */
+  /**
+   * used for queryengine
+   */
   @Override
   public QueryDataSource query(
       List<IFullPath> pathList,
@@ -1999,7 +2068,9 @@ public class DataRegion implements IDataRegionForQuery {
     return fileScanHandles;
   }
 
-  /** lock the read lock of the insert lock */
+  /**
+   * lock the read lock of the insert lock
+   */
   @Override
   public void readLock() {
     // apply read lock for SG insert lock to prevent inconsistent with 
concurrently writing memtable
@@ -2008,20 +2079,26 @@ public class DataRegion implements IDataRegionForQuery {
     tsFileManager.readLock();
   }
 
-  /** unlock the read lock of insert lock */
+  /**
+   * unlock the read lock of insert lock
+   */
   @Override
   public void readUnlock() {
     tsFileManager.readUnlock();
     insertLock.readLock().unlock();
   }
 
-  /** lock the write lock of the insert lock */
+  /**
+   * lock the write lock of the insert lock
+   */
   public void writeLock(String holder) {
     insertLock.writeLock().lock();
     insertWriteLockHolder = holder;
   }
 
-  /** unlock the write lock of the insert lock */
+  /**
+   * unlock the write lock of the insert lock
+   */
   public void writeUnlock() {
     insertWriteLockHolder = "";
     insertLock.writeLock().unlock();
@@ -2071,7 +2148,9 @@ public class DataRegion implements IDataRegionForQuery {
     return tsfileResourcesForQuery;
   }
 
-  /** Seperate tsfiles in TsFileManager to sealedList and unsealedList. */
+  /**
+   * Seperate tsfiles in TsFileManager to sealedList and unsealedList.
+   */
   private void separateTsFile(
       List<TsFileResource> sealedResource,
       List<TsFileResource> unsealedResource,
@@ -2501,7 +2580,9 @@ public class DataRegion implements IDataRegionForQuery {
     }
   }
 
-  /** Put the memtable back to the MemTablePool and make the metadata in 
writer visible */
+  /**
+   * Put the memtable back to the MemTablePool and make the metadata in writer 
visible
+   */
   // TODO please consider concurrency with read and insert method.
   private void closeUnsealedTsFileProcessorCallBack(TsFileProcessor 
tsFileProcessor)
       throws TsFileProcessorException {
@@ -2597,7 +2678,9 @@ public class DataRegion implements IDataRegionForQuery {
     return trySubmitCount;
   }
 
-  /** Schedule settle compaction for ttl check. */
+  /**
+   * Schedule settle compaction for ttl check.
+   */
   public int executeTTLCheck() throws InterruptedException {
     while (!isCompactionSelecting.compareAndSet(false, true)) {
       // wait until success
@@ -2726,7 +2809,9 @@ public class DataRegion implements IDataRegionForQuery {
     return getNonSystemDatabaseName(databaseName);
   }
 
-  /** Merge file under this database processor */
+  /**
+   * Merge file under this database processor
+   */
   public int compact() {
     writeLock("merge");
     CompactionScheduler.exclusiveLockCompactionSelection();
@@ -2746,7 +2831,7 @@ public class DataRegion implements IDataRegionForQuery {
    * <p>Then, update the latestTimeForEachDevice and 
partitionLatestFlushedTimeForEachDevice.
    *
    * @param newTsFileResource tsfile resource @UsedBy load external tsfile 
module
-   * @param deleteOriginFile whether to delete origin tsfile
+   * @param deleteOriginFile  whether to delete origin tsfile
    * @param isGeneratedByPipe whether the load tsfile request is generated by 
pipe
    */
   public void loadNewTsFile(
@@ -2932,8 +3017,8 @@ public class DataRegion implements IDataRegionForQuery {
   }
 
   /**
-   * Update latest time in latestTimeForEachDevice and
-   * partitionLatestFlushedTimeForEachDevice. @UsedBy sync module, load 
external tsfile module.
+   * Update latest time in latestTimeForEachDevice and 
partitionLatestFlushedTimeForEachDevice.
+   * @UsedBy sync module, load external tsfile module.
    */
   protected void updateLastFlushTime(TsFileResource newTsFileResource) {
     for (IDeviceID device : newTsFileResource.getDevices()) {
@@ -2951,8 +3036,8 @@ public class DataRegion implements IDataRegionForQuery {
   /**
    * Execute the loading process by the type.
    *
-   * @param tsFileResource tsfile resource to be loaded
-   * @param filePartitionId the partition id of the new file
+   * @param tsFileResource   tsfile resource to be loaded
+   * @param filePartitionId  the partition id of the new file
    * @param deleteOriginFile whether to delete the original file
    * @return load the file successfully @UsedBy sync module, load external 
tsfile module.
    */
@@ -3195,14 +3280,14 @@ public class DataRegion implements IDataRegionForQuery {
    * "tsFileResource" have the same plan indexes as the local one.
    *
    * @return true if any file contains plans with indexes no less than the max 
plan index of
-   *     "tsFileResource", otherwise false.
+   * "tsFileResource", otherwise false.
    */
   public boolean isFileAlreadyExist(TsFileResource tsFileResource, long 
partitionNum) {
     // examine working processor first as they have the largest plan index
     return isFileAlreadyExistInWorking(
-            tsFileResource, partitionNum, getWorkSequenceTsFileProcessors())
+        tsFileResource, partitionNum, getWorkSequenceTsFileProcessors())
         || isFileAlreadyExistInWorking(
-            tsFileResource, partitionNum, getWorkUnsequenceTsFileProcessors())
+        tsFileResource, partitionNum, getWorkUnsequenceTsFileProcessors())
         || isFileAlreadyExistInClosed(tsFileResource, partitionNum, 
getSequenceFileList())
         || isFileAlreadyExistInClosed(tsFileResource, partitionNum, 
getUnSequenceFileList());
   }
@@ -3326,7 +3411,7 @@ public class DataRegion implements IDataRegionForQuery {
         boolean isSequence =
             config.isEnableSeparateData()
                 && insertRowNode.getTime()
-                    > lastFlushTimeMap.getFlushedTime(timePartitionId, 
insertRowNode.getDeviceID());
+                > lastFlushTimeMap.getFlushedTime(timePartitionId, 
insertRowNode.getDeviceID());
         TsFileProcessor tsFileProcessor = 
getOrCreateTsFileProcessor(timePartitionId, isSequence);
         if (tsFileProcessor == null) {
           continue;
@@ -3435,8 +3520,8 @@ public class DataRegion implements IDataRegionForQuery {
         areSequence[i] =
             config.isEnableSeparateData()
                 && insertRowNode.getTime()
-                    > lastFlushTimeMap.getFlushedTime(
-                        timePartitionIds[i], insertRowNode.getDeviceID());
+                > lastFlushTimeMap.getFlushedTime(
+                timePartitionIds[i], insertRowNode.getDeviceID());
       }
       insertToTsFileProcessors(insertRowsNode, areSequence, timePartitionIds);
       if (!insertRowsNode.getResults().isEmpty()) {
@@ -3500,7 +3585,7 @@ public class DataRegion implements IDataRegionForQuery {
   }
 
   /**
-   * @param folder the folder's path
+   * @param folder   the folder's path
    * @param diskSize the disk space occupied by this folder, unit is MB
    */
   private void countFolderDiskSize(String folder, AtomicLong diskSize) {
@@ -3610,7 +3695,9 @@ public class DataRegion implements IDataRegionForQuery {
     return insertWriteLockHolder;
   }
 
-  /** This method could only be used in iot consensus */
+  /**
+   * This method could only be used in iot consensus
+   */
   public IWALNode getWALNode() {
     if 
(!config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS))
 {
       throw new UnsupportedOperationException();
@@ -3620,7 +3707,9 @@ public class DataRegion implements IDataRegionForQuery {
         .applyForWALNode(databaseName + FILE_NAME_SEPARATOR + dataRegionId);
   }
 
-  /** Wait for this data region successfully deleted */
+  /**
+   * Wait for this data region successfully deleted
+   */
   public void waitForDeleted() {
     writeLock("waitForDeleted");
     try {
@@ -3636,7 +3725,9 @@ public class DataRegion implements IDataRegionForQuery {
     }
   }
 
-  /** Release all threads waiting for this data region successfully deleted */
+  /**
+   * Release all threads waiting for this data region successfully deleted
+   */
   public void markDeleted() {
     writeLock("markDeleted");
     try {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
index cd6d5aa7c20..7ecd348a2ca 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
@@ -480,7 +480,7 @@ public abstract class AbstractMemTable implements IMemTable 
{
   }
 
   @Override
-  public boolean checkIfChunkDoesNotExist(IDeviceID deviceId, String 
measurement) {
+  public boolean chunkNotExist(IDeviceID deviceId, String measurement) {
     IWritableMemChunkGroup memChunkGroup = memTableMap.get(deviceId);
     if (null == memChunkGroup) {
       return true;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
index be5febcb0e6..4de1520a22b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
@@ -177,7 +177,7 @@ public interface IMemTable extends WALEntryValue {
   void release();
 
   /** must guarantee the device exists in the work memtable only used when mem 
control enabled */
-  boolean checkIfChunkDoesNotExist(IDeviceID deviceId, String measurement);
+  boolean chunkNotExist(IDeviceID deviceId, String measurement);
 
   /** only used when mem control enabled */
   long getCurrentTVListSize(IDeviceID deviceId, String measurement);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index c16f03424a6..2e060319700 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -118,36 +118,56 @@ import static 
org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet.WORK
 @SuppressWarnings("java:S1135") // ignore todos
 public class TsFileProcessor {
 
-  /** Logger fot this class. */
+  /**
+   * Logger fot this class.
+   */
   private static final Logger logger = 
LoggerFactory.getLogger(TsFileProcessor.class);
 
   private static final int NUM_MEM_TO_ESTIMATE = 3;
 
-  /** Storage group name of this tsfile. */
+  /**
+   * Storage group name of this tsfile.
+   */
   private final String storageGroupName;
 
-  /** IoTDB config. */
+  /**
+   * IoTDB config.
+   */
   private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
 
-  /** Database info for mem control. */
+  /**
+   * Database info for mem control.
+   */
   private final DataRegionInfo dataRegionInfo;
 
-  /** Tsfile processor info for mem control. */
+  /**
+   * Tsfile processor info for mem control.
+   */
   private TsFileProcessorInfo tsFileProcessorInfo;
 
-  /** Sync this object in read() and asyncTryToFlush(). */
+  /**
+   * Sync this object in read() and asyncTryToFlush().
+   */
   private final ConcurrentLinkedDeque<IMemTable> flushingMemTables = new 
ConcurrentLinkedDeque<>();
 
-  /** Modification to memtable mapping. */
+  /**
+   * Modification to memtable mapping.
+   */
   private final List<Pair<Modification, IMemTable>> modsToMemtable = new 
ArrayList<>();
 
-  /** Writer for restore tsfile and flushing. */
+  /**
+   * Writer for restore tsfile and flushing.
+   */
   private RestorableTsFileIOWriter writer;
 
-  /** Tsfile resource for index this tsfile. */
+  /**
+   * Tsfile resource for index this tsfile.
+   */
   private final TsFileResource tsFileResource;
 
-  /** Time range index to indicate this processor belongs to which time range 
*/
+  /**
+   * Time range index to indicate this processor belongs to which time range
+   */
   private long timeRangeId;
 
   /**
@@ -155,7 +175,9 @@ public class TsFileProcessor {
    */
   private volatile boolean managedByFlushManager;
 
-  /** A lock to mutual exclude read and read */
+  /**
+   * A lock to mutual exclude read and read
+   */
   private final ReadWriteLock flushQueryLock = new ReentrantReadWriteLock();
 
   /**
@@ -164,32 +186,48 @@ public class TsFileProcessor {
    */
   private volatile boolean shouldClose;
 
-  /** Working memtable. */
+  /**
+   * Working memtable.
+   */
   private IMemTable workMemTable;
 
-  /** Last flush time to flush the working memtable. */
+  /**
+   * Last flush time to flush the working memtable.
+   */
   private long lastWorkMemtableFlushTime;
 
-  /** This callback is called before the workMemtable is added into the 
flushingMemTables. */
+  /**
+   * This callback is called before the workMemtable is added into the 
flushingMemTables.
+   */
   private final DataRegion.UpdateEndTimeCallBack updateLatestFlushTimeCallback;
 
-  /** Wal node. */
+  /**
+   * Wal node.
+   */
   private final IWALNode walNode;
 
-  /** Whether it's a sequence file or not. */
+  /**
+   * Whether it's a sequence file or not.
+   */
   private final boolean sequence;
 
-  /** Total memtable size for mem control. */
+  /**
+   * Total memtable size for mem control.
+   */
   private long totalMemTableSize;
 
   private static final String FLUSH_QUERY_WRITE_LOCKED = "{}: {} get 
flushQueryLock write lock";
   private static final String FLUSH_QUERY_WRITE_RELEASE =
       "{}: {} get flushQueryLock write lock released";
 
-  /** Close file listener. */
+  /**
+   * Close file listener.
+   */
   private final List<CloseFileListener> closeFileListeners = new 
CopyOnWriteArrayList<>();
 
-  /** Flush file listener. */
+  /**
+   * Flush file listener.
+   */
   private final List<FlushListener> flushListeners = new ArrayList<>();
 
   private final QueryExecutionMetricSet QUERY_EXECUTION_METRICS =
@@ -278,7 +316,8 @@ public class TsFileProcessor {
       memIncrements =
           checkAlignedMemCostAndAddToTspInfoForRow(
               insertRowNode.getDeviceID(), insertRowNode.getMeasurements(),
-              insertRowNode.getDataTypes(), insertRowNode.getValues());
+              insertRowNode.getDataTypes(), insertRowNode.getValues(),
+              insertRowNode.getColumnCategories());
     } else {
       memIncrements =
           checkMemCostAndAddToTspInfoForRow(
@@ -487,9 +526,9 @@ public class TsFileProcessor {
    * non-null value, e.g., {1, null, 3, null, 5} will be {1, 3, 5, null, 5}
    *
    * @param insertTabletNode insert a tablet of a device
-   * @param start start index of rows to be inserted in insertTabletPlan
-   * @param end end index of rows to be inserted in insertTabletPlan
-   * @param results result array
+   * @param start            start index of rows to be inserted in 
insertTabletPlan
+   * @param end              end index of rows to be inserted in 
insertTabletPlan
+   * @param results          result array
    */
   public void insertTablet(
       InsertTabletNode insertTabletNode, int start, int end, TSStatus[] 
results, boolean noFailure)
@@ -590,7 +629,7 @@ public class TsFileProcessor {
       if (dataTypes[i] == null || measurements[i] == null) {
         continue;
       }
-      if (workMemTable.checkIfChunkDoesNotExist(deviceId, measurements[i])) {
+      if (workMemTable.chunkNotExist(deviceId, measurements[i])) {
         // ChunkMetadataIncrement
         chunkMetadataIncrement += 
ChunkMetadata.calculateRamSize(measurements[i], dataTypes[i]);
         memTableIncrement += TVList.tvListArrayMemCost(dataTypes[i]);
@@ -608,7 +647,7 @@ public class TsFileProcessor {
       }
     }
     updateMemoryInfo(memTableIncrement, chunkMetadataIncrement, 
textDataIncrement);
-    return new long[] {memTableIncrement, textDataIncrement, 
chunkMetadataIncrement};
+    return new long[]{memTableIncrement, textDataIncrement, 
chunkMetadataIncrement};
   }
 
   @SuppressWarnings("squid:S3776") // High Cognitive Complexity
@@ -630,9 +669,9 @@ public class TsFileProcessor {
         if (dataTypes[i] == null || measurements[i] == null) {
           continue;
         }
-        if (workMemTable.checkIfChunkDoesNotExist(deviceId, measurements[i])
+        if (workMemTable.chunkNotExist(deviceId, measurements[i])
             && (!increasingMemTableInfo.containsKey(deviceId)
-                || 
!increasingMemTableInfo.get(deviceId).containsKey(measurements[i]))) {
+            || 
!increasingMemTableInfo.get(deviceId).containsKey(measurements[i]))) {
           // ChunkMetadataIncrement
           chunkMetadataIncrement += 
ChunkMetadata.calculateRamSize(measurements[i], dataTypes[i]);
           memTableIncrement += TVList.tvListArrayMemCost(dataTypes[i]);
@@ -659,35 +698,26 @@ public class TsFileProcessor {
       }
     }
     updateMemoryInfo(memTableIncrement, chunkMetadataIncrement, 
textDataIncrement);
-    return new long[] {memTableIncrement, textDataIncrement, 
chunkMetadataIncrement};
+    return new long[]{memTableIncrement, textDataIncrement, 
chunkMetadataIncrement};
   }
 
   @SuppressWarnings("squid:S3776") // high Cognitive Complexity
   private long[] checkAlignedMemCostAndAddToTspInfoForRow(
-      IDeviceID deviceId, String[] measurements, TSDataType[] dataTypes, 
Object[] values)
+      IDeviceID deviceId, String[] measurements, TSDataType[] dataTypes, 
Object[] values,
+      TsTableColumnCategory[] columnCategories)
       throws WriteProcessException {
     // Memory of increased PrimitiveArray and TEXT values, e.g., add a 
long[128], add 128*8
     long memTableIncrement = 0L;
     long textDataIncrement = 0L;
     long chunkMetadataIncrement = 0L;
 
-    if (workMemTable.checkIfChunkDoesNotExist(deviceId, 
AlignedPath.VECTOR_PLACEHOLDER)) {
+    if (workMemTable.chunkNotExist(deviceId, AlignedPath.VECTOR_PLACEHOLDER)) {
       // For new device of this mem table
       // ChunkMetadataIncrement
       chunkMetadataIncrement +=
           ChunkMetadata.calculateRamSize(AlignedPath.VECTOR_PLACEHOLDER, 
TSDataType.VECTOR)
               * dataTypes.length;
-      memTableIncrement += AlignedTVList.alignedTvListArrayMemCost(dataTypes, 
null);
-      for (int i = 0; i < dataTypes.length; i++) {
-        // Skip failed Measurements
-        if (dataTypes[i] == null || measurements[i] == null) {
-          continue;
-        }
-        // TEXT data mem size
-        if (dataTypes[i].isBinary() && values[i] != null) {
-          textDataIncrement += MemUtils.getBinarySize((Binary) values[i]);
-        }
-      }
+      memTableIncrement += AlignedTVList.alignedTvListArrayMemCost(dataTypes, 
columnCategories);
     } else {
       // For existed device of this mem table
       AlignedWritableMemChunk alignedMemChunk =
@@ -696,30 +726,35 @@ public class TsFileProcessor {
       List<TSDataType> dataTypesInTVList = new ArrayList<>();
       for (int i = 0; i < dataTypes.length; i++) {
         // Skip failed Measurements
-        if (dataTypes[i] == null || measurements[i] == null) {
+        if (dataTypes[i] == null || measurements[i] == null || 
(columnCategories != null
+            && columnCategories[i] != TsTableColumnCategory.MEASUREMENT)) {
           continue;
         }
 
-        // Extending the column of aligned mem chunk
+        // add arrays for new columns
         if (!alignedMemChunk.containsMeasurement(measurements[i])) {
+          int currentArrayNum = alignedMemChunk.alignedListSize() / 
PrimitiveArrayManager.ARRAY_SIZE +
+              alignedMemChunk.alignedListSize() % 
PrimitiveArrayManager.ARRAY_SIZE > 0 ? 1 : 0;
           memTableIncrement +=
-              (alignedMemChunk.alignedListSize() / 
PrimitiveArrayManager.ARRAY_SIZE + 1)
-                  * AlignedTVList.valueListArrayMemCost(dataTypes[i]);
+              currentArrayNum * 
AlignedTVList.valueListArrayMemCost(dataTypes[i]);
           dataTypesInTVList.add(dataTypes[i]);
         }
-        // TEXT data mem size
-        if (dataTypes[i].isBinary() && values[i] != null) {
-          textDataIncrement += MemUtils.getBinarySize((Binary) values[i]);
-        }
       }
-      // Here currentChunkPointNum >= 1
+      // this insertion will result in a new array
       if ((alignedMemChunk.alignedListSize() % 
PrimitiveArrayManager.ARRAY_SIZE) == 0) {
         dataTypesInTVList.addAll(((AlignedTVList) 
alignedMemChunk.getTVList()).getTsDataTypes());
         memTableIncrement += 
AlignedTVList.alignedTvListArrayMemCost(dataTypesInTVList);
       }
     }
+
+    for (int i = 0; i < dataTypes.length; i++) {
+      // TEXT data mem size
+      if (dataTypes[i] != null && dataTypes[i].isBinary() && values[i] != 
null) {
+        textDataIncrement += MemUtils.getBinarySize((Binary) values[i]);
+      }
+    }
     updateMemoryInfo(memTableIncrement, chunkMetadataIncrement, 
textDataIncrement);
-    return new long[] {memTableIncrement, textDataIncrement, 
chunkMetadataIncrement};
+    return new long[]{memTableIncrement, textDataIncrement, 
chunkMetadataIncrement};
   }
 
   @SuppressWarnings("squid:S3776") // high Cognitive Complexity
@@ -736,7 +771,7 @@ public class TsFileProcessor {
       TSDataType[] dataTypes = insertRowNode.getDataTypes();
       Object[] values = insertRowNode.getValues();
       String[] measurements = insertRowNode.getMeasurements();
-      if (workMemTable.checkIfChunkDoesNotExist(deviceId, 
AlignedPath.VECTOR_PLACEHOLDER)
+      if (workMemTable.chunkNotExist(deviceId, AlignedPath.VECTOR_PLACEHOLDER)
           && !increasingMemTableInfo.containsKey(deviceId)) {
         // For new device of this mem table
         // ChunkMetadataIncrement
@@ -803,7 +838,7 @@ public class TsFileProcessor {
       }
     }
     updateMemoryInfo(memTableIncrement, chunkMetadataIncrement, 
textDataIncrement);
-    return new long[] {memTableIncrement, textDataIncrement, 
chunkMetadataIncrement};
+    return new long[]{memTableIncrement, textDataIncrement, 
chunkMetadataIncrement};
   }
 
   private long[] checkMemCostAndAddToTspInfoForTablet(
@@ -815,7 +850,7 @@ public class TsFileProcessor {
       int end)
       throws WriteProcessException {
     if (start >= end) {
-      return new long[] {0, 0, 0};
+      return new long[]{0, 0, 0};
     }
     long[] memIncrements = new long[3]; // memTable, text, chunk metadata
 
@@ -845,7 +880,7 @@ public class TsFileProcessor {
       TSStatus[] results)
       throws WriteProcessException {
     if (start >= end) {
-      return new long[] {0, 0, 0};
+      return new long[]{0, 0, 0};
     }
     long[] memIncrements = new long[3]; // memTable, text, chunk metadata
 
@@ -877,7 +912,7 @@ public class TsFileProcessor {
       Object column) {
     // memIncrements = [memTable, text, chunk metadata] respectively
 
-    if (workMemTable.checkIfChunkDoesNotExist(deviceId, measurement)) {
+    if (workMemTable.chunkNotExist(deviceId, measurement)) {
       // ChunkMetadataIncrement
       memIncrements[2] += ChunkMetadata.calculateRamSize(measurement, 
dataType);
       memIncrements[0] +=
@@ -940,7 +975,7 @@ public class TsFileProcessor {
     }
 
     // memIncrements = [memTable, text, chunk metadata] respectively
-    if (workMemTable.checkIfChunkDoesNotExist(deviceId, 
AlignedPath.VECTOR_PLACEHOLDER)) {
+    if (workMemTable.chunkNotExist(deviceId, AlignedPath.VECTOR_PLACEHOLDER)) {
       // new devices introduce new ChunkMetadata
       // ChunkMetadata memory Increment
       memIncrements[2] +=
@@ -950,8 +985,8 @@ public class TsFileProcessor {
 
       int numArraysToAdd =
           incomingPointNum / PrimitiveArrayManager.ARRAY_SIZE
-                      + incomingPointNum % PrimitiveArrayManager.ARRAY_SIZE
-                  > 0
+              + incomingPointNum % PrimitiveArrayManager.ARRAY_SIZE
+              > 0
               ? 1
               : 0;
       memIncrements[0] +=
@@ -971,7 +1006,7 @@ public class TsFileProcessor {
             || column == null
             || measurement == null
             || (columnCategories != null
-                && columnCategories[i] != TsTableColumnCategory.MEASUREMENT)) {
+            && columnCategories[i] != TsTableColumnCategory.MEASUREMENT)) {
           continue;
         }
 
@@ -987,14 +1022,14 @@ public class TsFileProcessor {
       // calculate how many new arrays will be added after this insertion
       int currentArrayCnt =
           currentPointNum / PrimitiveArrayManager.ARRAY_SIZE
-                      + currentPointNum % PrimitiveArrayManager.ARRAY_SIZE
-                  > 0
+              + currentPointNum % PrimitiveArrayManager.ARRAY_SIZE
+              > 0
               ? 1
               : 0;
       int newArrayCnt =
           newPointNum / PrimitiveArrayManager.ARRAY_SIZE
-                      + newPointNum % PrimitiveArrayManager.ARRAY_SIZE
-                  > 0
+              + newPointNum % PrimitiveArrayManager.ARRAY_SIZE
+              > 0
               ? 1
               : 0;
       long acquireArray = newArrayCnt - currentArrayCnt;
@@ -1016,7 +1051,7 @@ public class TsFileProcessor {
           || column == null
           || measurement == null
           || (columnCategories != null
-              && columnCategories[i] != TsTableColumnCategory.MEASUREMENT)) {
+          && columnCategories[i] != TsTableColumnCategory.MEASUREMENT)) {
         continue;
       }
 
@@ -1168,7 +1203,9 @@ public class TsFileProcessor {
     logger.info("File {} is closed synchronously", 
tsFileResource.getTsFile().getAbsolutePath());
   }
 
-  /** async close one tsfile, register and close it by another thread */
+  /**
+   * async close one tsfile, register and close it by another thread
+   */
   public Future<?> asyncClose() {
     flushQueryLock.writeLock().lock();
     if (logger.isDebugEnabled()) {
@@ -1294,7 +1331,9 @@ public class TsFileProcessor {
     }
   }
 
-  /** Put the working memtable into flushing list and set the working memtable 
to null */
+  /**
+   * Put the working memtable into flushing list and set the working memtable 
to null
+   */
   public void asyncFlush() {
     flushQueryLock.writeLock().lock();
     if (logger.isDebugEnabled()) {
@@ -1372,7 +1411,9 @@ public class TsFileProcessor {
     return FlushManager.getInstance().registerTsFileProcessor(this);
   }
 
-  /** Put back the memtable to MemTablePool and make metadata in writer 
visible */
+  /**
+   * Put back the memtable to MemTablePool and make metadata in writer visible
+   */
   private void releaseFlushedMemTable(IMemTable memTable) {
     flushQueryLock.writeLock().lock();
     if (logger.isDebugEnabled()) {
@@ -1429,7 +1470,9 @@ public class TsFileProcessor {
     }
   }
 
-  /** This method will synchronize the memTable and release its flushing 
resources */
+  /**
+   * This method will synchronize the memTable and release its flushing 
resources
+   */
   private void syncReleaseFlushedMemTable(IMemTable memTable) {
     synchronized (flushingMemTables) {
       releaseFlushedMemTable(memTable);
@@ -1664,7 +1707,9 @@ public class TsFileProcessor {
     }
   }
 
-  /** end file and write some meta */
+  /**
+   * end file and write some meta
+   */
   private void endFile() throws IOException, TsFileProcessorException {
     if (logger.isDebugEnabled()) {
       logger.debug("Start to end file {}", tsFileResource);
@@ -1686,7 +1731,9 @@ public class TsFileProcessor {
     writer = null;
   }
 
-  /** End empty file and remove it from file system */
+  /**
+   * End empty file and remove it from file system
+   */
   private void endEmptyFile() throws TsFileProcessorException, IOException {
     logger.info("Start to end empty file {}", tsFileResource);
 
@@ -1714,7 +1761,9 @@ public class TsFileProcessor {
     this.managedByFlushManager = managedByFlushManager;
   }
 
-  /** Close this tsfile */
+  /**
+   * Close this tsfile
+   */
   public void close() throws TsFileProcessorException {
     try {
       // When closing resource file, its corresponding mod file is also closed.
@@ -2166,7 +2215,9 @@ public class TsFileProcessor {
     this.timeRangeId = timeRangeId;
   }
 
-  /** Release resource of a memtable */
+  /**
+   * Release resource of a memtable
+   */
   public void putMemTableBackAndClose() throws TsFileProcessorException {
     if (workMemTable != null) {
       workMemTable.release();
@@ -2190,12 +2241,16 @@ public class TsFileProcessor {
     return workMemTable != null ? workMemTable.getTVListsRamCost() : 0;
   }
 
-  /** Return Long.MAX_VALUE if workMemTable is null */
+  /**
+   * Return Long.MAX_VALUE if workMemTable is null
+   */
   public long getWorkMemTableCreatedTime() {
     return workMemTable != null ? workMemTable.getCreatedTime() : 
Long.MAX_VALUE;
   }
 
-  /** Return Long.MAX_VALUE if workMemTable is null */
+  /**
+   * Return Long.MAX_VALUE if workMemTable is null
+   */
   public long getWorkMemTableUpdateTime() {
     return workMemTable != null ? workMemTable.getUpdateTime() : 
Long.MAX_VALUE;
   }


Reply via email to