This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new e7bc4926df0 Support legacy pipe receiver requests (#17901)
e7bc4926df0 is described below

commit e7bc4926df0878337d61b7c19a00b72d48e55fe0
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jun 11 17:25:31 2026 +0800

    Support legacy pipe receiver requests (#17901)
    
    * Support legacy pipe snapshot seal parameters
    
    * Support legacy pipe tablet requests
    
    * Resolve legacy pipe tablet request conflicts
    
    * Add pipe receiver 1.3 compatibility tests
    
    * Try current raw tablet format first
    
    * Fix legacy raw tablet fallback
---
 .../receiver/protocol/IoTDBConfigNodeReceiver.java |   4 +-
 .../pipe/sink/PipeConfigNodeThriftRequestTest.java | 212 +++++++++
 .../protocol/thrift/IoTDBDataNodeReceiver.java     |   4 +-
 .../request/PipeTransferTabletRawReq.java          |  89 +++-
 .../pipe/sink/util/TabletStatementConverter.java   |  29 +-
 .../pipe/sink/PipeDataNodeThriftRequestTest.java   | 505 +++++++++++++++++++++
 .../thrift/request/PipeTransferFileSealReqV2.java  |   8 +
 .../common/PipeTransferSliceReqBuilderTest.java    |  32 ++
 .../thrift/request/PipeRequestTypeTest.java        |  61 +++
 .../request/PipeTransferCompressedReqTest.java     | 112 +++++
 .../request/PipeTransferFileSealReqV2Test.java     |  65 +++
 11 files changed, 1101 insertions(+), 20 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
index 7de8e8bf78d..5f414f2cd69 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
@@ -1277,7 +1277,7 @@ public class IoTDBConfigNodeReceiver extends 
IoTDBFileReceiver {
         PipeConfigRegionSnapshotEvent.getConfigPhysicalPlanTypeSet(
             parameters.get(ColumnHeaderConstant.TYPE));
     final boolean isTreeModelDataAllowedToBeCaptured =
-        parameters.containsKey(PipeTransferFileSealReqV2.TREE);
+        
PipeTransferFileSealReqV2.isTreeModelDataAllowedToBeCaptured(parameters);
     final TreePattern treePattern =
         TreePattern.parsePatternFromString(
             parameters.get(ColumnHeaderConstant.PATH_PATTERN),
@@ -1285,7 +1285,7 @@ public class IoTDBConfigNodeReceiver extends 
IoTDBFileReceiver {
             p -> new IoTDBTreePattern(isTreeModelDataAllowedToBeCaptured, p));
     final TablePattern tablePattern =
         new TablePattern(
-            parameters.containsKey(PipeTransferFileSealReqV2.TABLE),
+            
PipeTransferFileSealReqV2.isTableModelDataAllowedToBeCaptured(parameters),
             parameters.get(PipeTransferFileSealReqV2.DATABASE_PATTERN),
             parameters.get(ColumnHeaderConstant.TABLE_NAME));
     final List<TSStatus> results = new ArrayList<>();
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/sink/PipeConfigNodeThriftRequestTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/sink/PipeConfigNodeThriftRequestTest.java
index 5c578ebead8..dd948a451cf 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/sink/PipeConfigNodeThriftRequestTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/sink/PipeConfigNodeThriftRequestTest.java
@@ -19,17 +19,33 @@
 
 package org.apache.iotdb.confignode.manager.pipe.sink;
 
+import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.common.PipeTransferHandshakeConstant;
+import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
+import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType;
+import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferFileSealReqV2;
+import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant;
 import org.apache.iotdb.confignode.consensus.request.write.cq.ActiveCQPlan;
 import 
org.apache.iotdb.confignode.manager.pipe.sink.payload.PipeTransferConfigNodeHandshakeV1Req;
+import 
org.apache.iotdb.confignode.manager.pipe.sink.payload.PipeTransferConfigNodeHandshakeV2Req;
 import 
org.apache.iotdb.confignode.manager.pipe.sink.payload.PipeTransferConfigPlanReq;
 import 
org.apache.iotdb.confignode.manager.pipe.sink.payload.PipeTransferConfigSnapshotPieceReq;
 import 
org.apache.iotdb.confignode.manager.pipe.sink.payload.PipeTransferConfigSnapshotSealReq;
 import org.apache.iotdb.confignode.persistence.schema.CNSnapshotFileType;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.PublicBAOS;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.DataOutputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 public class PipeConfigNodeThriftRequestTest {
 
@@ -48,6 +64,59 @@ public class PipeConfigNodeThriftRequestTest {
     Assert.assertEquals(req.getTimestampPrecision(), 
deserializeReq.getTimestampPrecision());
   }
 
+  @Test
+  public void testPipeTransferConfigHandshakeReqFromLegacyV13Body() throws 
IOException {
+    final TPipeTransferReq req =
+        legacyTransferReq(
+            PipeRequestType.HANDSHAKE_CONFIGNODE_V1,
+            serializeLegacyHandshakeV1Body(TIME_PRECISION));
+
+    final PipeTransferConfigNodeHandshakeV1Req deserializeReq =
+        PipeTransferConfigNodeHandshakeV1Req.fromTPipeTransferReq(req);
+
+    Assert.assertEquals(req.getVersion(), deserializeReq.getVersion());
+    Assert.assertEquals(req.getType(), deserializeReq.getType());
+    Assert.assertEquals(TIME_PRECISION, 
deserializeReq.getTimestampPrecision());
+  }
+
+  @Test
+  public void testPipeTransferConfigHandshakeV2Req() throws IOException {
+    final Map<String, String> params = new HashMap<>();
+    params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION, 
TIME_PRECISION);
+    params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_CLUSTER_ID, 
"cluster");
+    params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME, "root");
+    params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD, "root");
+
+    final PipeTransferConfigNodeHandshakeV2Req req =
+        PipeTransferConfigNodeHandshakeV2Req.toTPipeTransferReq(params);
+    final PipeTransferConfigNodeHandshakeV2Req deserializeReq =
+        PipeTransferConfigNodeHandshakeV2Req.fromTPipeTransferReq(req);
+
+    Assert.assertEquals(req.getVersion(), deserializeReq.getVersion());
+    Assert.assertEquals(req.getType(), deserializeReq.getType());
+    Assert.assertEquals(params, deserializeReq.getParams());
+  }
+
+  @Test
+  public void testPipeTransferConfigHandshakeV2ReqFromLegacyV13Body() throws 
IOException {
+    final Map<String, String> params = new HashMap<>();
+    params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION, 
TIME_PRECISION);
+    params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_CLUSTER_ID, 
"cluster");
+    params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME, "root");
+    params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD, "root");
+
+    final TPipeTransferReq req =
+        legacyTransferReq(
+            PipeRequestType.HANDSHAKE_CONFIGNODE_V2, 
serializeLegacyHandshakeV2Body(params));
+
+    final PipeTransferConfigNodeHandshakeV2Req deserializeReq =
+        PipeTransferConfigNodeHandshakeV2Req.fromTPipeTransferReq(req);
+
+    Assert.assertEquals(req.getVersion(), deserializeReq.getVersion());
+    Assert.assertEquals(req.getType(), deserializeReq.getType());
+    Assert.assertEquals(params, deserializeReq.getParams());
+  }
+
   @Test
   public void testPipeTransferConfigPlanReq() {
     PipeTransferConfigPlanReq req =
@@ -56,6 +125,22 @@ public class PipeConfigNodeThriftRequestTest {
 
     Assert.assertEquals(req.getVersion(), deserializeReq.getVersion());
     Assert.assertEquals(req.getType(), deserializeReq.getType());
+    Assert.assertArrayEquals(req.getBody(), deserializeReq.getBody());
+  }
+
+  @Test
+  public void testPipeTransferConfigPlanReqFromLegacyV13Body() {
+    final ActiveCQPlan plan = new ActiveCQPlan("cqId", "md5");
+    final ByteBuffer legacyBody = plan.serializeToByteBuffer();
+    final TPipeTransferReq req =
+        legacyTransferReq(PipeRequestType.TRANSFER_CONFIG_PLAN, legacyBody);
+
+    final PipeTransferConfigPlanReq deserializeReq =
+        PipeTransferConfigPlanReq.fromTPipeTransferReq(req);
+
+    Assert.assertEquals(req.getVersion(), deserializeReq.getVersion());
+    Assert.assertEquals(req.getType(), deserializeReq.getType());
+    Assert.assertArrayEquals(byteBufferToByteArray(legacyBody), 
deserializeReq.getBody());
   }
 
   @Test
@@ -76,6 +161,24 @@ public class PipeConfigNodeThriftRequestTest {
     Assert.assertArrayEquals(req.getFilePiece(), 
deserializeReq.getFilePiece());
   }
 
+  @Test
+  public void testPipeTransferConfigSnapshotPieceReqFromLegacyV13Body() throws 
IOException {
+    final byte[] body = "legacyConfigSnapshotPiece".getBytes();
+    final TPipeTransferReq req =
+        legacyTransferReq(
+            PipeRequestType.TRANSFER_CONFIG_SNAPSHOT_PIECE,
+            serializeLegacyFilePieceBody("cluster_schema.bin", 10L, body));
+
+    final PipeTransferConfigSnapshotPieceReq deserializeReq =
+        PipeTransferConfigSnapshotPieceReq.fromTPipeTransferReq(req);
+
+    Assert.assertEquals(req.getVersion(), deserializeReq.getVersion());
+    Assert.assertEquals(req.getType(), deserializeReq.getType());
+    Assert.assertEquals("cluster_schema.bin", deserializeReq.getFileName());
+    Assert.assertEquals(10L, deserializeReq.getStartWritingOffset());
+    Assert.assertArrayEquals(body, deserializeReq.getFilePiece());
+  }
+
   @Test
   public void testPipeTransferConfigSnapshotSealReq() throws IOException {
     String snapshotName = "cluster_schema.bin";
@@ -108,4 +211,113 @@ public class PipeConfigNodeThriftRequestTest {
     Assert.assertEquals(req.getFileLengths(), deserializeReq.getFileLengths());
     Assert.assertEquals(req.getParameters(), deserializeReq.getParameters());
   }
+
+  @Test
+  public void testPipeTransferConfigSnapshotSealReqFromLegacyV13Body() throws 
IOException {
+    final String snapshotName = "cluster_schema.bin";
+    final String templateInfoName = "template_info.bin";
+    final Map<String, String> parameters = new HashMap<>();
+    parameters.put(ColumnHeaderConstant.PATH_PATTERN, "root.**");
+    parameters.put(
+        PipeTransferConfigSnapshotSealReq.FILE_TYPE,
+        Byte.toString(CNSnapshotFileType.SCHEMA.getType()));
+    parameters.put(ColumnHeaderConstant.TYPE, "200");
+
+    final TPipeTransferReq req =
+        legacyTransferReq(
+            PipeRequestType.TRANSFER_CONFIG_SNAPSHOT_SEAL,
+            serializeLegacyFileSealV2Body(
+                Arrays.asList(snapshotName, templateInfoName),
+                Arrays.asList(100L, 10L),
+                parameters));
+    final PipeTransferConfigSnapshotSealReq deserializeReq =
+        PipeTransferConfigSnapshotSealReq.fromTPipeTransferReq(req);
+
+    Assert.assertEquals(req.getVersion(), deserializeReq.getVersion());
+    Assert.assertEquals(req.getType(), deserializeReq.getType());
+    Assert.assertEquals(
+        Arrays.asList(snapshotName, templateInfoName), 
deserializeReq.getFileNames());
+    Assert.assertEquals(Arrays.asList(100L, 10L), 
deserializeReq.getFileLengths());
+    Assert.assertEquals(parameters, deserializeReq.getParameters());
+    Assert.assertTrue(
+        PipeTransferFileSealReqV2.isTreeModelDataAllowedToBeCaptured(
+            deserializeReq.getParameters()));
+    Assert.assertFalse(
+        PipeTransferFileSealReqV2.isTableModelDataAllowedToBeCaptured(
+            deserializeReq.getParameters()));
+  }
+
+  private static TPipeTransferReq legacyTransferReq(
+      final PipeRequestType requestType, final ByteBuffer body) {
+    final TPipeTransferReq req = new TPipeTransferReq();
+    req.version = IoTDBSinkRequestVersion.VERSION_1.getVersion();
+    req.type = requestType.getType();
+    req.body = body;
+    return req;
+  }
+
+  private static byte[] byteBufferToByteArray(final ByteBuffer byteBuffer) {
+    final ByteBuffer duplicatedBuffer = byteBuffer.duplicate();
+    final byte[] bytes = new byte[duplicatedBuffer.remaining()];
+    duplicatedBuffer.get(bytes);
+    return bytes;
+  }
+
+  private static ByteBuffer serializeLegacyFileSealV2Body(
+      final List<String> fileNames,
+      final List<Long> fileLengths,
+      final Map<String, String> parameters)
+      throws IOException {
+    try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      ReadWriteIOUtils.write(fileNames.size(), outputStream);
+      for (final String fileName : fileNames) {
+        ReadWriteIOUtils.write(fileName, outputStream);
+      }
+      ReadWriteIOUtils.write(fileLengths.size(), outputStream);
+      for (final Long fileLength : fileLengths) {
+        ReadWriteIOUtils.write(fileLength, outputStream);
+      }
+      ReadWriteIOUtils.write(parameters.size(), outputStream);
+      for (final Map.Entry<String, String> entry : parameters.entrySet()) {
+        ReadWriteIOUtils.write(entry.getKey(), outputStream);
+        ReadWriteIOUtils.write(entry.getValue(), outputStream);
+      }
+      return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
+    }
+  }
+
+  private static ByteBuffer serializeLegacyHandshakeV1Body(final String 
timestampPrecision)
+      throws IOException {
+    try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      ReadWriteIOUtils.write(timestampPrecision, outputStream);
+      return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
+    }
+  }
+
+  private static ByteBuffer serializeLegacyHandshakeV2Body(final Map<String, 
String> params)
+      throws IOException {
+    try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      ReadWriteIOUtils.write(params.size(), outputStream);
+      for (final Map.Entry<String, String> entry : params.entrySet()) {
+        ReadWriteIOUtils.write(entry.getKey(), outputStream);
+        ReadWriteIOUtils.write(entry.getValue(), outputStream);
+      }
+      return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
+    }
+  }
+
+  private static ByteBuffer serializeLegacyFilePieceBody(
+      final String fileName, final long startWritingOffset, final byte[] 
filePiece)
+      throws IOException {
+    try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      ReadWriteIOUtils.write(fileName, outputStream);
+      ReadWriteIOUtils.write(startWritingOffset, outputStream);
+      ReadWriteIOUtils.write(new Binary(filePiece), outputStream);
+      return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index f4cb3b281e9..ba7f5f7e03e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -641,7 +641,7 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
         PipeSchemaRegionSnapshotEvent.getStatementTypeSet(
             parameters.get(ColumnHeaderConstant.TYPE));
     final boolean isTreeModelDataAllowedToBeCaptured =
-        parameters.containsKey(PipeTransferFileSealReqV2.TREE);
+        
PipeTransferFileSealReqV2.isTreeModelDataAllowedToBeCaptured(parameters);
     final TreePattern treePattern =
         TreePattern.parsePatternFromString(
             parameters.get(ColumnHeaderConstant.PATH_PATTERN),
@@ -649,7 +649,7 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
             p -> new IoTDBTreePattern(isTreeModelDataAllowedToBeCaptured, p));
     final TablePattern tablePattern =
         new TablePattern(
-            parameters.containsKey(PipeTransferFileSealReqV2.TABLE),
+            
PipeTransferFileSealReqV2.isTableModelDataAllowedToBeCaptured(parameters),
             parameters.get(PipeTransferFileSealReqV2.DATABASE_PATTERN),
             parameters.get(ColumnHeaderConstant.TABLE_NAME));
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java
index 98ea83b6d76..63db24a37ba 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java
@@ -30,6 +30,7 @@ import 
org.apache.iotdb.db.pipe.sink.util.sorter.PipeTreeModelTabletEventSorter;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 
+import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.utils.PublicBAOS;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
 import org.apache.tsfile.write.record.Tablet;
@@ -118,19 +119,7 @@ public class PipeTransferTabletRawReq extends 
TPipeTransferReq {
       final ByteBuffer buffer, final TabletStringInternPool 
tabletStringInternPool) {
     final PipeTransferTabletRawReq tabletReq = new PipeTransferTabletRawReq();
 
-    final int startPosition = buffer.position();
-    try {
-      final InsertTabletStatement insertTabletStatement =
-          TabletStatementConverter.deserializeStatementFromTabletFormat(
-              buffer, false, tabletStringInternPool);
-      tabletReq.isAligned = insertTabletStatement.isAligned();
-      tabletReq.statement = insertTabletStatement;
-    } catch (final Exception e) {
-      buffer.position(startPosition);
-      tabletReq.tablet =
-          PipeTabletUtils.internTablet(Tablet.deserialize(buffer), 
tabletStringInternPool);
-      tabletReq.isAligned = ReadWriteIOUtils.readBool(buffer);
-    }
+    tabletReq.deserializeTPipeTransferRawReq(buffer, tabletStringInternPool);
 
     return tabletReq;
   }
@@ -167,6 +156,80 @@ public class PipeTransferTabletRawReq extends 
TPipeTransferReq {
     return tabletReq;
   }
 
+  private void deserializeTPipeTransferRawReq(
+      final ByteBuffer buffer, final TabletStringInternPool 
tabletStringInternPool) {
+    final int startPosition = buffer.position();
+    try {
+      // Current V1 raw tablet requests can be converted to 
InsertTabletStatement directly. Keep
+      // this as the first attempt to avoid the overhead of constructing an 
intermediate Tablet.
+      final InsertTabletStatement insertTabletStatement =
+          TabletStatementConverter.deserializeStatementFromTabletFormat(
+              buffer, false, tabletStringInternPool);
+      // Legacy tablets do not serialize column categories. Since hasSchema=1 
can be
+      // misread as FIELD, the current reader may return a corrupt statement 
instead of failing.
+      
ensureStatementDeserializedFromCurrentTabletFormat(insertTabletStatement);
+      isAligned = insertTabletStatement.isAligned();
+      statement = insertTabletStatement;
+      return;
+    } catch (final Exception e) {
+      buffer.position(startPosition);
+    }
+
+    try {
+      // Some old senders serialize Tablet without column categories. Retry 
with the legacy reader
+      // before falling back to the full Tablet deserialization path.
+      final InsertTabletStatement insertTabletStatement =
+          TabletStatementConverter.deserializeLegacyStatementFromTabletFormat(
+              buffer, tabletStringInternPool);
+      isAligned = insertTabletStatement.isAligned();
+      statement = insertTabletStatement;
+      return;
+    } catch (final Exception e) {
+      buffer.position(startPosition);
+    }
+
+    tablet = PipeTabletUtils.internTablet(Tablet.deserialize(buffer), 
tabletStringInternPool);
+    isAligned = ReadWriteIOUtils.readBool(buffer);
+  }
+
+  private static void ensureStatementDeserializedFromCurrentTabletFormat(
+      final InsertTabletStatement statement) {
+    final String[] measurements = statement.getMeasurements();
+    final TSDataType[] dataTypes = statement.getDataTypes();
+
+    if (Objects.isNull(measurements)
+        || Objects.isNull(dataTypes)
+        || measurements.length != dataTypes.length) {
+      throw new IllegalArgumentException(
+          "Incomplete schema in current tablet format deserialization.");
+    }
+
+    final Object[] columns = statement.getColumns();
+    if (Objects.nonNull(columns) && columns.length != measurements.length) {
+      throw new IllegalArgumentException(
+          "Column count is inconsistent with schema count in current tablet 
format deserialization.");
+    }
+
+    for (int i = 0; i < measurements.length; ++i) {
+      if (Objects.isNull(measurements[i]) || Objects.isNull(dataTypes[i])) {
+        throw new IllegalArgumentException(
+            "Incomplete measurement schema in current tablet format 
deserialization.");
+      }
+      if (statement.getRowCount() > 0 && (Objects.isNull(columns) || 
Objects.isNull(columns[i]))) {
+        throw new IllegalArgumentException(
+            "Incomplete column values in current tablet format 
deserialization.");
+      }
+    }
+
+    final long[] times = statement.getTimes();
+    if (statement.getRowCount() > 0
+        && measurements.length > 0
+        && (Objects.isNull(times) || times.length < statement.getRowCount())) {
+      throw new IllegalArgumentException(
+          "Incomplete timestamps in current tablet format deserialization.");
+    }
+  }
+
   /////////////////////////////// Air Gap ///////////////////////////////
 
   /**
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java
index d7be9f548f5..405859dd982 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java
@@ -86,6 +86,27 @@ public class TabletStatementConverter {
       final boolean readDatabaseName,
       final TabletStringInternPool tabletStringInternPool)
       throws IllegalPathException {
+    return deserializeStatementFromTabletFormat(
+        byteBuffer, readDatabaseName, tabletStringInternPool, true);
+  }
+
+  public static InsertTabletStatement 
deserializeLegacyStatementFromTabletFormat(
+      final ByteBuffer byteBuffer) throws IllegalPathException {
+    return deserializeLegacyStatementFromTabletFormat(byteBuffer, null);
+  }
+
+  public static InsertTabletStatement 
deserializeLegacyStatementFromTabletFormat(
+      final ByteBuffer byteBuffer, final TabletStringInternPool 
tabletStringInternPool)
+      throws IllegalPathException {
+    return deserializeStatementFromTabletFormat(byteBuffer, false, 
tabletStringInternPool, false);
+  }
+
+  private static InsertTabletStatement deserializeStatementFromTabletFormat(
+      final ByteBuffer byteBuffer,
+      final boolean readDatabaseName,
+      final TabletStringInternPool tabletStringInternPool,
+      final boolean readColumnCategory)
+      throws IllegalPathException {
     final InsertTabletStatement statement = new InsertTabletStatement();
 
     // Calculate memory size during deserialization, use INSTANCE_SIZE constant
@@ -132,9 +153,11 @@ public class TabletStatementConverter {
         final Pair<String, TSDataType> pair = readMeasurement(byteBuffer, 
tabletStringInternPool);
         measurement[i] = pair.getLeft();
         dataTypes[i] = pair.getRight();
-        columnCategories[i] =
-            TsTableColumnCategory.fromTsFileColumnCategory(
-                ColumnCategory.values()[byteBuffer.get()]);
+        if (readColumnCategory) {
+          columnCategories[i] =
+              TsTableColumnCategory.fromTsFileColumnCategory(
+                  ColumnCategory.values()[byteBuffer.get()]);
+        }
 
         // Calculate memory for each measurement string
         if (measurement[i] != null) {
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
index 10573e5609d..88f7353f519 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
@@ -20,12 +20,18 @@
 package org.apache.iotdb.db.pipe.sink;
 
 import org.apache.iotdb.commons.path.PartialPath;
+import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.common.PipeTransferHandshakeConstant;
+import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
+import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType;
+import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferFileSealReqV2;
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.response.PipeTransferFilePieceResp;
 import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.commons.schema.SchemaConstant;
+import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant;
 import 
org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.CombineRequest;
 import org.apache.iotdb.db.pipe.processor.twostage.state.CountState;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferDataNodeHandshakeV1Req;
+import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferDataNodeHandshakeV2Req;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferPlanNodeReq;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferSchemaSnapshotPieceReq;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferSchemaSnapshotSealReq;
@@ -40,6 +46,7 @@ import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTable
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTsFilePieceReq;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTsFilePieceWithModReq;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTsFileSealReq;
+import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTsFileSealWithModReq;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.CreateAlignedTimeSeriesNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
 import org.apache.iotdb.db.queryengine.plan.statement.Statement;
@@ -69,9 +76,12 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.time.LocalDate;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 public class PipeDataNodeThriftRequestTest {
@@ -146,6 +156,61 @@ public class PipeDataNodeThriftRequestTest {
     Assert.assertEquals(req.getTimestampPrecision(), 
deserializeReq.getTimestampPrecision());
   }
 
+  @Test
+  public void testPipeTransferDataNodeHandshakeReqFromLegacyV13Body() throws 
IOException {
+    final TPipeTransferReq req =
+        legacyTransferReq(
+            PipeRequestType.HANDSHAKE_DATANODE_V1, 
serializeLegacyHandshakeV1Body(TIME_PRECISION));
+
+    final PipeTransferDataNodeHandshakeV1Req deserializeReq =
+        PipeTransferDataNodeHandshakeV1Req.fromTPipeTransferReq(req);
+
+    Assert.assertEquals(req.getVersion(), deserializeReq.getVersion());
+    Assert.assertEquals(req.getType(), deserializeReq.getType());
+    Assert.assertEquals(TIME_PRECISION, 
deserializeReq.getTimestampPrecision());
+  }
+
+  @Test
+  public void testPipeTransferDataNodeHandshakeV2Req() throws IOException {
+    final Map<String, String> params = new HashMap<>();
+    params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION, 
TIME_PRECISION);
+    params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_CLUSTER_ID, 
"cluster");
+    params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME, "root");
+    params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD, "root");
+    
params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY, 
"async");
+    params.put(
+        PipeTransferHandshakeConstant.HANDSHAKE_KEY_VALIDATE_TSFILE, 
Boolean.TRUE.toString());
+
+    final PipeTransferDataNodeHandshakeV2Req req =
+        PipeTransferDataNodeHandshakeV2Req.toTPipeTransferReq(params);
+    final PipeTransferDataNodeHandshakeV2Req deserializeReq =
+        PipeTransferDataNodeHandshakeV2Req.fromTPipeTransferReq(req);
+
+    Assert.assertEquals(req.getVersion(), deserializeReq.getVersion());
+    Assert.assertEquals(req.getType(), deserializeReq.getType());
+    Assert.assertEquals(params, deserializeReq.getParams());
+  }
+
+  @Test
+  public void testPipeTransferDataNodeHandshakeV2ReqFromLegacyV13Body() throws 
IOException {
+    final Map<String, String> params = new HashMap<>();
+    params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION, 
TIME_PRECISION);
+    params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_CLUSTER_ID, 
"cluster");
+    params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME, "root");
+    params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD, "root");
+
+    final TPipeTransferReq req =
+        legacyTransferReq(
+            PipeRequestType.HANDSHAKE_DATANODE_V2, 
serializeLegacyHandshakeV2Body(params));
+
+    final PipeTransferDataNodeHandshakeV2Req deserializeReq =
+        PipeTransferDataNodeHandshakeV2Req.fromTPipeTransferReq(req);
+
+    Assert.assertEquals(req.getVersion(), deserializeReq.getVersion());
+    Assert.assertEquals(req.getType(), deserializeReq.getType());
+    Assert.assertEquals(params, deserializeReq.getParams());
+  }
+
   @Test
   public void testPipeTransferInsertNodeReq() {
     final PipeTransferTabletInsertNodeReq req =
@@ -173,6 +238,34 @@ public class PipeDataNodeThriftRequestTest {
     Assert.assertEquals(statement.getPaths(), paths);
   }
 
+  @Test
+  public void testPipeTransferInsertNodeReqFromLegacyV13Body() {
+    final InsertRowNode node =
+        new InsertRowNode(
+            new PlanNodeId(""),
+            new PartialPath(new String[] {"root", "sg", "d"}),
+            false,
+            new String[] {"s"},
+            new TSDataType[] {TSDataType.INT32},
+            1,
+            new Object[] {1},
+            false);
+    final TPipeTransferReq req =
+        legacyTransferReq(
+            PipeRequestType.TRANSFER_TABLET_INSERT_NODE, 
node.serializeToByteBuffer());
+
+    final PipeTransferTabletInsertNodeReq deserializeReq =
+        PipeTransferTabletInsertNodeReq.fromTPipeTransferReq(req);
+
+    Assert.assertEquals(req.getVersion(), deserializeReq.getVersion());
+    Assert.assertEquals(req.getType(), deserializeReq.getType());
+    Assert.assertEquals(node, deserializeReq.getInsertNode());
+
+    final List<PartialPath> paths = new ArrayList<>();
+    paths.add(new PartialPath(new String[] {"root", "sg", "d", "s"}));
+    Assert.assertEquals(paths, deserializeReq.constructStatement().getPaths());
+  }
+
   @Test
   public void testPipeTransferInsertNodeReqV2() {
     final PipeTransferTabletInsertNodeReqV2 req =
@@ -244,6 +337,23 @@ public class PipeDataNodeThriftRequestTest {
 
     Assert.assertEquals(req.getVersion(), deserializeReq.getVersion());
     Assert.assertEquals(req.getType(), deserializeReq.getType());
+    Assert.assertArrayEquals(
+        new byte[] {'a', 'b'}, 
byteBufferToByteArray(deserializeReq.getByteBuffer()));
+  }
+
+  @Test
+  public void testPipeTransferTabletBinaryReqFromLegacyV13Body() {
+    // Not do real test here since "serializeToWal" needs private inner class 
of walBuffer
+    final TPipeTransferReq req =
+        legacyTransferReq(
+            PipeRequestType.TRANSFER_TABLET_BINARY, ByteBuffer.wrap(new byte[] 
{'a', 'b'}));
+    final PipeTransferTabletBinaryReq deserializeReq =
+        PipeTransferTabletBinaryReq.fromTPipeTransferReq(req);
+
+    Assert.assertEquals(req.getVersion(), deserializeReq.getVersion());
+    Assert.assertEquals(req.getType(), deserializeReq.getType());
+    Assert.assertArrayEquals(
+        new byte[] {'a', 'b'}, 
byteBufferToByteArray(deserializeReq.getByteBuffer()));
   }
 
   @Test
@@ -285,6 +395,30 @@ public class PipeDataNodeThriftRequestTest {
     Assert.assertEquals(req.getPlanNode(), deserializeReq.getPlanNode());
   }
 
+  @Test
+  public void testPipeTransferPlanNodeReqFromLegacyV13SchemaPlanBody() {
+    final CreateAlignedTimeSeriesNode node =
+        new CreateAlignedTimeSeriesNode(
+            new PlanNodeId(""),
+            new PartialPath(new String[] {"root", "sg", "d"}),
+            Collections.singletonList("s"),
+            Collections.singletonList(TSDataType.INT32),
+            Collections.singletonList(TSEncoding.PLAIN),
+            Collections.singletonList(CompressionType.UNCOMPRESSED),
+            null,
+            null,
+            null);
+    final TPipeTransferReq req =
+        legacyTransferReq(PipeRequestType.TRANSFER_PLAN_NODE, 
node.serializeToByteBuffer());
+
+    final PipeTransferPlanNodeReq deserializeReq =
+        PipeTransferPlanNodeReq.fromTPipeTransferReq(req);
+
+    Assert.assertEquals(req.getVersion(), deserializeReq.getVersion());
+    Assert.assertEquals(req.getType(), deserializeReq.getType());
+    Assert.assertEquals(node, deserializeReq.getPlanNode());
+  }
+
   @Test
   public void testPipeTransferTabletReq() {
     try {
@@ -518,6 +652,72 @@ public class PipeDataNodeThriftRequestTest {
         insertTabletStatements.get(1).getMeasurements()[0]);
   }
 
+  @Test
+  public void testPipeTransferTabletBatchReqWithLegacyTabletFormat() throws 
IOException {
+    final List<ByteBuffer> tabletBuffers = new ArrayList<>();
+    tabletBuffers.add(serializeLegacyTabletRawBuffer(false));
+    tabletBuffers.add(serializeLegacyTabletRawBuffer(true));
+
+    final PipeTransferTabletBatchReq req =
+        PipeTransferTabletBatchReq.toTPipeTransferReq(Collections.emptyList(), 
tabletBuffers);
+
+    final PipeTransferTabletBatchReq deserializedReq =
+        PipeTransferTabletBatchReq.fromTPipeTransferReq(req);
+
+    Assert.assertEquals(2, deserializedReq.getTabletReqs().size());
+    Assert.assertFalse(deserializedReq.getTabletReqs().get(0).getIsAligned());
+    Assert.assertTrue(deserializedReq.getTabletReqs().get(1).getIsAligned());
+
+    
assertLegacyTabletStatement(deserializedReq.getTabletReqs().get(0).constructStatement());
+    
assertLegacyTabletStatement(deserializedReq.getTabletReqs().get(1).constructStatement());
+  }
+
+  @Test
+  public void testPipeTransferTabletBatchReqFromLegacyV13Body() throws 
IOException {
+    final InsertRowNode node =
+        new InsertRowNode(
+            new PlanNodeId(""),
+            new PartialPath(new String[] {"root", "sg", "d"}),
+            false,
+            new String[] {"s"},
+            new TSDataType[] {TSDataType.INT32},
+            1,
+            new Object[] {1},
+            false);
+    final TPipeTransferReq req =
+        legacyTransferReq(
+            PipeRequestType.TRANSFER_TABLET_BATCH,
+            serializeLegacyTabletBatchBody(
+                Collections.singletonList(node.serializeToByteBuffer()),
+                
Collections.singletonList(serializeLegacyTabletRawBuffer(false))));
+
+    final PipeTransferTabletBatchReq deserializedReq =
+        PipeTransferTabletBatchReq.fromTPipeTransferReq(req);
+
+    Assert.assertEquals(req.getVersion(), deserializedReq.getVersion());
+    Assert.assertEquals(req.getType(), deserializedReq.getType());
+    Assert.assertEquals(1, deserializedReq.getInsertNodeReqs().size());
+    Assert.assertEquals(1, deserializedReq.getTabletReqs().size());
+    Assert.assertEquals(node, 
deserializedReq.getInsertNodeReqs().get(0).getInsertNode());
+    
assertLegacyTabletStatement(deserializedReq.getTabletReqs().get(0).constructStatement());
+  }
+
+  @Test
+  public void testPipeTransferTabletRawReqWithLegacyTabletFormat() throws 
IOException {
+    final TPipeTransferReq req = new TPipeTransferReq();
+    req.version = IoTDBSinkRequestVersion.VERSION_1.getVersion();
+    req.type = PipeRequestType.TRANSFER_TABLET_RAW.getType();
+    req.body = serializeLegacyTabletRawBuffer(true);
+
+    final PipeTransferTabletRawReq deserializedReq =
+        PipeTransferTabletRawReq.fromTPipeTransferReq(req);
+
+    Assert.assertEquals(req.getVersion(), deserializedReq.getVersion());
+    Assert.assertEquals(req.getType(), deserializedReq.getType());
+    Assert.assertTrue(deserializedReq.getIsAligned());
+    assertLegacyTabletStatement(deserializedReq.constructStatement());
+  }
+
   @Test
   public void testPipeTransferTabletBatchReqV2() throws IOException {
     final List<ByteBuffer> insertNodeBuffers = new ArrayList<>();
@@ -733,6 +933,38 @@ public class PipeDataNodeThriftRequestTest {
     Assert.assertArrayEquals(req.getFilePiece(), 
deserializeReq.getFilePiece());
   }
 
+  @Test
+  public void testPipeTransferFilePieceReqsFromLegacyV13Bodies() throws 
IOException {
+    final byte[] body = "legacyPiece".getBytes();
+
+    final PipeTransferTsFilePieceReq tsFilePieceReq =
+        PipeTransferTsFilePieceReq.fromTPipeTransferReq(
+            legacyTransferReq(
+                PipeRequestType.TRANSFER_TS_FILE_PIECE,
+                serializeLegacyFilePieceBody("1.tsfile", 1L, body)));
+    Assert.assertEquals("1.tsfile", tsFilePieceReq.getFileName());
+    Assert.assertEquals(1L, tsFilePieceReq.getStartWritingOffset());
+    Assert.assertArrayEquals(body, tsFilePieceReq.getFilePiece());
+
+    final PipeTransferTsFilePieceWithModReq tsFilePieceWithModReq =
+        PipeTransferTsFilePieceWithModReq.fromTPipeTransferReq(
+            legacyTransferReq(
+                PipeRequestType.TRANSFER_TS_FILE_PIECE_WITH_MOD,
+                serializeLegacyFilePieceBody("1.tsfile.mod", 2L, body)));
+    Assert.assertEquals("1.tsfile.mod", tsFilePieceWithModReq.getFileName());
+    Assert.assertEquals(2L, tsFilePieceWithModReq.getStartWritingOffset());
+    Assert.assertArrayEquals(body, tsFilePieceWithModReq.getFilePiece());
+
+    final PipeTransferSchemaSnapshotPieceReq schemaSnapshotPieceReq =
+        PipeTransferSchemaSnapshotPieceReq.fromTPipeTransferReq(
+            legacyTransferReq(
+                PipeRequestType.TRANSFER_SCHEMA_SNAPSHOT_PIECE,
+                serializeLegacyFilePieceBody("schema.snapshot", 3L, body)));
+    Assert.assertEquals("schema.snapshot", 
schemaSnapshotPieceReq.getFileName());
+    Assert.assertEquals(3L, schemaSnapshotPieceReq.getStartWritingOffset());
+    Assert.assertArrayEquals(body, schemaSnapshotPieceReq.getFilePiece());
+  }
+
   @Test
   public void testPipeTransferTsFileSealReq() throws IOException {
     final String fileName = "1.tsfile";
@@ -749,6 +981,87 @@ public class PipeDataNodeThriftRequestTest {
     Assert.assertEquals(req.getFileLength(), deserializeReq.getFileLength());
   }
 
+  @Test
+  public void testPipeTransferTsFileSealReqFromLegacyV13Body() throws 
IOException {
+    final TPipeTransferReq req =
+        legacyTransferReq(
+            PipeRequestType.TRANSFER_TS_FILE_SEAL, 
serializeLegacyFileSealV1Body("1.tsfile", 100L));
+
+    final PipeTransferTsFileSealReq deserializeReq =
+        PipeTransferTsFileSealReq.fromTPipeTransferReq(req);
+
+    Assert.assertEquals(req.getVersion(), deserializeReq.getVersion());
+    Assert.assertEquals(req.getType(), deserializeReq.getType());
+    Assert.assertEquals("1.tsfile", deserializeReq.getFileName());
+    Assert.assertEquals(100L, deserializeReq.getFileLength());
+  }
+
+  @Test
+  public void testPipeTransferTsFileSealWithModReq() throws IOException {
+    final String modFileName = "1.tsfile.mod";
+    final String tsFileName = "1.tsfile";
+
+    final PipeTransferTsFileSealWithModReq req =
+        PipeTransferTsFileSealWithModReq.toTPipeTransferReq(
+            modFileName, 10, tsFileName, 100, "root.db");
+    final PipeTransferTsFileSealWithModReq deserializeReq =
+        PipeTransferTsFileSealWithModReq.fromTPipeTransferReq(req);
+
+    Assert.assertEquals(req.getVersion(), deserializeReq.getVersion());
+    Assert.assertEquals(req.getType(), deserializeReq.getType());
+    Assert.assertEquals(Arrays.asList(modFileName, tsFileName), 
deserializeReq.getFileNames());
+    Assert.assertEquals(Arrays.asList(10L, 100L), 
deserializeReq.getFileLengths());
+    Assert.assertEquals("root.db", 
deserializeReq.getDatabaseNameByTsFileName());
+  }
+
+  @Test
+  public void 
testPipeTransferTsFileSealWithModReqFromLegacyV13BodyWithoutDatabaseName()
+      throws IOException {
+    final String modFileName = "1.tsfile.mod";
+    final String tsFileName = "1.tsfile";
+    final TPipeTransferReq req =
+        legacyTransferReq(
+            PipeRequestType.TRANSFER_TS_FILE_SEAL_WITH_MOD,
+            serializeLegacyFileSealV2Body(
+                Arrays.asList(modFileName, tsFileName),
+                Arrays.asList(10L, 100L),
+                Collections.emptyMap()));
+
+    final PipeTransferTsFileSealWithModReq deserializeReq =
+        PipeTransferTsFileSealWithModReq.fromTPipeTransferReq(req);
+
+    Assert.assertEquals(req.getVersion(), deserializeReq.getVersion());
+    Assert.assertEquals(req.getType(), deserializeReq.getType());
+    Assert.assertEquals(Arrays.asList(modFileName, tsFileName), 
deserializeReq.getFileNames());
+    Assert.assertEquals(Arrays.asList(10L, 100L), 
deserializeReq.getFileLengths());
+    Assert.assertTrue(deserializeReq.getParameters().isEmpty());
+    Assert.assertNull(deserializeReq.getDatabaseNameByTsFileName());
+  }
+
+  @Test
+  public void 
testPipeTransferTsFileSealWithModReqFromLegacyV13BodyWithNullDatabaseName()
+      throws IOException {
+    final String modFileName = "1.tsfile.mod";
+    final String tsFileName = "1.tsfile";
+    final Map<String, String> parameters = new HashMap<>();
+    parameters.put("DATABASE_NAME_" + tsFileName, null);
+    final TPipeTransferReq req =
+        legacyTransferReq(
+            PipeRequestType.TRANSFER_TS_FILE_SEAL_WITH_MOD,
+            serializeLegacyFileSealV2Body(
+                Arrays.asList(modFileName, tsFileName), Arrays.asList(10L, 
100L), parameters));
+
+    final PipeTransferTsFileSealWithModReq deserializeReq =
+        PipeTransferTsFileSealWithModReq.fromTPipeTransferReq(req);
+
+    Assert.assertEquals(req.getVersion(), deserializeReq.getVersion());
+    Assert.assertEquals(req.getType(), deserializeReq.getType());
+    Assert.assertEquals(Arrays.asList(modFileName, tsFileName), 
deserializeReq.getFileNames());
+    Assert.assertEquals(Arrays.asList(10L, 100L), 
deserializeReq.getFileLengths());
+    Assert.assertEquals(parameters, deserializeReq.getParameters());
+    Assert.assertNull(deserializeReq.getDatabaseNameByTsFileName());
+  }
+
   @Test
   public void testPipeTransferSchemaSnapshotSealReq() throws IOException {
     final String mTreeSnapshotName = SchemaConstant.MTREE_SNAPSHOT;
@@ -784,6 +1097,36 @@ public class PipeDataNodeThriftRequestTest {
     Assert.assertEquals(req.getParameters(), deserializeReq.getParameters());
   }
 
+  @Test
+  public void testPipeTransferSchemaSnapshotSealReqFromLegacyV13Body() throws 
IOException {
+    final String mTreeSnapshotName = SchemaConstant.MTREE_SNAPSHOT;
+    final String tLogName = SchemaConstant.TAG_LOG;
+    final Map<String, String> parameters = new HashMap<>();
+    parameters.put(ColumnHeaderConstant.PATH_PATTERN, "root.**");
+    parameters.put(ColumnHeaderConstant.DATABASE, "root.db");
+    parameters.put(ColumnHeaderConstant.TYPE, "19");
+
+    final TPipeTransferReq req =
+        legacyTransferReq(
+            PipeRequestType.TRANSFER_SCHEMA_SNAPSHOT_SEAL,
+            serializeLegacyFileSealV2Body(
+                Arrays.asList(mTreeSnapshotName, tLogName), 
Arrays.asList(100L, 10L), parameters));
+    final PipeTransferSchemaSnapshotSealReq deserializeReq =
+        PipeTransferSchemaSnapshotSealReq.fromTPipeTransferReq(req);
+
+    Assert.assertEquals(req.getVersion(), deserializeReq.getVersion());
+    Assert.assertEquals(req.getType(), deserializeReq.getType());
+    Assert.assertEquals(Arrays.asList(mTreeSnapshotName, tLogName), 
deserializeReq.getFileNames());
+    Assert.assertEquals(Arrays.asList(100L, 10L), 
deserializeReq.getFileLengths());
+    Assert.assertEquals(parameters, deserializeReq.getParameters());
+    Assert.assertTrue(
+        PipeTransferFileSealReqV2.isTreeModelDataAllowedToBeCaptured(
+            deserializeReq.getParameters()));
+    Assert.assertFalse(
+        PipeTransferFileSealReqV2.isTableModelDataAllowedToBeCaptured(
+            deserializeReq.getParameters()));
+  }
+
   @Test
   public void testPipeTransferFilePieceResp() throws IOException {
     final PipeTransferFilePieceResp resp =
@@ -795,6 +1138,83 @@ public class PipeDataNodeThriftRequestTest {
     Assert.assertEquals(resp.getEndWritingOffset(), 
deserializeResp.getEndWritingOffset());
   }
 
+  private static TPipeTransferReq legacyTransferReq(
+      final PipeRequestType requestType, final ByteBuffer body) {
+    final TPipeTransferReq req = new TPipeTransferReq();
+    req.version = IoTDBSinkRequestVersion.VERSION_1.getVersion();
+    req.type = requestType.getType();
+    req.body = body;
+    return req;
+  }
+
+  private static ByteBuffer serializeLegacyFileSealV2Body(
+      final List<String> fileNames,
+      final List<Long> fileLengths,
+      final Map<String, String> parameters)
+      throws IOException {
+    try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      ReadWriteIOUtils.write(fileNames.size(), outputStream);
+      for (final String fileName : fileNames) {
+        ReadWriteIOUtils.write(fileName, outputStream);
+      }
+      ReadWriteIOUtils.write(fileLengths.size(), outputStream);
+      for (final Long fileLength : fileLengths) {
+        ReadWriteIOUtils.write(fileLength, outputStream);
+      }
+      ReadWriteIOUtils.write(parameters.size(), outputStream);
+      for (final Map.Entry<String, String> entry : parameters.entrySet()) {
+        ReadWriteIOUtils.write(entry.getKey(), outputStream);
+        ReadWriteIOUtils.write(entry.getValue(), outputStream);
+      }
+      return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
+    }
+  }
+
+  private static ByteBuffer serializeLegacyHandshakeV1Body(final String 
timestampPrecision)
+      throws IOException {
+    try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      ReadWriteIOUtils.write(timestampPrecision, outputStream);
+      return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
+    }
+  }
+
+  private static ByteBuffer serializeLegacyHandshakeV2Body(final Map<String, 
String> params)
+      throws IOException {
+    try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      ReadWriteIOUtils.write(params.size(), outputStream);
+      for (final Map.Entry<String, String> entry : params.entrySet()) {
+        ReadWriteIOUtils.write(entry.getKey(), outputStream);
+        ReadWriteIOUtils.write(entry.getValue(), outputStream);
+      }
+      return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
+    }
+  }
+
+  private static ByteBuffer serializeLegacyFilePieceBody(
+      final String fileName, final long startWritingOffset, final byte[] 
filePiece)
+      throws IOException {
+    try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      ReadWriteIOUtils.write(fileName, outputStream);
+      ReadWriteIOUtils.write(startWritingOffset, outputStream);
+      ReadWriteIOUtils.write(new Binary(filePiece), outputStream);
+      return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
+    }
+  }
+
+  private static ByteBuffer serializeLegacyFileSealV1Body(
+      final String fileName, final long fileLength) throws IOException {
+    try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      ReadWriteIOUtils.write(fileName, outputStream);
+      ReadWriteIOUtils.write(fileLength, outputStream);
+      return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
+    }
+  }
+
   private static Tablet createSingleValueTablet(final String deviceId, final 
String measurement) {
     final List<IMeasurementSchema> schemaList = new ArrayList<>();
     schemaList.add(new MeasurementSchema(measurement, TSDataType.INT32));
@@ -814,4 +1234,89 @@ public class PipeDataNodeThriftRequestTest {
       return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
     }
   }
+
+  private static ByteBuffer serializeLegacyTabletRawBuffer(final boolean 
isAligned)
+      throws IOException {
+    try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      ReadWriteIOUtils.write("root.sg.d", outputStream);
+      ReadWriteIOUtils.write(2, outputStream);
+
+      ReadWriteIOUtils.write((byte) 1, outputStream);
+      ReadWriteIOUtils.write(2, outputStream);
+      writeLegacyMeasurementSchema(outputStream, "s1", TSDataType.INT32);
+      writeLegacyMeasurementSchema(outputStream, "s2", TSDataType.TEXT);
+
+      ReadWriteIOUtils.write((byte) 1, outputStream);
+      ReadWriteIOUtils.write(2L, outputStream);
+      ReadWriteIOUtils.write(1L, outputStream);
+
+      ReadWriteIOUtils.write((byte) 0, outputStream);
+
+      ReadWriteIOUtils.write((byte) 1, outputStream);
+      ReadWriteIOUtils.write((byte) 1, outputStream);
+      ReadWriteIOUtils.write(2, outputStream);
+      ReadWriteIOUtils.write(1, outputStream);
+      ReadWriteIOUtils.write((byte) 1, outputStream);
+      ReadWriteIOUtils.write((byte) 1, outputStream);
+      ReadWriteIOUtils.write(new Binary("2", TSFileConfig.STRING_CHARSET), 
outputStream);
+      ReadWriteIOUtils.write((byte) 1, outputStream);
+      ReadWriteIOUtils.write(new Binary("1", TSFileConfig.STRING_CHARSET), 
outputStream);
+
+      ReadWriteIOUtils.write(isAligned, outputStream);
+      return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
+    }
+  }
+
+  private static ByteBuffer serializeLegacyTabletBatchBody(
+      final List<ByteBuffer> insertNodeBuffers, final List<ByteBuffer> 
tabletBuffers)
+      throws IOException {
+    try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      ReadWriteIOUtils.write(0, outputStream);
+
+      ReadWriteIOUtils.write(insertNodeBuffers.size(), outputStream);
+      for (final ByteBuffer insertNodeBuffer : insertNodeBuffers) {
+        writeByteBuffer(outputStream, insertNodeBuffer);
+      }
+
+      ReadWriteIOUtils.write(tabletBuffers.size(), outputStream);
+      for (final ByteBuffer tabletBuffer : tabletBuffers) {
+        writeByteBuffer(outputStream, tabletBuffer);
+      }
+
+      return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
+    }
+  }
+
+  private static void writeByteBuffer(
+      final DataOutputStream outputStream, final ByteBuffer byteBuffer) throws 
IOException {
+    outputStream.write(byteBufferToByteArray(byteBuffer));
+  }
+
+  private static byte[] byteBufferToByteArray(final ByteBuffer byteBuffer) {
+    final ByteBuffer duplicatedBuffer = byteBuffer.duplicate();
+    final byte[] bytes = new byte[duplicatedBuffer.remaining()];
+    duplicatedBuffer.get(bytes);
+    return bytes;
+  }
+
+  private static void writeLegacyMeasurementSchema(
+      final DataOutputStream outputStream, final String measurement, final 
TSDataType dataType)
+      throws IOException {
+    ReadWriteIOUtils.write((byte) 1, outputStream);
+    ReadWriteIOUtils.write(measurement, outputStream);
+    ReadWriteIOUtils.write(dataType.serialize(), outputStream);
+    ReadWriteIOUtils.write(TSEncoding.PLAIN.serialize(), outputStream);
+    ReadWriteIOUtils.write(CompressionType.UNCOMPRESSED.serialize(), 
outputStream);
+    ReadWriteIOUtils.write(0, outputStream);
+  }
+
+  private static void assertLegacyTabletStatement(final InsertTabletStatement 
statement) {
+    Assert.assertEquals("root.sg.d", statement.getDevicePath().getFullPath());
+    Assert.assertArrayEquals(new String[] {"s1", "s2"}, 
statement.getMeasurements());
+    Assert.assertArrayEquals(
+        new TSDataType[] {TSDataType.INT32, TSDataType.TEXT}, 
statement.getDataTypes());
+    Assert.assertEquals(2, statement.getRowCount());
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeTransferFileSealReqV2.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeTransferFileSealReqV2.java
index cf73948c3f1..227d5871d8b 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeTransferFileSealReqV2.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeTransferFileSealReqV2.java
@@ -54,6 +54,14 @@ public abstract class PipeTransferFileSealReqV2 extends 
TPipeTransferReq {
     return parameters;
   }
 
+  public static boolean isTreeModelDataAllowedToBeCaptured(final Map<String, 
String> parameters) {
+    return parameters.containsKey(TREE) || !parameters.containsKey(TABLE);
+  }
+
+  public static boolean isTableModelDataAllowedToBeCaptured(final Map<String, 
String> parameters) {
+    return parameters.containsKey(TABLE);
+  }
+
   protected abstract PipeRequestType getPlanType();
 
   /////////////////////////////// Thrift ///////////////////////////////
diff --git 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferSliceReqBuilderTest.java
 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferSliceReqBuilderTest.java
index 290ce397980..cc836cae945 100644
--- 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferSliceReqBuilderTest.java
+++ 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferSliceReqBuilderTest.java
@@ -22,14 +22,20 @@ package 
org.apache.iotdb.commons.pipe.sink.payload.thrift.common;
 import org.apache.iotdb.commons.conf.CommonConfig;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
+import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType;
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferSliceReq;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.PublicBAOS;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.DataOutputStream;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 
 public class PipeTransferSliceReqBuilderTest {
@@ -91,6 +97,32 @@ public class PipeTransferSliceReqBuilderTest {
             createReq(IoTDBSinkRequestVersion.VERSION_1.getVersion(), 4), 
bodySizeLimit));
   }
 
+  @Test
+  public void testPipeTransferSliceReqFromLegacyV13Body() throws IOException {
+    final TPipeTransferReq req = new TPipeTransferReq();
+    req.version = IoTDBSinkRequestVersion.VERSION_1.getVersion();
+    req.type = PipeRequestType.TRANSFER_SLICE.getType();
+    try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      ReadWriteIOUtils.write(7, outputStream);
+      ReadWriteIOUtils.write(PipeRequestType.TRANSFER_TABLET_RAW.getType(), 
outputStream);
+      ReadWriteIOUtils.write(6, outputStream);
+      ReadWriteIOUtils.write(new Binary(new byte[] {2, 3, 4}), outputStream);
+      ReadWriteIOUtils.write(1, outputStream);
+      ReadWriteIOUtils.write(2, outputStream);
+      req.body = ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
+    }
+
+    final PipeTransferSliceReq sliceReq = 
PipeTransferSliceReq.fromTPipeTransferReq(req);
+
+    Assert.assertEquals(7, sliceReq.getOrderId());
+    Assert.assertEquals(PipeRequestType.TRANSFER_TABLET_RAW.getType(), 
sliceReq.getOriginReqType());
+    Assert.assertEquals(6, sliceReq.getOriginBodySize());
+    Assert.assertArrayEquals(new byte[] {2, 3, 4}, sliceReq.getSliceBody());
+    Assert.assertEquals(1, sliceReq.getSliceIndex());
+    Assert.assertEquals(2, sliceReq.getSliceCount());
+  }
+
   private static TPipeTransferReq createReq(final byte version, final int 
bodySize) {
     final byte[] body = new byte[bodySize];
     for (int i = 0; i < body.length; ++i) {
diff --git 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeRequestTypeTest.java
 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeRequestTypeTest.java
new file mode 100644
index 00000000000..c2e0ba949f9
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeRequestTypeTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.sink.payload.thrift.request;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class PipeRequestTypeTest {
+
+  @Test
+  public void testAllV13RequestTypesAreRecognized() {
+    assertV13RequestType((short) 0, PipeRequestType.HANDSHAKE_CONFIGNODE_V1);
+    assertV13RequestType((short) 1, PipeRequestType.HANDSHAKE_DATANODE_V1);
+    assertV13RequestType((short) 50, PipeRequestType.HANDSHAKE_CONFIGNODE_V2);
+    assertV13RequestType((short) 51, PipeRequestType.HANDSHAKE_DATANODE_V2);
+
+    assertV13RequestType((short) 2, 
PipeRequestType.TRANSFER_TABLET_INSERT_NODE);
+    assertV13RequestType((short) 3, PipeRequestType.TRANSFER_TABLET_RAW);
+    assertV13RequestType((short) 4, PipeRequestType.TRANSFER_TS_FILE_PIECE);
+    assertV13RequestType((short) 5, PipeRequestType.TRANSFER_TS_FILE_SEAL);
+    assertV13RequestType((short) 6, PipeRequestType.TRANSFER_TABLET_BATCH);
+    assertV13RequestType((short) 7, PipeRequestType.TRANSFER_TABLET_BINARY);
+    assertV13RequestType((short) 8, 
PipeRequestType.TRANSFER_TS_FILE_PIECE_WITH_MOD);
+    assertV13RequestType((short) 9, 
PipeRequestType.TRANSFER_TS_FILE_SEAL_WITH_MOD);
+
+    // 1.3 named this request TRANSFER_SCHEMA_PLAN. 2.0 keeps the same wire 
type.
+    assertV13RequestType((short) 100, PipeRequestType.TRANSFER_PLAN_NODE);
+    assertV13RequestType((short) 101, 
PipeRequestType.TRANSFER_SCHEMA_SNAPSHOT_PIECE);
+    assertV13RequestType((short) 102, 
PipeRequestType.TRANSFER_SCHEMA_SNAPSHOT_SEAL);
+
+    assertV13RequestType((short) 200, PipeRequestType.TRANSFER_CONFIG_PLAN);
+    assertV13RequestType((short) 201, 
PipeRequestType.TRANSFER_CONFIG_SNAPSHOT_PIECE);
+    assertV13RequestType((short) 202, 
PipeRequestType.TRANSFER_CONFIG_SNAPSHOT_SEAL);
+
+    assertV13RequestType((short) 300, PipeRequestType.TRANSFER_COMPRESSED);
+    assertV13RequestType((short) 400, PipeRequestType.TRANSFER_SLICE);
+  }
+
+  private static void assertV13RequestType(
+      final short type, final PipeRequestType expectedRequestType) {
+    Assert.assertTrue(PipeRequestType.isValidatedRequestType(type));
+    Assert.assertEquals(expectedRequestType, PipeRequestType.valueOf(type));
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeTransferCompressedReqTest.java
 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeTransferCompressedReqTest.java
new file mode 100644
index 00000000000..ccdbc5086a5
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeTransferCompressedReqTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.sink.payload.thrift.request;
+
+import org.apache.iotdb.commons.pipe.sink.compressor.PipeCompressor;
+import org.apache.iotdb.commons.pipe.sink.compressor.PipeCompressorFactory;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+
+import org.apache.tsfile.utils.BytesUtils;
+import org.apache.tsfile.utils.PublicBAOS;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+public class PipeTransferCompressedReqTest {
+
+  @Test
+  public void testPipeTransferCompressedReq() throws IOException {
+    final TPipeTransferReq originalReq = new TPipeTransferReq();
+    originalReq.version = IoTDBSinkRequestVersion.VERSION_1.getVersion();
+    originalReq.type = PipeRequestType.TRANSFER_TABLET_BINARY.getType();
+    originalReq.body = ByteBuffer.wrap(new byte[] {1, 2, 3, 4});
+
+    final TPipeTransferReq compressedReq =
+        PipeTransferCompressedReq.toTPipeTransferReq(
+            originalReq,
+            Collections.singletonList(
+                PipeCompressorFactory.getCompressor(
+                    PipeCompressor.PipeCompressionType.GZIP.getIndex())));
+    final TPipeTransferReq decompressedReq =
+        PipeTransferCompressedReq.fromTPipeTransferReq(compressedReq);
+
+    Assert.assertEquals(IoTDBSinkRequestVersion.VERSION_1.getVersion(), 
compressedReq.version);
+    Assert.assertEquals(PipeRequestType.TRANSFER_COMPRESSED.getType(), 
compressedReq.type);
+    Assert.assertEquals(originalReq.version, decompressedReq.version);
+    Assert.assertEquals(originalReq.type, decompressedReq.type);
+    Assert.assertArrayEquals(originalReq.getBody(), decompressedReq.getBody());
+  }
+
+  @Test
+  public void testPipeTransferCompressedReqFromLegacyV13Body() throws 
IOException {
+    final TPipeTransferReq originalReq = new TPipeTransferReq();
+    originalReq.version = IoTDBSinkRequestVersion.VERSION_1.getVersion();
+    originalReq.type = PipeRequestType.TRANSFER_TABLET_BINARY.getType();
+    originalReq.body = ByteBuffer.wrap(new byte[] {1, 2, 3, 4});
+
+    final TPipeTransferReq compressedReq = new TPipeTransferReq();
+    compressedReq.version = IoTDBSinkRequestVersion.VERSION_1.getVersion();
+    compressedReq.type = PipeRequestType.TRANSFER_COMPRESSED.getType();
+    compressedReq.body =
+        serializeLegacyCompressedBody(
+            originalReq,
+            Collections.singletonList(
+                PipeCompressorFactory.getCompressor(
+                    PipeCompressor.PipeCompressionType.GZIP.getIndex())));
+
+    final TPipeTransferReq decompressedReq =
+        PipeTransferCompressedReq.fromTPipeTransferReq(compressedReq);
+
+    Assert.assertEquals(originalReq.version, decompressedReq.version);
+    Assert.assertEquals(originalReq.type, decompressedReq.type);
+    Assert.assertArrayEquals(originalReq.getBody(), decompressedReq.getBody());
+  }
+
+  private static ByteBuffer serializeLegacyCompressedBody(
+      final TPipeTransferReq originalReq, final List<PipeCompressor> 
compressors)
+      throws IOException {
+    try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      byte[] body =
+          BytesUtils.concatByteArrayList(
+              Arrays.asList(
+                  new byte[] {originalReq.version},
+                  BytesUtils.shortToBytes(originalReq.type),
+                  originalReq.getBody()));
+
+      ReadWriteIOUtils.write((byte) compressors.size(), outputStream);
+      for (final PipeCompressor compressor : compressors) {
+        ReadWriteIOUtils.write(compressor.serialize(), outputStream);
+        ReadWriteIOUtils.write(body.length, outputStream);
+        body = compressor.compress(body);
+      }
+      outputStream.write(body);
+
+      return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
+    }
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeTransferFileSealReqV2Test.java
 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeTransferFileSealReqV2Test.java
new file mode 100644
index 00000000000..4ac0e5cb7c3
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeTransferFileSealReqV2Test.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.sink.payload.thrift.request;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class PipeTransferFileSealReqV2Test {
+
+  @Test
+  public void testLegacyV13SnapshotSealCapturesTreeOnly() {
+    final Map<String, String> parameters = new HashMap<>();
+
+    
Assert.assertTrue(PipeTransferFileSealReqV2.isTreeModelDataAllowedToBeCaptured(parameters));
+    
Assert.assertFalse(PipeTransferFileSealReqV2.isTableModelDataAllowedToBeCaptured(parameters));
+  }
+
+  @Test
+  public void testExplicitTreeOnlySnapshotSealCapturesTreeOnly() {
+    final Map<String, String> parameters = new HashMap<>();
+    parameters.put(PipeTransferFileSealReqV2.TREE, "");
+
+    
Assert.assertTrue(PipeTransferFileSealReqV2.isTreeModelDataAllowedToBeCaptured(parameters));
+    
Assert.assertFalse(PipeTransferFileSealReqV2.isTableModelDataAllowedToBeCaptured(parameters));
+  }
+
+  @Test
+  public void testExplicitTableOnlySnapshotSealCapturesTableOnly() {
+    final Map<String, String> parameters = new HashMap<>();
+    parameters.put(PipeTransferFileSealReqV2.TABLE, "");
+
+    
Assert.assertFalse(PipeTransferFileSealReqV2.isTreeModelDataAllowedToBeCaptured(parameters));
+    
Assert.assertTrue(PipeTransferFileSealReqV2.isTableModelDataAllowedToBeCaptured(parameters));
+  }
+
+  @Test
+  public void testExplicitTreeAndTableSnapshotSealCapturesBoth() {
+    final Map<String, String> parameters = new HashMap<>();
+    parameters.put(PipeTransferFileSealReqV2.TREE, "");
+    parameters.put(PipeTransferFileSealReqV2.TABLE, "");
+
+    
Assert.assertTrue(PipeTransferFileSealReqV2.isTreeModelDataAllowedToBeCaptured(parameters));
+    
Assert.assertTrue(PipeTransferFileSealReqV2.isTableModelDataAllowedToBeCaptured(parameters));
+  }
+}

Reply via email to