This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 4230ad16afd Pipe: using clusterId to judge whether the target cluster 
is source cluster (#11994)
4230ad16afd is described below

commit 4230ad16afd614897de4452e17a893d6d86e70c8
Author: Xuan Ronaldo <[email protected]>
AuthorDate: Fri Feb 2 17:06:50 2024 +0800

    Pipe: using clusterId to judge whether the target cluster is source cluster 
(#11994)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../db/pipe/agent/runtime/PipeRuntimeAgent.java    | 27 ++++++++-
 .../common/PipeTransferHandshakeConstant.java      | 30 ++++++++++
 ...akeReq.java => PipeTransferHandshakeV1Req.java} | 18 +++---
 ...akeReq.java => PipeTransferHandshakeV2Req.java} | 63 +++++++++++++-------
 .../protocol/airgap/IoTDBAirGapConnector.java      | 28 +++++++--
 .../async/IoTDBThriftAsyncClientManager.java       | 61 +++++++++++++++----
 .../thrift/sync/IoTDBThriftSyncClientManager.java  | 55 +++++++++++++----
 .../receiver/thrift/IoTDBThriftReceiverV1.java     | 66 +++++++++++++++++++--
 .../connector/PipeDataNodeThriftRequestTest.java   | 68 ++++++++++++++++++++--
 .../iotdb/db/pipe/connector/PipeReceiverTest.java  |  4 +-
 .../connector/payload/request/PipeRequestType.java |  4 +-
 11 files changed, 351 insertions(+), 73 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
index 75997e483e2..1f2eca0e5e8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
@@ -32,6 +32,9 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import 
org.apache.iotdb.db.pipe.progress.assigner.SimpleConsensusProgressIndexAssigner;
 import org.apache.iotdb.db.pipe.resource.PipeHardlinkFileDirStartupCleaner;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
+import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.db.service.ResourcesInformationHolder;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
@@ -41,6 +44,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 public class PipeRuntimeAgent implements IService {
 
@@ -48,13 +52,14 @@ public class PipeRuntimeAgent implements IService {
   private static final int DATA_NODE_ID = 
IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
 
   private final AtomicBoolean isShutdown = new AtomicBoolean(false);
-
-  private final PipePeriodicalJobExecutor pipePeriodicalJobExecutor =
-      new PipePeriodicalJobExecutor();
+  private final AtomicReference<String> clusterId = new 
AtomicReference<>(null);
 
   private final SimpleConsensusProgressIndexAssigner 
simpleConsensusProgressIndexAssigner =
       new SimpleConsensusProgressIndexAssigner();
 
+  private final PipePeriodicalJobExecutor pipePeriodicalJobExecutor =
+      new PipePeriodicalJobExecutor();
+
   //////////////////////////// System Service Interface 
////////////////////////////
 
   public synchronized void preparePipeResources(
@@ -103,6 +108,22 @@ public class PipeRuntimeAgent implements IService {
     return ServiceType.PIPE_RUNTIME_AGENT;
   }
 
+  public String getClusterIdIfPossible() {
+    if (clusterId.get() == null) {
+      synchronized (clusterId) {
+        if (clusterId.get() == null) {
+          try (final ConfigNodeClient configNodeClient =
+              
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
 {
+            clusterId.set(configNodeClient.getClusterId().getClusterId());
+          } catch (Exception e) {
+            LOGGER.warn("Unable to get clusterId, because: {}", 
e.getMessage(), e);
+          }
+        }
+      }
+    }
+    return clusterId.get();
+  }
+
   ////////////////////// SimpleConsensus ProgressIndex Assigner 
//////////////////////
 
   public void assignSimpleProgressIndexIfNeeded(InsertNode insertNode) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/common/PipeTransferHandshakeConstant.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/common/PipeTransferHandshakeConstant.java
new file mode 100644
index 00000000000..3d2bf1cbb0f
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/common/PipeTransferHandshakeConstant.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.connector.payload.evolvable.common;
+
+public class PipeTransferHandshakeConstant {
+
+  public static final String HANDSHAKE_KEY_TIME_PRECISION = 
"timestampPrecision";
+  public static final String HANDSHAKE_KEY_CLUSTER_ID = "clusterID";
+
+  private PipeTransferHandshakeConstant() {
+    // Utility class
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferHandshakeReq.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferHandshakeV1Req.java
similarity index 83%
copy from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferHandshakeReq.java
copy to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferHandshakeV1Req.java
index 80f9d1217eb..c829f531028 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferHandshakeReq.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferHandshakeV1Req.java
@@ -30,11 +30,11 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Objects;
 
-public class PipeTransferHandshakeReq extends TPipeTransferReq {
+public class PipeTransferHandshakeV1Req extends TPipeTransferReq {
 
   private transient String timestampPrecision;
 
-  private PipeTransferHandshakeReq() {
+  private PipeTransferHandshakeV1Req() {
     // Empty constructor
   }
 
@@ -44,14 +44,14 @@ public class PipeTransferHandshakeReq extends 
TPipeTransferReq {
 
   /////////////////////////////// Thrift ///////////////////////////////
 
-  public static PipeTransferHandshakeReq toTPipeTransferReq(String 
timestampPrecision)
+  public static PipeTransferHandshakeV1Req toTPipeTransferReq(String 
timestampPrecision)
       throws IOException {
-    final PipeTransferHandshakeReq handshakeReq = new 
PipeTransferHandshakeReq();
+    final PipeTransferHandshakeV1Req handshakeReq = new 
PipeTransferHandshakeV1Req();
 
     handshakeReq.timestampPrecision = timestampPrecision;
 
     handshakeReq.version = IoTDBConnectorRequestVersion.VERSION_1.getVersion();
-    handshakeReq.type = PipeRequestType.HANDSHAKE.getType();
+    handshakeReq.type = PipeRequestType.HANDSHAKE_V1.getType();
     try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
         final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
       ReadWriteIOUtils.write(timestampPrecision, outputStream);
@@ -62,8 +62,8 @@ public class PipeTransferHandshakeReq extends 
TPipeTransferReq {
     return handshakeReq;
   }
 
-  public static PipeTransferHandshakeReq fromTPipeTransferReq(TPipeTransferReq 
transferReq) {
-    final PipeTransferHandshakeReq handshakeReq = new 
PipeTransferHandshakeReq();
+  public static PipeTransferHandshakeV1Req 
fromTPipeTransferReq(TPipeTransferReq transferReq) {
+    final PipeTransferHandshakeV1Req handshakeReq = new 
PipeTransferHandshakeV1Req();
 
     handshakeReq.timestampPrecision = 
ReadWriteIOUtils.readString(transferReq.body);
 
@@ -80,7 +80,7 @@ public class PipeTransferHandshakeReq extends 
TPipeTransferReq {
     try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
         final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
       
ReadWriteIOUtils.write(IoTDBConnectorRequestVersion.VERSION_1.getVersion(), 
outputStream);
-      ReadWriteIOUtils.write(PipeRequestType.HANDSHAKE.getType(), 
outputStream);
+      ReadWriteIOUtils.write(PipeRequestType.HANDSHAKE_V1.getType(), 
outputStream);
       ReadWriteIOUtils.write(timestampPrecision, outputStream);
       return byteArrayOutputStream.toByteArray();
     }
@@ -96,7 +96,7 @@ public class PipeTransferHandshakeReq extends 
TPipeTransferReq {
     if (obj == null || getClass() != obj.getClass()) {
       return false;
     }
-    PipeTransferHandshakeReq that = (PipeTransferHandshakeReq) obj;
+    PipeTransferHandshakeV1Req that = (PipeTransferHandshakeV1Req) obj;
     return timestampPrecision.equals(that.timestampPrecision)
         && version == that.version
         && type == that.type
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferHandshakeReq.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferHandshakeV2Req.java
similarity index 56%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferHandshakeReq.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferHandshakeV2Req.java
index 80f9d1217eb..f2c8a9233a8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferHandshakeReq.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferHandshakeV2Req.java
@@ -25,47 +25,62 @@ import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
+import org.apache.thrift.TException;
+
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Objects;
 
-public class PipeTransferHandshakeReq extends TPipeTransferReq {
+public class PipeTransferHandshakeV2Req extends TPipeTransferReq {
 
-  private transient String timestampPrecision;
+  private transient Map<String, String> params;
 
-  private PipeTransferHandshakeReq() {
+  private PipeTransferHandshakeV2Req() {
     // Empty constructor
   }
 
-  public String getTimestampPrecision() {
-    return timestampPrecision;
+  public Map<String, String> getParams() {
+    return params;
   }
 
   /////////////////////////////// Thrift ///////////////////////////////
 
-  public static PipeTransferHandshakeReq toTPipeTransferReq(String 
timestampPrecision)
-      throws IOException {
-    final PipeTransferHandshakeReq handshakeReq = new 
PipeTransferHandshakeReq();
-
-    handshakeReq.timestampPrecision = timestampPrecision;
+  public static PipeTransferHandshakeV2Req toTPipeTransferReq(Map<String, 
String> params)
+      throws TException, IOException {
+    final PipeTransferHandshakeV2Req handshakeReq = new 
PipeTransferHandshakeV2Req();
 
     handshakeReq.version = IoTDBConnectorRequestVersion.VERSION_1.getVersion();
-    handshakeReq.type = PipeRequestType.HANDSHAKE.getType();
+    handshakeReq.type = PipeRequestType.HANDSHAKE_V2.getType();
     try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
         final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
-      ReadWriteIOUtils.write(timestampPrecision, outputStream);
+      ReadWriteIOUtils.write(params.size(), outputStream);
+      for (final Map.Entry<String, String> entry : params.entrySet()) {
+        ReadWriteIOUtils.write(entry.getKey(), outputStream);
+        ReadWriteIOUtils.write(entry.getValue(), outputStream);
+      }
       handshakeReq.body =
           ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
     }
 
+    handshakeReq.params = params;
+
     return handshakeReq;
   }
 
-  public static PipeTransferHandshakeReq fromTPipeTransferReq(TPipeTransferReq 
transferReq) {
-    final PipeTransferHandshakeReq handshakeReq = new 
PipeTransferHandshakeReq();
+  public static PipeTransferHandshakeV2Req 
fromTPipeTransferReq(TPipeTransferReq transferReq) {
+    final PipeTransferHandshakeV2Req handshakeReq = new 
PipeTransferHandshakeV2Req();
 
-    handshakeReq.timestampPrecision = 
ReadWriteIOUtils.readString(transferReq.body);
+    Map<String, String> params = new HashMap<>();
+    final int size = ReadWriteIOUtils.readInt(transferReq.body);
+    for (int i = 0; i < size; ++i) {
+      final String key = ReadWriteIOUtils.readString(transferReq.body);
+      final String value = ReadWriteIOUtils.readString(transferReq.body);
+      params.put(key, value);
+    }
+    handshakeReq.params = params;
 
     handshakeReq.version = transferReq.version;
     handshakeReq.type = transferReq.type;
@@ -76,12 +91,16 @@ public class PipeTransferHandshakeReq extends 
TPipeTransferReq {
 
   /////////////////////////////// Air Gap ///////////////////////////////
 
-  public static byte[] toTransferHandshakeBytes(String timestampPrecision) 
throws IOException {
+  public static byte[] toTransferHandshakeBytes(HashMap<String, String> 
params) throws IOException {
     try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
         final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
       
ReadWriteIOUtils.write(IoTDBConnectorRequestVersion.VERSION_1.getVersion(), 
outputStream);
-      ReadWriteIOUtils.write(PipeRequestType.HANDSHAKE.getType(), 
outputStream);
-      ReadWriteIOUtils.write(timestampPrecision, outputStream);
+      ReadWriteIOUtils.write(PipeRequestType.HANDSHAKE_V2.getType(), 
outputStream);
+      ReadWriteIOUtils.write(params.size(), outputStream);
+      for (final Map.Entry<String, String> entry : params.entrySet()) {
+        ReadWriteIOUtils.write(entry.getKey(), outputStream);
+        ReadWriteIOUtils.write(entry.getValue(), outputStream);
+      }
       return byteArrayOutputStream.toByteArray();
     }
   }
@@ -96,15 +115,15 @@ public class PipeTransferHandshakeReq extends 
TPipeTransferReq {
     if (obj == null || getClass() != obj.getClass()) {
       return false;
     }
-    PipeTransferHandshakeReq that = (PipeTransferHandshakeReq) obj;
-    return timestampPrecision.equals(that.timestampPrecision)
+    PipeTransferHandshakeV2Req that = (PipeTransferHandshakeV2Req) obj;
+    return Objects.equals(params, that.params)
         && version == that.version
         && type == that.type
-        && body.equals(that.body);
+        && Objects.equals(body, that.body);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(timestampPrecision, version, type, body);
+    return Objects.hash(params, version, type, body);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
index b9ffaa94a26..854f527222c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
@@ -26,11 +26,14 @@ import 
org.apache.iotdb.commons.pipe.plugin.builtin.connector.iotdb.IoTDBConnect
 import org.apache.iotdb.commons.utils.NodeUrlUtils;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import 
org.apache.iotdb.db.pipe.connector.payload.airgap.AirGapELanguageConstant;
 import org.apache.iotdb.db.pipe.connector.payload.airgap.AirGapOneByteResponse;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.common.PipeTransferHandshakeConstant;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFilePieceReq;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFileSealReq;
-import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeReq;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeV1Req;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeV2Req;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReq;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
@@ -63,6 +66,7 @@ import java.net.Socket;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Objects;
 import java.util.Set;
@@ -194,11 +198,23 @@ public class IoTDBAirGapConnector extends IoTDBConnector {
         continue;
       }
 
-      if (!send(
-          socket,
-          PipeTransferHandshakeReq.toTransferHandshakeBytes(
-              
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()))) {
-        throw new PipeException("Handshake error with target server ip: " + ip 
+ ", port: " + port);
+      final HashMap<String, String> params = new HashMap<>();
+      params.put(
+          PipeTransferHandshakeConstant.HANDSHAKE_KEY_CLUSTER_ID,
+          PipeAgent.runtime().getClusterIdIfPossible());
+      params.put(
+          PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION,
+          CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
+
+      // Try to handshake by PipeTransferHandshakeV2Req. If failed, retry to 
handshake by
+      // PipeTransferHandshakeV1Req. If failed again, throw 
PipeConnectionException.
+      if (!send(socket, 
PipeTransferHandshakeV2Req.toTransferHandshakeBytes(params))
+          && !send(
+              socket,
+              PipeTransferHandshakeV1Req.toTransferHandshakeBytes(
+                  
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()))) {
+        throw new PipeConnectionException(
+            "Handshake error with target server ip: " + ip + ", port: " + 
port);
       } else {
         isSocketAlive.set(i, true);
         socket.setSoTimeout((int) 
PIPE_CONFIG.getPipeConnectorTransferTimeoutMs());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncClientManager.java
index 477165d2bd8..b70794667dd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncClientManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncClientManager.java
@@ -24,7 +24,10 @@ import org.apache.iotdb.commons.client.ClientPoolFactory;
 import org.apache.iotdb.commons.client.IClientManager;
 import 
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
-import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeReq;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.common.PipeTransferHandshakeConstant;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeV1Req;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeV2Req;
 import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.IoTDBThriftClientManager;
 import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
 import org.apache.iotdb.pipe.api.exception.PipeException;
@@ -35,6 +38,7 @@ import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -122,14 +126,14 @@ public class IoTDBThriftAsyncClientManager extends 
IoTDBThriftClientManager {
     }
 
     final AtomicBoolean isHandshakeFinished = new AtomicBoolean(false);
+    final AtomicReference<TPipeTransferResp> resp = new AtomicReference<>();
     final AtomicReference<Exception> exception = new AtomicReference<>();
-
-    client.pipeTransfer(
-        PipeTransferHandshakeReq.toTPipeTransferReq(
-            
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()),
+    final AsyncMethodCallback<TPipeTransferResp> callback =
         new AsyncMethodCallback<TPipeTransferResp>() {
           @Override
           public void onComplete(TPipeTransferResp response) {
+            resp.set(response);
+
             if (response.getStatus().getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
               LOGGER.warn(
                   "Handshake error with receiver {}:{}, code: {}, message: 
{}.",
@@ -167,8 +171,47 @@ public class IoTDBThriftAsyncClientManager extends 
IoTDBThriftClientManager {
 
             isHandshakeFinished.set(true);
           }
-        });
+        };
+
+    // Try to handshake by PipeTransferHandshakeV2Req.
+    final HashMap<String, String> params = new HashMap<>();
+    params.put(
+        PipeTransferHandshakeConstant.HANDSHAKE_KEY_CLUSTER_ID,
+        PipeAgent.runtime().getClusterIdIfPossible());
+    params.put(
+        PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION,
+        CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
+    client.pipeTransfer(PipeTransferHandshakeV2Req.toTPipeTransferReq(params), 
callback);
+    waitHandshakeFinished(isHandshakeFinished);
+
+    // Retry to handshake by PipeTransferHandshakeV1Req.
+    if (resp.get() != null
+        && resp.get().getStatus().getCode() == 
TSStatusCode.PIPE_VERSION_ERROR.getStatusCode()) {
+      LOGGER.info(
+          "Handshake error by PipeTransferHandshakeV2Req with receiver {}:{} "
+              + "retry to handshake by PipeTransferHandshakeV1Req.",
+          targetNodeUrl.getIp(),
+          targetNodeUrl.getPort());
+
+      isHandshakeFinished.set(false);
+      resp.set(null);
+      exception.set(null);
+
+      client.pipeTransfer(
+          PipeTransferHandshakeV1Req.toTPipeTransferReq(
+              
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()),
+          callback);
+      waitHandshakeFinished(isHandshakeFinished);
+    }
+
+    if (exception.get() != null) {
+      throw new PipeConnectionException("Failed to handshake.", 
exception.get());
+    }
+
+    return false;
+  }
 
+  private void waitHandshakeFinished(AtomicBoolean isHandshakeFinished) {
     try {
       while (!isHandshakeFinished.get()) {
         Thread.sleep(10);
@@ -177,12 +220,6 @@ public class IoTDBThriftAsyncClientManager extends 
IoTDBThriftClientManager {
       Thread.currentThread().interrupt();
       throw new PipeException("Interrupted while waiting for handshake 
response.", e);
     }
-
-    if (exception.get() != null) {
-      throw new PipeConnectionException("Failed to handshake.", 
exception.get());
-    }
-
-    return false;
   }
 
   public void updateLeaderCache(String deviceId, TEndPoint endPoint) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncClientManager.java
index f3f83237768..6cc5c5ac683 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncClientManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncClientManager.java
@@ -24,20 +24,22 @@ import 
org.apache.iotdb.commons.client.property.ThriftClientProperty;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import 
org.apache.iotdb.commons.pipe.connector.client.IoTDBThriftSyncConnectorClient;
-import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeReq;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.common.PipeTransferHandshakeConstant;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeV1Req;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeV2Req;
 import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.IoTDBThriftClientManager;
 import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
 import org.apache.iotdb.tsfile.utils.Pair;
 
-import org.apache.thrift.TException;
-import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -95,7 +97,7 @@ public class IoTDBThriftSyncClientManager extends 
IoTDBThriftClientManager imple
             "All target servers %s are not available.", 
endPoint2ClientAndStatus.keySet()));
   }
 
-  private void reconstructClient(TEndPoint endPoint) throws IOException {
+  private void reconstructClient(TEndPoint endPoint) {
     final Pair<IoTDBThriftSyncConnectorClient, Boolean> clientAndStatus =
         endPoint2ClientAndStatus.get(endPoint);
 
@@ -111,6 +113,12 @@ public class IoTDBThriftSyncClientManager extends 
IoTDBThriftClientManager imple
       }
     }
 
+    initClientAndStatus(clientAndStatus, endPoint);
+    sendHandshakeReq(clientAndStatus, endPoint);
+  }
+
+  private void initClientAndStatus(
+      Pair<IoTDBThriftSyncConnectorClient, Boolean> clientAndStatus, TEndPoint 
endPoint) {
     try {
       clientAndStatus.setLeft(
           new IoTDBThriftSyncConnectorClient(
@@ -124,7 +132,7 @@ public class IoTDBThriftSyncClientManager extends 
IoTDBThriftClientManager imple
               useSSL,
               trustStorePath,
               trustStorePwd));
-    } catch (TTransportException e) {
+    } catch (Exception e) {
       throw new PipeConnectionException(
           String.format(
               PipeConnectionException.CONNECTION_ERROR_FORMATTER,
@@ -132,14 +140,41 @@ public class IoTDBThriftSyncClientManager extends 
IoTDBThriftClientManager imple
               endPoint.getPort()),
           e);
     }
+  }
 
+  public void sendHandshakeReq(
+      Pair<IoTDBThriftSyncConnectorClient, Boolean> clientAndStatus, TEndPoint 
endPoint) {
     try {
-      final TPipeTransferResp resp =
+      final HashMap<String, String> params = new HashMap<>();
+      params.put(
+          PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION,
+          CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
+      params.put(
+          PipeTransferHandshakeConstant.HANDSHAKE_KEY_CLUSTER_ID,
+          PipeAgent.runtime().getClusterIdIfPossible());
+
+      // Try to handshake by PipeTransferHandshakeV2Req.
+      TPipeTransferResp resp =
           clientAndStatus
               .getLeft()
-              .pipeTransfer(
-                  PipeTransferHandshakeReq.toTPipeTransferReq(
-                      
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()));
+              
.pipeTransfer(PipeTransferHandshakeV2Req.toTPipeTransferReq(params));
+      // Receiver may be an old version, so we need to retry to handshake by
+      // PipeTransferHandshakeV1Req.
+      if (resp.getStatus().getCode() == 
TSStatusCode.PIPE_TYPE_ERROR.getStatusCode()) {
+        LOGGER.info(
+            "Handshake error with target server ip: {}, port: {}, because: {}. 
"
+                + "Retry to handshake by PipeTransferHandshakeV1Req.",
+            endPoint.getIp(),
+            endPoint.getPort(),
+            resp.getStatus());
+        resp =
+            clientAndStatus
+                .getLeft()
+                .pipeTransfer(
+                    PipeTransferHandshakeV1Req.toTPipeTransferReq(
+                        
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()));
+      }
+
       if (resp.getStatus().getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         LOGGER.warn(
             "Handshake error with target server ip: {}, port: {}, because: 
{}.",
@@ -156,7 +191,7 @@ public class IoTDBThriftSyncClientManager extends 
IoTDBThriftClientManager imple
             endPoint.getIp(),
             endPoint.getPort());
       }
-    } catch (TException e) {
+    } catch (Exception e) {
       LOGGER.warn(
           "Handshake error with target server ip: {}, port: {}, because: {}.",
           endPoint.getIp(),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
index 67d2cbad1e8..da6f888bc57 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
@@ -31,11 +31,14 @@ import org.apache.iotdb.db.auth.AuthorityChecker;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import 
org.apache.iotdb.db.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.common.PipeTransferHandshakeConstant;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.reponse.PipeTransferFilePieceResp;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFilePieceReq;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFileSealReq;
-import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeReq;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeV1Req;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeV2Req;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferSchemaPlanReq;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBatchReq;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReq;
@@ -113,8 +116,10 @@ public class IoTDBThriftReceiverV1 implements 
IoTDBThriftReceiver {
       final short rawRequestType = req.getType();
       if (PipeRequestType.isValidatedRequestType(rawRequestType)) {
         switch (PipeRequestType.valueOf(rawRequestType)) {
-          case HANDSHAKE:
-            return 
handleTransferHandshake(PipeTransferHandshakeReq.fromTPipeTransferReq(req));
+          case HANDSHAKE_V1:
+            return 
handleTransferHandshakeV1(PipeTransferHandshakeV1Req.fromTPipeTransferReq(req));
+          case HANDSHAKE_V2:
+            return 
handleTransferHandshakeV2(PipeTransferHandshakeV2Req.fromTPipeTransferReq(req));
           case TRANSFER_TABLET_INSERT_NODE:
             return handleTransferTabletInsertNode(
                 PipeTransferTabletInsertNodeReq.fromTPipeTransferReq(req),
@@ -170,7 +175,7 @@ public class IoTDBThriftReceiverV1 implements 
IoTDBThriftReceiver {
     }
   }
 
-  private TPipeTransferResp handleTransferHandshake(PipeTransferHandshakeReq 
req) {
+  private TPipeTransferResp 
handleTransferHandshakeV1(PipeTransferHandshakeV1Req req) {
     if (!CommonDescriptor.getInstance()
         .getConfig()
         .getTimestampPrecision()
@@ -246,6 +251,59 @@ public class IoTDBThriftReceiverV1 implements 
IoTDBThriftReceiver {
     return new TPipeTransferResp(RpcUtils.SUCCESS_STATUS);
   }
 
+  private TPipeTransferResp 
handleTransferHandshakeV2(PipeTransferHandshakeV2Req req)
+      throws IOException {
+    // Reject to handshake if the receiver can not take clusterId from config 
node.
+    final String clusterIdFromConfigNode = 
PipeAgent.runtime().getClusterIdIfPossible();
+    if (clusterIdFromConfigNode == null) {
+      final TSStatus status =
+          RpcUtils.getStatus(
+              TSStatusCode.PIPE_HANDSHAKE_ERROR,
+              "Receiver can not get clusterId from config node.");
+      LOGGER.warn("Handshake failed, response status = {}.", status);
+      return new TPipeTransferResp(status);
+    }
+
+    // Reject to handshake if the request does not contain sender's clusterId.
+    final String clusterIdFromHandshakeRequest =
+        
req.getParams().get(PipeTransferHandshakeConstant.HANDSHAKE_KEY_CLUSTER_ID);
+    if (clusterIdFromHandshakeRequest == null) {
+      final TSStatus status =
+          RpcUtils.getStatus(
+              TSStatusCode.PIPE_HANDSHAKE_ERROR, "Handshake request does not 
contain clusterId.");
+      LOGGER.warn("Handshake failed, response status = {}.", status);
+      return new TPipeTransferResp(status);
+    }
+
+    // Reject to handshake if the receiver and sender are from the same 
cluster.
+    if (Objects.equals(clusterIdFromConfigNode, 
clusterIdFromHandshakeRequest)) {
+      final TSStatus status =
+          RpcUtils.getStatus(
+              TSStatusCode.PIPE_HANDSHAKE_ERROR,
+              String.format(
+                  "Receiver and sender are from the same cluster %s.",
+                  clusterIdFromHandshakeRequest));
+      LOGGER.warn("Handshake failed, response status = {}.", status);
+      return new TPipeTransferResp(status);
+    }
+
+    // Reject to handshake if the request does not contain timestampPrecision.
+    final String timestampPrecision =
+        
req.getParams().get(PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION);
+    if (timestampPrecision == null) {
+      final TSStatus status =
+          RpcUtils.getStatus(
+              TSStatusCode.PIPE_HANDSHAKE_ERROR,
+              "Handshake request does not contain timestampPrecision.");
+      LOGGER.warn("Handshake failed, response status = {}.", status);
+      return new TPipeTransferResp(status);
+    }
+
+    // Handle the handshake request as a v1 request.
+    return handleTransferHandshakeV1(
+        PipeTransferHandshakeV1Req.toTPipeTransferReq(timestampPrecision));
+  }
+
   private TPipeTransferResp handleTransferTabletInsertNode(
       PipeTransferTabletInsertNodeReq req,
       IPartitionFetcher partitionFetcher,
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeDataNodeThriftRequestTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeDataNodeThriftRequestTest.java
index 7fd72c51e20..0aba9930704 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeDataNodeThriftRequestTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeDataNodeThriftRequestTest.java
@@ -20,13 +20,18 @@
 package org.apache.iotdb.db.pipe.connector;
 
 import org.apache.iotdb.commons.path.PartialPath;
+import 
org.apache.iotdb.commons.pipe.connector.payload.request.IoTDBConnectorRequestVersion;
+import org.apache.iotdb.commons.pipe.connector.payload.request.PipeRequestType;
 import 
org.apache.iotdb.commons.pipe.connector.payload.request.PipeTransferSnapshotPieceReq;
 import 
org.apache.iotdb.commons.pipe.connector.payload.request.PipeTransferSnapshotSealReq;
 import 
org.apache.iotdb.commons.pipe.connector.payload.response.PipeTransferSnapshotPieceResp;
+import 
org.apache.iotdb.db.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.common.PipeTransferHandshakeConstant;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.reponse.PipeTransferFilePieceResp;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFilePieceReq;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFileSealReq;
-import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeReq;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeV1Req;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeV2Req;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferSchemaPlanReq;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBatchReq;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
@@ -52,16 +57,19 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 
 public class PipeDataNodeThriftRequestTest {
 
   private static final String TIME_PRECISION = "ms";
+  private static final String CLUSTER_ID = "abcde";
 
   @Test
-  public void testPipeValidateHandshakeReq() throws IOException {
-    PipeTransferHandshakeReq req = 
PipeTransferHandshakeReq.toTPipeTransferReq(TIME_PRECISION);
-    PipeTransferHandshakeReq deserializeReq = 
PipeTransferHandshakeReq.fromTPipeTransferReq(req);
+  public void testPipeValidateHandshakeV1Req() throws IOException {
+    PipeTransferHandshakeV1Req req = 
PipeTransferHandshakeV1Req.toTPipeTransferReq(TIME_PRECISION);
+    PipeTransferHandshakeV1Req deserializeReq =
+        PipeTransferHandshakeV1Req.fromTPipeTransferReq(req);
 
     Assert.assertEquals(req.getVersion(), deserializeReq.getVersion());
     Assert.assertEquals(req.getType(), deserializeReq.getType());
@@ -70,6 +78,58 @@ public class PipeDataNodeThriftRequestTest {
     Assert.assertEquals(req.getTimestampPrecision(), 
deserializeReq.getTimestampPrecision());
   }
 
+  @Test
+  public void testPipeValidateHandshakeV2Req() throws Exception {
+    HashMap<String, String> params = new HashMap<>();
+    params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_CLUSTER_ID, 
CLUSTER_ID);
+    params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION, 
TIME_PRECISION);
+    params.put("Nullable", null);
+
+    PipeTransferHandshakeV2Req req = 
PipeTransferHandshakeV2Req.toTPipeTransferReq(params);
+    PipeTransferHandshakeV2Req deserializeReq =
+        PipeTransferHandshakeV2Req.fromTPipeTransferReq(req);
+
+    Assert.assertEquals(req.getVersion(), deserializeReq.getVersion());
+    Assert.assertEquals(req.getType(), deserializeReq.getType());
+    Assert.assertArrayEquals(req.getBody(), deserializeReq.getBody());
+
+    Assert.assertEquals(
+        
req.getParams().get(PipeTransferHandshakeConstant.HANDSHAKE_KEY_CLUSTER_ID),
+        
deserializeReq.getParams().get(PipeTransferHandshakeConstant.HANDSHAKE_KEY_CLUSTER_ID));
+    Assert.assertEquals(
+        
req.getParams().get(PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION),
+        
deserializeReq.getParams().get(PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION));
+    Assert.assertEquals(
+        req.getParams().get("Nullable"), 
deserializeReq.getParams().get("Nullable"));
+  }
+
+  @Test
+  public void testPipeValidateHandshakeV2Req4AirGap() throws IOException {
+    // Construct byteBuffer.
+    HashMap<String, String> params = new HashMap<>();
+    params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_CLUSTER_ID, 
CLUSTER_ID);
+    params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION, 
TIME_PRECISION);
+    params.put("Nullable", null);
+    ByteBuffer byteBuffer =
+        
ByteBuffer.wrap(PipeTransferHandshakeV2Req.toTransferHandshakeBytes(params));
+
+    // Construct request.
+    byte version = ReadWriteIOUtils.readByte(byteBuffer);
+    short type = ReadWriteIOUtils.readShort(byteBuffer);
+    ByteBuffer body = byteBuffer.slice();
+    final AirGapPseudoTPipeTransferRequest req =
+        (AirGapPseudoTPipeTransferRequest)
+            new 
AirGapPseudoTPipeTransferRequest().setVersion(version).setType(type).setBody(body);
+    final PipeTransferHandshakeV2Req deserializeReq =
+        PipeTransferHandshakeV2Req.fromTPipeTransferReq(req);
+
+    // Assert.
+    Assert.assertEquals(
+        IoTDBConnectorRequestVersion.VERSION_1.getVersion(), 
deserializeReq.getVersion());
+    Assert.assertEquals(PipeRequestType.HANDSHAKE_V2.getType(), 
deserializeReq.getType());
+    Assert.assertEquals(params, deserializeReq.getParams());
+  }
+
   @Test
   public void testPipeTransferInsertNodeReq() {
     PipeTransferTabletInsertNodeReq req =
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeReceiverTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeReceiverTest.java
index bc8a0c8e29d..d851219137b 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeReceiverTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeReceiverTest.java
@@ -20,7 +20,7 @@
 package org.apache.iotdb.db.pipe.connector;
 
 import org.apache.iotdb.commons.conf.CommonDescriptor;
-import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeReq;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeV1Req;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
 import org.apache.iotdb.db.pipe.receiver.thrift.IoTDBThriftReceiverV1;
 import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
@@ -43,7 +43,7 @@ public class PipeReceiverTest {
     IoTDBThriftReceiverV1 receiver = new IoTDBThriftReceiverV1();
     try {
       receiver.receive(
-          PipeTransferHandshakeReq.toTPipeTransferReq(
+          PipeTransferHandshakeV1Req.toTPipeTransferReq(
               
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()),
           mock(IPartitionFetcher.class),
           mock(ISchemaFetcher.class));
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/request/PipeRequestType.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/request/PipeRequestType.java
index 1d9528a4318..be35ae36a61 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/request/PipeRequestType.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/request/PipeRequestType.java
@@ -24,7 +24,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 public enum PipeRequestType {
-  HANDSHAKE((short) 1),
+  HANDSHAKE_V1((short) 1),
 
   TRANSFER_TABLET_INSERT_NODE((short) 2),
   TRANSFER_TABLET_RAW((short) 3),
@@ -40,6 +40,8 @@ public enum PipeRequestType {
 
   TRANSFER_SNAPSHOT_PIECE((short) 10),
   TRANSFER_SNAPSHOT_SEAL((short) 11),
+
+  HANDSHAKE_V2((short) 12),
   ;
 
   private final short type;


Reply via email to