This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch force_ci/object_type
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 07758a60f710db45a72553443e4c12a457152256
Author: Zhenyu Luo <[email protected]>
AuthorDate: Tue Dec 9 15:00:00 2025 +0800

    Pipe: Modify the TableRawReq deserialization method to support 
directconversion to TableStatement. (#16844)
    
    * Pipe: Modify the TableRawReq deserialization method to support direct 
conversion to TableStatement.
    
    * fix
    
    * fix
    
    * fix
    
    * fix
    
    * fix
    
    * update
    
    * update
    
    * update
    
    * refactor: optimize TabletStatementConverter according to code review
    
    - Optimize times array copy: skip copy when lengths are equal, use 
System.arraycopy
    - Add warning logs when times array is null or too small
    - Ensure all arrays (values, times, bitMaps) are copied to rowSize length 
for immutability
    - Filter out null columns when converting Statement to Tablet
    - Rename idColumnIndices to tagColumnIndices
    - Add skipString method to avoid constructing temporary objects
    - Add comments explaining skipped fields in readMeasurement
    - Use direct buffer position increment instead of reading bytes for skipping
    - Ensure all column values are copied to ensure immutability
    
    * update
    
    * update
    
    (cherry picked from commit 13b0582dfb1a63a3f242b9545304d0d9fdede5cd)
---
 .../request/PipeTransferTabletBatchReqV2.java      |   7 +-
 .../request/PipeTransferTabletRawReq.java          | 110 +++-
 .../request/PipeTransferTabletRawReqV2.java        |  50 +-
 .../pipe/sink/util/TabletStatementConverter.java   | 476 ++++++++++++++++
 .../sink/util/sorter/InsertEventDataAdapter.java   | 127 +++++
 .../util/sorter/InsertTabletStatementAdapter.java  | 118 ++++
 ...EventSorter.java => PipeInsertEventSorter.java} |  94 +++-
 .../sorter/PipeTableModelTabletEventSorter.java    |  67 ++-
 .../sorter/PipeTreeModelTabletEventSorter.java     |  48 +-
 .../db/pipe/sink/util/sorter/TabletAdapter.java    | 113 ++++
 .../plan/statement/crud/InsertBaseStatement.java   |  10 +
 .../plan/statement/crud/InsertTabletStatement.java | 197 +++++++
 .../pipe/sink/PipeDataNodeThriftRequestTest.java   |   4 +-
 .../db/pipe/sink/PipeStatementEventSorterTest.java | 313 +++++++++++
 .../sink/util/TabletStatementConverterTest.java    | 607 +++++++++++++++++++++
 15 files changed, 2257 insertions(+), 84 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReqV2.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReqV2.java
index d9c3fabfae8..c136ffbe7d3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReqV2.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReqV2.java
@@ -33,7 +33,6 @@ import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 
 import org.apache.tsfile.utils.PublicBAOS;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
-import org.apache.tsfile.write.record.Tablet;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -247,11 +246,7 @@ public class PipeTransferTabletBatchReqV2 extends 
TPipeTransferReq {
 
     size = ReadWriteIOUtils.readInt(transferReq.body);
     for (int i = 0; i < size; ++i) {
-      batchReq.tabletReqs.add(
-          PipeTransferTabletRawReqV2.toTPipeTransferRawReq(
-              Tablet.deserialize(transferReq.body),
-              ReadWriteIOUtils.readBool(transferReq.body),
-              ReadWriteIOUtils.readString(transferReq.body)));
+      
batchReq.tabletReqs.add(PipeTransferTabletRawReqV2.toTPipeTransferRawReq(transferReq.body));
     }
 
     batchReq.version = transferReq.version;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java
index 7da44f297d1..af9b37edbf6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java
@@ -22,6 +22,7 @@ package 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request;
 import org.apache.iotdb.commons.exception.MetadataException;
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType;
+import org.apache.iotdb.db.pipe.sink.util.TabletStatementConverter;
 import 
org.apache.iotdb.db.pipe.sink.util.sorter.PipeTreeModelTabletEventSorter;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
@@ -43,10 +44,25 @@ public class PipeTransferTabletRawReq extends 
TPipeTransferReq {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTransferTabletRawReq.class);
 
-  protected transient Tablet tablet;
+  protected transient InsertTabletStatement statement;
+
   protected transient boolean isAligned;
+  protected transient Tablet tablet;
 
+  /**
+   * Get Tablet. If tablet is null, convert from statement.
+   *
+   * @return Tablet object
+   */
   public Tablet getTablet() {
+    if (tablet == null && statement != null) {
+      try {
+        tablet = statement.convertToTablet();
+      } catch (final MetadataException e) {
+        LOGGER.warn("Failed to convert statement to tablet.", e);
+        return null;
+      }
+    }
     return tablet;
   }
 
@@ -54,16 +70,29 @@ public class PipeTransferTabletRawReq extends 
TPipeTransferReq {
     return isAligned;
   }
 
+  /**
+   * Construct Statement. If statement already exists, return it. Otherwise, 
convert from tablet.
+   *
+   * @return InsertTabletStatement
+   */
   public InsertTabletStatement constructStatement() {
+    if (statement != null) {
+      new 
PipeTreeModelTabletEventSorter(statement).deduplicateAndSortTimestampsIfNecessary();
+      return statement;
+    }
+
+    // Sort and deduplicate tablet before converting
     new 
PipeTreeModelTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary();
 
     try {
       if (isTabletEmpty(tablet)) {
         // Empty statement, will be filtered after construction
-        return new InsertTabletStatement();
+        statement = new InsertTabletStatement();
+        return statement;
       }
 
-      return new InsertTabletStatement(tablet, isAligned, null);
+      statement = new InsertTabletStatement(tablet, isAligned, null);
+      return statement;
     } catch (final MetadataException e) {
       LOGGER.warn("Generate Statement from tablet {} error.", tablet, e);
       return null;
@@ -107,8 +136,20 @@ public class PipeTransferTabletRawReq extends 
TPipeTransferReq {
   public static PipeTransferTabletRawReq fromTPipeTransferReq(final 
TPipeTransferReq transferReq) {
     final PipeTransferTabletRawReq tabletReq = new PipeTransferTabletRawReq();
 
-    tabletReq.tablet = Tablet.deserialize(transferReq.body);
-    tabletReq.isAligned = ReadWriteIOUtils.readBool(transferReq.body);
+    final ByteBuffer buffer = transferReq.body;
+    final int startPosition = buffer.position();
+    try {
+      // V1: no databaseName, readDatabaseName = false
+      final InsertTabletStatement insertTabletStatement =
+          
TabletStatementConverter.deserializeStatementFromTabletFormat(buffer, false);
+      tabletReq.isAligned = insertTabletStatement.isAligned();
+      // devicePath is already set in deserializeStatementFromTabletFormat for 
V1 format
+      tabletReq.statement = insertTabletStatement;
+    } catch (final Exception e) {
+      buffer.position(startPosition);
+      tabletReq.tablet = Tablet.deserialize(buffer);
+      tabletReq.isAligned = ReadWriteIOUtils.readBool(buffer);
+    }
 
     tabletReq.version = transferReq.version;
     tabletReq.type = transferReq.type;
@@ -118,18 +159,56 @@ public class PipeTransferTabletRawReq extends 
TPipeTransferReq {
 
   /////////////////////////////// Air Gap ///////////////////////////////
 
-  public static byte[] toTPipeTransferBytes(final Tablet tablet, final boolean 
isAligned)
-      throws IOException {
+  /**
+   * Serialize to bytes. If tablet is null, convert from statement first.
+   *
+   * @return serialized bytes
+   * @throws IOException if serialization fails
+   */
+  public byte[] toTPipeTransferBytes() throws IOException {
+    Tablet tabletToSerialize = tablet;
+    boolean isAlignedToSerialize = isAligned;
+
+    // If tablet is null, convert from statement
+    if (tabletToSerialize == null && statement != null) {
+      try {
+        tabletToSerialize = statement.convertToTablet();
+        isAlignedToSerialize = statement.isAligned();
+      } catch (final MetadataException e) {
+        throw new IOException("Failed to convert statement to tablet for 
serialization", e);
+      }
+    }
+
+    if (tabletToSerialize == null) {
+      throw new IOException("Cannot serialize: both tablet and statement are 
null");
+    }
+
     try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
         final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
       ReadWriteIOUtils.write(IoTDBSinkRequestVersion.VERSION_1.getVersion(), 
outputStream);
       ReadWriteIOUtils.write(PipeRequestType.TRANSFER_TABLET_RAW.getType(), 
outputStream);
-      tablet.serialize(outputStream);
-      ReadWriteIOUtils.write(isAligned, outputStream);
+      tabletToSerialize.serialize(outputStream);
+      ReadWriteIOUtils.write(isAlignedToSerialize, outputStream);
       return byteArrayOutputStream.toByteArray();
     }
   }
 
+  /**
+   * Static method for backward compatibility. Creates a temporary instance 
and serializes.
+   *
+   * @param tablet Tablet to serialize
+   * @param isAligned whether aligned
+   * @return serialized bytes
+   * @throws IOException if serialization fails
+   */
+  public static byte[] toTPipeTransferBytes(final Tablet tablet, final boolean 
isAligned)
+      throws IOException {
+    final PipeTransferTabletRawReq req = new PipeTransferTabletRawReq();
+    req.tablet = tablet;
+    req.isAligned = isAligned;
+    return req.toTPipeTransferBytes();
+  }
+
   /////////////////////////////// Object ///////////////////////////////
 
   @Override
@@ -141,7 +220,16 @@ public class PipeTransferTabletRawReq extends 
TPipeTransferReq {
       return false;
     }
     final PipeTransferTabletRawReq that = (PipeTransferTabletRawReq) obj;
-    return Objects.equals(tablet, that.tablet)
+    // Compare statement if both have it, otherwise compare tablet
+    if (statement != null && that.statement != null) {
+      return Objects.equals(statement, that.statement)
+          && isAligned == that.isAligned
+          && version == that.version
+          && type == that.type
+          && Objects.equals(body, that.body);
+    }
+    // Fallback to tablet comparison
+    return Objects.equals(getTablet(), that.getTablet())
         && isAligned == that.isAligned
         && version == that.version
         && type == that.type
@@ -150,6 +238,6 @@ public class PipeTransferTabletRawReq extends 
TPipeTransferReq {
 
   @Override
   public int hashCode() {
-    return Objects.hash(tablet, isAligned, version, type, body);
+    return Objects.hash(getTablet(), isAligned, version, type, body);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReqV2.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReqV2.java
index 43d8501252c..3c5f420a317 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReqV2.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReqV2.java
@@ -22,6 +22,7 @@ package 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request;
 import org.apache.iotdb.commons.exception.MetadataException;
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType;
+import org.apache.iotdb.db.pipe.sink.util.TabletStatementConverter;
 import 
org.apache.iotdb.db.pipe.sink.util.sorter.PipeTableModelTabletEventSorter;
 import 
org.apache.iotdb.db.pipe.sink.util.sorter.PipeTreeModelTabletEventSorter;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
@@ -52,6 +53,16 @@ public class PipeTransferTabletRawReqV2 extends 
PipeTransferTabletRawReq {
 
   @Override
   public InsertTabletStatement constructStatement() {
+    if (statement != null) {
+      if (Objects.isNull(dataBaseName)) {
+        new 
PipeTreeModelTabletEventSorter(statement).deduplicateAndSortTimestampsIfNecessary();
+      } else {
+        new 
PipeTableModelTabletEventSorter(statement).sortByTimestampIfNecessary();
+      }
+
+      return statement;
+    }
+
     if (Objects.isNull(dataBaseName)) {
       new 
PipeTreeModelTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary();
     } else {
@@ -86,6 +97,16 @@ public class PipeTransferTabletRawReqV2 extends 
PipeTransferTabletRawReq {
     return tabletReq;
   }
 
+  public static PipeTransferTabletRawReqV2 toTPipeTransferRawReq(final 
ByteBuffer buffer) {
+    final PipeTransferTabletRawReqV2 tabletReq = new 
PipeTransferTabletRawReqV2();
+
+    tabletReq.deserializeTPipeTransferRawReq(buffer);
+    tabletReq.version = IoTDBSinkRequestVersion.VERSION_1.getVersion();
+    tabletReq.type = PipeRequestType.TRANSFER_TABLET_RAW_V2.getType();
+
+    return tabletReq;
+  }
+
   /////////////////////////////// Thrift ///////////////////////////////
 
   public static PipeTransferTabletRawReqV2 toTPipeTransferReq(
@@ -114,13 +135,11 @@ public class PipeTransferTabletRawReqV2 extends 
PipeTransferTabletRawReq {
       final TPipeTransferReq transferReq) {
     final PipeTransferTabletRawReqV2 tabletReq = new 
PipeTransferTabletRawReqV2();
 
-    tabletReq.tablet = Tablet.deserialize(transferReq.body);
-    tabletReq.isAligned = ReadWriteIOUtils.readBool(transferReq.body);
-    tabletReq.dataBaseName = ReadWriteIOUtils.readString(transferReq.body);
+    tabletReq.deserializeTPipeTransferRawReq(transferReq.body);
+    tabletReq.body = transferReq.body;
 
     tabletReq.version = transferReq.version;
     tabletReq.type = transferReq.type;
-    tabletReq.body = transferReq.body;
 
     return tabletReq;
   }
@@ -161,4 +180,27 @@ public class PipeTransferTabletRawReqV2 extends 
PipeTransferTabletRawReq {
   public int hashCode() {
     return Objects.hash(super.hashCode(), dataBaseName);
   }
+
+  /////////////////////////////// Util ///////////////////////////////
+
+  public void deserializeTPipeTransferRawReq(final ByteBuffer buffer) {
+    final int startPosition = buffer.position();
+    try {
+      // V2: read databaseName, readDatabaseName = true
+      final InsertTabletStatement insertTabletStatement =
+          
TabletStatementConverter.deserializeStatementFromTabletFormat(buffer, true);
+      this.isAligned = insertTabletStatement.isAligned();
+      // databaseName is already set in deserializeStatementFromTabletFormat 
when
+      // readDatabaseName=true
+      this.dataBaseName = insertTabletStatement.getDatabaseName().orElse(null);
+      this.statement = insertTabletStatement;
+    } catch (final Exception e) {
+      // If Statement deserialization fails, fallback to Tablet format
+      // Reset buffer position for Tablet deserialization
+      buffer.position(startPosition);
+      this.tablet = Tablet.deserialize(buffer);
+      this.isAligned = ReadWriteIOUtils.readBool(buffer);
+      this.dataBaseName = ReadWriteIOUtils.readString(buffer);
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java
new file mode 100644
index 00000000000..c5b9ebed4d5
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java
@@ -0,0 +1,476 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.sink.util;
+
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
+import org.apache.iotdb.db.pipe.resource.memory.InsertNodeMemoryEstimator;
+import 
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+
+import org.apache.tsfile.enums.ColumnCategory;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.BitMap;
+import org.apache.tsfile.utils.BytesUtils;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+import org.apache.tsfile.write.UnSupportedDataTypeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+/**
+ * Utility class for converting between InsertTabletStatement and Tablet 
format ByteBuffer. This
+ * avoids creating intermediate Tablet objects and directly converts between 
formats with only the
+ * fields needed.
+ */
+public class TabletStatementConverter {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TabletStatementConverter.class);
+
+  // Memory calculation constants - extracted from RamUsageEstimator for 
better performance
+  private static final long NUM_BYTES_ARRAY_HEADER =
+      org.apache.tsfile.utils.RamUsageEstimator.NUM_BYTES_ARRAY_HEADER;
+  private static final long NUM_BYTES_OBJECT_REF =
+      org.apache.tsfile.utils.RamUsageEstimator.NUM_BYTES_OBJECT_REF;
+  private static final long NUM_BYTES_OBJECT_HEADER =
+      org.apache.tsfile.utils.RamUsageEstimator.NUM_BYTES_OBJECT_HEADER;
+  private static final long SIZE_OF_ARRAYLIST =
+      
org.apache.tsfile.utils.RamUsageEstimator.shallowSizeOfInstance(java.util.ArrayList.class);
+  private static final long SIZE_OF_BITMAP =
+      org.apache.tsfile.utils.RamUsageEstimator.shallowSizeOfInstance(
+          org.apache.tsfile.utils.BitMap.class);
+
+  private TabletStatementConverter() {
+    // Utility class, no instantiation
+  }
+
+  /**
+   * Deserialize InsertTabletStatement from Tablet format ByteBuffer.
+   *
+   * @param byteBuffer ByteBuffer containing serialized data
+   * @param readDatabaseName whether to read databaseName from buffer (for V2 
format)
+   * @return InsertTabletStatement with all fields set, including devicePath
+   */
+  public static InsertTabletStatement deserializeStatementFromTabletFormat(
+      final ByteBuffer byteBuffer, final boolean readDatabaseName) throws 
IllegalPathException {
+    final InsertTabletStatement statement = new InsertTabletStatement();
+
+    // Calculate memory size during deserialization, use INSTANCE_SIZE constant
+    long memorySize = InsertTabletStatement.getInstanceSize();
+
+    final String insertTargetName = ReadWriteIOUtils.readString(byteBuffer);
+
+    final int rowSize = ReadWriteIOUtils.readInt(byteBuffer);
+
+    // deserialize schemas
+    final int schemaSize =
+        BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer))
+            ? ReadWriteIOUtils.readInt(byteBuffer)
+            : 0;
+    final String[] measurement = new String[schemaSize];
+    final TsTableColumnCategory[] columnCategories = new 
TsTableColumnCategory[schemaSize];
+    final TSDataType[] dataTypes = new TSDataType[schemaSize];
+
+    // Calculate memory for arrays headers and references during 
deserialization
+    // measurements array: array header + object references
+    long measurementMemorySize =
+        org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize(
+            NUM_BYTES_ARRAY_HEADER + NUM_BYTES_OBJECT_REF * schemaSize);
+
+    // dataTypes array: shallow size (array header + references)
+    long dataTypesMemorySize =
+        org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize(
+            NUM_BYTES_ARRAY_HEADER + NUM_BYTES_OBJECT_REF * schemaSize);
+
+    // columnCategories array: shallow size (array header + references)
+    long columnCategoriesMemorySize =
+        org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize(
+            NUM_BYTES_ARRAY_HEADER + NUM_BYTES_OBJECT_REF * schemaSize);
+
+    // tagColumnIndices (TAG columns): ArrayList base + array header
+    long tagColumnIndicesSize = SIZE_OF_ARRAYLIST;
+    tagColumnIndicesSize += NUM_BYTES_ARRAY_HEADER;
+
+    // Deserialize and calculate memory in the same loop
+    for (int i = 0; i < schemaSize; i++) {
+      final boolean hasSchema = 
BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
+      if (hasSchema) {
+        final Pair<String, TSDataType> pair = readMeasurement(byteBuffer);
+        measurement[i] = pair.getLeft();
+        dataTypes[i] = pair.getRight();
+        columnCategories[i] =
+            TsTableColumnCategory.fromTsFileColumnCategory(
+                ColumnCategory.values()[byteBuffer.get()]);
+
+        // Calculate memory for each measurement string
+        if (measurement[i] != null) {
+          measurementMemorySize += 
org.apache.tsfile.utils.RamUsageEstimator.sizeOf(measurement[i]);
+        }
+
+        // Calculate memory for TAG column indices
+        if (columnCategories[i] != null && 
columnCategories[i].equals(TsTableColumnCategory.TAG)) {
+          tagColumnIndicesSize +=
+              org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize(
+                      Integer.BYTES + NUM_BYTES_OBJECT_HEADER)
+                  + NUM_BYTES_OBJECT_REF;
+        }
+      }
+    }
+
+    // Add all calculated memory to total
+    memorySize += measurementMemorySize;
+    memorySize += dataTypesMemorySize;
+
+    // deserialize times and calculate memory during deserialization
+    final long[] times = new long[rowSize];
+    // Calculate memory: array header + long size * rowSize
+    final long timesMemorySize =
+        org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize(
+            NUM_BYTES_ARRAY_HEADER + (long) Long.BYTES * rowSize);
+
+    final boolean isTimesNotNull = 
BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
+    if (isTimesNotNull) {
+      for (int i = 0; i < rowSize; i++) {
+        times[i] = ReadWriteIOUtils.readLong(byteBuffer);
+      }
+    }
+
+    // Add times memory to total
+    memorySize += timesMemorySize;
+
+    // deserialize bitmaps and calculate memory during deserialization
+    final BitMap[] bitMaps;
+    final long bitMapsMemorySize;
+
+    final boolean isBitMapsNotNull = 
BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
+    if (isBitMapsNotNull) {
+      // Use the method that returns both BitMap array and memory size
+      final Pair<BitMap[], Long> bitMapsAndMemory =
+          readBitMapsFromBufferWithMemory(byteBuffer, schemaSize);
+      bitMaps = bitMapsAndMemory.getLeft();
+      bitMapsMemorySize = bitMapsAndMemory.getRight();
+    } else {
+      // Calculate memory for empty BitMap array: array header + references
+      bitMaps = new BitMap[schemaSize];
+      bitMapsMemorySize =
+          org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize(
+              NUM_BYTES_ARRAY_HEADER + NUM_BYTES_OBJECT_REF * schemaSize);
+    }
+
+    // Add bitMaps memory to total
+    memorySize += bitMapsMemorySize;
+
+    // Deserialize values and calculate memory during deserialization
+    final Object[] values;
+    final long valuesMemorySize;
+
+    final boolean isValuesNotNull = 
BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
+    if (isValuesNotNull) {
+      // Use the method that returns both values array and memory size
+      final Pair<Object[], Long> valuesAndMemory =
+          readValuesFromBufferWithMemory(byteBuffer, dataTypes, schemaSize, 
rowSize);
+      values = valuesAndMemory.getLeft();
+      valuesMemorySize = valuesAndMemory.getRight();
+    } else {
+      // Calculate memory for empty values array: array header + references
+      values = new Object[schemaSize];
+      valuesMemorySize =
+          org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize(
+              NUM_BYTES_ARRAY_HEADER + NUM_BYTES_OBJECT_REF * schemaSize);
+    }
+
+    // Add values memory to total
+    memorySize += valuesMemorySize;
+
+    final boolean isAligned = ReadWriteIOUtils.readBoolean(byteBuffer);
+
+    statement.setMeasurements(measurement);
+    statement.setTimes(times);
+    statement.setBitMaps(bitMaps);
+    statement.setDataTypes(dataTypes);
+    statement.setColumns(values);
+    statement.setRowCount(rowSize);
+    statement.setAligned(isAligned);
+
+    // Read databaseName if requested (V2 format)
+    if (readDatabaseName) {
+      final String databaseName = ReadWriteIOUtils.readString(byteBuffer);
+      if (databaseName != null) {
+        statement.setDatabaseName(databaseName);
+        statement.setWriteToTable(true);
+        // For table model, insertTargetName is table name, convert to 
lowercase
+        statement.setDevicePath(new 
PartialPath(insertTargetName.toLowerCase(), false));
+        // Calculate memory for databaseName
+        memorySize += 
org.apache.tsfile.utils.RamUsageEstimator.sizeOf(databaseName);
+
+        statement.setColumnCategories(columnCategories);
+
+        memorySize += columnCategoriesMemorySize;
+        memorySize += tagColumnIndicesSize;
+      } else {
+        // For tree model, use DataNodeDevicePathCache
+        statement.setDevicePath(
+            
DataNodeDevicePathCache.getInstance().getPartialPath(insertTargetName));
+        statement.setColumnCategories(null);
+      }
+    } else {
+      // V1 format: no databaseName in buffer, always use 
DataNodeDevicePathCache
+      statement.setDevicePath(
+          
DataNodeDevicePathCache.getInstance().getPartialPath(insertTargetName));
+      statement.setColumnCategories(null);
+    }
+
+    // Calculate memory for devicePath
+    memorySize += 
InsertNodeMemoryEstimator.sizeOfPartialPath(statement.getDevicePath());
+
+    // Set the pre-calculated memory size to avoid recalculation
+    statement.setRamBytesUsed(memorySize);
+
+    return statement;
+  }
+
+  /**
+   * Deserialize InsertTabletStatement from Tablet format ByteBuffer (V1 
format, no databaseName).
+   *
+   * @param byteBuffer ByteBuffer containing serialized data
+   * @return InsertTabletStatement with devicePath set using 
DataNodeDevicePathCache
+   */
+  public static InsertTabletStatement deserializeStatementFromTabletFormat(
+      final ByteBuffer byteBuffer) throws IllegalPathException {
+    return deserializeStatementFromTabletFormat(byteBuffer, false);
+  }
+
+  /**
+   * Skip a string in ByteBuffer without reading it. This is more efficient 
than reading and
+   * discarding the string.
+   *
+   * @param buffer ByteBuffer to skip string from
+   */
+  private static void skipString(final ByteBuffer buffer) {
+    final int size = ReadWriteIOUtils.readInt(buffer);
+    if (size > 0) {
+      buffer.position(buffer.position() + size);
+    }
+  }
+
+  /**
+   * Read measurement name and data type from buffer, skipping other 
measurement schema fields
+   * (encoding, compression, and tags/attributes) that are not needed for 
InsertTabletStatement.
+   *
+   * @param buffer ByteBuffer containing serialized measurement schema
+   * @return Pair of measurement name and data type
+   */
+  private static Pair<String, TSDataType> readMeasurement(final ByteBuffer 
buffer) {
+    // Read measurement name and data type
+    final Pair<String, TSDataType> pair =
+        new Pair<>(ReadWriteIOUtils.readString(buffer), 
TSDataType.deserializeFrom(buffer));
+
+    // Skip encoding type (byte) and compression type (byte) - 2 bytes total
+    buffer.position(buffer.position() + 2);
+
+    // Skip props map (Map<String, String>)
+    final int size = ReadWriteIOUtils.readInt(buffer);
+    if (size > 0) {
+      for (int i = 0; i < size; i++) {
+        // Skip key (String) and value (String) without constructing temporary 
objects
+        skipString(buffer);
+        skipString(buffer);
+      }
+    }
+
+    return pair;
+  }
+
+  /**
+   * Deserialize bitmaps and calculate memory size during deserialization. 
Returns a Pair of BitMap
+   * array and the calculated memory size.
+   */
+  private static Pair<BitMap[], Long> readBitMapsFromBufferWithMemory(
+      final ByteBuffer byteBuffer, final int columns) {
+    final BitMap[] bitMaps = new BitMap[columns];
+
+    // Calculate memory: array header + object references
+    long memorySize =
+        org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize(
+            NUM_BYTES_ARRAY_HEADER + NUM_BYTES_OBJECT_REF * columns);
+
+    for (int i = 0; i < columns; i++) {
+      final boolean hasBitMap = 
BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
+      if (hasBitMap) {
+        final int size = ReadWriteIOUtils.readInt(byteBuffer);
+        final Binary valueBinary = ReadWriteIOUtils.readBinary(byteBuffer);
+        final byte[] byteArray = valueBinary.getValues();
+        bitMaps[i] = new BitMap(size, byteArray);
+
+        // Calculate memory for this BitMap: BitMap object + byte array
+        // BitMap shallow size + byte array (array header + array length)
+        memorySize +=
+            SIZE_OF_BITMAP
+                + org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize(
+                    NUM_BYTES_ARRAY_HEADER + byteArray.length);
+      }
+    }
+
+    return new Pair<>(bitMaps, memorySize);
+  }
+
+  /**
+   * Deserialize values from buffer and calculate memory size during 
deserialization. Returns a Pair
+   * of values array and the calculated memory size.
+   *
+   * @param byteBuffer data values
+   * @param types data types
+   * @param columns column number
+   * @param rowSize row number
+   * @return Pair of values array and memory size
+   */
+  @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity 
warning
+  private static Pair<Object[], Long> readValuesFromBufferWithMemory(
+      final ByteBuffer byteBuffer, final TSDataType[] types, final int 
columns, final int rowSize) {
+    final Object[] values = new Object[columns];
+
+    // Calculate memory: array header + object references
+    long memorySize =
+        org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize(
+            NUM_BYTES_ARRAY_HEADER + NUM_BYTES_OBJECT_REF * columns);
+
+    for (int i = 0; i < columns; i++) {
+      final boolean isValueColumnsNotNull =
+          BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
+      if (isValueColumnsNotNull && types[i] == null) {
+        continue;
+      }
+
+      switch (types[i]) {
+        case BOOLEAN:
+          final boolean[] boolValues = new boolean[rowSize];
+          if (isValueColumnsNotNull) {
+            for (int index = 0; index < rowSize; index++) {
+              boolValues[index] = 
BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
+            }
+          }
+          values[i] = boolValues;
+          // Calculate memory for boolean array: array header + 1 byte per 
element (aligned)
+          memorySize +=
+              org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize(
+                  NUM_BYTES_ARRAY_HEADER + rowSize);
+          break;
+        case INT32:
+        case DATE:
+          final int[] intValues = new int[rowSize];
+          if (isValueColumnsNotNull) {
+            for (int index = 0; index < rowSize; index++) {
+              intValues[index] = ReadWriteIOUtils.readInt(byteBuffer);
+            }
+          }
+          values[i] = intValues;
+          // Calculate memory for int array: array header + 4 bytes per 
element (aligned)
+          memorySize +=
+              org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize(
+                  NUM_BYTES_ARRAY_HEADER + (long) Integer.BYTES * rowSize);
+          break;
+        case INT64:
+        case TIMESTAMP:
+          final long[] longValues = new long[rowSize];
+          if (isValueColumnsNotNull) {
+            for (int index = 0; index < rowSize; index++) {
+              longValues[index] = ReadWriteIOUtils.readLong(byteBuffer);
+            }
+          }
+          values[i] = longValues;
+          // Calculate memory for long array: array header + 8 bytes per 
element (aligned)
+          memorySize +=
+              org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize(
+                  NUM_BYTES_ARRAY_HEADER + (long) Long.BYTES * rowSize);
+          break;
+        case FLOAT:
+          final float[] floatValues = new float[rowSize];
+          if (isValueColumnsNotNull) {
+            for (int index = 0; index < rowSize; index++) {
+              floatValues[index] = ReadWriteIOUtils.readFloat(byteBuffer);
+            }
+          }
+          values[i] = floatValues;
+          // Calculate memory for float array: array header + 4 bytes per 
element (aligned)
+          memorySize +=
+              org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize(
+                  NUM_BYTES_ARRAY_HEADER + (long) Float.BYTES * rowSize);
+          break;
+        case DOUBLE:
+          final double[] doubleValues = new double[rowSize];
+          if (isValueColumnsNotNull) {
+            for (int index = 0; index < rowSize; index++) {
+              doubleValues[index] = ReadWriteIOUtils.readDouble(byteBuffer);
+            }
+          }
+          values[i] = doubleValues;
+          // Calculate memory for double array: array header + 8 bytes per 
element (aligned)
+          memorySize +=
+              org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize(
+                  NUM_BYTES_ARRAY_HEADER + (long) Double.BYTES * rowSize);
+          break;
+        case TEXT:
+        case STRING:
+        case BLOB:
+        case OBJECT:
+          // Handle object array type: Binary[] is an array of objects
+          final Binary[] binaryValues = new Binary[rowSize];
+          // Calculate memory for Binary array: array header + object 
references
+          long binaryArrayMemory =
+              org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize(
+                  NUM_BYTES_ARRAY_HEADER + NUM_BYTES_OBJECT_REF * rowSize);
+
+          if (isValueColumnsNotNull) {
+            for (int index = 0; index < rowSize; index++) {
+              final boolean isNotNull =
+                  BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
+              if (isNotNull) {
+                binaryValues[index] = ReadWriteIOUtils.readBinary(byteBuffer);
+                // Calculate memory for each Binary object during 
deserialization
+                binaryArrayMemory += binaryValues[index].ramBytesUsed();
+              } else {
+                binaryValues[index] = Binary.EMPTY_VALUE;
+                // EMPTY_VALUE also has memory cost
+                binaryArrayMemory += Binary.EMPTY_VALUE.ramBytesUsed();
+              }
+            }
+          } else {
+            Arrays.fill(binaryValues, Binary.EMPTY_VALUE);
+            // Calculate memory for all EMPTY_VALUE
+            binaryArrayMemory += (long) rowSize * 
Binary.EMPTY_VALUE.ramBytesUsed();
+          }
+          values[i] = binaryValues;
+          // Add calculated Binary array memory to total
+          memorySize += binaryArrayMemory;
+          break;
+        default:
+          throw new UnSupportedDataTypeException(
+              String.format("data type %s is not supported when convert data 
at client", types[i]));
+      }
+    }
+
+    return new Pair<>(values, memorySize);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/InsertEventDataAdapter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/InsertEventDataAdapter.java
new file mode 100644
index 00000000000..62111cb4129
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/InsertEventDataAdapter.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.sink.util.sorter;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.utils.BitMap;
+
+/**
+ * Adapter interface to encapsulate common operations needed for sorting and 
deduplication. This
+ * interface allows the sorter to work with both Tablet and 
InsertTabletStatement.
+ */
+public interface InsertEventDataAdapter {
+
+  /**
+   * Get the number of columns.
+   *
+   * @return number of columns
+   */
+  int getColumnCount();
+
+  /**
+   * Get data type for a specific column.
+   *
+   * @param columnIndex column index
+   * @return data type of the column
+   */
+  TSDataType getDataType(int columnIndex);
+
+  /**
+   * Get bit maps for null values.
+   *
+   * @return array of bit maps, may be null
+   */
+  BitMap[] getBitMaps();
+
+  /**
+   * Set bit maps for null values.
+   *
+   * @param bitMaps array of bit maps
+   */
+  void setBitMaps(BitMap[] bitMaps);
+
+  /**
+   * Get value arrays for all columns.
+   *
+   * @return array of value arrays (Object[])
+   */
+  Object[] getValues();
+
+  /**
+   * Set value array for a specific column.
+   *
+   * @param columnIndex column index
+   * @param value value array
+   */
+  void setValue(int columnIndex, Object value);
+
+  /**
+   * Get timestamps array.
+   *
+   * @return array of timestamps
+   */
+  long[] getTimestamps();
+
+  /**
+   * Set timestamps array.
+   *
+   * @param timestamps array of timestamps
+   */
+  void setTimestamps(long[] timestamps);
+
+  /**
+   * Get row size/count.
+   *
+   * @return number of rows
+   */
+  int getRowSize();
+
+  /**
+   * Set row size/count.
+   *
+   * @param rowSize number of rows
+   */
+  void setRowSize(int rowSize);
+
+  /**
+   * Get timestamp at a specific row index.
+   *
+   * @param rowIndex row index
+   * @return timestamp value
+   */
+  long getTimestamp(int rowIndex);
+
+  /**
+   * Get device ID at a specific row index (for table model).
+   *
+   * @param rowIndex row index
+   * @return device ID
+   */
+  IDeviceID getDeviceID(int rowIndex);
+
+  /**
+   * Check if the DATE type column value is stored as LocalDate[] (Tablet) or 
int[] (Statement).
+   *
+   * @param columnIndex column index
+   * @return true if DATE type is stored as LocalDate[], false if stored as 
int[]
+   */
+  boolean isDateStoredAsLocalDate(int columnIndex);
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/InsertTabletStatementAdapter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/InsertTabletStatementAdapter.java
new file mode 100644
index 00000000000..30f74a22965
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/InsertTabletStatementAdapter.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.sink.util.sorter;
+
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.utils.BitMap;
+
+/** Adapter for InsertTabletStatement to implement InsertEventDataAdapter 
interface. */
+public class InsertTabletStatementAdapter implements InsertEventDataAdapter {
+
+  private final InsertTabletStatement statement;
+
+  public InsertTabletStatementAdapter(final InsertTabletStatement statement) {
+    this.statement = statement;
+  }
+
+  @Override
+  public int getColumnCount() {
+    final Object[] columns = statement.getColumns();
+    return columns != null ? columns.length : 0;
+  }
+
+  @Override
+  public TSDataType getDataType(int columnIndex) {
+    final TSDataType[] dataTypes = statement.getDataTypes();
+    if (dataTypes != null && columnIndex < dataTypes.length) {
+      return dataTypes[columnIndex];
+    }
+    return null;
+  }
+
+  @Override
+  public BitMap[] getBitMaps() {
+    return statement.getBitMaps();
+  }
+
+  @Override
+  public void setBitMaps(BitMap[] bitMaps) {
+    statement.setBitMaps(bitMaps);
+  }
+
+  @Override
+  public Object[] getValues() {
+    return statement.getColumns();
+  }
+
+  @Override
+  public void setValue(int columnIndex, Object value) {
+    Object[] columns = statement.getColumns();
+    if (columns != null && columnIndex < columns.length) {
+      columns[columnIndex] = value;
+    }
+  }
+
+  @Override
+  public long[] getTimestamps() {
+    return statement.getTimes();
+  }
+
+  @Override
+  public void setTimestamps(long[] timestamps) {
+    statement.setTimes(timestamps);
+  }
+
+  @Override
+  public int getRowSize() {
+    return statement.getRowCount();
+  }
+
+  @Override
+  public void setRowSize(int rowSize) {
+    statement.setRowCount(rowSize);
+  }
+
+  @Override
+  public long getTimestamp(int rowIndex) {
+    long[] times = statement.getTimes();
+    if (times != null && rowIndex < times.length) {
+      return times[rowIndex];
+    }
+    return 0;
+  }
+
+  @Override
+  public IDeviceID getDeviceID(int rowIndex) {
+    return statement.getTableDeviceID(rowIndex);
+  }
+
+  @Override
+  public boolean isDateStoredAsLocalDate(int columnIndex) {
+    // InsertTabletStatement stores DATE as int[]
+    return false;
+  }
+
+  public InsertTabletStatement getStatement() {
+    return statement;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeTabletEventSorter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeInsertEventSorter.java
similarity index 65%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeTabletEventSorter.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeInsertEventSorter.java
index 6540bf9855d..46a3fc6df94 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeTabletEventSorter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeInsertEventSorter.java
@@ -19,19 +19,20 @@
 
 package org.apache.iotdb.db.pipe.sink.util.sorter;
 
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.utils.Binary;
 import org.apache.tsfile.utils.BitMap;
 import org.apache.tsfile.write.UnSupportedDataTypeException;
 import org.apache.tsfile.write.record.Tablet;
-import org.apache.tsfile.write.schema.IMeasurementSchema;
 
 import java.time.LocalDate;
 import java.util.Objects;
 
-public class PipeTabletEventSorter {
+public class PipeInsertEventSorter {
 
-  protected final Tablet tablet;
+  protected final InsertEventDataAdapter dataAdapter;
 
   protected Integer[] index;
   protected boolean isSorted = true;
@@ -39,8 +40,31 @@ public class PipeTabletEventSorter {
   protected int[] deDuplicatedIndex;
   protected int deDuplicatedSize;
 
-  public PipeTabletEventSorter(final Tablet tablet) {
-    this.tablet = tablet;
+  /**
+   * Constructor for Tablet.
+   *
+   * @param tablet the tablet to sort
+   */
+  public PipeInsertEventSorter(final Tablet tablet) {
+    this.dataAdapter = new TabletAdapter(tablet);
+  }
+
+  /**
+   * Constructor for InsertTabletStatement.
+   *
+   * @param statement the insert tablet statement to sort
+   */
+  public PipeInsertEventSorter(final InsertTabletStatement statement) {
+    this.dataAdapter = new InsertTabletStatementAdapter(statement);
+  }
+
+  /**
+   * Constructor with adapter (for internal use or advanced scenarios).
+   *
+   * @param adapter the data adapter
+   */
+  protected PipeInsertEventSorter(final InsertEventDataAdapter adapter) {
+    this.dataAdapter = adapter;
   }
 
   // Input:
@@ -54,35 +78,42 @@ public class PipeTabletEventSorter {
   // (Used index: [2(3), 4(0)])
   // Col: [6, 1]
   protected void sortAndMayDeduplicateValuesAndBitMaps() {
-    int columnIndex = 0;
-    for (int i = 0, size = tablet.getSchemas().size(); i < size; i++) {
-      final IMeasurementSchema schema = tablet.getSchemas().get(i);
-      if (schema != null) {
+    final int columnCount = dataAdapter.getColumnCount();
+    BitMap[] bitMaps = dataAdapter.getBitMaps();
+    boolean bitMapsModified = false;
+
+    for (int columnIndex = 0; columnIndex < columnCount; columnIndex++) {
+      final TSDataType dataType = dataAdapter.getDataType(columnIndex);
+      if (dataType != null) {
         BitMap deDuplicatedBitMap = null;
         BitMap originalBitMap = null;
-        if (tablet.getBitMaps() != null && tablet.getBitMaps()[columnIndex] != 
null) {
-          originalBitMap = tablet.getBitMaps()[columnIndex];
+        if (bitMaps != null && columnIndex < bitMaps.length && 
bitMaps[columnIndex] != null) {
+          originalBitMap = bitMaps[columnIndex];
           deDuplicatedBitMap = new BitMap(originalBitMap.getSize());
         }
 
-        tablet.getValues()[columnIndex] =
+        final Object[] values = dataAdapter.getValues();
+        final Object reorderedValue =
             reorderValueListAndBitMap(
-                tablet.getValues()[columnIndex],
-                schema.getType(),
-                originalBitMap,
-                deDuplicatedBitMap);
+                values[columnIndex], dataType, columnIndex, originalBitMap, 
deDuplicatedBitMap);
+        dataAdapter.setValue(columnIndex, reorderedValue);
 
-        if (tablet.getBitMaps() != null && tablet.getBitMaps()[columnIndex] != 
null) {
-          tablet.getBitMaps()[columnIndex] = deDuplicatedBitMap;
+        if (bitMaps != null && columnIndex < bitMaps.length && 
bitMaps[columnIndex] != null) {
+          bitMaps[columnIndex] = deDuplicatedBitMap;
+          bitMapsModified = true;
         }
-        columnIndex++;
       }
     }
+
+    if (bitMapsModified) {
+      dataAdapter.setBitMaps(bitMaps);
+    }
   }
 
   protected Object reorderValueListAndBitMap(
       final Object valueList,
       final TSDataType dataType,
+      final int columnIndex,
       final BitMap originalBitMap,
       final BitMap deDuplicatedBitMap) {
     // Older version's sender may contain null values, we need to cover this 
case
@@ -107,13 +138,26 @@ public class PipeTabletEventSorter {
         }
         return deDuplicatedIntValues;
       case DATE:
-        final LocalDate[] dateValues = (LocalDate[]) valueList;
-        final LocalDate[] deDuplicatedDateValues = new 
LocalDate[dateValues.length];
-        for (int i = 0; i < deDuplicatedSize; i++) {
-          deDuplicatedDateValues[i] =
-              dateValues[getLastNonnullIndex(i, originalBitMap, 
deDuplicatedBitMap)];
+        // DATE type: Tablet uses LocalDate[], InsertTabletStatement uses int[]
+        if (dataAdapter.isDateStoredAsLocalDate(columnIndex)) {
+          // Tablet: LocalDate[]
+          final LocalDate[] dateValues = (LocalDate[]) valueList;
+          final LocalDate[] deDuplicatedDateValues = new 
LocalDate[dateValues.length];
+          for (int i = 0; i < deDuplicatedSize; i++) {
+            deDuplicatedDateValues[i] =
+                dateValues[getLastNonnullIndex(i, originalBitMap, 
deDuplicatedBitMap)];
+          }
+          return deDuplicatedDateValues;
+        } else {
+          // InsertTabletStatement: int[]
+          final int[] intDateValues = (int[]) valueList;
+          final int[] deDuplicatedIntDateValues = new 
int[intDateValues.length];
+          for (int i = 0; i < deDuplicatedSize; i++) {
+            deDuplicatedIntDateValues[i] =
+                intDateValues[getLastNonnullIndex(i, originalBitMap, 
deDuplicatedBitMap)];
+          }
+          return deDuplicatedIntDateValues;
         }
-        return deDuplicatedDateValues;
       case INT64:
       case TIMESTAMP:
         final long[] longValues = (long[]) valueList;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeTableModelTabletEventSorter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeTableModelTabletEventSorter.java
index 5735b51c6d0..ba034cae3a3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeTableModelTabletEventSorter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeTableModelTabletEventSorter.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.db.pipe.sink.util.sorter;
 
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.file.metadata.IDeviceID;
 import org.apache.tsfile.utils.Pair;
@@ -31,14 +33,29 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class PipeTableModelTabletEventSorter extends PipeTabletEventSorter {
+public class PipeTableModelTabletEventSorter extends PipeInsertEventSorter {
   private int initIndexSize;
 
+  /**
+   * Constructor for Tablet.
+   *
+   * @param tablet the tablet to sort
+   */
   public PipeTableModelTabletEventSorter(final Tablet tablet) {
     super(tablet);
     deDuplicatedSize = tablet == null ? 0 : tablet.getRowSize();
   }
 
+  /**
+   * Constructor for InsertTabletStatement.
+   *
+   * @param statement the insert tablet statement to sort
+   */
+  public PipeTableModelTabletEventSorter(final InsertTabletStatement 
statement) {
+    super(statement);
+    deDuplicatedSize = statement == null ? 0 : statement.getRowCount();
+  }
+
   /**
    * For the sorting and deduplication needs of the table model tablet, it is 
done according to the
    * {@link IDeviceID}. For sorting, it is necessary to sort the {@link 
IDeviceID} first, and then
@@ -46,18 +63,19 @@ public class PipeTableModelTabletEventSorter extends 
PipeTabletEventSorter {
    * the same timestamp in different {@link IDeviceID} will not be processed.
    */
   public void sortAndDeduplicateByDevIdTimestamp() {
-    if (tablet == null || tablet.getRowSize() < 1) {
+    if (dataAdapter == null || dataAdapter.getRowSize() < 1) {
       return;
     }
 
     final HashMap<IDeviceID, List<Pair<Integer, Integer>>> deviceIDToIndexMap 
= new HashMap<>();
-    final long[] timestamps = tablet.getTimestamps();
+    final long[] timestamps = dataAdapter.getTimestamps();
+    final int rowSize = dataAdapter.getRowSize();
 
-    IDeviceID lastDevice = tablet.getDeviceID(0);
-    long previousTimestamp = tablet.getTimestamp(0);
+    IDeviceID lastDevice = dataAdapter.getDeviceID(0);
+    long previousTimestamp = dataAdapter.getTimestamp(0);
     int lasIndex = 0;
-    for (int i = 1, size = tablet.getRowSize(); i < size; ++i) {
-      final IDeviceID deviceID = tablet.getDeviceID(i);
+    for (int i = 1; i < rowSize; ++i) {
+      final IDeviceID deviceID = dataAdapter.getDeviceID(i);
       final long currentTimestamp = timestamps[i];
       final int deviceComparison = deviceID.compareTo(lastDevice);
       if (deviceComparison == 0) {
@@ -92,7 +110,7 @@ public class PipeTableModelTabletEventSorter extends 
PipeTabletEventSorter {
     if (!list.isEmpty()) {
       isSorted = false;
     }
-    list.add(new Pair<>(lasIndex, tablet.getRowSize()));
+    list.add(new Pair<>(lasIndex, rowSize));
 
     if (isSorted && isDeDuplicated) {
       return;
@@ -100,8 +118,8 @@ public class PipeTableModelTabletEventSorter extends 
PipeTabletEventSorter {
 
     initIndexSize = 0;
     deDuplicatedSize = 0;
-    index = new Integer[tablet.getRowSize()];
-    deDuplicatedIndex = new int[tablet.getRowSize()];
+    index = new Integer[rowSize];
+    deDuplicatedIndex = new int[rowSize];
     deviceIDToIndexMap.entrySet().stream()
         .sorted(Map.Entry.comparingByKey())
         .forEach(
@@ -129,19 +147,22 @@ public class PipeTableModelTabletEventSorter extends 
PipeTabletEventSorter {
   }
 
   private void sortAndDeduplicateValuesAndBitMapsWithTimestamp() {
-    tablet.setTimestamps(
+    // TIMESTAMP is not a DATE type, so columnIndex is not relevant here, use 
-1
+    dataAdapter.setTimestamps(
         (long[])
-            reorderValueListAndBitMap(tablet.getTimestamps(), 
TSDataType.TIMESTAMP, null, null));
+            reorderValueListAndBitMap(
+                dataAdapter.getTimestamps(), TSDataType.TIMESTAMP, -1, null, 
null));
     sortAndMayDeduplicateValuesAndBitMaps();
-    tablet.setRowSize(deDuplicatedSize);
+    dataAdapter.setRowSize(deDuplicatedSize);
   }
 
   private void sortTimestamps(final int startIndex, final int endIndex) {
-    Arrays.sort(this.index, startIndex, endIndex, 
Comparator.comparingLong(tablet::getTimestamp));
+    Arrays.sort(
+        this.index, startIndex, endIndex, 
Comparator.comparingLong(dataAdapter::getTimestamp));
   }
 
   private void deDuplicateTimestamps(final int startIndex, final int endIndex) 
{
-    final long[] timestamps = tablet.getTimestamps();
+    final long[] timestamps = dataAdapter.getTimestamps();
     long lastTime = timestamps[index[startIndex]];
     for (int i = startIndex + 1; i < endIndex; i++) {
       if (lastTime != (lastTime = timestamps[index[i]])) {
@@ -153,12 +174,13 @@ public class PipeTableModelTabletEventSorter extends 
PipeTabletEventSorter {
 
   /** Sort by time only. */
   public void sortByTimestampIfNecessary() {
-    if (tablet == null || tablet.getRowSize() == 0) {
+    if (dataAdapter == null || dataAdapter.getRowSize() == 0) {
       return;
     }
 
-    final long[] timestamps = tablet.getTimestamps();
-    for (int i = 1, size = tablet.getRowSize(); i < size; ++i) {
+    final long[] timestamps = dataAdapter.getTimestamps();
+    final int rowSize = dataAdapter.getRowSize();
+    for (int i = 1; i < rowSize; ++i) {
       final long currentTimestamp = timestamps[i];
       final long previousTimestamp = timestamps[i - 1];
 
@@ -172,8 +194,8 @@ public class PipeTableModelTabletEventSorter extends 
PipeTabletEventSorter {
       return;
     }
 
-    index = new Integer[tablet.getRowSize()];
-    for (int i = 0, size = tablet.getRowSize(); i < size; i++) {
+    index = new Integer[rowSize];
+    for (int i = 0; i < rowSize; i++) {
       index[i] = i;
     }
 
@@ -185,7 +207,8 @@ public class PipeTableModelTabletEventSorter extends 
PipeTabletEventSorter {
   }
 
   private void sortTimestamps() {
-    Arrays.sort(this.index, Comparator.comparingLong(tablet::getTimestamp));
-    Arrays.sort(tablet.getTimestamps(), 0, tablet.getRowSize());
+    Arrays.sort(this.index, 
Comparator.comparingLong(dataAdapter::getTimestamp));
+    final long[] timestamps = dataAdapter.getTimestamps();
+    Arrays.sort(timestamps, 0, dataAdapter.getRowSize());
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeTreeModelTabletEventSorter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeTreeModelTabletEventSorter.java
index c26f59220f9..2a56b706463 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeTreeModelTabletEventSorter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeTreeModelTabletEventSorter.java
@@ -19,25 +19,43 @@
 
 package org.apache.iotdb.db.pipe.sink.util.sorter;
 
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+
 import org.apache.tsfile.write.record.Tablet;
 
 import java.util.Arrays;
 import java.util.Comparator;
 
-public class PipeTreeModelTabletEventSorter extends PipeTabletEventSorter {
+public class PipeTreeModelTabletEventSorter extends PipeInsertEventSorter {
 
+  /**
+   * Constructor for Tablet.
+   *
+   * @param tablet the tablet to sort
+   */
   public PipeTreeModelTabletEventSorter(final Tablet tablet) {
     super(tablet);
     deDuplicatedSize = tablet == null ? 0 : tablet.getRowSize();
   }
 
+  /**
+   * Constructor for InsertTabletStatement.
+   *
+   * @param statement the insert tablet statement to sort
+   */
+  public PipeTreeModelTabletEventSorter(final InsertTabletStatement statement) 
{
+    super(statement);
+    deDuplicatedSize = statement == null ? 0 : statement.getRowCount();
+  }
+
   public void deduplicateAndSortTimestampsIfNecessary() {
-    if (tablet == null || tablet.getRowSize() == 0) {
+    if (dataAdapter == null || dataAdapter.getRowSize() == 0) {
       return;
     }
 
-    long[] timestamps = tablet.getTimestamps();
-    for (int i = 1, size = tablet.getRowSize(); i < size; ++i) {
+    long[] timestamps = dataAdapter.getTimestamps();
+    final int rowSize = dataAdapter.getRowSize();
+    for (int i = 1; i < rowSize; ++i) {
       final long currentTimestamp = timestamps[i];
       final long previousTimestamp = timestamps[i - 1];
 
@@ -54,9 +72,9 @@ public class PipeTreeModelTabletEventSorter extends 
PipeTabletEventSorter {
       return;
     }
 
-    index = new Integer[tablet.getRowSize()];
-    deDuplicatedIndex = new int[tablet.getRowSize()];
-    for (int i = 0, size = tablet.getRowSize(); i < size; i++) {
+    index = new Integer[rowSize];
+    deDuplicatedIndex = new int[rowSize];
+    for (int i = 0; i < rowSize; i++) {
       index[i] = i;
     }
 
@@ -78,14 +96,16 @@ public class PipeTreeModelTabletEventSorter extends 
PipeTabletEventSorter {
 
   private void sortTimestamps() {
     // Index is sorted stably because it is Integer[]
-    Arrays.sort(index, Comparator.comparingLong(tablet::getTimestamp));
-    Arrays.sort(tablet.getTimestamps(), 0, tablet.getRowSize());
+    Arrays.sort(index, Comparator.comparingLong(dataAdapter::getTimestamp));
+    final long[] timestamps = dataAdapter.getTimestamps();
+    Arrays.sort(timestamps, 0, dataAdapter.getRowSize());
   }
 
   private void deduplicateTimestamps() {
     deDuplicatedSize = 0;
-    long[] timestamps = tablet.getTimestamps();
-    for (int i = 1, size = tablet.getRowSize(); i < size; i++) {
+    long[] timestamps = dataAdapter.getTimestamps();
+    final int rowSize = dataAdapter.getRowSize();
+    for (int i = 1; i < rowSize; i++) {
       if (timestamps[i] != timestamps[i - 1]) {
         deDuplicatedIndex[deDuplicatedSize] = i - 1;
         timestamps[deDuplicatedSize] = timestamps[i - 1];
@@ -94,8 +114,8 @@ public class PipeTreeModelTabletEventSorter extends 
PipeTabletEventSorter {
       }
     }
 
-    deDuplicatedIndex[deDuplicatedSize] = tablet.getRowSize() - 1;
-    timestamps[deDuplicatedSize] = timestamps[tablet.getRowSize() - 1];
-    tablet.setRowSize(++deDuplicatedSize);
+    deDuplicatedIndex[deDuplicatedSize] = rowSize - 1;
+    timestamps[deDuplicatedSize] = timestamps[rowSize - 1];
+    dataAdapter.setRowSize(++deDuplicatedSize);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/TabletAdapter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/TabletAdapter.java
new file mode 100644
index 00000000000..b200127a5d4
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/TabletAdapter.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.sink.util.sorter;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.utils.BitMap;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+
+import java.util.List;
+
+/** Adapter for Tablet to implement InsertEventDataAdapter interface. */
+public class TabletAdapter implements InsertEventDataAdapter {
+
+  private final Tablet tablet;
+
+  public TabletAdapter(final Tablet tablet) {
+    this.tablet = tablet;
+  }
+
+  @Override
+  public int getColumnCount() {
+    final Object[] values = tablet.getValues();
+    return values != null ? values.length : 0;
+  }
+
+  @Override
+  public TSDataType getDataType(int columnIndex) {
+    final List<IMeasurementSchema> schemas = tablet.getSchemas();
+    if (schemas != null && columnIndex < schemas.size()) {
+      final IMeasurementSchema schema = schemas.get(columnIndex);
+      return schema != null ? schema.getType() : null;
+    }
+    return null;
+  }
+
+  @Override
+  public BitMap[] getBitMaps() {
+    return tablet.getBitMaps();
+  }
+
+  @Override
+  public void setBitMaps(BitMap[] bitMaps) {
+    tablet.setBitMaps(bitMaps);
+  }
+
+  @Override
+  public Object[] getValues() {
+    return tablet.getValues();
+  }
+
+  @Override
+  public void setValue(int columnIndex, Object value) {
+    tablet.getValues()[columnIndex] = value;
+  }
+
+  @Override
+  public long[] getTimestamps() {
+    return tablet.getTimestamps();
+  }
+
+  @Override
+  public void setTimestamps(long[] timestamps) {
+    tablet.setTimestamps(timestamps);
+  }
+
+  @Override
+  public int getRowSize() {
+    return tablet.getRowSize();
+  }
+
+  @Override
+  public void setRowSize(int rowSize) {
+    tablet.setRowSize(rowSize);
+  }
+
+  @Override
+  public long getTimestamp(int rowIndex) {
+    return tablet.getTimestamp(rowIndex);
+  }
+
+  @Override
+  public IDeviceID getDeviceID(int rowIndex) {
+    return tablet.getDeviceID(rowIndex);
+  }
+
+  @Override
+  public boolean isDateStoredAsLocalDate(int columnIndex) {
+    return true;
+  }
+
+  public Tablet getTablet() {
+    return tablet;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
index 1aae871ea0c..d8786e33959 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
@@ -839,6 +839,16 @@ public abstract class InsertBaseStatement extends 
Statement implements Accountab
     return ramBytesUsed;
   }
 
+  /**
+   * Set the pre-calculated memory size. This is used when memory size is 
calculated during
+   * deserialization to avoid recalculation.
+   *
+   * @param ramBytesUsed the calculated memory size in bytes
+   */
+  public void setRamBytesUsed(long ramBytesUsed) {
+    this.ramBytesUsed = ramBytesUsed;
+  }
+
   private long shallowSizeOfList(List<?> list) {
     return Objects.nonNull(list)
         ? UpdateDetailContainer.LIST_SIZE
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
index d2c1d2a783a..12369d81bfd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
@@ -44,6 +44,7 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.StatementType;
 import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
 import org.apache.iotdb.db.utils.CommonUtils;
 
+import org.apache.tsfile.enums.ColumnCategory;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.file.metadata.IDeviceID;
 import org.apache.tsfile.file.metadata.IDeviceID.Factory;
@@ -58,6 +59,8 @@ import org.apache.tsfile.write.UnSupportedDataTypeException;
 import org.apache.tsfile.write.record.Tablet;
 import org.apache.tsfile.write.schema.IMeasurementSchema;
 import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.time.LocalDate;
 import java.util.ArrayList;
@@ -69,11 +72,22 @@ import java.util.Map;
 import java.util.Objects;
 
 public class InsertTabletStatement extends InsertBaseStatement implements 
ISchemaValidation {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(InsertTabletStatement.class);
+
   private static final long INSTANCE_SIZE =
       RamUsageEstimator.shallowSizeOfInstance(InsertTabletStatement.class);
 
   private static final String DATATYPE_UNSUPPORTED = "Data type %s is not 
supported.";
 
+  /**
+   * Get the instance size of InsertTabletStatement for memory calculation.
+   *
+   * @return instance size in bytes
+   */
+  public static long getInstanceSize() {
+    return INSTANCE_SIZE;
+  }
+
   protected long[] times; // times should be sorted. It is done in the session 
API.
   protected BitMap[] nullBitMaps;
   protected Object[] columns;
@@ -701,6 +715,189 @@ public class InsertTabletStatement extends 
InsertBaseStatement implements ISchem
     }
   }
 
+  /**
+   * Convert this InsertTabletStatement to Tablet. This method constructs a 
Tablet object from this
+   * statement, converting all necessary fields. All arrays are copied to 
rowSize length to ensure
+   * immutability.
+   *
+   * @return Tablet object
+   * @throws MetadataException if conversion fails
+   */
+  public Tablet convertToTablet() throws MetadataException {
+    try {
+      // Get deviceId/tableName from devicePath
+      final String deviceIdOrTableName =
+          this.getDevicePath() != null ? this.getDevicePath().getFullPath() : 
"";
+
+      // Get schemas from measurementSchemas
+      final MeasurementSchema[] measurementSchemas = 
this.getMeasurementSchemas();
+      final String[] measurements = this.getMeasurements();
+      final TSDataType[] dataTypes = this.getDataTypes();
+      // If measurements and dataTypes are not null, use measurements.length 
as the standard length
+      final int originalSchemaSize = measurements != null ? 
measurements.length : 0;
+
+      // Build schemas and track valid column indices (skip null columns)
+      // measurements and dataTypes being null is standard - skip those columns
+      final List<IMeasurementSchema> schemas = new ArrayList<>();
+      final List<Integer> validColumnIndices = new ArrayList<>();
+      for (int i = 0; i < originalSchemaSize; i++) {
+        if (dataTypes != null && measurements[i] != null && dataTypes[i] != 
null) {
+          // Create MeasurementSchema if not present
+          schemas.add(new MeasurementSchema(measurements[i], dataTypes[i]));
+          validColumnIndices.add(i);
+        }
+        // Skip null columns - don't add to schemas or validColumnIndices
+      }
+
+      final int schemaSize = schemas.size();
+
+      // Get columnTypes (for table model) - only for valid columns
+      final TsTableColumnCategory[] columnCategories = 
this.getColumnCategories();
+      final List<ColumnCategory> tabletColumnTypes = new ArrayList<>();
+      if (columnCategories != null && columnCategories.length > 0) {
+        for (final int validIndex : validColumnIndices) {
+          if (columnCategories[validIndex] != null) {
+            
tabletColumnTypes.add(columnCategories[validIndex].toTsFileColumnType());
+          } else {
+            tabletColumnTypes.add(ColumnCategory.FIELD);
+          }
+        }
+      } else {
+        // Default to FIELD for all valid columns if not specified
+        for (int i = 0; i < schemaSize; i++) {
+          tabletColumnTypes.add(ColumnCategory.FIELD);
+        }
+      }
+
+      // Get timestamps - always copy to ensure immutability
+      final long[] times = this.getTimes();
+      final int rowSize = this.getRowCount();
+      final long[] timestamps;
+      if (times != null && times.length >= rowSize && rowSize > 0) {
+        timestamps = new long[rowSize];
+        System.arraycopy(times, 0, timestamps, 0, rowSize);
+      } else {
+        LOGGER.warn(
+            "Times array is null or too small. times.length={}, rowSize={}, 
deviceId={}",
+            times != null ? times.length : 0,
+            rowSize,
+            deviceIdOrTableName);
+        timestamps = new long[0];
+      }
+
+      // Get values - convert Statement columns to Tablet format, only for 
valid columns
+      // All arrays are truncated/copied to rowSize length
+      final Object[] statementColumns = this.getColumns();
+      final Object[] tabletValues = new Object[schemaSize];
+      if (statementColumns != null && statementColumns.length > 0) {
+        for (int i = 0; i < validColumnIndices.size(); i++) {
+          final int originalIndex = validColumnIndices.get(i);
+          if (statementColumns[originalIndex] != null && 
dataTypes[originalIndex] != null) {
+            tabletValues[i] =
+                convertColumnToTablet(
+                    statementColumns[originalIndex], dataTypes[originalIndex], 
rowSize);
+          } else {
+            tabletValues[i] = null;
+          }
+        }
+      }
+
+      // Get bitMaps - copy and truncate to rowSize, only for valid columns
+      final BitMap[] originalBitMaps = this.getBitMaps();
+      final BitMap[] bitMaps;
+      if (originalBitMaps != null && originalBitMaps.length > 0) {
+        bitMaps = new BitMap[schemaSize];
+        for (int i = 0; i < validColumnIndices.size(); i++) {
+          final int originalIndex = validColumnIndices.get(i);
+          if (originalBitMaps[originalIndex] != null) {
+            // Create a new BitMap truncated to rowSize
+            final byte[] truncatedBytes =
+                originalBitMaps[originalIndex].getTruncatedByteArray(rowSize);
+            bitMaps[i] = new BitMap(rowSize, truncatedBytes);
+          } else {
+            bitMaps[i] = null;
+          }
+        }
+      } else {
+        bitMaps = null;
+      }
+
+      // Create Tablet using the full constructor
+      // Tablet(String tableName, List<IMeasurementSchema> schemas, 
List<ColumnCategory>
+      // columnTypes,
+      //        long[] timestamps, Object[] values, BitMap[] bitMaps, int 
rowSize)
+      return new Tablet(
+          deviceIdOrTableName,
+          schemas,
+          tabletColumnTypes,
+          timestamps,
+          tabletValues,
+          bitMaps,
+          rowSize);
+    } catch (final Exception e) {
+      throw new MetadataException("Failed to convert InsertTabletStatement to 
Tablet", e);
+    }
+  }
+
+  /**
+   * Convert a single column value from Statement format to Tablet format. 
Statement uses primitive
+   * arrays (e.g., int[], long[], float[]), while Tablet may need different 
format. All arrays are
+   * copied and truncated to rowSize length to ensure immutability - even if 
the original array is
+   * modified, the converted array remains unchanged.
+   *
+   * @param columnValue column value from Statement (primitive array)
+   * @param dataType data type of the column
+   * @param rowSize number of rows to copy (truncate to this length)
+   * @return column value in Tablet format (copied and truncated array)
+   */
+  private Object convertColumnToTablet(
+      final Object columnValue, final TSDataType dataType, final int rowSize) {
+
+    if (columnValue == null) {
+      return null;
+    }
+
+    if (TSDataType.DATE.equals(dataType)) {
+      final int[] values = (int[]) columnValue;
+      // Copy and truncate to rowSize
+      final int[] copiedValues = Arrays.copyOf(values, Math.min(values.length, 
rowSize));
+      final LocalDate[] localDateValue = new LocalDate[rowSize];
+      for (int i = 0; i < copiedValues.length; i++) {
+        localDateValue[i] = DateUtils.parseIntToLocalDate(copiedValues[i]);
+      }
+      // Fill remaining with null if needed
+      for (int i = copiedValues.length; i < rowSize; i++) {
+        localDateValue[i] = null;
+      }
+      return localDateValue;
+    }
+
+    // For primitive arrays, copy and truncate to rowSize
+    if (columnValue instanceof boolean[]) {
+      final boolean[] original = (boolean[]) columnValue;
+      return Arrays.copyOf(original, Math.min(original.length, rowSize));
+    } else if (columnValue instanceof int[]) {
+      final int[] original = (int[]) columnValue;
+      return Arrays.copyOf(original, Math.min(original.length, rowSize));
+    } else if (columnValue instanceof long[]) {
+      final long[] original = (long[]) columnValue;
+      return Arrays.copyOf(original, Math.min(original.length, rowSize));
+    } else if (columnValue instanceof float[]) {
+      final float[] original = (float[]) columnValue;
+      return Arrays.copyOf(original, Math.min(original.length, rowSize));
+    } else if (columnValue instanceof double[]) {
+      final double[] original = (double[]) columnValue;
+      return Arrays.copyOf(original, Math.min(original.length, rowSize));
+    } else if (columnValue instanceof Binary[]) {
+      // For Binary arrays, create a new array and copy references, truncate 
to rowSize
+      final Binary[] original = (Binary[]) columnValue;
+      return Arrays.copyOf(original, Math.min(original.length, rowSize));
+    }
+
+    // For other types, return as-is (should not happen for standard types)
+    return columnValue;
+  }
+
   @Override
   public String toString() {
     final int size = 
CommonDescriptor.getInstance().getConfig().getPathLogMaxSize();
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
index 1a122c63d4f..0cc4470882e 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
@@ -437,7 +437,7 @@ public class PipeDataNodeThriftRequestTest {
     try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
         final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
       t.serialize(outputStream);
-      ReadWriteIOUtils.write(false, outputStream);
+      ReadWriteIOUtils.write(true, outputStream);
       tabletBuffers.add(
           ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size()));
       tabletDataBase.add("test");
@@ -459,7 +459,7 @@ public class PipeDataNodeThriftRequestTest {
         new byte[] {'a', 'b'}, 
deserializedReq.getBinaryReqs().get(0).getByteBuffer().array());
     Assert.assertEquals(node, 
deserializedReq.getInsertNodeReqs().get(0).getInsertNode());
     Assert.assertEquals(t, deserializedReq.getTabletReqs().get(0).getTablet());
-    Assert.assertFalse(deserializedReq.getTabletReqs().get(0).getIsAligned());
+    Assert.assertTrue(deserializedReq.getTabletReqs().get(0).getIsAligned());
 
     Assert.assertEquals("test", 
deserializedReq.getBinaryReqs().get(0).getDataBaseName());
     Assert.assertEquals("test", 
deserializedReq.getTabletReqs().get(0).getDataBaseName());
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeStatementEventSorterTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeStatementEventSorterTest.java
new file mode 100644
index 00000000000..7c13d9764b3
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeStatementEventSorterTest.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.sink;
+
+import 
org.apache.iotdb.db.pipe.sink.util.sorter.PipeTableModelTabletEventSorter;
+import 
org.apache.iotdb.db.pipe.sink.util.sorter.PipeTreeModelTabletEventSorter;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.time.LocalDate;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class PipeStatementEventSorterTest {
+
+  @Test
+  public void testTreeModelDeduplicateAndSort() throws Exception {
+    List<IMeasurementSchema> schemaList = new ArrayList<>();
+    schemaList.add(new MeasurementSchema("s1", TSDataType.INT64));
+    schemaList.add(new MeasurementSchema("s2", TSDataType.INT64));
+    schemaList.add(new MeasurementSchema("s3", TSDataType.INT64));
+
+    Tablet tablet = new Tablet("root.sg.device", schemaList, 30);
+
+    long timestamp = 300;
+    for (long i = 0; i < 10; i++) {
+      int rowIndex = tablet.getRowSize();
+      tablet.addTimestamp(rowIndex, timestamp + i);
+      for (int s = 0; s < 3; s++) {
+        tablet.addValue(schemaList.get(s).getMeasurementName(), rowIndex, 
timestamp + i);
+      }
+
+      rowIndex = tablet.getRowSize();
+      tablet.addTimestamp(rowIndex, timestamp - i);
+      for (int s = 0; s < 3; s++) {
+        tablet.addValue(schemaList.get(s).getMeasurementName(), rowIndex, 
timestamp - i);
+      }
+
+      rowIndex = tablet.getRowSize();
+      tablet.addTimestamp(rowIndex, timestamp);
+      for (int s = 0; s < 3; s++) {
+        tablet.addValue(schemaList.get(s).getMeasurementName(), rowIndex, 
timestamp);
+      }
+    }
+
+    Set<Integer> indices = new HashSet<>();
+    for (int i = 0; i < 30; i++) {
+      indices.add((int) tablet.getTimestamp(i));
+    }
+
+    Assert.assertFalse(tablet.isSorted());
+
+    // Convert Tablet to Statement
+    InsertTabletStatement statement = new InsertTabletStatement(tablet, true, 
null);
+
+    // Sort using Statement
+    new 
PipeTreeModelTabletEventSorter(statement).deduplicateAndSortTimestampsIfNecessary();
+
+    Assert.assertEquals(indices.size(), statement.getRowCount());
+
+    final long[] timestamps = Arrays.copyOfRange(statement.getTimes(), 0, 
statement.getRowCount());
+    final Object[] columns = statement.getColumns();
+    for (int i = 0; i < 3; ++i) {
+      Assert.assertArrayEquals(
+          timestamps, Arrays.copyOfRange((long[]) columns[i], 0, 
statement.getRowCount()));
+    }
+
+    for (int i = 1; i < statement.getRowCount(); ++i) {
+      Assert.assertTrue(timestamps[i] > timestamps[i - 1]);
+      for (int j = 0; j < 3; ++j) {
+        Assert.assertTrue(((long[]) columns[j])[i] > ((long[]) columns[j])[i - 
1]);
+      }
+    }
+  }
+
+  @Test
+  public void testTreeModelDeduplicate() throws Exception {
+    final List<IMeasurementSchema> schemaList = new ArrayList<>();
+    schemaList.add(new MeasurementSchema("s1", TSDataType.INT64));
+    schemaList.add(new MeasurementSchema("s2", TSDataType.INT64));
+    schemaList.add(new MeasurementSchema("s3", TSDataType.INT64));
+
+    final Tablet tablet = new Tablet("root.sg.device", schemaList, 10);
+
+    final long timestamp = 300;
+    for (long i = 0; i < 10; i++) {
+      final int rowIndex = tablet.getRowSize();
+      tablet.addTimestamp(rowIndex, timestamp);
+      for (int s = 0; s < 3; s++) {
+        tablet.addValue(
+            schemaList.get(s).getMeasurementName(),
+            rowIndex,
+            (i + s) % 3 != 0 ? timestamp + i : null);
+      }
+    }
+
+    final Set<Integer> indices = new HashSet<>();
+    for (int i = 0; i < 10; i++) {
+      indices.add((int) tablet.getTimestamp(i));
+    }
+
+    Assert.assertTrue(tablet.isSorted());
+
+    // Convert Tablet to Statement
+    InsertTabletStatement statement = new InsertTabletStatement(tablet, true, 
null);
+
+    // Sort using Statement
+    new 
PipeTreeModelTabletEventSorter(statement).deduplicateAndSortTimestampsIfNecessary();
+
+    Assert.assertEquals(indices.size(), statement.getRowCount());
+
+    final long[] timestamps = Arrays.copyOfRange(statement.getTimes(), 0, 
statement.getRowCount());
+    final Object[] columns = statement.getColumns();
+    Assert.assertEquals(timestamps[0] + 8, ((long[]) columns[0])[0]);
+    for (int i = 1; i < 3; ++i) {
+      Assert.assertEquals(timestamps[0] + 9, ((long[]) columns[i])[0]);
+    }
+  }
+
+  @Test
+  public void testTreeModelSort() throws Exception {
+    List<IMeasurementSchema> schemaList = new ArrayList<>();
+    schemaList.add(new MeasurementSchema("s1", TSDataType.INT64));
+    schemaList.add(new MeasurementSchema("s2", TSDataType.INT64));
+    schemaList.add(new MeasurementSchema("s3", TSDataType.INT64));
+
+    Tablet tablet = new Tablet("root.sg.device", schemaList, 30);
+
+    for (long i = 0; i < 10; i++) {
+      int rowIndex = tablet.getRowSize();
+      tablet.addTimestamp(rowIndex, (long) rowIndex + 2);
+      for (int s = 0; s < 3; s++) {
+        tablet.addValue(schemaList.get(s).getMeasurementName(), rowIndex, 
(long) rowIndex + 2);
+      }
+
+      rowIndex = tablet.getRowSize();
+      tablet.addTimestamp(rowIndex, rowIndex);
+      for (int s = 0; s < 3; s++) {
+        tablet.addValue(schemaList.get(s).getMeasurementName(), rowIndex, 
(long) rowIndex);
+      }
+
+      rowIndex = tablet.getRowSize();
+      tablet.addTimestamp(rowIndex, (long) rowIndex - 2);
+      for (int s = 0; s < 3; s++) {
+        tablet.addValue(schemaList.get(s).getMeasurementName(), rowIndex, 
(long) rowIndex - 2);
+      }
+    }
+
+    Set<Integer> indices = new HashSet<>();
+    for (int i = 0; i < 30; i++) {
+      indices.add((int) tablet.getTimestamp(i));
+    }
+
+    Assert.assertFalse(tablet.isSorted());
+
+    long[] timestamps = Arrays.copyOfRange(tablet.getTimestamps(), 0, 
tablet.getRowSize());
+    for (int i = 0; i < 3; ++i) {
+      Assert.assertArrayEquals(
+          timestamps, Arrays.copyOfRange((long[]) tablet.getValues()[i], 0, 
tablet.getRowSize()));
+    }
+
+    for (int i = 1; i < tablet.getRowSize(); ++i) {
+      Assert.assertTrue(timestamps[i] != timestamps[i - 1]);
+      for (int j = 0; j < 3; ++j) {
+        Assert.assertNotEquals((long) tablet.getValue(i, j), (long) 
tablet.getValue(i - 1, j));
+      }
+    }
+
+    // Convert Tablet to Statement
+    InsertTabletStatement statement = new InsertTabletStatement(tablet, true, 
null);
+
+    // Sort using Statement
+    new 
PipeTreeModelTabletEventSorter(statement).deduplicateAndSortTimestampsIfNecessary();
+
+    Assert.assertEquals(indices.size(), statement.getRowCount());
+
+    timestamps = Arrays.copyOfRange(statement.getTimes(), 0, 
statement.getRowCount());
+    final Object[] columns = statement.getColumns();
+    for (int i = 0; i < 3; ++i) {
+      Assert.assertArrayEquals(
+          timestamps, Arrays.copyOfRange((long[]) columns[i], 0, 
statement.getRowCount()));
+    }
+
+    for (int i = 1; i < statement.getRowCount(); ++i) {
+      Assert.assertTrue(timestamps[i] > timestamps[i - 1]);
+      for (int j = 0; j < 3; ++j) {
+        Assert.assertTrue(((long[]) columns[j])[i] > ((long[]) columns[j])[i - 
1]);
+      }
+    }
+  }
+
+  @Test
+  public void testTableModelDeduplicateAndSort() throws Exception {
+    doTableModelTest(true, true);
+  }
+
+  @Test
+  public void testTableModelDeduplicate() throws Exception {
+    doTableModelTest(true, false);
+  }
+
+  @Test
+  public void testTableModelSort() throws Exception {
+    doTableModelTest(false, true);
+  }
+
+  @Test
+  public void testTableModelSort1() throws Exception {
+    doTableModelTest1();
+  }
+
+  public void doTableModelTest(final boolean hasDuplicates, final boolean 
isUnSorted)
+      throws Exception {
+    final Tablet tablet =
+        PipeTabletEventSorterTest.generateTablet("test", 10, hasDuplicates, 
isUnSorted);
+
+    // Convert Tablet to Statement
+    InsertTabletStatement statement = new InsertTabletStatement(tablet, false, 
"test_db");
+
+    // Sort using Statement
+    new 
PipeTableModelTabletEventSorter(statement).sortAndDeduplicateByDevIdTimestamp();
+
+    long[] timestamps = statement.getTimes();
+    final Object[] columns = statement.getColumns();
+    for (int i = 1; i < statement.getRowCount(); i++) {
+      long time = timestamps[i];
+      Assert.assertTrue(time > timestamps[i - 1]);
+      Assert.assertEquals(
+          ((Binary[]) columns[0])[i],
+          new Binary(String.valueOf(i / 
100).getBytes(StandardCharsets.UTF_8)));
+      Assert.assertEquals(((long[]) columns[1])[i], (long) i);
+      Assert.assertEquals(((float[]) columns[2])[i], i * 1.0f, 0.001f);
+      Assert.assertEquals(
+          ((Binary[]) columns[3])[i],
+          new Binary(String.valueOf(i).getBytes(StandardCharsets.UTF_8)));
+      Assert.assertEquals(((long[]) columns[4])[i], (long) i);
+      Assert.assertEquals(((int[]) columns[5])[i], i);
+      Assert.assertEquals(((double[]) columns[6])[i], i * 0.1, 0.0001);
+      // DATE is stored as int[] in Statement, not LocalDate[]
+      LocalDate expectedDate = PipeTabletEventSorterTest.getDate(i);
+      int expectedDateInt =
+          
org.apache.tsfile.utils.DateUtils.parseDateExpressionToInt(expectedDate);
+      Assert.assertEquals(((int[]) columns[7])[i], expectedDateInt);
+      Assert.assertEquals(
+          ((Binary[]) columns[8])[i],
+          new Binary(String.valueOf(i).getBytes(StandardCharsets.UTF_8)));
+    }
+  }
+
+  public void doTableModelTest1() throws Exception {
+    final Tablet tablet = PipeTabletEventSorterTest.generateTablet("test", 10, 
false, true);
+
+    // Convert Tablet to Statement
+    InsertTabletStatement statement = new InsertTabletStatement(tablet, false, 
"test_db");
+
+    // Sort using Statement
+    new 
PipeTableModelTabletEventSorter(statement).sortByTimestampIfNecessary();
+
+    long[] timestamps = statement.getTimes();
+    final Object[] columns = statement.getColumns();
+    for (int i = 1; i < statement.getRowCount(); i++) {
+      long time = timestamps[i];
+      Assert.assertTrue(time > timestamps[i - 1]);
+      Assert.assertEquals(
+          ((Binary[]) columns[0])[i],
+          new Binary(String.valueOf(i / 
100).getBytes(StandardCharsets.UTF_8)));
+      Assert.assertEquals(((long[]) columns[1])[i], (long) i);
+      Assert.assertEquals(((float[]) columns[2])[i], i * 1.0f, 0.001f);
+      Assert.assertEquals(
+          ((Binary[]) columns[3])[i],
+          new Binary(String.valueOf(i).getBytes(StandardCharsets.UTF_8)));
+      Assert.assertEquals(((long[]) columns[4])[i], (long) i);
+      Assert.assertEquals(((int[]) columns[5])[i], i);
+      Assert.assertEquals(((double[]) columns[6])[i], i * 0.1, 0.0001);
+      // DATE is stored as int[] in Statement, not LocalDate[]
+      LocalDate expectedDate = PipeTabletEventSorterTest.getDate(i);
+      int expectedDateInt =
+          
org.apache.tsfile.utils.DateUtils.parseDateExpressionToInt(expectedDate);
+      Assert.assertEquals(((int[]) columns[7])[i], expectedDateInt);
+      Assert.assertEquals(
+          ((Binary[]) columns[8])[i],
+          new Binary(String.valueOf(i).getBytes(StandardCharsets.UTF_8)));
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverterTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverterTest.java
new file mode 100644
index 00000000000..410afc76130
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverterTest.java
@@ -0,0 +1,607 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under this License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.sink.util;
+
+import org.apache.iotdb.commons.exception.MetadataException;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+
+import org.apache.tsfile.enums.ColumnCategory;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.DateUtils;
+import org.apache.tsfile.utils.PublicBAOS;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.time.LocalDate;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TabletStatementConverterTest {
+
+  @Test
+  public void testConvertStatementToTabletTreeModel() throws MetadataException 
{
+    final int columnCount = 1000;
+    final int rowCount = 100;
+    final String deviceName = "root.sg.device";
+    final boolean isAligned = true;
+
+    // Generate Tablet and construct Statement from it
+    final Tablet originalTablet = generateTreeModelTablet(deviceName, 
columnCount, rowCount);
+    final InsertTabletStatement statement =
+        new InsertTabletStatement(originalTablet, isAligned, null);
+
+    // Convert Statement to Tablet
+    final Tablet convertedTablet = statement.convertToTablet();
+
+    // Verify conversion
+    assertTabletsEqual(originalTablet, convertedTablet);
+  }
+
+  @Test
+  public void testConvertStatementToTabletTableModel() throws 
MetadataException {
+    final int columnCount = 1000;
+    final int rowCount = 100;
+    final String tableName = "table1";
+    final String databaseName = "test_db";
+    final boolean isAligned = false;
+
+    // Generate Tablet and construct Statement from it
+    final Tablet originalTablet = generateTableModelTablet(tableName, 
columnCount, rowCount);
+    final InsertTabletStatement statement =
+        new InsertTabletStatement(originalTablet, isAligned, databaseName);
+
+    // Convert Statement to Tablet
+    final Tablet convertedTablet = statement.convertToTablet();
+
+    // Verify conversion
+    assertTabletsEqual(originalTablet, convertedTablet);
+  }
+
+  @Test
+  public void testDeserializeStatementFromTabletFormat() throws IOException, 
MetadataException {
+    final int columnCount = 1000;
+    final int rowCount = 100;
+    final String deviceName = "root.sg.device";
+
+    // Generate test Tablet
+    final Tablet originalTablet = generateTreeModelTablet(deviceName, 
columnCount, rowCount);
+
+    // Serialize Tablet to ByteBuffer
+    final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+    final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream);
+    // Then serialize the tablet
+    originalTablet.serialize(outputStream);
+    // Write isAligned at the end
+    final boolean isAligned = true;
+    ReadWriteIOUtils.write(isAligned, outputStream);
+
+    final ByteBuffer buffer =
+        ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
+
+    // Deserialize Statement from Tablet format
+    final InsertTabletStatement statement =
+        TabletStatementConverter.deserializeStatementFromTabletFormat(buffer);
+
+    // Verify basic information
+    Assert.assertEquals(deviceName, statement.getDevicePath().getFullPath());
+    Assert.assertEquals(rowCount, statement.getRowCount());
+    Assert.assertEquals(columnCount, statement.getMeasurements().length);
+    Assert.assertEquals(isAligned, statement.isAligned());
+
+    // Verify data by converting Statement back to Tablet
+    final Tablet convertedTablet = statement.convertToTablet();
+    assertTabletsEqual(originalTablet, convertedTablet);
+  }
+
+  @Test
+  public void testRoundTripConversionTreeModel() throws MetadataException, 
IOException {
+    final int columnCount = 1000;
+    final int rowCount = 100;
+    final String deviceName = "root.sg.device";
+
+    // Generate original Tablet
+    final Tablet originalTablet = generateTreeModelTablet(deviceName, 
columnCount, rowCount);
+
+    // Serialize Tablet to ByteBuffer
+    final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+    final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream);
+    originalTablet.serialize(outputStream);
+    final boolean isAligned = true;
+    ReadWriteIOUtils.write(isAligned, outputStream);
+    final ByteBuffer buffer =
+        ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
+
+    // Deserialize to Statement
+    final InsertTabletStatement statement =
+        TabletStatementConverter.deserializeStatementFromTabletFormat(buffer);
+    // Convert Statement back to Tablet
+    final Tablet convertedTablet = statement.convertToTablet();
+
+    // Verify round trip
+    assertTabletsEqual(originalTablet, convertedTablet);
+  }
+
+  @Test
+  public void testRoundTripConversionTableModel() throws MetadataException {
+    final int columnCount = 1000;
+    final int rowCount = 100;
+    final String tableName = "table1";
+    final String databaseName = "test_db";
+    final boolean isAligned = false;
+
+    // Generate original Tablet for table model
+    final Tablet originalTablet = generateTableModelTablet(tableName, 
columnCount, rowCount);
+
+    // Construct Statement from Tablet
+    final InsertTabletStatement originalStatement =
+        new InsertTabletStatement(originalTablet, isAligned, databaseName);
+
+    // Convert Statement to Tablet
+    final Tablet convertedTablet = originalStatement.convertToTablet();
+
+    // Convert Tablet back to Statement
+    final InsertTabletStatement convertedStatement =
+        new InsertTabletStatement(convertedTablet, isAligned, databaseName);
+
+    // Verify round trip: original Tablet should equal converted Tablet
+    assertTabletsEqual(originalTablet, convertedTablet);
+  }
+
+  /**
+   * Generate a Tablet for tree model with all data types and specified number 
of columns and rows.
+   *
+   * @param deviceName device name
+   * @param columnCount number of columns
+   * @param rowCount number of rows
+   * @return Tablet with test data
+   */
+  private Tablet generateTreeModelTablet(
+      final String deviceName, final int columnCount, final int rowCount) {
+    final List<IMeasurementSchema> schemaList = new ArrayList<>();
+    final TSDataType[] dataTypes = new TSDataType[columnCount];
+    final String[] measurementNames = new String[columnCount];
+    final Object[] columnData = new Object[columnCount];
+
+    // Create schemas and generate test data
+    for (int col = 0; col < columnCount; col++) {
+      final TSDataType dataType = ALL_DATA_TYPES[col % ALL_DATA_TYPES.length];
+      final String measurementName = "col_" + col + "_" + dataType.name();
+      schemaList.add(new MeasurementSchema(measurementName, dataType));
+      dataTypes[col] = dataType;
+      measurementNames[col] = measurementName;
+
+      // Generate test data for this column
+      switch (dataType) {
+        case BOOLEAN:
+          final boolean[] boolValues = new boolean[rowCount];
+          for (int row = 0; row < rowCount; row++) {
+            boolValues[row] = (row + col) % 2 == 0;
+          }
+          columnData[col] = boolValues;
+          break;
+        case INT32:
+          final int[] intValues = new int[rowCount];
+          for (int row = 0; row < rowCount; row++) {
+            intValues[row] = row * 100 + col;
+          }
+          columnData[col] = intValues;
+          break;
+        case DATE:
+          final LocalDate[] dateValues = new LocalDate[rowCount];
+          for (int row = 0; row < rowCount; row++) {
+            // Generate valid dates starting from 2024-01-01
+            dateValues[row] = LocalDate.of(2024, 1, 1).plusDays((row + col) % 
365);
+          }
+          columnData[col] = dateValues;
+          break;
+        case INT64:
+        case TIMESTAMP:
+          final long[] longValues = new long[rowCount];
+          for (int row = 0; row < rowCount; row++) {
+            longValues[row] = (long) row * 1000L + col;
+          }
+          columnData[col] = longValues;
+          break;
+        case FLOAT:
+          final float[] floatValues = new float[rowCount];
+          for (int row = 0; row < rowCount; row++) {
+            floatValues[row] = row * 1.5f + col * 0.1f;
+          }
+          columnData[col] = floatValues;
+          break;
+        case DOUBLE:
+          final double[] doubleValues = new double[rowCount];
+          for (int row = 0; row < rowCount; row++) {
+            doubleValues[row] = row * 2.5 + col * 0.01;
+          }
+          columnData[col] = doubleValues;
+          break;
+        case TEXT:
+        case STRING:
+        case BLOB:
+          final Binary[] binaryValues = new Binary[rowCount];
+          for (int row = 0; row < rowCount; row++) {
+            binaryValues[row] = new Binary(("value_row_" + row + "_col_" + 
col).getBytes());
+          }
+          columnData[col] = binaryValues;
+          break;
+        default:
+          throw new IllegalArgumentException("Unsupported data type: " + 
dataType);
+      }
+    }
+
+    // Create and fill tablet
+    final Tablet tablet = new Tablet(deviceName, schemaList, rowCount);
+    final long[] times = new long[rowCount];
+    for (int row = 0; row < rowCount; row++) {
+      times[row] = row * 1000L;
+      final int rowIndex = tablet.getRowSize();
+      tablet.addTimestamp(rowIndex, times[row]);
+      for (int col = 0; col < columnCount; col++) {
+        final TSDataType dataType = dataTypes[col];
+        final Object data = columnData[col];
+        switch (dataType) {
+          case BOOLEAN:
+            tablet.addValue(measurementNames[col], rowIndex, ((boolean[]) 
data)[row]);
+            break;
+          case INT32:
+            tablet.addValue(measurementNames[col], rowIndex, ((int[]) 
data)[row]);
+            break;
+          case DATE:
+            tablet.addValue(measurementNames[col], rowIndex, ((LocalDate[]) 
data)[row]);
+            break;
+          case INT64:
+          case TIMESTAMP:
+            tablet.addValue(measurementNames[col], rowIndex, ((long[]) 
data)[row]);
+            break;
+          case FLOAT:
+            tablet.addValue(measurementNames[col], rowIndex, ((float[]) 
data)[row]);
+            break;
+          case DOUBLE:
+            tablet.addValue(measurementNames[col], rowIndex, ((double[]) 
data)[row]);
+            break;
+          case TEXT:
+          case STRING:
+          case BLOB:
+            tablet.addValue(measurementNames[col], rowIndex, ((Binary[]) 
data)[row]);
+            break;
+        }
+      }
+    }
+
+    return tablet;
+  }
+
+  /**
+   * Generate a Tablet for table model with all data types and specified 
number of columns and rows.
+   *
+   * @param tableName table name
+   * @param columnCount number of columns
+   * @param rowCount number of rows
+   * @return Tablet with test data
+   */
+  private Tablet generateTableModelTablet(
+      final String tableName, final int columnCount, final int rowCount) {
+    final List<IMeasurementSchema> schemaList = new ArrayList<>();
+    final TSDataType[] dataTypes = new TSDataType[columnCount];
+    final String[] measurementNames = new String[columnCount];
+    final List<ColumnCategory> columnTypes = new ArrayList<>();
+    final Object[] columnData = new Object[columnCount];
+
+    // Create schemas and generate test data
+    for (int col = 0; col < columnCount; col++) {
+      final TSDataType dataType = ALL_DATA_TYPES[col % ALL_DATA_TYPES.length];
+      final String measurementName = "col_" + col + "_" + dataType.name();
+      schemaList.add(new MeasurementSchema(measurementName, dataType));
+      dataTypes[col] = dataType;
+      measurementNames[col] = measurementName;
+      // For table model, all columns are FIELD (can be TAG/ATTRIBUTE/FIELD, 
but we use FIELD for
+      // simplicity)
+      columnTypes.add(ColumnCategory.FIELD);
+
+      // Generate test data for this column
+      switch (dataType) {
+        case BOOLEAN:
+          final boolean[] boolValues = new boolean[rowCount];
+          for (int row = 0; row < rowCount; row++) {
+            boolValues[row] = (row + col) % 2 == 0;
+          }
+          columnData[col] = boolValues;
+          break;
+        case INT32:
+          final int[] intValues = new int[rowCount];
+          for (int row = 0; row < rowCount; row++) {
+            intValues[row] = row * 100 + col;
+          }
+          columnData[col] = intValues;
+          break;
+        case DATE:
+          final LocalDate[] dateValues = new LocalDate[rowCount];
+          for (int row = 0; row < rowCount; row++) {
+            // Generate valid dates starting from 2024-01-01
+            dateValues[row] = LocalDate.of(2024, 1, 1).plusDays((row + col) % 
365);
+          }
+          columnData[col] = dateValues;
+          break;
+        case INT64:
+        case TIMESTAMP:
+          final long[] longValues = new long[rowCount];
+          for (int row = 0; row < rowCount; row++) {
+            longValues[row] = (long) row * 1000L + col;
+          }
+          columnData[col] = longValues;
+          break;
+        case FLOAT:
+          final float[] floatValues = new float[rowCount];
+          for (int row = 0; row < rowCount; row++) {
+            floatValues[row] = row * 1.5f + col * 0.1f;
+          }
+          columnData[col] = floatValues;
+          break;
+        case DOUBLE:
+          final double[] doubleValues = new double[rowCount];
+          for (int row = 0; row < rowCount; row++) {
+            doubleValues[row] = row * 2.5 + col * 0.01;
+          }
+          columnData[col] = doubleValues;
+          break;
+        case TEXT:
+        case STRING:
+        case BLOB:
+          final Binary[] binaryValues = new Binary[rowCount];
+          for (int row = 0; row < rowCount; row++) {
+            binaryValues[row] = new Binary(("value_row_" + row + "_col_" + 
col).getBytes());
+          }
+          columnData[col] = binaryValues;
+          break;
+        default:
+          throw new IllegalArgumentException("Unsupported data type: " + 
dataType);
+      }
+    }
+
+    // Create tablet using table model constructor: Tablet(String, 
List<String>, List<TSDataType>,
+    // List<ColumnCategory>, int)
+    final List<String> measurementNameList = 
IMeasurementSchema.getMeasurementNameList(schemaList);
+    final List<TSDataType> dataTypeList = 
IMeasurementSchema.getDataTypeList(schemaList);
+    final Tablet tablet =
+        new Tablet(tableName, measurementNameList, dataTypeList, columnTypes, 
rowCount);
+    tablet.initBitMaps();
+
+    // Fill tablet with data
+    final long[] times = new long[rowCount];
+    for (int row = 0; row < rowCount; row++) {
+      times[row] = row * 1000L;
+      final int rowIndex = tablet.getRowSize();
+      tablet.addTimestamp(rowIndex, times[row]);
+      for (int col = 0; col < columnCount; col++) {
+        final TSDataType dataType = dataTypes[col];
+        final Object data = columnData[col];
+        switch (dataType) {
+          case BOOLEAN:
+            tablet.addValue(measurementNames[col], rowIndex, ((boolean[]) 
data)[row]);
+            break;
+          case INT32:
+            tablet.addValue(measurementNames[col], rowIndex, ((int[]) 
data)[row]);
+            break;
+          case DATE:
+            tablet.addValue(measurementNames[col], rowIndex, ((LocalDate[]) 
data)[row]);
+            break;
+          case INT64:
+          case TIMESTAMP:
+            tablet.addValue(measurementNames[col], rowIndex, ((long[]) 
data)[row]);
+            break;
+          case FLOAT:
+            tablet.addValue(measurementNames[col], rowIndex, ((float[]) 
data)[row]);
+            break;
+          case DOUBLE:
+            tablet.addValue(measurementNames[col], rowIndex, ((double[]) 
data)[row]);
+            break;
+          case TEXT:
+          case STRING:
+          case BLOB:
+            tablet.addValue(measurementNames[col], rowIndex, ((Binary[]) 
data)[row]);
+            break;
+        }
+      }
+      tablet.setRowSize(rowIndex + 1);
+    }
+
+    return tablet;
+  }
+
+  /**
+   * Check if two Tablets are equal in all aspects.
+   *
+   * @param expected expected Tablet
+   * @param actual actual Tablet
+   */
+  private void assertTabletsEqual(final Tablet expected, final Tablet actual) {
+    Assert.assertEquals(expected.getDeviceId(), actual.getDeviceId());
+    Assert.assertEquals(expected.getRowSize(), actual.getRowSize());
+    Assert.assertEquals(expected.getSchemas().size(), 
actual.getSchemas().size());
+
+    // Verify timestamps
+    final long[] expectedTimes = expected.getTimestamps();
+    final long[] actualTimes = actual.getTimestamps();
+    Assert.assertArrayEquals(expectedTimes, actualTimes);
+
+    // Verify each column
+    final int columnCount = expected.getSchemas().size();
+    final int rowCount = expected.getRowSize();
+    final Object[] expectedValues = expected.getValues();
+    final Object[] actualValues = actual.getValues();
+
+    for (int col = 0; col < columnCount; col++) {
+      final IMeasurementSchema schema = expected.getSchemas().get(col);
+      final TSDataType dataType = schema.getType();
+      final Object expectedColumn = expectedValues[col];
+      final Object actualColumn = actualValues[col];
+
+      Assert.assertNotNull(actualColumn);
+
+      // Verify each row in this column
+      for (int row = 0; row < rowCount; row++) {
+        switch (dataType) {
+          case BOOLEAN:
+            final boolean expectedBool = ((boolean[]) expectedColumn)[row];
+            final boolean actualBool = ((boolean[]) actualColumn)[row];
+            Assert.assertEquals(expectedBool, actualBool);
+            break;
+          case INT32:
+            final int expectedInt = ((int[]) expectedColumn)[row];
+            final int actualInt = ((int[]) actualColumn)[row];
+            Assert.assertEquals(expectedInt, actualInt);
+            break;
+          case DATE:
+            final LocalDate expectedDate = ((LocalDate[]) expectedColumn)[row];
+            final LocalDate actualDate = ((LocalDate[]) actualColumn)[row];
+            Assert.assertEquals(expectedDate, actualDate);
+            break;
+          case INT64:
+          case TIMESTAMP:
+            final long expectedLong = ((long[]) expectedColumn)[row];
+            final long actualLong = ((long[]) actualColumn)[row];
+            Assert.assertEquals(expectedLong, actualLong);
+            break;
+          case FLOAT:
+            final float expectedFloat = ((float[]) expectedColumn)[row];
+            final float actualFloat = ((float[]) actualColumn)[row];
+            Assert.assertEquals(expectedFloat, actualFloat, 0.0001f);
+            break;
+          case DOUBLE:
+            final double expectedDouble = ((double[]) expectedColumn)[row];
+            final double actualDouble = ((double[]) actualColumn)[row];
+            Assert.assertEquals(expectedDouble, actualDouble, 0.0001);
+            break;
+          case TEXT:
+          case STRING:
+          case BLOB:
+            final Binary expectedBinary = ((Binary[]) expectedColumn)[row];
+            final Binary actualBinary = ((Binary[]) actualColumn)[row];
+            Assert.assertNotNull(actualBinary);
+            Assert.assertEquals(expectedBinary, actualBinary);
+            break;
+        }
+      }
+    }
+  }
+
+  /**
+   * Check if a Tablet and an InsertTabletStatement contain the same data.
+   *
+   * @param tablet Tablet
+   * @param statement InsertTabletStatement
+   */
+  private void assertTabletAndStatementEqual(
+      final Tablet tablet, final InsertTabletStatement statement) {
+    Assert.assertEquals(
+        tablet.getDeviceId(),
+        statement.getDevicePath() != null ? 
statement.getDevicePath().getFullPath() : null);
+    Assert.assertEquals(tablet.getRowSize(), statement.getRowCount());
+    Assert.assertEquals(tablet.getSchemas().size(), 
statement.getMeasurements().length);
+
+    // Verify timestamps
+    Assert.assertArrayEquals(tablet.getTimestamps(), statement.getTimes());
+
+    // Verify each column
+    final int columnCount = tablet.getSchemas().size();
+    final int rowCount = tablet.getRowSize();
+    final Object[] tabletValues = tablet.getValues();
+    final Object[] statementColumns = statement.getColumns();
+
+    for (int col = 0; col < columnCount; col++) {
+      final TSDataType dataType = tablet.getSchemas().get(col).getType();
+      final Object tabletColumn = tabletValues[col];
+      final Object statementColumn = statementColumns[col];
+
+      Assert.assertNotNull(statementColumn);
+
+      // Verify each row
+      for (int row = 0; row < rowCount; row++) {
+        switch (dataType) {
+          case BOOLEAN:
+            final boolean tabletBool = ((boolean[]) tabletColumn)[row];
+            final boolean statementBool = ((boolean[]) statementColumn)[row];
+            Assert.assertEquals(tabletBool, statementBool);
+            break;
+          case INT32:
+            final int tabletInt = ((int[]) tabletColumn)[row];
+            final int statementInt = ((int[]) statementColumn)[row];
+            Assert.assertEquals(tabletInt, statementInt);
+            break;
+          case DATE:
+            // DATE type: Tablet uses LocalDate[], Statement uses int[] 
(YYYYMMDD format)
+            final LocalDate tabletDate = ((LocalDate[]) tabletColumn)[row];
+            final int statementDateInt = ((int[]) statementColumn)[row];
+            // Convert LocalDate to int (YYYYMMDD format) for comparison
+            final int tabletDateInt = 
DateUtils.parseDateExpressionToInt(tabletDate);
+            Assert.assertEquals(tabletDateInt, statementDateInt);
+            break;
+          case INT64:
+          case TIMESTAMP:
+            final long tabletLong = ((long[]) tabletColumn)[row];
+            final long statementLong = ((long[]) statementColumn)[row];
+            Assert.assertEquals(tabletLong, statementLong);
+            break;
+          case FLOAT:
+            final float tabletFloat = ((float[]) tabletColumn)[row];
+            final float statementFloat = ((float[]) statementColumn)[row];
+            Assert.assertEquals(tabletFloat, statementFloat, 0.0001f);
+            break;
+          case DOUBLE:
+            final double tabletDouble = ((double[]) tabletColumn)[row];
+            final double statementDouble = ((double[]) statementColumn)[row];
+            Assert.assertEquals(tabletDouble, statementDouble, 0.0001);
+            break;
+          case TEXT:
+          case STRING:
+          case BLOB:
+            final Binary tabletBinary = ((Binary[]) tabletColumn)[row];
+            final Binary statementBinary = ((Binary[]) statementColumn)[row];
+            Assert.assertNotNull(statementBinary);
+            Assert.assertEquals(tabletBinary, statementBinary);
+            break;
+        }
+      }
+    }
+  }
+
+  // Define all supported data types
+  private static final TSDataType[] ALL_DATA_TYPES = {
+    TSDataType.BOOLEAN,
+    TSDataType.INT32,
+    TSDataType.INT64,
+    TSDataType.FLOAT,
+    TSDataType.DOUBLE,
+    TSDataType.TEXT,
+    TSDataType.TIMESTAMP,
+    TSDataType.DATE,
+    TSDataType.BLOB,
+    TSDataType.STRING
+  };
+}

Reply via email to