This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 8140adff9f0 Pipe: Fix "waitHandshakeFinished" may wait forever when 
asyncClientManager is closed (#14373) (#14435)
8140adff9f0 is described below

commit 8140adff9f0cd0bb72e3fd5fd04f299d70f2bea6
Author: Caideyipi <[email protected]>
AuthorDate: Mon Dec 16 12:09:20 2024 +0800

    Pipe: Fix "waitHandshakeFinished" may wait forever when asyncClientManager 
is closed (#14373) (#14435)
---
 .../client/IoTDBDataNodeAsyncClientManager.java    | 45 ++++++++++++++--------
 .../async/AsyncPipeDataTransferServiceClient.java  | 28 +++++++-------
 2 files changed, 43 insertions(+), 30 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
index 1bbf5273187..aff8926d0cb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
@@ -73,16 +73,18 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
 
   private final LoadBalancer loadBalancer;
 
+  private volatile boolean isClosed = false;
+
   public IoTDBDataNodeAsyncClientManager(
-      List<TEndPoint> endPoints,
+      final List<TEndPoint> endPoints,
       /* The following parameters are used locally. */
-      boolean useLeaderCache,
-      String loadBalanceStrategy,
+      final boolean useLeaderCache,
+      final String loadBalanceStrategy,
       /* The following parameters are used to handshake with the receiver. */
-      String username,
-      String password,
-      boolean shouldReceiverConvertOnTypeMismatch,
-      String loadTsFileStrategy) {
+      final String username,
+      final String password,
+      final boolean shouldReceiverConvertOnTypeMismatch,
+      final String loadTsFileStrategy) {
     super(
         endPoints,
         username,
@@ -135,7 +137,7 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
     return loadBalancer.borrowClient();
   }
 
-  public AsyncPipeDataTransferServiceClient borrowClient(String deviceId) 
throws Exception {
+  public AsyncPipeDataTransferServiceClient borrowClient(final String 
deviceId) throws Exception {
     if (!useLeaderCache || Objects.isNull(deviceId)) {
       return borrowClient();
     }
@@ -143,7 +145,8 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
     return borrowClient(LEADER_CACHE_MANAGER.getLeaderEndPoint(deviceId));
   }
 
-  public AsyncPipeDataTransferServiceClient borrowClient(TEndPoint endPoint) 
throws Exception {
+  public AsyncPipeDataTransferServiceClient borrowClient(final TEndPoint 
endPoint)
+      throws Exception {
     if (!useLeaderCache || Objects.isNull(endPoint)) {
       return borrowClient();
     }
@@ -153,7 +156,7 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
       if (handshakeIfNecessary(endPoint, client)) {
         return client;
       }
-    } catch (Exception e) {
+    } catch (final Exception e) {
       LOGGER.warn(
           "failed to borrow client {}:{} for cached leader.",
           endPoint.getIp(),
@@ -173,7 +176,8 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
    * @throws Exception if an error occurs.
    */
   private boolean handshakeIfNecessary(
-      TEndPoint targetNodeUrl, AsyncPipeDataTransferServiceClient client) 
throws Exception {
+      final TEndPoint targetNodeUrl, final AsyncPipeDataTransferServiceClient 
client)
+      throws Exception {
     if (client.isHandshakeFinished()) {
       return true;
     }
@@ -185,7 +189,7 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
     final AsyncMethodCallback<TPipeTransferResp> callback =
         new AsyncMethodCallback<TPipeTransferResp>() {
           @Override
-          public void onComplete(TPipeTransferResp response) {
+          public void onComplete(final TPipeTransferResp response) {
             resp.set(response);
 
             if (response.getStatus().getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
@@ -215,7 +219,7 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
           }
 
           @Override
-          public void onError(Exception e) {
+          public void onError(final Exception e) {
             LOGGER.warn(
                 "Handshake error with receiver {}:{}.",
                 targetNodeUrl.getIp(),
@@ -281,18 +285,24 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
     return false;
   }
 
-  private void waitHandshakeFinished(AtomicBoolean isHandshakeFinished) {
+  private void waitHandshakeFinished(final AtomicBoolean isHandshakeFinished) {
     try {
+      final long startTime = System.currentTimeMillis();
       while (!isHandshakeFinished.get()) {
+        if (isClosed
+            || System.currentTimeMillis() - startTime
+                > 
PipeConfig.getInstance().getPipeConnectorHandshakeTimeoutMs() * 2L) {
+          throw new PipeConnectionException("Timed out when waiting for client 
handshake finish.");
+        }
         Thread.sleep(10);
       }
-    } catch (InterruptedException e) {
+    } catch (final InterruptedException e) {
       Thread.currentThread().interrupt();
       throw new PipeException("Interrupted while waiting for handshake 
response.", e);
     }
   }
 
-  public void updateLeaderCache(String deviceId, TEndPoint endPoint) {
+  public void updateLeaderCache(final String deviceId, final TEndPoint 
endPoint) {
     if (!useLeaderCache || deviceId == null || endPoint == null) {
       return;
     }
@@ -306,6 +316,7 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
   }
 
   public void close() {
+    isClosed = true;
     synchronized (IoTDBDataNodeAsyncClientManager.class) {
       RECEIVER_ATTRIBUTES_REF_COUNT.computeIfPresent(
           receiverAttributes,
@@ -316,7 +327,7 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
               if (clientManager != null) {
                 try {
                   clientManager.close();
-                } catch (Exception e) {
+                } catch (final Exception e) {
                   LOGGER.warn("Failed to close client manager.", e);
                 }
               }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
index 91fa97fc82c..f64c3f8d95d 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
@@ -58,10 +58,10 @@ public class AsyncPipeDataTransferServiceClient extends 
IClientRPCService.AsyncC
   private final AtomicBoolean isHandshakeFinished = new AtomicBoolean(false);
 
   public AsyncPipeDataTransferServiceClient(
-      ThriftClientProperty property,
-      TEndPoint endpoint,
-      TAsyncClientManager tClientManager,
-      ClientManager<TEndPoint, AsyncPipeDataTransferServiceClient> 
clientManager)
+      final ThriftClientProperty property,
+      final TEndPoint endpoint,
+      final TAsyncClientManager tClientManager,
+      final ClientManager<TEndPoint, AsyncPipeDataTransferServiceClient> 
clientManager)
       throws IOException {
     super(
         property.getProtocolFactory(),
@@ -81,7 +81,7 @@ public class AsyncPipeDataTransferServiceClient extends 
IClientRPCService.AsyncC
   }
 
   @Override
-  public void onError(Exception e) {
+  public void onError(final Exception e) {
     super.onError(e);
     ThriftClient.resolveException(e, this);
     returnSelf();
@@ -114,11 +114,11 @@ public class AsyncPipeDataTransferServiceClient extends 
IClientRPCService.AsyncC
     }
   }
 
-  public void setShouldReturnSelf(boolean shouldReturnSelf) {
+  public void setShouldReturnSelf(final boolean shouldReturnSelf) {
     this.shouldReturnSelf.set(shouldReturnSelf);
   }
 
-  public void setTimeoutDynamically(int timeout) {
+  public void setTimeoutDynamically(final int timeout) {
     try {
       ((TNonblockingSocket) ___transport).setTimeout(timeout);
     } catch (Exception e) {
@@ -178,20 +178,21 @@ public class AsyncPipeDataTransferServiceClient extends 
IClientRPCService.AsyncC
       extends AsyncThriftClientFactory<TEndPoint, 
AsyncPipeDataTransferServiceClient> {
 
     public Factory(
-        ClientManager<TEndPoint, AsyncPipeDataTransferServiceClient> 
clientManager,
-        ThriftClientProperty thriftClientProperty,
-        String threadName) {
+        final ClientManager<TEndPoint, AsyncPipeDataTransferServiceClient> 
clientManager,
+        final ThriftClientProperty thriftClientProperty,
+        final String threadName) {
       super(clientManager, thriftClientProperty, threadName);
     }
 
     @Override
     public void destroyObject(
-        TEndPoint endPoint, PooledObject<AsyncPipeDataTransferServiceClient> 
pooledObject) {
+        final TEndPoint endPoint,
+        final PooledObject<AsyncPipeDataTransferServiceClient> pooledObject) {
       pooledObject.getObject().close();
     }
 
     @Override
-    public PooledObject<AsyncPipeDataTransferServiceClient> 
makeObject(TEndPoint endPoint)
+    public PooledObject<AsyncPipeDataTransferServiceClient> makeObject(final 
TEndPoint endPoint)
         throws Exception {
       return new DefaultPooledObject<>(
           new AsyncPipeDataTransferServiceClient(
@@ -203,7 +204,8 @@ public class AsyncPipeDataTransferServiceClient extends 
IClientRPCService.AsyncC
 
     @Override
     public boolean validateObject(
-        TEndPoint endPoint, PooledObject<AsyncPipeDataTransferServiceClient> 
pooledObject) {
+        final TEndPoint endPoint,
+        final PooledObject<AsyncPipeDataTransferServiceClient> pooledObject) {
       return pooledObject.getObject().isReady();
     }
   }

Reply via email to