This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch TableModelIngestion
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/TableModelIngestion by this 
push:
     new da82fb67d51 Finish write MemTable
da82fb67d51 is described below

commit da82fb67d51d41f526295f2161b99173474fc5dd
Author: jt2594838 <[email protected]>
AuthorDate: Fri Jun 21 15:07:20 2024 +0800

    Finish write MemTable
---
 .../dataregion/DataExecutionVisitor.java           | 35 +----------
 .../plan/analyze/schema/SchemaValidator.java       | 26 +--------
 .../plan/planner/plan/node/write/InsertNode.java   | 67 ++++++++++++++++++----
 .../planner/plan/node/write/InsertTabletNode.java  | 66 +++++++++++++++++++++
 .../node/write/RelationalInsertTabletNode.java     | 12 ++++
 .../plan/relational/metadata/ColumnSchema.java     |  8 +++
 .../plan/relational/metadata/TableSchema.java      | 30 ++++++++++
 .../relational/sql/ast/WrappedInsertStatement.java | 30 ++++++++++
 .../db/storageengine/dataregion/DataRegion.java    | 62 ++++++++------------
 .../dataregion/memtable/AbstractMemTable.java      | 44 ++++++++------
 .../memtable/AlignedWritableMemChunk.java          |  9 +--
 .../memtable/AlignedWritableMemChunkGroup.java     |  5 +-
 .../dataregion/memtable/IWritableMemChunk.java     |  5 +-
 .../memtable/IWritableMemChunkGroup.java           |  3 +-
 .../dataregion/memtable/TsFileProcessor.java       |  8 +++
 .../dataregion/memtable/WritableMemChunk.java      |  5 +-
 .../dataregion/memtable/WritableMemChunkGroup.java |  3 +-
 .../org/apache/iotdb/db/utils/CommonUtils.java     | 10 ++++
 .../java/org/apache/iotdb/db/utils/MemUtils.java   | 13 ++++-
 .../db/utils/datastructure/AlignedTVList.java      | 12 ++--
 .../iotdb/db/utils/datastructure/TVList.java       |  4 +-
 .../storageengine/dataregion/DataRegionTest.java   | 28 ++++-----
 .../dataregion/memtable/TsFileProcessorTest.java   | 28 ++++-----
 .../org/apache/iotdb/db/utils/MemUtilsTest.java    |  2 +-
 .../db/utils/datastructure/AlignedTVListTest.java  | 10 ++--
 .../apache/iotdb/commons/schema/table/TsTable.java |  1 +
 .../schema/table/column/TsTableColumnCategory.java | 14 +++++
 27 files changed, 364 insertions(+), 176 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
index a46df8d1b5a..4fbccc10049 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
@@ -76,44 +76,13 @@ public class DataExecutionVisitor extends 
PlanVisitor<TSStatus, DataRegion> {
 
   @Override
   public TSStatus visitRelationalInsertTablet(RelationalInsertTabletNode node, 
DataRegion dataRegion) {
-    try {
-      dataRegion.insertRelationalTablet(node);
-      return StatusUtils.OK;
-    } catch (OutOfTTLException e) {
-      LOGGER.warn("Error in executing plan node: {}, caused by {}", node, 
e.getMessage());
-      return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
-    } catch (WriteProcessRejectException e) {
-      LOGGER.warn("Reject in executing plan node: {}, caused by {}", node, 
e.getMessage());
-      return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
-    } catch (WriteProcessException e) {
-      LOGGER.error("Error in executing plan node: {}", node, e);
-      return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
-    } catch (BatchProcessException e) {
-      LOGGER.warn(
-          "Batch failure in executing a InsertTabletNode. device: {}, 
startTime: {}, measurements: {}, failing status: {}",
-          node.getDevicePath(),
-          node.getTimes()[0],
-          node.getMeasurements(),
-          e.getFailingStatus());
-      // For each error
-      TSStatus firstStatus = null;
-      for (TSStatus status : e.getFailingStatus()) {
-        if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-          firstStatus = status;
-        }
-        // Return WRITE_PROCESS_REJECT directly for the consensus retry logic
-        if (status.getCode() == 
TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode()) {
-          return status;
-        }
-      }
-      return firstStatus;
-    }
+    return visitInsertTablet(node, dataRegion);
   }
 
   @Override
   public TSStatus visitInsertTablet(InsertTabletNode node, DataRegion 
dataRegion) {
     try {
-      dataRegion.insertTreeTablet(node);
+      dataRegion.insertTablet(node);
       return StatusUtils.OK;
     } catch (OutOfTTLException e) {
       LOGGER.warn("Error in executing plan node: {}, caused by {}", node, 
e.getMessage());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/SchemaValidator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/SchemaValidator.java
index 71dc2afbe4b..be74ef5be14 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/SchemaValidator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/SchemaValidator.java
@@ -68,7 +68,7 @@ public class SchemaValidator {
       final TableSchema incomingSchema = insertStatement.getTableSchema();
       final TableSchema realSchema = 
metadata.validateTableHeaderSchema(databaseName,
           incomingSchema, context);
-      validate(incomingSchema, realSchema);
+      insertStatement.validate(realSchema);
       metadata.validateDeviceSchema(insertStatement, context);
       insertStatement.updateAfterSchemaValidation(context);
     } catch (QueryProcessException e) {
@@ -76,31 +76,7 @@ public class SchemaValidator {
     }
   }
 
-  public static void validate(TableSchema incomingSchema, TableSchema 
realSchema) {
-    final List<ColumnSchema> incomingSchemaColumns = 
incomingSchema.getColumns();
-    Map<String, ColumnSchema> realSchemaMap = new HashMap<>();
-    realSchema.getColumns().forEach(c -> realSchemaMap.put(c.getName(), c));
 
-    for (ColumnSchema incomingSchemaColumn : incomingSchemaColumns) {
-      final ColumnSchema realSchemaColumn = 
realSchemaMap.get(incomingSchemaColumn.getName());
-      validate(incomingSchemaColumn, realSchemaColumn);
-    }
-  }
-
-  public static void validate(ColumnSchema incoming, ColumnSchema real) {
-    if (real == null) {
-      throw new SemanticException("Column " + incoming.getName() + " does not 
exists or fails to be "
-          + "created");
-    }
-    if (!incoming.getType().equals(real.getType())) {
-      throw new SemanticException(String.format("Inconsistent data type of 
column %s: %s/%s",
-          incoming.getName(), incoming.getType(), real.getType()));
-    }
-    if (!incoming.getColumnCategory().equals(real.getColumnCategory())) {
-      throw new SemanticException(String.format("Inconsistent column category 
of column %s: %s/%s",
-          incoming.getName(), incoming.getColumnCategory(), 
real.getColumnCategory()));
-    }
-  }
 
   public static ISchemaTree validate(
       ISchemaFetcher schemaFetcher,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
index f81150db2a8..032020ef303 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
@@ -21,6 +21,7 @@ package 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.function.Function;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.commons.consensus.index.ComparableConsensusRequest;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
@@ -31,12 +32,14 @@ import 
org.apache.iotdb.consensus.iot.log.ConsensusReqReader;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
 import org.apache.iotdb.db.storageengine.dataregion.memtable.DeviceIDFactory;
+import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
 import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
 
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.exception.NotImplementedException;
 import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.TableSchema;
 import org.apache.tsfile.write.schema.MeasurementSchema;
 
 import java.io.DataInputStream;
@@ -48,12 +51,14 @@ import java.util.Objects;
 
 public abstract class InsertNode extends WritePlanNode implements 
ComparableConsensusRequest {
 
-  /** this insert node doesn't need to participate in iot consensus */
+  /**
+   * this insert node doesn't need to participate in iot consensus
+   */
   public static final long NO_CONSENSUS_INDEX = 
ConsensusReqReader.DEFAULT_SEARCH_INDEX;
 
   /**
-   * if use id table, this filed is id form of device path <br>
-   * if not, this filed is device path<br>
+   * if use id table, this filed is id form of device path <br> if not, this 
filed is device
+   * path<br>
    */
   protected PartialPath devicePath;
 
@@ -64,12 +69,12 @@ public abstract class InsertNode extends WritePlanNode 
implements ComparableCons
 
   protected TsTableColumnCategory[] columnCategories;
   protected List<Integer> idColumnIndices;
+  protected int measurementColumnCnt = -1;
 
   protected int failedMeasurementNumber = 0;
 
   /**
-   * device id reference, for reuse device id in both id table and memtable 
<br>
-   * used in memtable
+   * device id reference, for reuse device id in both id table and memtable 
<br> used in memtable
    */
   protected IDeviceID deviceID;
 
@@ -79,7 +84,9 @@ public abstract class InsertNode extends WritePlanNode 
implements ComparableCons
    */
   protected long searchIndex = NO_CONSENSUS_INDEX;
 
-  /** Physical address of data region after splitting */
+  /**
+   * Physical address of data region after splitting
+   */
   protected TRegionReplicaSet dataRegionReplicaSet;
 
   protected ProgressIndex progressIndex;
@@ -150,6 +157,11 @@ public abstract class InsertNode extends WritePlanNode 
implements ComparableCons
     return measurements;
   }
 
+  public boolean isValidMeasurement(int i) {
+    return measurementSchemas != null && measurementSchemas[i] != null && 
(columnCategories == null
+        || columnCategories[i] == TsTableColumnCategory.MEASUREMENT);
+  }
+
   public void setMeasurements(String[] measurements) {
     this.measurements = measurements;
   }
@@ -158,6 +170,20 @@ public abstract class InsertNode extends WritePlanNode 
implements ComparableCons
     return dataTypes;
   }
 
+  public int getMeasurementColumnCnt() {
+    if (measurementColumnCnt == -1) {
+      measurementColumnCnt = 0;
+      if (measurementSchemas != null) {
+        for (int i = 0; i < measurementSchemas.length; i++) {
+          if (isValidMeasurement(i)) {
+            measurementColumnCnt++;
+          }
+        }
+      }
+    }
+    return measurementColumnCnt;
+  }
+
   public TSDataType getDataType(int index) {
     return dataTypes[index];
   }
@@ -181,7 +207,9 @@ public abstract class InsertNode extends WritePlanNode 
implements ComparableCons
     return searchIndex;
   }
 
-  /** Search index should start from 1 */
+  /**
+   * Search index should start from 1
+   */
   public void setSearchIndex(long searchIndex) {
     this.searchIndex = searchIndex;
   }
@@ -197,7 +225,10 @@ public abstract class InsertNode extends WritePlanNode 
implements ComparableCons
   }
 
   // region Serialization methods for WAL
-  /** Serialized size of measurement schemas, ignoring failed time series */
+
+  /**
+   * Serialized size of measurement schemas, ignoring failed time series
+   */
   protected int serializeMeasurementSchemasSize() {
     int byteLen = 0;
     for (int i = 0; i < measurements.length; i++) {
@@ -210,7 +241,9 @@ public abstract class InsertNode extends WritePlanNode 
implements ComparableCons
     return byteLen;
   }
 
-  /** Serialize measurement schemas, ignoring failed time series */
+  /**
+   * Serialize measurement schemas, ignoring failed time series
+   */
   protected void serializeMeasurementSchemasToWAL(IWALByteBufferView buffer) {
     for (int i = 0; i < measurements.length; i++) {
       // ignore failed partial insert
@@ -305,9 +338,15 @@ public abstract class InsertNode extends WritePlanNode 
implements ComparableCons
 
   @Override
   public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
-    if (!super.equals(o)) return false;
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    if (!super.equals(o)) {
+      return false;
+    }
     InsertNode that = (InsertNode) o;
     return isAligned == that.isAligned
         && Objects.equals(devicePath, that.devicePath)
@@ -343,4 +382,8 @@ public abstract class InsertNode extends WritePlanNode 
implements ComparableCons
       }
     }
   }
+
+  public String getTableName() {
+    return null;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
index e46e6cc91e1..d0e733b69e1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
@@ -19,14 +19,20 @@
 
 package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write;
 
+import static org.apache.iotdb.db.utils.CommonUtils.isAlive;
+
 import java.util.Map.Entry;
 import java.util.function.IntFunction;
+import java.util.function.IntToLongFunction;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.commons.utils.TimePartitionUtils;
+import org.apache.iotdb.db.exception.query.OutOfTTLException;
 import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis;
 import 
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
@@ -37,8 +43,11 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
 import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
 import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
+import org.apache.iotdb.db.utils.DateTimeUtils;
 import org.apache.iotdb.db.utils.QueryDataSetUtils;
 
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.exception.NotImplementedException;
 import org.apache.tsfile.file.metadata.IDeviceID;
@@ -1183,4 +1192,61 @@ public class InsertTabletNode extends InsertNode 
implements WALEntryValue {
   public List<Pair<IDeviceID, Integer>> splitByDevice(int start, int end) {
     return Collections.singletonList(new Pair<>(deviceID, end));
   }
+
+
+  /**
+   *
+   * @param results insertion result of each row
+   * @param rowTTLGetter the ttl associated with each row
+   * @return the position of the first alive row
+   * @throws OutOfTTLException if all rows have expired the TTL
+   */
+  public int checkTTL(TSStatus[] results,
+      IntToLongFunction rowTTLGetter)
+      throws OutOfTTLException {
+    return checkTTLInternal(results, rowTTLGetter, true);
+  }
+
+  protected int checkTTLInternal(TSStatus[] results,
+      IntToLongFunction rowTTLGetter, boolean breakOnFirstAlive)
+      throws OutOfTTLException {
+
+    /*
+     * assume that batch has been sorted by client
+     */
+    int loc = 0;
+    long ttl = 0;
+    int firstAliveLoc = -1;
+    while (loc < getRowCount()) {
+      ttl = rowTTLGetter.applyAsLong(loc);
+      long currTime = getTimes()[loc];
+      // skip points that do not satisfy TTL
+      if (!isAlive(currTime, ttl)) {
+        results[loc] =
+            RpcUtils.getStatus(
+                TSStatusCode.OUT_OF_TTL,
+                String.format(
+                    "Insertion time [%s] is less than ttl time bound [%s]",
+                    DateTimeUtils.convertLongToDate(currTime),
+                    DateTimeUtils.convertLongToDate(
+                        CommonDateTimeUtils.currentTime() - ttl)));
+      } else {
+        if (firstAliveLoc == -1) {
+          firstAliveLoc = loc;
+        }
+        if (breakOnFirstAlive) {
+          break;
+        }
+      }
+      loc++;
+    }
+
+    if (firstAliveLoc == -1) {
+      // no alive data
+      throw new OutOfTTLException(
+          getTimes()[getTimes().length - 1],
+          (CommonDateTimeUtils.currentTime() - ttl));
+    }
+    return firstAliveLoc;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
index c60499e97f3..f86e3302d87 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
@@ -24,8 +24,11 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.function.IntToLongFunction;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
+import org.apache.iotdb.db.exception.query.OutOfTTLException;
 import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
@@ -157,5 +160,14 @@ public class RelationalInsertTabletNode extends 
InsertTabletNode {
 
     return result;
   }
+
+  @Override
+  public int checkTTL(TSStatus[] results, IntToLongFunction rowTTLGetter) 
throws OutOfTTLException {
+    return checkTTLInternal(results, rowTTLGetter, false);
+  }
+
+  public String getTableName() {
+    return deviceID.getTableName();
+  }
 }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/ColumnSchema.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/ColumnSchema.java
index bb460c61423..91f074811d7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/ColumnSchema.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/ColumnSchema.java
@@ -21,12 +21,14 @@ package 
org.apache.iotdb.db.queryengine.plan.relational.metadata;
 
 import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
 
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchema;
 import org.apache.tsfile.read.common.type.BinaryType;
 import org.apache.tsfile.read.common.type.BooleanType;
 import org.apache.tsfile.read.common.type.DoubleType;
 import org.apache.tsfile.read.common.type.FloatType;
 import org.apache.tsfile.read.common.type.Type;
 import org.apache.tsfile.read.common.type.TypeEnum;
+import org.apache.tsfile.read.common.type.TypeFactory;
 import org.apache.tsfile.read.common.type.UnknownType;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
 
@@ -35,6 +37,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Objects;
 import java.util.StringJoiner;
+import org.apache.tsfile.write.record.Tablet.ColumnType;
 
 import static java.util.Locale.ENGLISH;
 import static java.util.Objects.requireNonNull;
@@ -143,6 +146,11 @@ public class ColumnSchema {
     }
   }
 
+  public static ColumnSchema ofTsColumnSchema(TsTableColumnSchema schema) {
+    return new ColumnSchema(schema.getColumnName(), 
TypeFactory.getType(schema.getDataType()),
+        false, schema.getColumnCategory());
+  }
+
   public static Builder builder() {
     return new Builder();
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableSchema.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableSchema.java
index e3ed509931c..4b88ef4807f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableSchema.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableSchema.java
@@ -19,7 +19,14 @@
 
 package org.apache.iotdb.db.queryengine.plan.relational.metadata;
 
+import java.util.ArrayList;
 import java.util.List;
+import org.apache.iotdb.commons.schema.table.TsTable;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchema;
+import 
org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager;
+import org.apache.tsfile.write.record.Tablet.ColumnType;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
 
 public class TableSchema {
 
@@ -39,4 +46,27 @@ public class TableSchema {
   public List<ColumnSchema> getColumns() {
     return columns;
   }
+
+  public static TableSchema of(TsTable tsTable) {
+    String tableName = tsTable.getTableName();
+    List<ColumnSchema> columns = new ArrayList<>();
+    for (TsTableColumnSchema tsTableColumnSchema : tsTable.getColumnList()) {
+      columns.add(ColumnSchema.ofTsColumnSchema(tsTableColumnSchema));
+    }
+    return new TableSchema(tableName, columns);
+  }
+
+  public org.apache.tsfile.file.metadata.TableSchema toTsFileTableSchema() {
+    // TODO-Table: unify redundant definitions
+    String tableName = this.getTableName();
+    List<IMeasurementSchema> measurementSchemas = new ArrayList<>();
+    List<ColumnType> columnTypes = new ArrayList<>();
+    for (ColumnSchema column : columns) {
+      measurementSchemas.add(new MeasurementSchema(column.getName(),
+          InternalTypeManager.getTSDataType(column.getType())));
+      columnTypes.add(column.getColumnCategory().toTsFileColumnType());
+    }
+    return
+        new org.apache.tsfile.file.metadata.TableSchema(tableName, 
measurementSchemas, columnTypes);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java
index f81a26dc594..355786f86af 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java
@@ -19,8 +19,12 @@
 
 package org.apache.iotdb.db.queryengine.plan.relational.sql.ast;
 
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.plan.analyze.schema.SchemaValidator;
 import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
 import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.ITableDeviceSchemaValidation;
 import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableSchema;
@@ -67,4 +71,30 @@ public abstract class WrappedInsertStatement extends 
WrappedStatement implements
 
     return tableSchema;
   }
+
+  public void validate(TableSchema realSchema) throws QueryProcessException {
+    final List<ColumnSchema> incomingSchemaColumns = 
getTableSchema().getColumns();
+    Map<String, ColumnSchema> realSchemaMap = new HashMap<>();
+    realSchema.getColumns().forEach(c -> realSchemaMap.put(c.getName(), c));
+
+    for (ColumnSchema incomingSchemaColumn : incomingSchemaColumns) {
+      final ColumnSchema realSchemaColumn = 
realSchemaMap.get(incomingSchemaColumn.getName());
+      validate(incomingSchemaColumn, realSchemaColumn);
+    }
+  }
+
+  public static void validate(ColumnSchema incoming, ColumnSchema real) {
+    if (real == null) {
+      throw new SemanticException("Column " + incoming.getName() + " does not 
exists or fails to be "
+          + "created");
+    }
+    if (!incoming.getType().equals(real.getType())) {
+      throw new SemanticException(String.format("Inconsistent data type of 
column %s: %s/%s",
+          incoming.getName(), incoming.getType(), real.getType()));
+    }
+    if (!incoming.getColumnCategory().equals(real.getColumnCategory())) {
+      throw new SemanticException(String.format("Inconsistent column category 
of column %s: %s/%s",
+          incoming.getName(), incoming.getColumnCategory(), 
real.getColumnCategory()));
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 7bf4862ac12..bdd0922a759 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.db.storageengine.dataregion;
 
-import java.util.function.IntFunction;
 import java.util.function.IntToLongFunction;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.cluster.NodeStatus;
@@ -58,10 +57,13 @@ import 
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeTTLCach
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableSchema;
+import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache;
 import org.apache.iotdb.db.service.SettleService;
 import org.apache.iotdb.db.service.metrics.CompactionMetrics;
 import org.apache.iotdb.db.service.metrics.FileMetrics;
@@ -113,6 +115,7 @@ import 
org.apache.iotdb.db.storageengine.rescon.memory.TimePartitionManager;
 import org.apache.iotdb.db.storageengine.rescon.memory.TsFileResourceManager;
 import 
org.apache.iotdb.db.storageengine.rescon.quotas.DataNodeSpaceQuotaManager;
 import org.apache.iotdb.db.tools.settle.TsFileAndModSettleTool;
+import org.apache.iotdb.db.utils.CommonUtils;
 import org.apache.iotdb.db.utils.DateTimeUtils;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -932,7 +935,7 @@ public class DataRegion implements IDataRegionForQuery {
     // reject insertions that are out of ttl
     long deviceTTL =
         
DataNodeTTLCache.getInstance().getTTL(insertRowNode.getDevicePath().getNodes());
-    if (!isAlive(insertRowNode.getTime(), deviceTTL)) {
+    if (!CommonUtils.isAlive(insertRowNode.getTime(), deviceTTL)) {
       throw new OutOfTTLException(
           insertRowNode.getTime(), (CommonDateTimeUtils.currentTime() - 
deviceTTL));
     }
@@ -966,21 +969,6 @@ public class DataRegion implements IDataRegionForQuery {
     }
   }
 
-  /**
-   * Insert a tablet (rows belonging to the same devices) into this database.
-   *
-   * @throws BatchProcessException if some of the rows failed to be inserted
-   */
-  @SuppressWarnings({"squid:S3776", "squid:S6541"}) // Suppress high Cognitive 
Complexity warning
-  public void insertTreeTablet(InsertTabletNode insertTabletNode)
-      throws BatchProcessException, WriteProcessException {
-    insertTablet(insertTabletNode, false);
-  }
-
-  public void insertRelationalTablet(InsertTabletNode insertTabletNode)
-      throws BatchProcessException, WriteProcessException {
-    insertTablet(insertTabletNode, true);
-  }
 
   private long getLastFlushTime(long timePartitionID, IDeviceID deviceID) {
    return config.isEnableSeparateData()
@@ -1056,8 +1044,7 @@ public class DataRegion implements IDataRegionForQuery {
    * @throws BatchProcessException if some of the rows failed to be inserted
    */
   @SuppressWarnings({"squid:S3776", "squid:S6541"}) // Suppress high Cognitive 
Complexity warning
-  private void insertTablet(InsertTabletNode insertTabletNode,
-      boolean checkAllRowTtl)
+  public void insertTablet(InsertTabletNode insertTabletNode)
       throws BatchProcessException, WriteProcessException {
     StorageEngine.blockInsertionIfReject(null);
     long startTime = System.nanoTime();
@@ -1070,12 +1057,11 @@ public class DataRegion implements IDataRegionForQuery {
       TSStatus[] results = new TSStatus[insertTabletNode.getRowCount()];
       Arrays.fill(results, RpcUtils.SUCCESS_STATUS);
       boolean noFailure;
+      int loc = insertTabletNode.checkTTL(results, i -> 
DataNodeTTLCache.getInstance()
+          .getTTL(insertTabletNode.getDeviceID(i)));
+      noFailure = loc == 0;
 
-      int loc = checkTTL(insertTabletNode, results, i -> 
DataNodeTTLCache.getInstance()
-          .getTTL(insertTabletNode.getDeviceID(i)), !checkAllRowTtl);
-      noFailure = loc != 0;
-
-      noFailure = noFailure & splitAndInsert(insertTabletNode, loc, results);
+      noFailure = noFailure && splitAndInsert(insertTabletNode, loc, results);
 
       startTime = System.nanoTime();
       tryToUpdateInsertTabletLastCache(insertTabletNode);
@@ -1117,7 +1103,7 @@ public class DataRegion implements IDataRegionForQuery {
       ttl = rowTTLGetter.applyAsLong(loc);
       long currTime = insertTabletNode.getTimes()[loc];
       // skip points that do not satisfy TTL
-      if (!isAlive(currTime, ttl)) {
+      if (!CommonUtils.isAlive(currTime, ttl)) {
         results[loc] =
             RpcUtils.getStatus(
                 TSStatusCode.OUT_OF_TTL,
@@ -1147,15 +1133,6 @@ public class DataRegion implements IDataRegionForQuery {
   }
 
 
-  /**
-   * Check whether the time falls in TTL.
-   *
-   * @return whether the given time falls in ttl
-   */
-  private boolean isAlive(long time, long dataTTL) {
-    return dataTTL == Long.MAX_VALUE || (CommonDateTimeUtils.currentTime() - 
time) <= dataTTL;
-  }
-
   /**
    * insert batch to tsfile processor thread-safety that the caller need to 
guarantee The rows to be
    * inserted are in the range [start, end) Null value in each column values 
will be replaced by the
@@ -1193,6 +1170,9 @@ public class DataRegion implements IDataRegionForQuery {
       return false;
     }
 
+    // register TableSchema (and maybe more) for table insertion
+    registerToTsFile(insertTabletNode, tsFileProcessor);
+
     try {
       tsFileProcessor.insertTablet(insertTabletNode, start, end, results, 
noFailure);
     } catch (WriteProcessRejectException e) {
@@ -1210,6 +1190,14 @@ public class DataRegion implements IDataRegionForQuery {
     return true;
   }
 
+  private void registerToTsFile(InsertNode node, TsFileProcessor 
tsFileProcessor) {
+    final String tableName = node.getTableName();
+    if (tableName != null) {
+      tsFileProcessor.registerToTsFile(tableName,
+          t -> 
TableSchema.of(DataNodeTableCache.getInstance().getTable(getDatabaseName(), 
t)).toTsFileTableSchema());
+    }
+  }
+
   private void tryToUpdateInsertTabletLastCache(InsertTabletNode node) {
     if (!CommonDescriptor.getInstance().getConfig().isLastCacheEnable()
         || 
(config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)
@@ -3373,7 +3361,7 @@ public class DataRegion implements IDataRegionForQuery {
       Map<TsFileProcessor, InsertRowsNode> tsFileProcessorMap = new 
HashMap<>();
       for (int i = 0; i < 
insertRowsOfOneDeviceNode.getInsertRowNodeList().size(); i++) {
         InsertRowNode insertRowNode = 
insertRowsOfOneDeviceNode.getInsertRowNodeList().get(i);
-        if (!isAlive(insertRowNode.getTime(), deviceTTL)) {
+        if (!CommonUtils.isAlive(insertRowNode.getTime(), deviceTTL)) {
           // we do not need to write these part of data, as they can not be 
queried
           // or the sub-plan has already been executed, we are retrying other 
sub-plans
           insertRowsOfOneDeviceNode
@@ -3485,7 +3473,7 @@ public class DataRegion implements IDataRegionForQuery {
         InsertRowNode insertRowNode = 
insertRowsNode.getInsertRowNodeList().get(i);
         long deviceTTL =
             
DataNodeTTLCache.getInstance().getTTL(insertRowNode.getDevicePath().getNodes());
-        if (!isAlive(insertRowNode.getTime(), deviceTTL)) {
+        if (!CommonUtils.isAlive(insertRowNode.getTime(), deviceTTL)) {
           insertRowsNode
               .getResults()
               .put(
@@ -3538,7 +3526,7 @@ public class DataRegion implements IDataRegionForQuery {
     for (int i = 0; i < 
insertMultiTabletsNode.getInsertTabletNodeList().size(); i++) {
       InsertTabletNode insertTabletNode = 
insertMultiTabletsNode.getInsertTabletNodeList().get(i);
       try {
-        insertTreeTablet(insertTabletNode);
+        insertTablet(insertTabletNode);
       } catch (WriteProcessException e) {
         insertMultiTabletsNode
             .getResults()
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
index 263edad395b..f17b9fc0ec6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.commons.path.AlignedFullPath;
 import org.apache.iotdb.commons.path.IFullPath;
 import org.apache.iotdb.commons.path.NonAlignedFullPath;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
 import org.apache.iotdb.commons.service.metric.MetricService;
 import org.apache.iotdb.commons.service.metric.enums.Metric;
 import org.apache.iotdb.commons.service.metric.enums.Tag;
@@ -354,10 +355,11 @@ public abstract class AbstractMemTable implements 
IMemTable {
       TSStatus[] results)
       throws WriteProcessException {
     try {
-      writeAlignedTablet(insertTabletNode, start, end);
-      memSize += MemUtils.getAlignedTabletSize(insertTabletNode, start, end);
+      writeAlignedTablet(insertTabletNode, start, end, results);
+      //TODO-Table: what is the relation between this and 
TsFileProcessor.checkMemCost
+      memSize += MemUtils.getAlignedTabletSize(insertTabletNode, start, end, 
results);
       int pointsInserted =
-          (insertTabletNode.getDataTypes().length - 
insertTabletNode.getFailedMeasurementNumber())
+          (insertTabletNode.getMeasurementColumnCnt() - 
insertTabletNode.getFailedMeasurementNumber())
               * (end - start);
       totalPointsNum += pointsInserted;
       MetricService.getInstance()
@@ -432,16 +434,18 @@ public abstract class AbstractMemTable implements 
IMemTable {
         insertTabletNode.getBitMaps(),
         schemaList,
         start,
-        end)) {
+        end, null)) {
       shouldFlush = true;
     }
   }
 
-  public void writeAlignedTablet(InsertTabletNode insertTabletNode, int start, 
int end) {
+  public void writeAlignedTablet(InsertTabletNode insertTabletNode, int start, 
int end,
+      TSStatus[] results) {
 
     List<IMeasurementSchema> schemaList = new ArrayList<>();
     for (int i = 0; i < insertTabletNode.getMeasurementSchemas().length; i++) {
-      if (insertTabletNode.getColumns()[i] == null) {
+      if (insertTabletNode.getColumns()[i] == null ||
+          (insertTabletNode.getColumnCategories() != null && 
insertTabletNode.getColumnCategories()[i] != 
TsTableColumnCategory.MEASUREMENT)) {
         schemaList.add(null);
       } else {
         schemaList.add(insertTabletNode.getMeasurementSchemas()[i]);
@@ -450,16 +454,24 @@ public abstract class AbstractMemTable implements 
IMemTable {
     if (schemaList.isEmpty()) {
       return;
     }
-    IWritableMemChunkGroup memChunkGroup =
-        
createAlignedMemChunkGroupIfNotExistAndGet(insertTabletNode.getDeviceID(), 
schemaList);
-    if (memChunkGroup.writeValuesWithFlushCheck(
-        insertTabletNode.getTimes(),
-        insertTabletNode.getColumns(),
-        insertTabletNode.getBitMaps(),
-        schemaList,
-        start,
-        end)) {
-      shouldFlush = true;
+    final List<Pair<IDeviceID, Integer>> deviceEndOffsetPair = 
insertTabletNode.splitByDevice(start, end);
+    int splitStart = start;
+    for (Pair<IDeviceID, Integer> pair : deviceEndOffsetPair) {
+      final IDeviceID deviceID = pair.left;
+      int splitEnd = pair.right;
+      IWritableMemChunkGroup memChunkGroup =
+          createAlignedMemChunkGroupIfNotExistAndGet(deviceID, schemaList);
+      if (memChunkGroup.writeValuesWithFlushCheck(
+          insertTabletNode.getTimes(),
+          insertTabletNode.getColumns(),
+          insertTabletNode.getBitMaps(),
+          schemaList,
+          splitStart,
+          splitEnd,
+          results)) {
+        shouldFlush = true;
+      }
+      splitStart = splitEnd;
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
index a219df17908..f904b9de49a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.storageengine.dataregion.memtable;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.db.storageengine.dataregion.flush.CompressionRatio;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
 import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
@@ -156,8 +157,8 @@ public class AlignedWritableMemChunk implements 
IWritableMemChunk {
 
   @Override
   public boolean putAlignedValuesWithFlushCheck(
-      long[] t, Object[] v, BitMap[] bitMaps, int start, int end) {
-    list.putAlignedValues(t, v, bitMaps, start, end);
+      long[] t, Object[] v, BitMap[] bitMaps, int start, int end, TSStatus[] 
results) {
+    list.putAlignedValues(t, v, bitMaps, start, end, results);
     return list.reachMaxChunkSizeThreshold();
   }
 
@@ -187,13 +188,13 @@ public class AlignedWritableMemChunk implements 
IWritableMemChunk {
       BitMap[] bitMaps,
       List<IMeasurementSchema> schemaList,
       int start,
-      int end) {
+      int end, TSStatus[] results) {
     Pair<Object[], BitMap[]> pair =
         checkAndReorderColumnValuesInInsertPlan(schemaList, valueList, 
bitMaps);
     Object[] reorderedColumnValues = pair.left;
     BitMap[] reorderedBitMaps = pair.right;
     return putAlignedValuesWithFlushCheck(
-        times, reorderedColumnValues, reorderedBitMaps, start, end);
+        times, reorderedColumnValues, reorderedBitMaps, start, end, results);
   }
 
   /**
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunkGroup.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunkGroup.java
index c541e52e817..797880b186f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunkGroup.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunkGroup.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.storageengine.dataregion.memtable;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.path.AlignedPath;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathPatternUtil;
@@ -55,9 +56,9 @@ public class AlignedWritableMemChunkGroup implements 
IWritableMemChunkGroup {
       BitMap[] bitMaps,
       List<IMeasurementSchema> schemaList,
       int start,
-      int end) {
+      int end, TSStatus[] results) {
     return memChunk.writeAlignedValuesWithFlushCheck(
-        times, columns, bitMaps, schemaList, start, end);
+        times, columns, bitMaps, schemaList, start, end, results);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
index eac06484035..376444072da 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.storageengine.dataregion.memtable;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
 import org.apache.iotdb.db.utils.datastructure.TVList;
 
@@ -58,7 +59,7 @@ public interface IWritableMemChunk extends WALEntryValue {
   void putBooleans(long[] t, boolean[] v, BitMap bitMap, int start, int end);
 
   boolean putAlignedValuesWithFlushCheck(
-      long[] t, Object[] v, BitMap[] bitMaps, int start, int end);
+      long[] t, Object[] v, BitMap[] bitMaps, int start, int end, TSStatus[] 
results);
 
   boolean writeWithFlushCheck(long insertTime, Object objectValue);
 
@@ -78,7 +79,7 @@ public interface IWritableMemChunk extends WALEntryValue {
       BitMap[] bitMaps,
       List<IMeasurementSchema> schemaList,
       int start,
-      int end);
+      int end, TSStatus[] results);
 
   long count();
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunkGroup.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunkGroup.java
index 3f94f361a35..29ab0f4afbb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunkGroup.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunkGroup.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.storageengine.dataregion.memtable;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
 
@@ -36,7 +37,7 @@ public interface IWritableMemChunkGroup extends WALEntryValue 
{
       BitMap[] bitMaps,
       List<IMeasurementSchema> schemaList,
       int start,
-      int end);
+      int end, TSStatus[] results);
 
   void release();
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index 4cb4210d222..bece21045f4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.storageengine.dataregion.memtable;
 
+import java.util.function.Function;
 import java.util.function.IntFunction;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
@@ -88,6 +89,7 @@ import org.apache.tsfile.file.metadata.AlignedChunkMetadata;
 import org.apache.tsfile.file.metadata.ChunkMetadata;
 import org.apache.tsfile.file.metadata.IChunkMetadata;
 import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.TableSchema;
 import org.apache.tsfile.utils.Binary;
 import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.write.writer.RestorableTsFileIOWriter;
@@ -2201,4 +2203,10 @@ public class TsFileProcessor {
   public ConcurrentLinkedDeque<IMemTable> getFlushingMemTable() {
     return flushingMemTables;
   }
+
+  public void registerToTsFile(String tableName,
+      Function<String, TableSchema> tableSchemaFunction) {
+    getWriter().getKnownSchema().getTableSchemaMap().computeIfAbsent(tableName,
+        tableSchemaFunction);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
index 4537809f5af..36282e91840 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.storageengine.dataregion.memtable;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.storageengine.dataregion.flush.CompressionRatio;
@@ -137,7 +138,7 @@ public class WritableMemChunk implements IWritableMemChunk {
       BitMap[] bitMaps,
       List<IMeasurementSchema> schemaList,
       int start,
-      int end) {
+      int end, TSStatus[] results) {
     throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + 
list.getDataType());
   }
 
@@ -211,7 +212,7 @@ public class WritableMemChunk implements IWritableMemChunk {
 
   @Override
   public boolean putAlignedValuesWithFlushCheck(
-      long[] t, Object[] v, BitMap[] bitMaps, int start, int end) {
+      long[] t, Object[] v, BitMap[] bitMaps, int start, int end, TSStatus[] 
results) {
     throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + 
schema.getType());
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java
index 5d61fe0e3dd..867b2aa11dc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.storageengine.dataregion.memtable;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathPatternUtil;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
@@ -51,7 +52,7 @@ public class WritableMemChunkGroup implements 
IWritableMemChunkGroup {
       BitMap[] bitMaps,
       List<IMeasurementSchema> schemaList,
       int start,
-      int end) {
+      int end, TSStatus[] results) {
     boolean flushFlag = false;
     for (int i = 0; i < columns.length; i++) {
       if (columns[i] == null) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
index 6a179cbaccb..e5c40c1b8c6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.utils;
 
+import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution;
@@ -414,4 +415,13 @@ public class CommonUtils {
     }
     return ret;
   }
+
+  /**
+   * Check whether the time falls in TTL.
+   *
+   * @return whether the given time falls in ttl
+   */
+  public static boolean isAlive(long time, long dataTTL) {
+    return dataTTL == Long.MAX_VALUE || (CommonDateTimeUtils.currentTime() - 
time) <= dataTTL;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
index 76953fe9fa2..7d179e7defc 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
@@ -127,16 +127,23 @@ public class MemUtils {
     return memSize;
   }
 
-  public static long getAlignedTabletSize(InsertTabletNode insertTabletNode, 
int start, int end) {
+  public static long getAlignedTabletSize(InsertTabletNode insertTabletNode, 
int start, int end,
+      TSStatus[] results) {
     if (start >= end) {
       return 0L;
     }
     long memSize = 0;
     for (int i = 0; i < insertTabletNode.getMeasurements().length; i++) {
-      if (insertTabletNode.getMeasurements()[i] == null) {
+      if (!insertTabletNode.isValidMeasurement(i)) {
         continue;
       }
-      memSize += (long) (end - start) * 
insertTabletNode.getDataTypes()[i].getDataTypeSize();
+      if (results == null) {
+        memSize += (long) (end - start) * 
insertTabletNode.getDataTypes()[i].getDataTypeSize();
+      } else {
+        for (int j = start; j < end; j++) {
+          memSize += insertTabletNode.getDataTypes()[i].getDataTypeSize();
+        }
+      }
     }
     // time and index column memSize for vector
     memSize += (end - start) * (8L + 4L);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
index 8372468e3cd..9990294b7bd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.utils.datastructure;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
 import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
@@ -723,7 +724,8 @@ public abstract class AlignedTVList extends TVList {
 
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity 
warning
   @Override
-  public void putAlignedValues(long[] time, Object[] value, BitMap[] bitMaps, 
int start, int end) {
+  public void putAlignedValues(long[] time, Object[] value, BitMap[] bitMaps, 
int start, int end,
+      TSStatus[] results) {
     checkExpansion();
     int idx = start;
 
@@ -742,7 +744,8 @@ public abstract class AlignedTVList extends TVList {
           indices.get(arrayIdx)[elementIdx + i] = rowCount;
           for (int j = 0; j < values.size(); j++) {
             if (value[j] == null
-                || bitMaps != null && bitMaps[j] != null && 
bitMaps[j].isMarked(idx + i)) {
+                || bitMaps != null && bitMaps[j] != null && 
bitMaps[j].isMarked(idx + i)
+                || results != null && results[idx + i] != null) {
               markNullValue(j, arrayIdx, elementIdx + i);
             }
           }
@@ -758,7 +761,8 @@ public abstract class AlignedTVList extends TVList {
           indices.get(arrayIdx)[elementIdx + i] = rowCount;
           for (int j = 0; j < values.size(); j++) {
             if (value[j] == null
-                || bitMaps != null && bitMaps[j] != null && 
bitMaps[j].isMarked(idx + i)) {
+                || bitMaps != null && bitMaps[j] != null && 
bitMaps[j].isMarked(idx + i)
+                || results != null && results[idx + i] != null) {
               markNullValue(j, arrayIdx, elementIdx + i);
             }
           }
@@ -1259,7 +1263,7 @@ public abstract class AlignedTVList extends TVList {
     }
 
     AlignedTVList tvList = AlignedTVList.newAlignedList(dataTypes);
-    tvList.putAlignedValues(times, values, bitMaps, 0, rowCount);
+    tvList.putAlignedValues(times, values, bitMaps, 0, rowCount, null);
     return tvList;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
index 99a34b47084..b2cd965458e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.utils.datastructure;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
@@ -189,7 +190,8 @@ public abstract class TVList implements WALEntryValue {
     throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
   }
 
-  public void putAlignedValues(long[] time, Object[] value, BitMap[] bitMaps, 
int start, int end) {
+  public void putAlignedValues(long[] time, Object[] value, BitMap[] bitMaps, 
int start, int end,
+      TSStatus[] results) {
     throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
   }
 
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
index b29ef2a48ac..80b45ebf46c 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
@@ -287,7 +287,7 @@ public class DataRegionTest {
             columns,
             times.length);
 
-    dataRegion.insertTreeTablet(insertTabletNode1);
+    dataRegion.insertTablet(insertTabletNode1);
     dataRegion.asyncCloseAllWorkingTsFileProcessors();
 
     for (int r = 50; r < 149; r++) {
@@ -309,7 +309,7 @@ public class DataRegionTest {
             columns,
             times.length);
 
-    dataRegion.insertTreeTablet(insertTabletNode2);
+    dataRegion.insertTablet(insertTabletNode2);
     dataRegion.asyncCloseAllWorkingTsFileProcessors();
     dataRegion.syncCloseAllWorkingTsFileProcessors();
 
@@ -365,7 +365,7 @@ public class DataRegionTest {
             columns,
             times.length);
 
-    dataRegion.insertTreeTablet(insertTabletNode1);
+    dataRegion.insertTablet(insertTabletNode1);
 
     for (int r = 50; r < 149; r++) {
       times[r - 50] = r;
@@ -386,7 +386,7 @@ public class DataRegionTest {
             columns,
             times.length);
 
-    dataRegion.insertTreeTablet(insertTabletNode2);
+    dataRegion.insertTablet(insertTabletNode2);
     Assert.assertTrue(SystemInfo.getInstance().getTotalMemTableSize() > 0);
     dataRegion.syncDeleteDataFiles();
     Assert.assertEquals(0, SystemInfo.getInstance().getTotalMemTableSize());
@@ -440,7 +440,7 @@ public class DataRegionTest {
             columns,
             times.length);
 
-    dataRegion.insertTreeTablet(insertTabletNode1);
+    dataRegion.insertTablet(insertTabletNode1);
     dataRegion.asyncCloseAllWorkingTsFileProcessors();
 
     for (int r = 50; r < 149; r++) {
@@ -462,7 +462,7 @@ public class DataRegionTest {
             columns,
             times.length);
 
-    dataRegion.insertTreeTablet(insertTabletNode2);
+    dataRegion.insertTablet(insertTabletNode2);
     dataRegion.asyncCloseAllWorkingTsFileProcessors();
     dataRegion.syncCloseAllWorkingTsFileProcessors();
 
@@ -516,7 +516,7 @@ public class DataRegionTest {
             times.length);
     insertTabletNode1.setFailedMeasurementNumber(2);
 
-    dataRegion.insertTreeTablet(insertTabletNode1);
+    dataRegion.insertTablet(insertTabletNode1);
     dataRegion.asyncCloseAllWorkingTsFileProcessors();
 
     for (int r = 50; r < 149; r++) {
@@ -539,7 +539,7 @@ public class DataRegionTest {
             times.length);
     insertTabletNode2.setFailedMeasurementNumber(2);
 
-    dataRegion.insertTreeTablet(insertTabletNode2);
+    dataRegion.insertTablet(insertTabletNode2);
     dataRegion.asyncCloseAllWorkingTsFileProcessors();
     dataRegion.syncCloseAllWorkingTsFileProcessors();
 
@@ -708,7 +708,7 @@ public class DataRegionTest {
             columns,
             times.length);
 
-    dataRegion.insertTreeTablet(insertTabletNode1);
+    dataRegion.insertTablet(insertTabletNode1);
     dataRegion.asyncCloseAllWorkingTsFileProcessors();
 
     for (int r = 149; r >= 50; r--) {
@@ -729,7 +729,7 @@ public class DataRegionTest {
             columns,
             times.length);
 
-    dataRegion.insertTreeTablet(insertTabletNode2);
+    dataRegion.insertTablet(insertTabletNode2);
     dataRegion.asyncCloseAllWorkingTsFileProcessors();
     dataRegion.syncCloseAllWorkingTsFileProcessors();
 
@@ -793,7 +793,7 @@ public class DataRegionTest {
             columns,
             times.length);
 
-    dataRegion.insertTreeTablet(insertTabletNode1);
+    dataRegion.insertTablet(insertTabletNode1);
     dataRegion.asyncCloseAllWorkingTsFileProcessors();
 
     for (int r = 1249; r >= 50; r--) {
@@ -814,7 +814,7 @@ public class DataRegionTest {
             columns,
             times.length);
 
-    dataRegion.insertTreeTablet(insertTabletNode2);
+    dataRegion.insertTablet(insertTabletNode2);
     dataRegion.asyncCloseAllWorkingTsFileProcessors();
     dataRegion.syncCloseAllWorkingTsFileProcessors();
 
@@ -878,7 +878,7 @@ public class DataRegionTest {
             columns,
             times.length);
 
-    dataRegion.insertTreeTablet(insertTabletNode1);
+    dataRegion.insertTablet(insertTabletNode1);
     dataRegion.asyncCloseAllWorkingTsFileProcessors();
 
     for (int r = 1249; r >= 50; r--) {
@@ -899,7 +899,7 @@ public class DataRegionTest {
             columns,
             times.length);
 
-    dataRegion.insertTreeTablet(insertTabletNode2);
+    dataRegion.insertTablet(insertTabletNode2);
     dataRegion.asyncCloseAllWorkingTsFileProcessors();
     dataRegion.syncCloseAllWorkingTsFileProcessors();
 
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java
index d016857ec49..770d0ce05f3 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java
@@ -303,12 +303,12 @@ public class TsFileProcessorTest {
     this.sgInfo.initTsFileProcessorInfo(processor);
     SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor);
     // Test Tablet
-    processor.insertTablet(genInsertTableNode(0, true), 0, 10, new 
TSStatus[10]);
+    processor.insertTablet(genInsertTableNode(0, true), 0, 10, new 
TSStatus[10], true);
     IMemTable memTable = processor.getWorkMemTable();
     Assert.assertEquals(1596808, memTable.getTVListsRamCost());
-    processor.insertTablet(genInsertTableNode(100, true), 0, 10, new 
TSStatus[10]);
+    processor.insertTablet(genInsertTableNode(100, true), 0, 10, new 
TSStatus[10], true);
     Assert.assertEquals(1596808, memTable.getTVListsRamCost());
-    processor.insertTablet(genInsertTableNode(200, true), 0, 10, new 
TSStatus[10]);
+    processor.insertTablet(genInsertTableNode(200, true), 0, 10, new 
TSStatus[10], true);
     Assert.assertEquals(1596808, memTable.getTVListsRamCost());
     Assert.assertEquals(90000, memTable.getTotalPointsNum());
     Assert.assertEquals(720360, memTable.memSize());
@@ -339,22 +339,22 @@ public class TsFileProcessorTest {
     this.sgInfo.initTsFileProcessorInfo(processor);
     SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor);
     // Test Tablet
-    processor.insertTablet(genInsertTableNode(0, true), 0, 10, new 
TSStatus[10]);
+    processor.insertTablet(genInsertTableNode(0, true), 0, 10, new 
TSStatus[10], true);
     IMemTable memTable = processor.getWorkMemTable();
     Assert.assertEquals(1596808, memTable.getTVListsRamCost());
-    processor.insertTablet(genInsertTableNodeFors3000ToS6000(0, true), 0, 10, 
new TSStatus[10]);
+    processor.insertTablet(genInsertTableNodeFors3000ToS6000(0, true), 0, 10, 
new TSStatus[10], true);
     Assert.assertEquals(3192808, memTable.getTVListsRamCost());
-    processor.insertTablet(genInsertTableNode(100, true), 0, 10, new 
TSStatus[10]);
+    processor.insertTablet(genInsertTableNode(100, true), 0, 10, new 
TSStatus[10], true);
     Assert.assertEquals(3192808, memTable.getTVListsRamCost());
-    processor.insertTablet(genInsertTableNodeFors3000ToS6000(100, true), 0, 
10, new TSStatus[10]);
+    processor.insertTablet(genInsertTableNodeFors3000ToS6000(100, true), 0, 
10, new TSStatus[10], true);
     Assert.assertEquals(3192808, memTable.getTVListsRamCost());
-    processor.insertTablet(genInsertTableNode(200, true), 0, 10, new 
TSStatus[10]);
+    processor.insertTablet(genInsertTableNode(200, true), 0, 10, new 
TSStatus[10], true);
     Assert.assertEquals(3192808, memTable.getTVListsRamCost());
-    processor.insertTablet(genInsertTableNodeFors3000ToS6000(200, true), 0, 
10, new TSStatus[10]);
+    processor.insertTablet(genInsertTableNodeFors3000ToS6000(200, true), 0, 
10, new TSStatus[10], true);
     Assert.assertEquals(3192808, memTable.getTVListsRamCost());
-    processor.insertTablet(genInsertTableNode(300, true), 0, 10, new 
TSStatus[10]);
+    processor.insertTablet(genInsertTableNode(300, true), 0, 10, new 
TSStatus[10], true);
     Assert.assertEquals(6385616, memTable.getTVListsRamCost());
-    processor.insertTablet(genInsertTableNodeFors3000ToS6000(300, true), 0, 
10, new TSStatus[10]);
+    processor.insertTablet(genInsertTableNodeFors3000ToS6000(300, true), 0, 
10, new TSStatus[10], true);
     Assert.assertEquals(6385616, memTable.getTVListsRamCost());
 
     Assert.assertEquals(240000, memTable.getTotalPointsNum());
@@ -393,12 +393,12 @@ public class TsFileProcessorTest {
     this.sgInfo.initTsFileProcessorInfo(processor);
     SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor);
     // Test tablet
-    processor.insertTablet(genInsertTableNode(0, false), 0, 10, new 
TSStatus[10]);
+    processor.insertTablet(genInsertTableNode(0, false), 0, 10, new 
TSStatus[10], true);
     IMemTable memTable = processor.getWorkMemTable();
     Assert.assertEquals(3192000, memTable.getTVListsRamCost());
-    processor.insertTablet(genInsertTableNode(100, false), 0, 10, new 
TSStatus[10]);
+    processor.insertTablet(genInsertTableNode(100, false), 0, 10, new 
TSStatus[10], true);
     Assert.assertEquals(3192000, memTable.getTVListsRamCost());
-    processor.insertTablet(genInsertTableNode(200, false), 0, 10, new 
TSStatus[10]);
+    processor.insertTablet(genInsertTableNode(200, false), 0, 10, new 
TSStatus[10], true);
     Assert.assertEquals(3192000, memTable.getTVListsRamCost());
     Assert.assertEquals(90000, memTable.getTotalPointsNum());
     Assert.assertEquals(1440000, memTable.memSize());
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/MemUtilsTest.java 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/MemUtilsTest.java
index 6b7208d10d3..bea447b649a 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/MemUtilsTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/MemUtilsTest.java
@@ -163,7 +163,7 @@ public class MemUtilsTest {
             null,
             columns,
             1);
-    Assert.assertEquals(sizeSum, MemUtils.getAlignedTabletSize(insertNode, 0, 
1));
+    Assert.assertEquals(sizeSum, MemUtils.getAlignedTabletSize(insertNode, 0, 
1, null));
   }
 
   /** This method tests MemUtils.getStringMem() and MemUtils.getDataPointMem() 
*/
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/datastructure/AlignedTVListTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/datastructure/AlignedTVListTest.java
index f7a0bb6e1eb..1b04f25d1a7 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/datastructure/AlignedTVListTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/datastructure/AlignedTVListTest.java
@@ -104,7 +104,7 @@ public class AlignedTVListTest {
     }
 
     tvList.putAlignedValues(
-        ArrayUtils.toPrimitive(timeList.toArray(new Long[0])), vectorArray, 
null, 0, 1000);
+        ArrayUtils.toPrimitive(timeList.toArray(new Long[0])), vectorArray, 
null, 0, 1000, null);
     for (long i = 0; i < tvList.rowCount; i++) {
       Assert.assertEquals(tvList.rowCount - i, tvList.getTime((int) i));
     }
@@ -132,7 +132,8 @@ public class AlignedTVListTest {
     }
 
     tvList.putAlignedValues(
-        ArrayUtils.toPrimitive(timeList.toArray(new Long[0])), vectorArray, 
bitMaps, 0, 1000);
+        ArrayUtils.toPrimitive(timeList.toArray(new Long[0])), vectorArray, 
bitMaps, 0, 1000,
+        null);
     for (long i = 0; i < tvList.rowCount; i++) {
       Assert.assertEquals(tvList.rowCount - i, tvList.getTime((int) i));
       if (i % 100 == 0) {
@@ -164,7 +165,8 @@ public class AlignedTVListTest {
     }
 
     tvList.putAlignedValues(
-        ArrayUtils.toPrimitive(timeList.toArray(new Long[0])), vectorArray, 
bitMaps, 0, 1000);
+        ArrayUtils.toPrimitive(timeList.toArray(new Long[0])), vectorArray, 
bitMaps, 0, 1000,
+        null);
 
     AlignedTVList clonedTvList = tvList.clone();
     for (long i = 0; i < tvList.rowCount; i++) {
@@ -220,7 +222,7 @@ public class AlignedTVListTest {
     }
 
     tvList.putAlignedValues(
-        ArrayUtils.toPrimitive(timeList.toArray(new Long[0])), vectorArray, 
bitMaps, 0, 10);
+        ArrayUtils.toPrimitive(timeList.toArray(new Long[0])), vectorArray, 
bitMaps, 0, 10, null);
     Assert.assertEquals(tvList.memoryBinaryChunkSize[1], 720);
 
     tvList.delete(5, 15);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java
index dd0d4662cc7..b0794d27f8f 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java
@@ -156,4 +156,5 @@ public class TsTable {
   public int hashCode() {
     return Objects.hash(tableName);
   }
+
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/TsTableColumnCategory.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/TsTableColumnCategory.java
index ca4faedffe1..ab7ae9bf6d5 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/TsTableColumnCategory.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/TsTableColumnCategory.java
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
+import org.apache.tsfile.write.record.Tablet.ColumnType;
 
 public enum TsTableColumnCategory {
   ID((byte) 0),
@@ -74,4 +75,17 @@ public enum TsTableColumnCategory {
         throw new IllegalArgumentException();
     }
   }
+
+  public ColumnType toTsFileColumnType() {
+    switch (this) {
+      case ID:
+        return ColumnType.ID;
+        case ATTRIBUTE:
+          return ColumnType.ATTRIBUTE;
+      case MEASUREMENT:
+        return ColumnType.MEASUREMENT;
+      default:
+        throw new IllegalArgumentException("Unsupported column type in TsFile: 
" + this);
+    }
+  }
 }

Reply via email to