This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 4230ad16afd Pipe: using clusterId to judge whether the target cluster
is source cluster (#11994)
4230ad16afd is described below
commit 4230ad16afd614897de4452e17a893d6d86e70c8
Author: Xuan Ronaldo <[email protected]>
AuthorDate: Fri Feb 2 17:06:50 2024 +0800
Pipe: using clusterId to judge whether the target cluster is source cluster
(#11994)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../db/pipe/agent/runtime/PipeRuntimeAgent.java | 27 ++++++++-
.../common/PipeTransferHandshakeConstant.java | 30 ++++++++++
...akeReq.java => PipeTransferHandshakeV1Req.java} | 18 +++---
...akeReq.java => PipeTransferHandshakeV2Req.java} | 63 +++++++++++++-------
.../protocol/airgap/IoTDBAirGapConnector.java | 28 +++++++--
.../async/IoTDBThriftAsyncClientManager.java | 61 +++++++++++++++----
.../thrift/sync/IoTDBThriftSyncClientManager.java | 55 +++++++++++++----
.../receiver/thrift/IoTDBThriftReceiverV1.java | 66 +++++++++++++++++++--
.../connector/PipeDataNodeThriftRequestTest.java | 68 ++++++++++++++++++++--
.../iotdb/db/pipe/connector/PipeReceiverTest.java | 4 +-
.../connector/payload/request/PipeRequestType.java | 4 +-
11 files changed, 351 insertions(+), 73 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
index 75997e483e2..1f2eca0e5e8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
@@ -32,6 +32,9 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import
org.apache.iotdb.db.pipe.progress.assigner.SimpleConsensusProgressIndexAssigner;
import org.apache.iotdb.db.pipe.resource.PipeHardlinkFileDirStartupCleaner;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
+import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.service.ResourcesInformationHolder;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
@@ -41,6 +44,7 @@ import org.slf4j.LoggerFactory;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
public class PipeRuntimeAgent implements IService {
@@ -48,13 +52,14 @@ public class PipeRuntimeAgent implements IService {
private static final int DATA_NODE_ID =
IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
private final AtomicBoolean isShutdown = new AtomicBoolean(false);
-
- private final PipePeriodicalJobExecutor pipePeriodicalJobExecutor =
- new PipePeriodicalJobExecutor();
+ private final AtomicReference<String> clusterId = new
AtomicReference<>(null);
private final SimpleConsensusProgressIndexAssigner
simpleConsensusProgressIndexAssigner =
new SimpleConsensusProgressIndexAssigner();
+ private final PipePeriodicalJobExecutor pipePeriodicalJobExecutor =
+ new PipePeriodicalJobExecutor();
+
//////////////////////////// System Service Interface
////////////////////////////
public synchronized void preparePipeResources(
@@ -103,6 +108,22 @@ public class PipeRuntimeAgent implements IService {
return ServiceType.PIPE_RUNTIME_AGENT;
}
+ public String getClusterIdIfPossible() {
+ if (clusterId.get() == null) {
+ synchronized (clusterId) {
+ if (clusterId.get() == null) {
+ try (final ConfigNodeClient configNodeClient =
+
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
+ clusterId.set(configNodeClient.getClusterId().getClusterId());
+ } catch (Exception e) {
+ LOGGER.warn("Unable to get clusterId, because: {}",
e.getMessage(), e);
+ }
+ }
+ }
+ }
+ return clusterId.get();
+ }
+
////////////////////// SimpleConsensus ProgressIndex Assigner
//////////////////////
public void assignSimpleProgressIndexIfNeeded(InsertNode insertNode) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/common/PipeTransferHandshakeConstant.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/common/PipeTransferHandshakeConstant.java
new file mode 100644
index 00000000000..3d2bf1cbb0f
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/common/PipeTransferHandshakeConstant.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.connector.payload.evolvable.common;
+
+public class PipeTransferHandshakeConstant {
+
+ public static final String HANDSHAKE_KEY_TIME_PRECISION =
"timestampPrecision";
+ public static final String HANDSHAKE_KEY_CLUSTER_ID = "clusterID";
+
+ private PipeTransferHandshakeConstant() {
+ // Utility class
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferHandshakeReq.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferHandshakeV1Req.java
similarity index 83%
copy from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferHandshakeReq.java
copy to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferHandshakeV1Req.java
index 80f9d1217eb..c829f531028 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferHandshakeReq.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferHandshakeV1Req.java
@@ -30,11 +30,11 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;
-public class PipeTransferHandshakeReq extends TPipeTransferReq {
+public class PipeTransferHandshakeV1Req extends TPipeTransferReq {
private transient String timestampPrecision;
- private PipeTransferHandshakeReq() {
+ private PipeTransferHandshakeV1Req() {
// Empty constructor
}
@@ -44,14 +44,14 @@ public class PipeTransferHandshakeReq extends
TPipeTransferReq {
/////////////////////////////// Thrift ///////////////////////////////
- public static PipeTransferHandshakeReq toTPipeTransferReq(String
timestampPrecision)
+ public static PipeTransferHandshakeV1Req toTPipeTransferReq(String
timestampPrecision)
throws IOException {
- final PipeTransferHandshakeReq handshakeReq = new
PipeTransferHandshakeReq();
+ final PipeTransferHandshakeV1Req handshakeReq = new
PipeTransferHandshakeV1Req();
handshakeReq.timestampPrecision = timestampPrecision;
handshakeReq.version = IoTDBConnectorRequestVersion.VERSION_1.getVersion();
- handshakeReq.type = PipeRequestType.HANDSHAKE.getType();
+ handshakeReq.type = PipeRequestType.HANDSHAKE_V1.getType();
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
ReadWriteIOUtils.write(timestampPrecision, outputStream);
@@ -62,8 +62,8 @@ public class PipeTransferHandshakeReq extends
TPipeTransferReq {
return handshakeReq;
}
- public static PipeTransferHandshakeReq fromTPipeTransferReq(TPipeTransferReq
transferReq) {
- final PipeTransferHandshakeReq handshakeReq = new
PipeTransferHandshakeReq();
+ public static PipeTransferHandshakeV1Req
fromTPipeTransferReq(TPipeTransferReq transferReq) {
+ final PipeTransferHandshakeV1Req handshakeReq = new
PipeTransferHandshakeV1Req();
handshakeReq.timestampPrecision =
ReadWriteIOUtils.readString(transferReq.body);
@@ -80,7 +80,7 @@ public class PipeTransferHandshakeReq extends
TPipeTransferReq {
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
ReadWriteIOUtils.write(IoTDBConnectorRequestVersion.VERSION_1.getVersion(),
outputStream);
- ReadWriteIOUtils.write(PipeRequestType.HANDSHAKE.getType(),
outputStream);
+ ReadWriteIOUtils.write(PipeRequestType.HANDSHAKE_V1.getType(),
outputStream);
ReadWriteIOUtils.write(timestampPrecision, outputStream);
return byteArrayOutputStream.toByteArray();
}
@@ -96,7 +96,7 @@ public class PipeTransferHandshakeReq extends
TPipeTransferReq {
if (obj == null || getClass() != obj.getClass()) {
return false;
}
- PipeTransferHandshakeReq that = (PipeTransferHandshakeReq) obj;
+ PipeTransferHandshakeV1Req that = (PipeTransferHandshakeV1Req) obj;
return timestampPrecision.equals(that.timestampPrecision)
&& version == that.version
&& type == that.type
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferHandshakeReq.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferHandshakeV2Req.java
similarity index 56%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferHandshakeReq.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferHandshakeV2Req.java
index 80f9d1217eb..f2c8a9233a8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferHandshakeReq.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferHandshakeV2Req.java
@@ -25,47 +25,62 @@ import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.thrift.TException;
+
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Objects;
-public class PipeTransferHandshakeReq extends TPipeTransferReq {
+public class PipeTransferHandshakeV2Req extends TPipeTransferReq {
- private transient String timestampPrecision;
+ private transient Map<String, String> params;
- private PipeTransferHandshakeReq() {
+ private PipeTransferHandshakeV2Req() {
// Empty constructor
}
- public String getTimestampPrecision() {
- return timestampPrecision;
+ public Map<String, String> getParams() {
+ return params;
}
/////////////////////////////// Thrift ///////////////////////////////
- public static PipeTransferHandshakeReq toTPipeTransferReq(String
timestampPrecision)
- throws IOException {
- final PipeTransferHandshakeReq handshakeReq = new
PipeTransferHandshakeReq();
-
- handshakeReq.timestampPrecision = timestampPrecision;
+ public static PipeTransferHandshakeV2Req toTPipeTransferReq(Map<String,
String> params)
+ throws TException, IOException {
+ final PipeTransferHandshakeV2Req handshakeReq = new
PipeTransferHandshakeV2Req();
handshakeReq.version = IoTDBConnectorRequestVersion.VERSION_1.getVersion();
- handshakeReq.type = PipeRequestType.HANDSHAKE.getType();
+ handshakeReq.type = PipeRequestType.HANDSHAKE_V2.getType();
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
- ReadWriteIOUtils.write(timestampPrecision, outputStream);
+ ReadWriteIOUtils.write(params.size(), outputStream);
+ for (final Map.Entry<String, String> entry : params.entrySet()) {
+ ReadWriteIOUtils.write(entry.getKey(), outputStream);
+ ReadWriteIOUtils.write(entry.getValue(), outputStream);
+ }
handshakeReq.body =
ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
}
+ handshakeReq.params = params;
+
return handshakeReq;
}
- public static PipeTransferHandshakeReq fromTPipeTransferReq(TPipeTransferReq
transferReq) {
- final PipeTransferHandshakeReq handshakeReq = new
PipeTransferHandshakeReq();
+ public static PipeTransferHandshakeV2Req
fromTPipeTransferReq(TPipeTransferReq transferReq) {
+ final PipeTransferHandshakeV2Req handshakeReq = new
PipeTransferHandshakeV2Req();
- handshakeReq.timestampPrecision =
ReadWriteIOUtils.readString(transferReq.body);
+ Map<String, String> params = new HashMap<>();
+ final int size = ReadWriteIOUtils.readInt(transferReq.body);
+ for (int i = 0; i < size; ++i) {
+ final String key = ReadWriteIOUtils.readString(transferReq.body);
+ final String value = ReadWriteIOUtils.readString(transferReq.body);
+ params.put(key, value);
+ }
+ handshakeReq.params = params;
handshakeReq.version = transferReq.version;
handshakeReq.type = transferReq.type;
@@ -76,12 +91,16 @@ public class PipeTransferHandshakeReq extends
TPipeTransferReq {
/////////////////////////////// Air Gap ///////////////////////////////
- public static byte[] toTransferHandshakeBytes(String timestampPrecision)
throws IOException {
+ public static byte[] toTransferHandshakeBytes(HashMap<String, String>
params) throws IOException {
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
ReadWriteIOUtils.write(IoTDBConnectorRequestVersion.VERSION_1.getVersion(),
outputStream);
- ReadWriteIOUtils.write(PipeRequestType.HANDSHAKE.getType(),
outputStream);
- ReadWriteIOUtils.write(timestampPrecision, outputStream);
+ ReadWriteIOUtils.write(PipeRequestType.HANDSHAKE_V2.getType(),
outputStream);
+ ReadWriteIOUtils.write(params.size(), outputStream);
+ for (final Map.Entry<String, String> entry : params.entrySet()) {
+ ReadWriteIOUtils.write(entry.getKey(), outputStream);
+ ReadWriteIOUtils.write(entry.getValue(), outputStream);
+ }
return byteArrayOutputStream.toByteArray();
}
}
@@ -96,15 +115,15 @@ public class PipeTransferHandshakeReq extends
TPipeTransferReq {
if (obj == null || getClass() != obj.getClass()) {
return false;
}
- PipeTransferHandshakeReq that = (PipeTransferHandshakeReq) obj;
- return timestampPrecision.equals(that.timestampPrecision)
+ PipeTransferHandshakeV2Req that = (PipeTransferHandshakeV2Req) obj;
+ return Objects.equals(params, that.params)
&& version == that.version
&& type == that.type
- && body.equals(that.body);
+ && Objects.equals(body, that.body);
}
@Override
public int hashCode() {
- return Objects.hash(timestampPrecision, version, type, body);
+ return Objects.hash(params, version, type, body);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
index b9ffaa94a26..854f527222c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java
@@ -26,11 +26,14 @@ import
org.apache.iotdb.commons.pipe.plugin.builtin.connector.iotdb.IoTDBConnect
import org.apache.iotdb.commons.utils.NodeUrlUtils;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
import
org.apache.iotdb.db.pipe.connector.payload.airgap.AirGapELanguageConstant;
import org.apache.iotdb.db.pipe.connector.payload.airgap.AirGapOneByteResponse;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.common.PipeTransferHandshakeConstant;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFilePieceReq;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFileSealReq;
-import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeReq;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeV1Req;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeV2Req;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReq;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
@@ -63,6 +66,7 @@ import java.net.Socket;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.Set;
@@ -194,11 +198,23 @@ public class IoTDBAirGapConnector extends IoTDBConnector {
continue;
}
- if (!send(
- socket,
- PipeTransferHandshakeReq.toTransferHandshakeBytes(
-
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()))) {
- throw new PipeException("Handshake error with target server ip: " + ip
+ ", port: " + port);
+ final HashMap<String, String> params = new HashMap<>();
+ params.put(
+ PipeTransferHandshakeConstant.HANDSHAKE_KEY_CLUSTER_ID,
+ PipeAgent.runtime().getClusterIdIfPossible());
+ params.put(
+ PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION,
+ CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
+
+ // Try to handshake by PipeTransferHandshakeV2Req. If failed, retry to
handshake by
+ // PipeTransferHandshakeV1Req. If failed again, throw
PipeConnectionException.
+ if (!send(socket,
PipeTransferHandshakeV2Req.toTransferHandshakeBytes(params))
+ && !send(
+ socket,
+ PipeTransferHandshakeV1Req.toTransferHandshakeBytes(
+
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()))) {
+ throw new PipeConnectionException(
+ "Handshake error with target server ip: " + ip + ", port: " +
port);
} else {
isSocketAlive.set(i, true);
socket.setSoTimeout((int)
PIPE_CONFIG.getPipeConnectorTransferTimeoutMs());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncClientManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncClientManager.java
index 477165d2bd8..b70794667dd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncClientManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncClientManager.java
@@ -24,7 +24,10 @@ import org.apache.iotdb.commons.client.ClientPoolFactory;
import org.apache.iotdb.commons.client.IClientManager;
import
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
import org.apache.iotdb.commons.conf.CommonDescriptor;
-import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeReq;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.common.PipeTransferHandshakeConstant;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeV1Req;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeV2Req;
import
org.apache.iotdb.db.pipe.connector.protocol.thrift.IoTDBThriftClientManager;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
import org.apache.iotdb.pipe.api.exception.PipeException;
@@ -35,6 +38,7 @@ import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -122,14 +126,14 @@ public class IoTDBThriftAsyncClientManager extends
IoTDBThriftClientManager {
}
final AtomicBoolean isHandshakeFinished = new AtomicBoolean(false);
+ final AtomicReference<TPipeTransferResp> resp = new AtomicReference<>();
final AtomicReference<Exception> exception = new AtomicReference<>();
-
- client.pipeTransfer(
- PipeTransferHandshakeReq.toTPipeTransferReq(
-
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()),
+ final AsyncMethodCallback<TPipeTransferResp> callback =
new AsyncMethodCallback<TPipeTransferResp>() {
@Override
public void onComplete(TPipeTransferResp response) {
+ resp.set(response);
+
if (response.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.warn(
"Handshake error with receiver {}:{}, code: {}, message:
{}.",
@@ -167,8 +171,47 @@ public class IoTDBThriftAsyncClientManager extends
IoTDBThriftClientManager {
isHandshakeFinished.set(true);
}
- });
+ };
+
+ // Try to handshake by PipeTransferHandshakeV2Req.
+ final HashMap<String, String> params = new HashMap<>();
+ params.put(
+ PipeTransferHandshakeConstant.HANDSHAKE_KEY_CLUSTER_ID,
+ PipeAgent.runtime().getClusterIdIfPossible());
+ params.put(
+ PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION,
+ CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
+ client.pipeTransfer(PipeTransferHandshakeV2Req.toTPipeTransferReq(params),
callback);
+ waitHandshakeFinished(isHandshakeFinished);
+
+ // Retry to handshake by PipeTransferHandshakeV1Req.
+ if (resp.get() != null
+ && resp.get().getStatus().getCode() ==
TSStatusCode.PIPE_VERSION_ERROR.getStatusCode()) {
+ LOGGER.info(
+ "Handshake error by PipeTransferHandshakeV2Req with receiver {}:{} "
+ + "retry to handshake by PipeTransferHandshakeV1Req.",
+ targetNodeUrl.getIp(),
+ targetNodeUrl.getPort());
+
+ isHandshakeFinished.set(false);
+ resp.set(null);
+ exception.set(null);
+
+ client.pipeTransfer(
+ PipeTransferHandshakeV1Req.toTPipeTransferReq(
+
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()),
+ callback);
+ waitHandshakeFinished(isHandshakeFinished);
+ }
+
+ if (exception.get() != null) {
+ throw new PipeConnectionException("Failed to handshake.",
exception.get());
+ }
+
+ return false;
+ }
+ private void waitHandshakeFinished(AtomicBoolean isHandshakeFinished) {
try {
while (!isHandshakeFinished.get()) {
Thread.sleep(10);
@@ -177,12 +220,6 @@ public class IoTDBThriftAsyncClientManager extends
IoTDBThriftClientManager {
Thread.currentThread().interrupt();
throw new PipeException("Interrupted while waiting for handshake
response.", e);
}
-
- if (exception.get() != null) {
- throw new PipeConnectionException("Failed to handshake.",
exception.get());
- }
-
- return false;
}
public void updateLeaderCache(String deviceId, TEndPoint endPoint) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncClientManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncClientManager.java
index f3f83237768..6cc5c5ac683 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncClientManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncClientManager.java
@@ -24,20 +24,22 @@ import
org.apache.iotdb.commons.client.property.ThriftClientProperty;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import
org.apache.iotdb.commons.pipe.connector.client.IoTDBThriftSyncConnectorClient;
-import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeReq;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.common.PipeTransferHandshakeConstant;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeV1Req;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeV2Req;
import
org.apache.iotdb.db.pipe.connector.protocol.thrift.IoTDBThriftClientManager;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
import org.apache.iotdb.tsfile.utils.Pair;
-import org.apache.thrift.TException;
-import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -95,7 +97,7 @@ public class IoTDBThriftSyncClientManager extends
IoTDBThriftClientManager imple
"All target servers %s are not available.",
endPoint2ClientAndStatus.keySet()));
}
- private void reconstructClient(TEndPoint endPoint) throws IOException {
+ private void reconstructClient(TEndPoint endPoint) {
final Pair<IoTDBThriftSyncConnectorClient, Boolean> clientAndStatus =
endPoint2ClientAndStatus.get(endPoint);
@@ -111,6 +113,12 @@ public class IoTDBThriftSyncClientManager extends
IoTDBThriftClientManager imple
}
}
+ initClientAndStatus(clientAndStatus, endPoint);
+ sendHandshakeReq(clientAndStatus, endPoint);
+ }
+
+ private void initClientAndStatus(
+ Pair<IoTDBThriftSyncConnectorClient, Boolean> clientAndStatus, TEndPoint
endPoint) {
try {
clientAndStatus.setLeft(
new IoTDBThriftSyncConnectorClient(
@@ -124,7 +132,7 @@ public class IoTDBThriftSyncClientManager extends
IoTDBThriftClientManager imple
useSSL,
trustStorePath,
trustStorePwd));
- } catch (TTransportException e) {
+ } catch (Exception e) {
throw new PipeConnectionException(
String.format(
PipeConnectionException.CONNECTION_ERROR_FORMATTER,
@@ -132,14 +140,41 @@ public class IoTDBThriftSyncClientManager extends
IoTDBThriftClientManager imple
endPoint.getPort()),
e);
}
+ }
+ public void sendHandshakeReq(
+ Pair<IoTDBThriftSyncConnectorClient, Boolean> clientAndStatus, TEndPoint
endPoint) {
try {
- final TPipeTransferResp resp =
+ final HashMap<String, String> params = new HashMap<>();
+ params.put(
+ PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION,
+ CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
+ params.put(
+ PipeTransferHandshakeConstant.HANDSHAKE_KEY_CLUSTER_ID,
+ PipeAgent.runtime().getClusterIdIfPossible());
+
+ // Try to handshake by PipeTransferHandshakeV2Req.
+ TPipeTransferResp resp =
clientAndStatus
.getLeft()
- .pipeTransfer(
- PipeTransferHandshakeReq.toTPipeTransferReq(
-
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()));
+
.pipeTransfer(PipeTransferHandshakeV2Req.toTPipeTransferReq(params));
+ // Receiver may be an old version, so we need to retry to handshake by
+ // PipeTransferHandshakeV1Req.
+ if (resp.getStatus().getCode() ==
TSStatusCode.PIPE_TYPE_ERROR.getStatusCode()) {
+ LOGGER.info(
+ "Handshake error with target server ip: {}, port: {}, because: {}.
"
+ + "Retry to handshake by PipeTransferHandshakeV1Req.",
+ endPoint.getIp(),
+ endPoint.getPort(),
+ resp.getStatus());
+ resp =
+ clientAndStatus
+ .getLeft()
+ .pipeTransfer(
+ PipeTransferHandshakeV1Req.toTPipeTransferReq(
+
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()));
+ }
+
if (resp.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.warn(
"Handshake error with target server ip: {}, port: {}, because:
{}.",
@@ -156,7 +191,7 @@ public class IoTDBThriftSyncClientManager extends
IoTDBThriftClientManager imple
endPoint.getIp(),
endPoint.getPort());
}
- } catch (TException e) {
+ } catch (Exception e) {
LOGGER.warn(
"Handshake error with target server ip: {}, port: {}, because: {}.",
endPoint.getIp(),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
index 67d2cbad1e8..da6f888bc57 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
@@ -31,11 +31,14 @@ import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
import
org.apache.iotdb.db.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.common.PipeTransferHandshakeConstant;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.reponse.PipeTransferFilePieceResp;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFilePieceReq;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFileSealReq;
-import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeReq;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeV1Req;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeV2Req;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferSchemaPlanReq;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBatchReq;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReq;
@@ -113,8 +116,10 @@ public class IoTDBThriftReceiverV1 implements
IoTDBThriftReceiver {
final short rawRequestType = req.getType();
if (PipeRequestType.isValidatedRequestType(rawRequestType)) {
switch (PipeRequestType.valueOf(rawRequestType)) {
- case HANDSHAKE:
- return
handleTransferHandshake(PipeTransferHandshakeReq.fromTPipeTransferReq(req));
+ case HANDSHAKE_V1:
+ return
handleTransferHandshakeV1(PipeTransferHandshakeV1Req.fromTPipeTransferReq(req));
+ case HANDSHAKE_V2:
+ return
handleTransferHandshakeV2(PipeTransferHandshakeV2Req.fromTPipeTransferReq(req));
case TRANSFER_TABLET_INSERT_NODE:
return handleTransferTabletInsertNode(
PipeTransferTabletInsertNodeReq.fromTPipeTransferReq(req),
@@ -170,7 +175,7 @@ public class IoTDBThriftReceiverV1 implements
IoTDBThriftReceiver {
}
}
- private TPipeTransferResp handleTransferHandshake(PipeTransferHandshakeReq
req) {
+ private TPipeTransferResp
handleTransferHandshakeV1(PipeTransferHandshakeV1Req req) {
if (!CommonDescriptor.getInstance()
.getConfig()
.getTimestampPrecision()
@@ -246,6 +251,59 @@ public class IoTDBThriftReceiverV1 implements
IoTDBThriftReceiver {
return new TPipeTransferResp(RpcUtils.SUCCESS_STATUS);
}
+ private TPipeTransferResp
handleTransferHandshakeV2(PipeTransferHandshakeV2Req req)
+ throws IOException {
+ // Reject to handshake if the receiver can not take clusterId from config
node.
+ final String clusterIdFromConfigNode =
PipeAgent.runtime().getClusterIdIfPossible();
+ if (clusterIdFromConfigNode == null) {
+ final TSStatus status =
+ RpcUtils.getStatus(
+ TSStatusCode.PIPE_HANDSHAKE_ERROR,
+ "Receiver can not get clusterId from config node.");
+ LOGGER.warn("Handshake failed, response status = {}.", status);
+ return new TPipeTransferResp(status);
+ }
+
+ // Reject to handshake if the request does not contain sender's clusterId.
+ final String clusterIdFromHandshakeRequest =
+
req.getParams().get(PipeTransferHandshakeConstant.HANDSHAKE_KEY_CLUSTER_ID);
+ if (clusterIdFromHandshakeRequest == null) {
+ final TSStatus status =
+ RpcUtils.getStatus(
+ TSStatusCode.PIPE_HANDSHAKE_ERROR, "Handshake request does not
contain clusterId.");
+ LOGGER.warn("Handshake failed, response status = {}.", status);
+ return new TPipeTransferResp(status);
+ }
+
+ // Reject to handshake if the receiver and sender are from the same
cluster.
+ if (Objects.equals(clusterIdFromConfigNode,
clusterIdFromHandshakeRequest)) {
+ final TSStatus status =
+ RpcUtils.getStatus(
+ TSStatusCode.PIPE_HANDSHAKE_ERROR,
+ String.format(
+ "Receiver and sender are from the same cluster %s.",
+ clusterIdFromHandshakeRequest));
+ LOGGER.warn("Handshake failed, response status = {}.", status);
+ return new TPipeTransferResp(status);
+ }
+
+ // Reject to handshake if the request does not contain timestampPrecision.
+ final String timestampPrecision =
+
req.getParams().get(PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION);
+ if (timestampPrecision == null) {
+ final TSStatus status =
+ RpcUtils.getStatus(
+ TSStatusCode.PIPE_HANDSHAKE_ERROR,
+ "Handshake request does not contain timestampPrecision.");
+ LOGGER.warn("Handshake failed, response status = {}.", status);
+ return new TPipeTransferResp(status);
+ }
+
+ // Handle the handshake request as a v1 request.
+ return handleTransferHandshakeV1(
+ PipeTransferHandshakeV1Req.toTPipeTransferReq(timestampPrecision));
+ }
+
private TPipeTransferResp handleTransferTabletInsertNode(
PipeTransferTabletInsertNodeReq req,
IPartitionFetcher partitionFetcher,
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeDataNodeThriftRequestTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeDataNodeThriftRequestTest.java
index 7fd72c51e20..0aba9930704 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeDataNodeThriftRequestTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeDataNodeThriftRequestTest.java
@@ -20,13 +20,18 @@
package org.apache.iotdb.db.pipe.connector;
import org.apache.iotdb.commons.path.PartialPath;
+import
org.apache.iotdb.commons.pipe.connector.payload.request.IoTDBConnectorRequestVersion;
+import org.apache.iotdb.commons.pipe.connector.payload.request.PipeRequestType;
import
org.apache.iotdb.commons.pipe.connector.payload.request.PipeTransferSnapshotPieceReq;
import
org.apache.iotdb.commons.pipe.connector.payload.request.PipeTransferSnapshotSealReq;
import
org.apache.iotdb.commons.pipe.connector.payload.response.PipeTransferSnapshotPieceResp;
+import
org.apache.iotdb.db.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.common.PipeTransferHandshakeConstant;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.reponse.PipeTransferFilePieceResp;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFilePieceReq;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFileSealReq;
-import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeReq;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeV1Req;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeV2Req;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferSchemaPlanReq;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBatchReq;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
@@ -52,16 +57,19 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
public class PipeDataNodeThriftRequestTest {
private static final String TIME_PRECISION = "ms";
+ private static final String CLUSTER_ID = "abcde";
@Test
- public void testPipeValidateHandshakeReq() throws IOException {
- PipeTransferHandshakeReq req =
PipeTransferHandshakeReq.toTPipeTransferReq(TIME_PRECISION);
- PipeTransferHandshakeReq deserializeReq =
PipeTransferHandshakeReq.fromTPipeTransferReq(req);
+ public void testPipeValidateHandshakeV1Req() throws IOException {
+ PipeTransferHandshakeV1Req req =
PipeTransferHandshakeV1Req.toTPipeTransferReq(TIME_PRECISION);
+ PipeTransferHandshakeV1Req deserializeReq =
+ PipeTransferHandshakeV1Req.fromTPipeTransferReq(req);
Assert.assertEquals(req.getVersion(), deserializeReq.getVersion());
Assert.assertEquals(req.getType(), deserializeReq.getType());
@@ -70,6 +78,58 @@ public class PipeDataNodeThriftRequestTest {
Assert.assertEquals(req.getTimestampPrecision(),
deserializeReq.getTimestampPrecision());
}
+ @Test
+ public void testPipeValidateHandshakeV2Req() throws Exception {
+ HashMap<String, String> params = new HashMap<>();
+ params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_CLUSTER_ID,
CLUSTER_ID);
+ params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION,
TIME_PRECISION);
+ params.put("Nullable", null);
+
+ PipeTransferHandshakeV2Req req =
PipeTransferHandshakeV2Req.toTPipeTransferReq(params);
+ PipeTransferHandshakeV2Req deserializeReq =
+ PipeTransferHandshakeV2Req.fromTPipeTransferReq(req);
+
+ Assert.assertEquals(req.getVersion(), deserializeReq.getVersion());
+ Assert.assertEquals(req.getType(), deserializeReq.getType());
+ Assert.assertArrayEquals(req.getBody(), deserializeReq.getBody());
+
+ Assert.assertEquals(
+
req.getParams().get(PipeTransferHandshakeConstant.HANDSHAKE_KEY_CLUSTER_ID),
+
deserializeReq.getParams().get(PipeTransferHandshakeConstant.HANDSHAKE_KEY_CLUSTER_ID));
+ Assert.assertEquals(
+
req.getParams().get(PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION),
+
deserializeReq.getParams().get(PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION));
+ Assert.assertEquals(
+ req.getParams().get("Nullable"),
deserializeReq.getParams().get("Nullable"));
+ }
+
+ @Test
+ public void testPipeValidateHandshakeV2Req4AirGap() throws IOException {
+ // Construct byteBuffer.
+ HashMap<String, String> params = new HashMap<>();
+ params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_CLUSTER_ID,
CLUSTER_ID);
+ params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION,
TIME_PRECISION);
+ params.put("Nullable", null);
+ ByteBuffer byteBuffer =
+
ByteBuffer.wrap(PipeTransferHandshakeV2Req.toTransferHandshakeBytes(params));
+
+ // Construct request.
+ byte version = ReadWriteIOUtils.readByte(byteBuffer);
+ short type = ReadWriteIOUtils.readShort(byteBuffer);
+ ByteBuffer body = byteBuffer.slice();
+ final AirGapPseudoTPipeTransferRequest req =
+ (AirGapPseudoTPipeTransferRequest)
+ new
AirGapPseudoTPipeTransferRequest().setVersion(version).setType(type).setBody(body);
+ final PipeTransferHandshakeV2Req deserializeReq =
+ PipeTransferHandshakeV2Req.fromTPipeTransferReq(req);
+
+ // Assert.
+ Assert.assertEquals(
+ IoTDBConnectorRequestVersion.VERSION_1.getVersion(),
deserializeReq.getVersion());
+ Assert.assertEquals(PipeRequestType.HANDSHAKE_V2.getType(),
deserializeReq.getType());
+ Assert.assertEquals(params, deserializeReq.getParams());
+ }
+
@Test
public void testPipeTransferInsertNodeReq() {
PipeTransferTabletInsertNodeReq req =
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeReceiverTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeReceiverTest.java
index bc8a0c8e29d..d851219137b 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeReceiverTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeReceiverTest.java
@@ -20,7 +20,7 @@
package org.apache.iotdb.db.pipe.connector;
import org.apache.iotdb.commons.conf.CommonDescriptor;
-import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeReq;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeV1Req;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
import org.apache.iotdb.db.pipe.receiver.thrift.IoTDBThriftReceiverV1;
import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
@@ -43,7 +43,7 @@ public class PipeReceiverTest {
IoTDBThriftReceiverV1 receiver = new IoTDBThriftReceiverV1();
try {
receiver.receive(
- PipeTransferHandshakeReq.toTPipeTransferReq(
+ PipeTransferHandshakeV1Req.toTPipeTransferReq(
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()),
mock(IPartitionFetcher.class),
mock(ISchemaFetcher.class));
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/request/PipeRequestType.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/request/PipeRequestType.java
index 1d9528a4318..be35ae36a61 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/request/PipeRequestType.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/request/PipeRequestType.java
@@ -24,7 +24,7 @@ import java.util.HashMap;
import java.util.Map;
public enum PipeRequestType {
- HANDSHAKE((short) 1),
+ HANDSHAKE_V1((short) 1),
TRANSFER_TABLET_INSERT_NODE((short) 2),
TRANSFER_TABLET_RAW((short) 3),
@@ -40,6 +40,8 @@ public enum PipeRequestType {
TRANSFER_SNAPSHOT_PIECE((short) 10),
TRANSFER_SNAPSHOT_SEAL((short) 11),
+
+ HANDSHAKE_V2((short) 12),
;
private final short type;