This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new bcc3b3dd4a1 Pipe schema: RPC request & response datastructures (#11680)
bcc3b3dd4a1 is described below

commit bcc3b3dd4a10cab1db9e5b6fa187f547800104b5
Author: Zikun Ma <[email protected]>
AuthorDate: Wed Dec 13 13:24:49 2023 +0800

    Pipe schema: RPC request & response datastructures (#11680)
---
 .../payload/request/PipeTransferConfigPlanReq.java |  78 +++++++++++++++
 .../request/PipeTransferFilePieceReq.java          |   4 +-
 .../evolvable/request/PipeTransferFileSealReq.java |   4 +-
 .../request/PipeTransferHandshakeReq.java          |   4 +-
 .../request/PipeTransferSchemaPlanReq.java         | 105 +++++++++++++++++++++
 .../request/PipeTransferTabletBatchReq.java        |   4 +-
 .../request/PipeTransferTabletBinaryReq.java       |   4 +-
 .../request/PipeTransferTabletInsertNodeReq.java   |   4 +-
 .../request/PipeTransferTabletRawReq.java          |   4 +-
 .../pipe/receiver/thrift/IoTDBThriftReceiver.java  |   2 +-
 .../receiver/thrift/IoTDBThriftReceiverAgent.java  |   2 +-
 .../receiver/thrift/IoTDBThriftReceiverV1.java     |  36 ++++++-
 ...est.java => PipeDataNodeThriftRequestTest.java} |  80 +++++++++++++++-
 .../request}/IoTDBConnectorRequestVersion.java     |   2 +-
 .../payload/request}/PipeRequestType.java          |   8 +-
 .../request/PipeTransferSnapshotPieceReq.java}     |  78 ++++++++-------
 .../request/PipeTransferSnapshotSealReq.java}      |  74 +++++++--------
 .../payload/request/TransferConfigPlanReq.java}    |  17 +---
 .../response/PipeTransferSnapshotPieceResp.java    |  81 ++++++++++++++++
 19 files changed, 478 insertions(+), 113 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/payload/request/PipeTransferConfigPlanReq.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/payload/request/PipeTransferConfigPlanReq.java
new file mode 100644
index 00000000000..3de3f585307
--- /dev/null
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/payload/request/PipeTransferConfigPlanReq.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.pipe.connector.payload.request;
+
+import 
org.apache.iotdb.commons.pipe.connector.payload.request.IoTDBConnectorRequestVersion;
+import org.apache.iotdb.commons.pipe.connector.payload.request.PipeRequestType;
+import 
org.apache.iotdb.commons.pipe.connector.payload.request.TransferConfigPlanReq;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+import org.apache.iotdb.tsfile.utils.BytesUtils;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+public class PipeTransferConfigPlanReq extends TransferConfigPlanReq {
+
+  private PipeTransferConfigPlanReq() {
+    // Empty constructor
+  }
+
+  /////////////////////////////// Thrift ///////////////////////////////
+
+  public static PipeTransferConfigPlanReq toTPipeTransferReq(IConsensusRequest 
consensusRequest) {
+    final PipeTransferConfigPlanReq req = new PipeTransferConfigPlanReq();
+
+    req.version = IoTDBConnectorRequestVersion.VERSION_1.getVersion();
+    req.type = PipeRequestType.TRANSFER_CONFIG_PLAN.getType();
+    req.body = consensusRequest.serializeToByteBuffer();
+
+    return req;
+  }
+
+  public static PipeTransferConfigPlanReq 
fromTPipeTransferReq(TPipeTransferReq transferReq) {
+    final PipeTransferConfigPlanReq configPlanReq = new 
PipeTransferConfigPlanReq();
+
+    configPlanReq.version = transferReq.version;
+    configPlanReq.type = transferReq.type;
+    configPlanReq.body = transferReq.body;
+
+    return configPlanReq;
+  }
+
+  /////////////////////////////// Air Gap ///////////////////////////////
+
+  public static byte[] toTransferInsertNodeBytes(IConsensusRequest 
consensusRequest)
+      throws IOException {
+    try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      
ReadWriteIOUtils.write(IoTDBConnectorRequestVersion.VERSION_1.getVersion(), 
outputStream);
+      ReadWriteIOUtils.write(PipeRequestType.TRANSFER_CONFIG_PLAN.getType(), 
outputStream);
+      return BytesUtils.concatByteArray(
+          byteArrayOutputStream.toByteArray(), 
consensusRequest.serializeToByteBuffer().array());
+    }
+  }
+
+  /////////////////////////////// Object ///////////////////////////////
+
+  // do not overwrite equals() and hashCode() methods for there is no extra 
member variable
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferFilePieceReq.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferFilePieceReq.java
index 1b9bf9bd1b2..255f84a468b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferFilePieceReq.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferFilePieceReq.java
@@ -19,8 +19,8 @@
 
 package org.apache.iotdb.db.pipe.connector.payload.evolvable.request;
 
-import org.apache.iotdb.db.pipe.connector.payload.evolvable.PipeRequestType;
-import 
org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnectorRequestVersion;
+import 
org.apache.iotdb.commons.pipe.connector.payload.request.IoTDBConnectorRequestVersion;
+import org.apache.iotdb.commons.pipe.connector.payload.request.PipeRequestType;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferFileSealReq.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferFileSealReq.java
index 6314068fe50..3b8603f6bd8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferFileSealReq.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferFileSealReq.java
@@ -19,8 +19,8 @@
 
 package org.apache.iotdb.db.pipe.connector.payload.evolvable.request;
 
-import org.apache.iotdb.db.pipe.connector.payload.evolvable.PipeRequestType;
-import 
org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnectorRequestVersion;
+import 
org.apache.iotdb.commons.pipe.connector.payload.request.IoTDBConnectorRequestVersion;
+import org.apache.iotdb.commons.pipe.connector.payload.request.PipeRequestType;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferHandshakeReq.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferHandshakeReq.java
index 435726bfcf1..80f9d1217eb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferHandshakeReq.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferHandshakeReq.java
@@ -19,8 +19,8 @@
 
 package org.apache.iotdb.db.pipe.connector.payload.evolvable.request;
 
-import org.apache.iotdb.db.pipe.connector.payload.evolvable.PipeRequestType;
-import 
org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnectorRequestVersion;
+import 
org.apache.iotdb.commons.pipe.connector.payload.request.IoTDBConnectorRequestVersion;
+import org.apache.iotdb.commons.pipe.connector.payload.request.PipeRequestType;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferSchemaPlanReq.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferSchemaPlanReq.java
new file mode 100644
index 00000000000..17d6503a05f
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferSchemaPlanReq.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.connector.payload.evolvable.request;
+
+import 
org.apache.iotdb.commons.pipe.connector.payload.request.IoTDBConnectorRequestVersion;
+import org.apache.iotdb.commons.pipe.connector.payload.request.PipeRequestType;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+import org.apache.iotdb.tsfile.utils.BytesUtils;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Objects;
+
+public class PipeTransferSchemaPlanReq extends TPipeTransferReq {
+
+  private transient PlanNode planNode;
+
+  private PipeTransferSchemaPlanReq() {
+    // Do nothing
+  }
+
+  public PlanNode getPlanNode() {
+    return planNode;
+  }
+
+  /////////////////////////////// Thrift ///////////////////////////////
+
+  public static PipeTransferSchemaPlanReq toTPipeTransferReq(PlanNode 
planNode) {
+    final PipeTransferSchemaPlanReq req = new PipeTransferSchemaPlanReq();
+
+    req.planNode = planNode;
+
+    req.version = IoTDBConnectorRequestVersion.VERSION_1.getVersion();
+    req.type = PipeRequestType.TRANSFER_SCHEMA_PLAN.getType();
+    req.body = planNode.serializeToByteBuffer();
+
+    return req;
+  }
+
+  public static PipeTransferSchemaPlanReq 
fromTPipeTransferReq(TPipeTransferReq transferReq) {
+    final PipeTransferSchemaPlanReq planNodeReq = new 
PipeTransferSchemaPlanReq();
+
+    planNodeReq.planNode = PlanNodeType.deserialize(transferReq.body);
+
+    planNodeReq.version = transferReq.version;
+    planNodeReq.type = transferReq.type;
+    planNodeReq.body = transferReq.body;
+
+    return planNodeReq;
+  }
+
+  /////////////////////////////// Air Gap ///////////////////////////////
+  public static byte[] toTransferInsertNodeBytes(PlanNode planNode) throws 
IOException {
+    try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      
ReadWriteIOUtils.write(IoTDBConnectorRequestVersion.VERSION_1.getVersion(), 
outputStream);
+      ReadWriteIOUtils.write(PipeRequestType.TRANSFER_SCHEMA_PLAN.getType(), 
outputStream);
+      return BytesUtils.concatByteArray(
+          byteArrayOutputStream.toByteArray(), 
planNode.serializeToByteBuffer().array());
+    }
+  }
+
+  /////////////////////////////// Object ///////////////////////////////
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    PipeTransferSchemaPlanReq that = (PipeTransferSchemaPlanReq) obj;
+    return planNode.equals(that.planNode)
+        && version == that.version
+        && type == that.type
+        && body.equals(that.body);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(planNode, version, type, body);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletBatchReq.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletBatchReq.java
index 710e3cb437e..bacd7a4dda6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletBatchReq.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletBatchReq.java
@@ -19,8 +19,8 @@
 
 package org.apache.iotdb.db.pipe.connector.payload.evolvable.request;
 
-import org.apache.iotdb.db.pipe.connector.payload.evolvable.PipeRequestType;
-import 
org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnectorRequestVersion;
+import 
org.apache.iotdb.commons.pipe.connector.payload.request.IoTDBConnectorRequestVersion;
+import org.apache.iotdb.commons.pipe.connector.payload.request.PipeRequestType;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.db.queryengine.plan.statement.Statement;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletBinaryReq.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletBinaryReq.java
index 3ab9c948081..28dbf072f5c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletBinaryReq.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletBinaryReq.java
@@ -19,8 +19,8 @@
 
 package org.apache.iotdb.db.pipe.connector.payload.evolvable.request;
 
-import org.apache.iotdb.db.pipe.connector.payload.evolvable.PipeRequestType;
-import 
org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnectorRequestVersion;
+import 
org.apache.iotdb.commons.pipe.connector.payload.request.IoTDBConnectorRequestVersion;
+import org.apache.iotdb.commons.pipe.connector.payload.request.PipeRequestType;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletInsertNodeReq.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletInsertNodeReq.java
index 80b9f92042b..e112d29e3bf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletInsertNodeReq.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletInsertNodeReq.java
@@ -19,8 +19,8 @@
 
 package org.apache.iotdb.db.pipe.connector.payload.evolvable.request;
 
-import org.apache.iotdb.db.pipe.connector.payload.evolvable.PipeRequestType;
-import 
org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnectorRequestVersion;
+import 
org.apache.iotdb.commons.pipe.connector.payload.request.IoTDBConnectorRequestVersion;
+import org.apache.iotdb.commons.pipe.connector.payload.request.PipeRequestType;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReq.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReq.java
index 71babe20982..71875f7f87e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReq.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReq.java
@@ -20,9 +20,9 @@
 package org.apache.iotdb.db.pipe.connector.payload.evolvable.request;
 
 import org.apache.iotdb.commons.exception.MetadataException;
+import 
org.apache.iotdb.commons.pipe.connector.payload.request.IoTDBConnectorRequestVersion;
+import org.apache.iotdb.commons.pipe.connector.payload.request.PipeRequestType;
 import org.apache.iotdb.commons.utils.PathUtils;
-import org.apache.iotdb.db.pipe.connector.payload.evolvable.PipeRequestType;
-import 
org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnectorRequestVersion;
 import org.apache.iotdb.db.queryengine.plan.parser.StatementGenerator;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiver.java
index b34b5c123da..409faa69176 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiver.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.db.pipe.receiver.thrift;
 
-import 
org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnectorRequestVersion;
+import 
org.apache.iotdb.commons.pipe.connector.payload.request.IoTDBConnectorRequestVersion;
 import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
 import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverAgent.java
index 848e31d3400..a112b27b817 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverAgent.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.db.pipe.receiver.thrift;
 
-import 
org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnectorRequestVersion;
+import 
org.apache.iotdb.commons.pipe.connector.payload.request.IoTDBConnectorRequestVersion;
 import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
 import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
 import org.apache.iotdb.rpc.RpcUtils;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
index e0cbae58464..03c58874292 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
@@ -21,22 +21,26 @@ package org.apache.iotdb.db.pipe.receiver.thrift;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
+import 
org.apache.iotdb.commons.pipe.connector.payload.request.IoTDBConnectorRequestVersion;
+import org.apache.iotdb.commons.pipe.connector.payload.request.PipeRequestType;
+import 
org.apache.iotdb.commons.pipe.connector.payload.request.PipeTransferSnapshotPieceReq;
+import 
org.apache.iotdb.commons.pipe.connector.payload.request.PipeTransferSnapshotSealReq;
+import 
org.apache.iotdb.commons.pipe.connector.payload.request.TransferConfigPlanReq;
 import org.apache.iotdb.commons.utils.StatusUtils;
 import org.apache.iotdb.db.auth.AuthorityChecker;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
 import 
org.apache.iotdb.db.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest;
-import org.apache.iotdb.db.pipe.connector.payload.evolvable.PipeRequestType;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.reponse.PipeTransferFilePieceResp;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFilePieceReq;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFileSealReq;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeReq;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferSchemaPlanReq;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBatchReq;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReq;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
-import 
org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnectorRequestVersion;
 import org.apache.iotdb.db.protocol.session.SessionManager;
 import org.apache.iotdb.db.queryengine.common.SessionInfo;
 import org.apache.iotdb.db.queryengine.plan.Coordinator;
@@ -139,6 +143,14 @@ public class IoTDBThriftReceiverV1 implements 
IoTDBThriftReceiver {
           case TRANSFER_FILE_SEAL:
             return handleTransferFileSeal(
                 PipeTransferFileSealReq.fromTPipeTransferReq(req), 
partitionFetcher, schemaFetcher);
+          case TRANSFER_CONFIG_PLAN:
+            return handleTransferConfigPlan((TransferConfigPlanReq) req);
+          case TRANSFER_SCHEMA_PLAN:
+            return handleTransferSchemaPlan((PipeTransferSchemaPlanReq) req);
+          case TRANSFER_SNAPSHOT_PIECE:
+            return handleTransferSnapshotPiece((PipeTransferSnapshotPieceReq) 
req);
+          case TRANSFER_SNAPSHOT_SEAL:
+            return handleTransferSnapshotSeal((PipeTransferSnapshotSealReq) 
req);
           default:
             break;
         }
@@ -503,6 +515,26 @@ public class IoTDBThriftReceiverV1 implements 
IoTDBThriftReceiver {
     }
   }
 
+  private TPipeTransferResp handleTransferConfigPlan(TransferConfigPlanReq 
req) {
+    // TODO
+    return new TPipeTransferResp();
+  }
+
+  private TPipeTransferResp handleTransferSchemaPlan(PipeTransferSchemaPlanReq 
req) {
+    // TODO
+    return new TPipeTransferResp();
+  }
+
+  private TPipeTransferResp 
handleTransferSnapshotPiece(PipeTransferSnapshotPieceReq req) {
+    // TODO
+    return new TPipeTransferResp();
+  }
+
+  private TPipeTransferResp 
handleTransferSnapshotSeal(PipeTransferSnapshotSealReq req) {
+    // TODO
+    return new TPipeTransferResp();
+  }
+
   private boolean isWritingFileAvailable() {
     final boolean isWritingFileAvailable =
         writingFile != null && writingFile.exists() && writingFileWriter != 
null;
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeThriftRequestTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeDataNodeThriftRequestTest.java
similarity index 67%
rename from 
iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeThriftRequestTest.java
rename to 
iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeDataNodeThriftRequestTest.java
index 15d3ee4cd80..03972592efd 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeThriftRequestTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeDataNodeThriftRequestTest.java
@@ -20,17 +20,24 @@
 package org.apache.iotdb.db.pipe.connector;
 
 import org.apache.iotdb.commons.path.PartialPath;
+import 
org.apache.iotdb.commons.pipe.connector.payload.request.PipeTransferSnapshotPieceReq;
+import 
org.apache.iotdb.commons.pipe.connector.payload.request.PipeTransferSnapshotSealReq;
+import 
org.apache.iotdb.commons.pipe.connector.payload.response.PipeTransferSnapshotPieceResp;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.reponse.PipeTransferFilePieceResp;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFilePieceReq;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFileSealReq;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeReq;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferSchemaPlanReq;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
 import org.apache.iotdb.db.queryengine.plan.statement.Statement;
 import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.write.record.Tablet;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
@@ -39,9 +46,10 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
-public class PipeThriftRequestTest {
+public class PipeDataNodeThriftRequestTest {
 
   private static final String TIME_PRECISION = "ms";
 
@@ -85,6 +93,30 @@ public class PipeThriftRequestTest {
     Assert.assertEquals(statement.getPaths(), paths);
   }
 
+  @Test
+  public void testPipeTransferSchemaPlanReq() {
+    PipeTransferSchemaPlanReq req =
+        PipeTransferSchemaPlanReq.toTPipeTransferReq(
+            new CreateAlignedTimeSeriesNode(
+                new PlanNodeId(""),
+                new PartialPath(new String[] {"root", "sg", "d"}),
+                Collections.singletonList("s"),
+                Collections.singletonList(TSDataType.INT32),
+                Collections.singletonList(TSEncoding.PLAIN),
+                Collections.singletonList(CompressionType.UNCOMPRESSED),
+                null,
+                null,
+                null));
+
+    PipeTransferSchemaPlanReq deserializeReq = 
PipeTransferSchemaPlanReq.fromTPipeTransferReq(req);
+
+    Assert.assertEquals(req.getVersion(), deserializeReq.getVersion());
+    Assert.assertEquals(req.getType(), deserializeReq.getType());
+    Assert.assertArrayEquals(req.getBody(), deserializeReq.getBody());
+
+    Assert.assertEquals(req.getPlanNode(), deserializeReq.getPlanNode());
+  }
+
   @Test
   public void testPipeTransferTabletReq() {
     try {
@@ -142,6 +174,25 @@ public class PipeThriftRequestTest {
     Assert.assertArrayEquals(req.getFilePiece(), 
deserializeReq.getFilePiece());
   }
 
+  @Test
+  public void testPipeTransferSnapshotPieceReq() throws IOException {
+    byte[] body = "testPipeTransferSnapshotPieceReq".getBytes();
+    String fileName = "1.temp";
+
+    PipeTransferSnapshotPieceReq req =
+        PipeTransferSnapshotPieceReq.toTPipeTransferReq(fileName, 0, body);
+    PipeTransferSnapshotPieceReq deserializeReq =
+        PipeTransferSnapshotPieceReq.fromTPipeTransferReq(req);
+
+    Assert.assertEquals(req.getVersion(), deserializeReq.getVersion());
+    Assert.assertEquals(req.getType(), deserializeReq.getType());
+    Assert.assertArrayEquals(req.getBody(), deserializeReq.getBody());
+
+    Assert.assertEquals(req.getSnapshotName(), 
deserializeReq.getSnapshotName());
+    Assert.assertEquals(req.getStartWritingOffset(), 
deserializeReq.getStartWritingOffset());
+    Assert.assertArrayEquals(req.getSnapshotPiece(), 
deserializeReq.getSnapshotPiece());
+  }
+
   @Test
   public void testPipeTransferFileSealReq() throws IOException {
     String fileName = "1.tsfile";
@@ -157,6 +208,22 @@ public class PipeThriftRequestTest {
     Assert.assertEquals(req.getFileLength(), deserializeReq.getFileLength());
   }
 
+  @Test
+  public void testPipeTransferSnapshotSealReq() throws IOException {
+    String fileName = "1.temp";
+
+    PipeTransferSnapshotSealReq req = 
PipeTransferSnapshotSealReq.toTPipeTransferReq(fileName, 100);
+    PipeTransferSnapshotSealReq deserializeReq =
+        PipeTransferSnapshotSealReq.fromTPipeTransferReq(req);
+
+    Assert.assertEquals(req.getVersion(), deserializeReq.getVersion());
+    Assert.assertEquals(req.getType(), deserializeReq.getType());
+    Assert.assertArrayEquals(req.getBody(), deserializeReq.getBody());
+
+    Assert.assertEquals(req.getSnapshotName(), 
deserializeReq.getSnapshotName());
+    Assert.assertEquals(req.getSnapshotLength(), 
deserializeReq.getSnapshotLength());
+  }
+
   @Test
   public void testPIpeTransferFilePieceResp() throws IOException {
     PipeTransferFilePieceResp resp =
@@ -167,4 +234,15 @@ public class PipeThriftRequestTest {
     Assert.assertEquals(resp.getStatus(), deserializeResp.getStatus());
     Assert.assertEquals(resp.getEndWritingOffset(), 
deserializeResp.getEndWritingOffset());
   }
+
+  @Test
+  public void testPipeTransferSnapshotPieceResp() throws IOException {
+    PipeTransferSnapshotPieceResp resp =
+        
PipeTransferSnapshotPieceResp.toTPipeTransferResp(RpcUtils.SUCCESS_STATUS, 100);
+    PipeTransferSnapshotPieceResp deserializeResp =
+        PipeTransferSnapshotPieceResp.fromTPipeTransferResp(resp);
+
+    Assert.assertEquals(resp.getStatus(), deserializeResp.getStatus());
+    Assert.assertEquals(resp.getEndWritingOffset(), 
deserializeResp.getEndWritingOffset());
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnectorRequestVersion.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/request/IoTDBConnectorRequestVersion.java
similarity index 94%
copy from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnectorRequestVersion.java
copy to 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/request/IoTDBConnectorRequestVersion.java
index 941a372a498..a4eaabee95b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnectorRequestVersion.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/request/IoTDBConnectorRequestVersion.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.connector.protocol;
+package org.apache.iotdb.commons.pipe.connector.payload.request;
 
 public enum IoTDBConnectorRequestVersion {
   VERSION_1((byte) 1),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/PipeRequestType.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/request/PipeRequestType.java
similarity index 89%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/PipeRequestType.java
rename to 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/request/PipeRequestType.java
index 029ee7fdb0d..1d9528a4318 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/PipeRequestType.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/request/PipeRequestType.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.connector.payload.evolvable;
+package org.apache.iotdb.commons.pipe.connector.payload.request;
 
 import java.util.Arrays;
 import java.util.HashMap;
@@ -34,6 +34,12 @@ public enum PipeRequestType {
 
   TRANSFER_TABLET_BATCH((short) 6),
   TRANSFER_TABLET_BINARY((short) 7),
+
+  TRANSFER_SCHEMA_PLAN((short) 8),
+  TRANSFER_CONFIG_PLAN((short) 9),
+
+  TRANSFER_SNAPSHOT_PIECE((short) 10),
+  TRANSFER_SNAPSHOT_SEAL((short) 11),
   ;
 
   private final short type;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferFilePieceReq.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/request/PipeTransferSnapshotPieceReq.java
similarity index 53%
copy from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferFilePieceReq.java
copy to 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/request/PipeTransferSnapshotPieceReq.java
index 1b9bf9bd1b2..b397d2f7e8c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferFilePieceReq.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/request/PipeTransferSnapshotPieceReq.java
@@ -17,10 +17,8 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.connector.payload.evolvable.request;
+package org.apache.iotdb.commons.pipe.connector.payload.request;
 
-import org.apache.iotdb.db.pipe.connector.payload.evolvable.PipeRequestType;
-import 
org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnectorRequestVersion;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
@@ -32,77 +30,77 @@ import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Objects;
 
-public class PipeTransferFilePieceReq extends TPipeTransferReq {
+public class PipeTransferSnapshotPieceReq extends TPipeTransferReq {
 
-  private transient String fileName;
+  private transient String snapshotName;
   private transient long startWritingOffset;
-  private transient byte[] filePiece;
+  private transient byte[] snapshotPiece;
 
-  private PipeTransferFilePieceReq() {
+  private PipeTransferSnapshotPieceReq() {
     // Empty constructor
   }
 
-  public String getFileName() {
-    return fileName;
+  public String getSnapshotName() {
+    return snapshotName;
   }
 
   public long getStartWritingOffset() {
     return startWritingOffset;
   }
 
-  public byte[] getFilePiece() {
-    return filePiece;
+  public byte[] getSnapshotPiece() {
+    return snapshotPiece;
   }
 
   /////////////////////////////// Thrift ///////////////////////////////
 
-  public static PipeTransferFilePieceReq toTPipeTransferReq(
-      String fileName, long startWritingOffset, byte[] filePiece) throws 
IOException {
-    final PipeTransferFilePieceReq filePieceReq = new 
PipeTransferFilePieceReq();
+  public static PipeTransferSnapshotPieceReq toTPipeTransferReq(
+      String snapshotName, long startWritingOffset, byte[] snapshotPiece) 
throws IOException {
+    final PipeTransferSnapshotPieceReq snapshotPieceReq = new 
PipeTransferSnapshotPieceReq();
 
-    filePieceReq.fileName = fileName;
-    filePieceReq.startWritingOffset = startWritingOffset;
-    filePieceReq.filePiece = filePiece;
+    snapshotPieceReq.snapshotName = snapshotName;
+    snapshotPieceReq.startWritingOffset = startWritingOffset;
+    snapshotPieceReq.snapshotPiece = snapshotPiece;
 
-    filePieceReq.version = IoTDBConnectorRequestVersion.VERSION_1.getVersion();
-    filePieceReq.type = PipeRequestType.TRANSFER_FILE_PIECE.getType();
+    snapshotPieceReq.version = 
IoTDBConnectorRequestVersion.VERSION_1.getVersion();
+    snapshotPieceReq.type = PipeRequestType.TRANSFER_SNAPSHOT_PIECE.getType();
     try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
         final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
-      ReadWriteIOUtils.write(fileName, outputStream);
+      ReadWriteIOUtils.write(snapshotName, outputStream);
       ReadWriteIOUtils.write(startWritingOffset, outputStream);
-      ReadWriteIOUtils.write(new Binary(filePiece), outputStream);
-      filePieceReq.body =
+      ReadWriteIOUtils.write(new Binary(snapshotPiece), outputStream);
+      snapshotPieceReq.body =
           ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
     }
 
-    return filePieceReq;
+    return snapshotPieceReq;
   }
 
-  public static PipeTransferFilePieceReq fromTPipeTransferReq(TPipeTransferReq 
transferReq) {
-    final PipeTransferFilePieceReq filePieceReq = new 
PipeTransferFilePieceReq();
+  public static PipeTransferSnapshotPieceReq 
fromTPipeTransferReq(TPipeTransferReq transferReq) {
+    final PipeTransferSnapshotPieceReq snapshotPieceReq = new 
PipeTransferSnapshotPieceReq();
 
-    filePieceReq.fileName = ReadWriteIOUtils.readString(transferReq.body);
-    filePieceReq.startWritingOffset = 
ReadWriteIOUtils.readLong(transferReq.body);
-    filePieceReq.filePiece = 
ReadWriteIOUtils.readBinary(transferReq.body).getValues();
+    snapshotPieceReq.snapshotName = 
ReadWriteIOUtils.readString(transferReq.body);
+    snapshotPieceReq.startWritingOffset = 
ReadWriteIOUtils.readLong(transferReq.body);
+    snapshotPieceReq.snapshotPiece = 
ReadWriteIOUtils.readBinary(transferReq.body).getValues();
 
-    filePieceReq.version = transferReq.version;
-    filePieceReq.type = transferReq.type;
-    filePieceReq.body = transferReq.body;
+    snapshotPieceReq.version = transferReq.version;
+    snapshotPieceReq.type = transferReq.type;
+    snapshotPieceReq.body = transferReq.body;
 
-    return filePieceReq;
+    return snapshotPieceReq;
   }
 
   /////////////////////////////// Air Gap ///////////////////////////////
 
   public static byte[] toTPipeTransferBytes(
-      String fileName, long startWritingOffset, byte[] filePiece) throws 
IOException {
+      String snapshotName, long startWritingOffset, byte[] snapshotPiece) 
throws IOException {
     try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
         final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
       
ReadWriteIOUtils.write(IoTDBConnectorRequestVersion.VERSION_1.getVersion(), 
outputStream);
-      ReadWriteIOUtils.write(PipeRequestType.TRANSFER_FILE_PIECE.getType(), 
outputStream);
-      ReadWriteIOUtils.write(fileName, outputStream);
+      
ReadWriteIOUtils.write(PipeRequestType.TRANSFER_SNAPSHOT_PIECE.getType(), 
outputStream);
+      ReadWriteIOUtils.write(snapshotName, outputStream);
       ReadWriteIOUtils.write(startWritingOffset, outputStream);
-      ReadWriteIOUtils.write(new Binary(filePiece), outputStream);
+      ReadWriteIOUtils.write(new Binary(snapshotPiece), outputStream);
       return byteArrayOutputStream.toByteArray();
     }
   }
@@ -117,10 +115,10 @@ public class PipeTransferFilePieceReq extends 
TPipeTransferReq {
     if (obj == null || getClass() != obj.getClass()) {
       return false;
     }
-    PipeTransferFilePieceReq that = (PipeTransferFilePieceReq) obj;
-    return fileName.equals(that.fileName)
+    PipeTransferSnapshotPieceReq that = (PipeTransferSnapshotPieceReq) obj;
+    return snapshotName.equals(that.snapshotName)
         && startWritingOffset == that.startWritingOffset
-        && Arrays.equals(filePiece, that.filePiece)
+        && Arrays.equals(snapshotPiece, that.snapshotPiece)
         && version == that.version
         && type == that.type
         && body.equals(that.body);
@@ -129,6 +127,6 @@ public class PipeTransferFilePieceReq extends 
TPipeTransferReq {
   @Override
   public int hashCode() {
     return Objects.hash(
-        fileName, startWritingOffset, Arrays.hashCode(filePiece), version, 
type, body);
+        snapshotName, startWritingOffset, Arrays.hashCode(snapshotPiece), 
version, type, body);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferFileSealReq.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/request/PipeTransferSnapshotSealReq.java
similarity index 52%
copy from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferFileSealReq.java
copy to 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/request/PipeTransferSnapshotSealReq.java
index 6314068fe50..b4ffbc2cd03 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferFileSealReq.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/request/PipeTransferSnapshotSealReq.java
@@ -17,10 +17,8 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.connector.payload.evolvable.request;
+package org.apache.iotdb.commons.pipe.connector.payload.request;
 
-import org.apache.iotdb.db.pipe.connector.payload.evolvable.PipeRequestType;
-import 
org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnectorRequestVersion;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -30,68 +28,68 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Objects;
 
-public class PipeTransferFileSealReq extends TPipeTransferReq {
+public class PipeTransferSnapshotSealReq extends TPipeTransferReq {
 
-  private transient String fileName;
-  private transient long fileLength;
+  private transient String snapshotName;
+  private transient long snapshotLength;
 
-  private PipeTransferFileSealReq() {
+  private PipeTransferSnapshotSealReq() {
     // Empty constructor
   }
 
-  public String getFileName() {
-    return fileName;
+  public String getSnapshotName() {
+    return snapshotName;
   }
 
-  public long getFileLength() {
-    return fileLength;
+  public long getSnapshotLength() {
+    return snapshotLength;
   }
 
   /////////////////////////////// Thrift ///////////////////////////////
 
-  public static PipeTransferFileSealReq toTPipeTransferReq(String fileName, 
long fileLength)
-      throws IOException {
-    final PipeTransferFileSealReq fileSealReq = new PipeTransferFileSealReq();
+  public static PipeTransferSnapshotSealReq toTPipeTransferReq(
+      String snapshotName, long snapshotLength) throws IOException {
+    final PipeTransferSnapshotSealReq snapshotSealReq = new 
PipeTransferSnapshotSealReq();
 
-    fileSealReq.fileName = fileName;
-    fileSealReq.fileLength = fileLength;
+    snapshotSealReq.snapshotName = snapshotName;
+    snapshotSealReq.snapshotLength = snapshotLength;
 
-    fileSealReq.version = IoTDBConnectorRequestVersion.VERSION_1.getVersion();
-    fileSealReq.type = PipeRequestType.TRANSFER_FILE_SEAL.getType();
+    snapshotSealReq.version = 
IoTDBConnectorRequestVersion.VERSION_1.getVersion();
+    snapshotSealReq.type = PipeRequestType.TRANSFER_SNAPSHOT_SEAL.getType();
     try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
         final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
-      ReadWriteIOUtils.write(fileName, outputStream);
-      ReadWriteIOUtils.write(fileLength, outputStream);
-      fileSealReq.body =
+      ReadWriteIOUtils.write(snapshotName, outputStream);
+      ReadWriteIOUtils.write(snapshotLength, outputStream);
+      snapshotSealReq.body =
           ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
     }
 
-    return fileSealReq;
+    return snapshotSealReq;
   }
 
-  public static PipeTransferFileSealReq fromTPipeTransferReq(TPipeTransferReq 
req) {
-    final PipeTransferFileSealReq fileSealReq = new PipeTransferFileSealReq();
+  public static PipeTransferSnapshotSealReq 
fromTPipeTransferReq(TPipeTransferReq req) {
+    final PipeTransferSnapshotSealReq snapshotSealReq = new 
PipeTransferSnapshotSealReq();
 
-    fileSealReq.fileName = ReadWriteIOUtils.readString(req.body);
-    fileSealReq.fileLength = ReadWriteIOUtils.readLong(req.body);
+    snapshotSealReq.snapshotName = ReadWriteIOUtils.readString(req.body);
+    snapshotSealReq.snapshotLength = ReadWriteIOUtils.readLong(req.body);
 
-    fileSealReq.version = req.version;
-    fileSealReq.type = req.type;
-    fileSealReq.body = req.body;
+    snapshotSealReq.version = req.version;
+    snapshotSealReq.type = req.type;
+    snapshotSealReq.body = req.body;
 
-    return fileSealReq;
+    return snapshotSealReq;
   }
 
   /////////////////////////////// Air Gap ///////////////////////////////
 
-  public static byte[] toTPipeTransferFileSealBytes(String fileName, long 
fileLength)
+  public static byte[] toTPipeTransferSnapshotSealBytes(String snapshotName, 
long snapshotLength)
       throws IOException {
     try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
         final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
       
ReadWriteIOUtils.write(IoTDBConnectorRequestVersion.VERSION_1.getVersion(), 
outputStream);
-      ReadWriteIOUtils.write(PipeRequestType.TRANSFER_FILE_SEAL.getType(), 
outputStream);
-      ReadWriteIOUtils.write(fileName, outputStream);
-      ReadWriteIOUtils.write(fileLength, outputStream);
+      ReadWriteIOUtils.write(PipeRequestType.TRANSFER_SNAPSHOT_SEAL.getType(), 
outputStream);
+      ReadWriteIOUtils.write(snapshotName, outputStream);
+      ReadWriteIOUtils.write(snapshotLength, outputStream);
       return byteArrayOutputStream.toByteArray();
     }
   }
@@ -106,9 +104,9 @@ public class PipeTransferFileSealReq extends 
TPipeTransferReq {
     if (obj == null || getClass() != obj.getClass()) {
       return false;
     }
-    PipeTransferFileSealReq that = (PipeTransferFileSealReq) obj;
-    return fileName.equals(that.fileName)
-        && fileLength == that.fileLength
+    PipeTransferSnapshotSealReq that = (PipeTransferSnapshotSealReq) obj;
+    return snapshotName.equals(that.snapshotName)
+        && snapshotLength == that.snapshotLength
         && version == that.version
         && type == that.type
         && body.equals(that.body);
@@ -116,6 +114,6 @@ public class PipeTransferFileSealReq extends 
TPipeTransferReq {
 
   @Override
   public int hashCode() {
-    return Objects.hash(fileName, fileLength, version, type, body);
+    return Objects.hash(snapshotName, snapshotLength, version, type, body);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnectorRequestVersion.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/request/TransferConfigPlanReq.java
similarity index 74%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnectorRequestVersion.java
rename to 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/request/TransferConfigPlanReq.java
index 941a372a498..3520ddfa835 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/IoTDBConnectorRequestVersion.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/request/TransferConfigPlanReq.java
@@ -17,19 +17,8 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.connector.protocol;
+package org.apache.iotdb.commons.pipe.connector.payload.request;
 
-public enum IoTDBConnectorRequestVersion {
-  VERSION_1((byte) 1),
-  ;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 
-  private final byte version;
-
-  IoTDBConnectorRequestVersion(byte type) {
-    this.version = type;
-  }
-
-  public byte getVersion() {
-    return version;
-  }
-}
+public abstract class TransferConfigPlanReq extends TPipeTransferReq {}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/response/PipeTransferSnapshotPieceResp.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/response/PipeTransferSnapshotPieceResp.java
new file mode 100644
index 00000000000..5df196a7b5c
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/response/PipeTransferSnapshotPieceResp.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.connector.payload.response;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class PipeTransferSnapshotPieceResp extends TPipeTransferResp {
+
+  public static final long ERROR_END_OFFSET = -1;
+
+  private long endWritingOffset;
+
+  private PipeTransferSnapshotPieceResp() {}
+
+  public long getEndWritingOffset() {
+    return endWritingOffset;
+  }
+
+  public static PipeTransferSnapshotPieceResp toTPipeTransferResp(
+      TSStatus status, long endWritingOffset) throws IOException {
+    final PipeTransferSnapshotPieceResp snapshotPieceResp = new 
PipeTransferSnapshotPieceResp();
+
+    snapshotPieceResp.status = status;
+
+    snapshotPieceResp.endWritingOffset = endWritingOffset;
+    try (PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      ReadWriteIOUtils.write(endWritingOffset, outputStream);
+      snapshotPieceResp.body =
+          ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
+    }
+
+    return snapshotPieceResp;
+  }
+
+  public static PipeTransferSnapshotPieceResp toTPipeTransferResp(TSStatus 
status) {
+    final PipeTransferSnapshotPieceResp snapshotPieceResp = new 
PipeTransferSnapshotPieceResp();
+
+    snapshotPieceResp.status = status;
+
+    return snapshotPieceResp;
+  }
+
+  public static PipeTransferSnapshotPieceResp fromTPipeTransferResp(
+      TPipeTransferResp transferResp) {
+    final PipeTransferSnapshotPieceResp snapshotPieceResp = new 
PipeTransferSnapshotPieceResp();
+
+    snapshotPieceResp.status = transferResp.status;
+
+    if (transferResp.isSetBody()) {
+      snapshotPieceResp.endWritingOffset = 
ReadWriteIOUtils.readLong(transferResp.body);
+      snapshotPieceResp.body = transferResp.body;
+    }
+
+    return snapshotPieceResp;
+  }
+}

Reply via email to