This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new c6b0e40594c Pipe: Fix async client returning self too early when 
downgrade from handshake v2 to v1 (#12913)
c6b0e40594c is described below

commit c6b0e40594c6f08bbcfb3a312b1764d964d2c695
Author: Zhenyu Luo <[email protected]>
AuthorDate: Mon Jul 15 10:56:42 2024 +0800

    Pipe: Fix async client returning self too early when downgrade from 
handshake v2 to v1 (#12913)
---
 .../client/IoTDBDataNodeAsyncClientManager.java    | 71 ++++++++++++----------
 .../async/IoTDBDataRegionAsyncConnector.java       |  6 +-
 2 files changed, 40 insertions(+), 37 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
index 8149a40add7..eff84ab5a5a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
@@ -199,43 +199,48 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
           }
         };
 
-    // Try to handshake by PipeTransferHandshakeV2Req.
-    final HashMap<String, String> params = new HashMap<>();
-    params.put(
-        PipeTransferHandshakeConstant.HANDSHAKE_KEY_CLUSTER_ID,
-        IoTDBDescriptor.getInstance().getConfig().getClusterId());
-    params.put(
-        PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION,
-        CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
-
-    
client.setTimeoutDynamically(PipeConfig.getInstance().getPipeConnectorHandshakeTimeoutMs());
-    
client.pipeTransfer(PipeTransferDataNodeHandshakeV2Req.toTPipeTransferReq(params),
 callback);
-    waitHandshakeFinished(isHandshakeFinished);
-
-    // Retry to handshake by PipeTransferHandshakeV1Req.
-    if (resp.get() != null
-        && resp.get().getStatus().getCode() == 
TSStatusCode.PIPE_TYPE_ERROR.getStatusCode()) {
-      LOGGER.info(
-          "Handshake error by PipeTransferHandshakeV2Req with receiver {}:{} "
-              + "retry to handshake by PipeTransferHandshakeV1Req.",
-          targetNodeUrl.getIp(),
-          targetNodeUrl.getPort());
-
-      supportModsIfIsDataNodeReceiver = false;
-      isHandshakeFinished.set(false);
-      resp.set(null);
-      exception.set(null);
+    try {
+      client.setShouldReturnSelf(false);
+      // Try to handshake by PipeTransferHandshakeV2Req.
+      final HashMap<String, String> params = new HashMap<>();
+      params.put(
+          PipeTransferHandshakeConstant.HANDSHAKE_KEY_CLUSTER_ID,
+          IoTDBDescriptor.getInstance().getConfig().getClusterId());
+      params.put(
+          PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION,
+          CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
 
       
client.setTimeoutDynamically(PipeConfig.getInstance().getPipeConnectorHandshakeTimeoutMs());
-      client.pipeTransfer(
-          PipeTransferDataNodeHandshakeV1Req.toTPipeTransferReq(
-              
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()),
-          callback);
+      
client.pipeTransfer(PipeTransferDataNodeHandshakeV2Req.toTPipeTransferReq(params),
 callback);
       waitHandshakeFinished(isHandshakeFinished);
-    }
 
-    if (exception.get() != null) {
-      throw new PipeConnectionException("Failed to handshake.", 
exception.get());
+      // Retry to handshake by PipeTransferHandshakeV1Req.
+      if (resp.get() != null
+          && resp.get().getStatus().getCode() == 
TSStatusCode.PIPE_TYPE_ERROR.getStatusCode()) {
+        LOGGER.info(
+            "Handshake error by PipeTransferHandshakeV2Req with receiver {}:{} 
"
+                + "retry to handshake by PipeTransferHandshakeV1Req.",
+            targetNodeUrl.getIp(),
+            targetNodeUrl.getPort());
+
+        supportModsIfIsDataNodeReceiver = false;
+        isHandshakeFinished.set(false);
+        resp.set(null);
+        exception.set(null);
+
+        
client.setTimeoutDynamically(PipeConfig.getInstance().getPipeConnectorHandshakeTimeoutMs());
+        client.pipeTransfer(
+            PipeTransferDataNodeHandshakeV1Req.toTPipeTransferReq(
+                
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()),
+            callback);
+        waitHandshakeFinished(isHandshakeFinished);
+      }
+      if (exception.get() != null) {
+        throw new PipeConnectionException("Failed to handshake.", 
exception.get());
+      }
+    } finally {
+      client.setShouldReturnSelf(true);
+      client.returnSelf();
     }
 
     return false;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index 7e3b4fb8a9f..b902a8b2f46 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -83,11 +83,9 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBDataRegionAsyncConnector.class);
 
   private static final String THRIFT_ERROR_FORMATTER_WITHOUT_ENDPOINT =
-      "Failed to borrow client from client pool or exception occurred "
-          + "when sending to receiver.";
+      "Failed to borrow client from client pool when sending to receiver.";
   private static final String THRIFT_ERROR_FORMATTER_WITH_ENDPOINT =
-      "Failed to borrow client from client pool or exception occurred "
-          + "when sending to receiver %s:%s.";
+      "Exception occurred while sending to receiver %s:%s.";
 
   private IoTDBDataNodeAsyncClientManager clientManager;
 

Reply via email to