This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new c6b0e40594c Pipe: Fix async client returning self too early when
downgrade from handshake v2 to v1 (#12913)
c6b0e40594c is described below
commit c6b0e40594c6f08bbcfb3a312b1764d964d2c695
Author: Zhenyu Luo <[email protected]>
AuthorDate: Mon Jul 15 10:56:42 2024 +0800
Pipe: Fix async client returning self too early when downgrade from
handshake v2 to v1 (#12913)
---
.../client/IoTDBDataNodeAsyncClientManager.java | 71 ++++++++++++----------
.../async/IoTDBDataRegionAsyncConnector.java | 6 +-
2 files changed, 40 insertions(+), 37 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
index 8149a40add7..eff84ab5a5a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
@@ -199,43 +199,48 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
}
};
- // Try to handshake by PipeTransferHandshakeV2Req.
- final HashMap<String, String> params = new HashMap<>();
- params.put(
- PipeTransferHandshakeConstant.HANDSHAKE_KEY_CLUSTER_ID,
- IoTDBDescriptor.getInstance().getConfig().getClusterId());
- params.put(
- PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION,
- CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
-
-
client.setTimeoutDynamically(PipeConfig.getInstance().getPipeConnectorHandshakeTimeoutMs());
-
client.pipeTransfer(PipeTransferDataNodeHandshakeV2Req.toTPipeTransferReq(params),
callback);
- waitHandshakeFinished(isHandshakeFinished);
-
- // Retry to handshake by PipeTransferHandshakeV1Req.
- if (resp.get() != null
- && resp.get().getStatus().getCode() ==
TSStatusCode.PIPE_TYPE_ERROR.getStatusCode()) {
- LOGGER.info(
- "Handshake error by PipeTransferHandshakeV2Req with receiver {}:{} "
- + "retry to handshake by PipeTransferHandshakeV1Req.",
- targetNodeUrl.getIp(),
- targetNodeUrl.getPort());
-
- supportModsIfIsDataNodeReceiver = false;
- isHandshakeFinished.set(false);
- resp.set(null);
- exception.set(null);
+ try {
+ client.setShouldReturnSelf(false);
+ // Try to handshake by PipeTransferHandshakeV2Req.
+ final HashMap<String, String> params = new HashMap<>();
+ params.put(
+ PipeTransferHandshakeConstant.HANDSHAKE_KEY_CLUSTER_ID,
+ IoTDBDescriptor.getInstance().getConfig().getClusterId());
+ params.put(
+ PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION,
+ CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
client.setTimeoutDynamically(PipeConfig.getInstance().getPipeConnectorHandshakeTimeoutMs());
- client.pipeTransfer(
- PipeTransferDataNodeHandshakeV1Req.toTPipeTransferReq(
-
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()),
- callback);
+
client.pipeTransfer(PipeTransferDataNodeHandshakeV2Req.toTPipeTransferReq(params),
callback);
waitHandshakeFinished(isHandshakeFinished);
- }
- if (exception.get() != null) {
- throw new PipeConnectionException("Failed to handshake.",
exception.get());
+ // Retry to handshake by PipeTransferHandshakeV1Req.
+ if (resp.get() != null
+ && resp.get().getStatus().getCode() ==
TSStatusCode.PIPE_TYPE_ERROR.getStatusCode()) {
+ LOGGER.info(
+ "Handshake error by PipeTransferHandshakeV2Req with receiver {}:{}
"
+ + "retry to handshake by PipeTransferHandshakeV1Req.",
+ targetNodeUrl.getIp(),
+ targetNodeUrl.getPort());
+
+ supportModsIfIsDataNodeReceiver = false;
+ isHandshakeFinished.set(false);
+ resp.set(null);
+ exception.set(null);
+
+
client.setTimeoutDynamically(PipeConfig.getInstance().getPipeConnectorHandshakeTimeoutMs());
+ client.pipeTransfer(
+ PipeTransferDataNodeHandshakeV1Req.toTPipeTransferReq(
+
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()),
+ callback);
+ waitHandshakeFinished(isHandshakeFinished);
+ }
+ if (exception.get() != null) {
+ throw new PipeConnectionException("Failed to handshake.",
exception.get());
+ }
+ } finally {
+ client.setShouldReturnSelf(true);
+ client.returnSelf();
}
return false;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index 7e3b4fb8a9f..b902a8b2f46 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -83,11 +83,9 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBDataRegionAsyncConnector.class);
private static final String THRIFT_ERROR_FORMATTER_WITHOUT_ENDPOINT =
- "Failed to borrow client from client pool or exception occurred "
- + "when sending to receiver.";
+ "Failed to borrow client from client pool when sending to receiver.";
private static final String THRIFT_ERROR_FORMATTER_WITH_ENDPOINT =
- "Failed to borrow client from client pool or exception occurred "
- + "when sending to receiver %s:%s.";
+ "Exception occurred while sending to receiver %s:%s.";
private IoTDBDataNodeAsyncClientManager clientManager;