This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new f50516a0384 Adjust verify connection rpc time limit to handle port 
abandoned port (#12855)
f50516a0384 is described below

commit f50516a038492e65fb46dfe08244b9b1883b2508
Author: Li Yu Heng <[email protected]>
AuthorDate: Fri Jul 5 19:16:52 2024 +0800

    Adjust verify connection rpc time limit to handle port abandoned port 
(#12855)
    
    * don't retry
    
    * ??
    
    * ConfigNodeClient
    
    * add log
    
    * recover timeout in a better way
    
    * tan review
    
    * synchronized
---
 .../CnToCnInternalServiceAsyncRequestManager.java  | 10 ++++++
 .../CnToDnInternalServiceAsyncRequestManager.java  | 10 ++++++
 .../iotdb/confignode/manager/ClusterManager.java   | 12 +++----
 .../iotdb/db/protocol/client/ConfigNodeClient.java | 38 ++++++++++++++++------
 .../DnToCnInternalServiceAsyncRequestManager.java  | 10 ++++++
 .../impl/DataNodeInternalRPCServiceImpl.java       | 13 +++-----
 .../AsyncConfigNodeInternalServiceClient.java      | 28 ++++++++++++++++
 .../async/AsyncDataNodeInternalServiceClient.java  | 28 ++++++++++++++++
 .../client/request/AsyncRequestManager.java        |  9 ++++-
 .../{Utils.java => TestConnectionUtils.java}       | 37 ++++++++++++++++++++-
 10 files changed, 168 insertions(+), 27 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToCnInternalServiceAsyncRequestManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToCnInternalServiceAsyncRequestManager.java
index bd5ee3fe499..19eaf9d9a40 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToCnInternalServiceAsyncRequestManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToCnInternalServiceAsyncRequestManager.java
@@ -21,9 +21,11 @@ package org.apache.iotdb.confignode.client.async;
 
 import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TNodeLocations;
+import 
org.apache.iotdb.commons.client.async.AsyncConfigNodeInternalServiceClient;
 import org.apache.iotdb.commons.client.request.AsyncRequestContext;
 import org.apache.iotdb.commons.client.request.AsyncRequestRPCHandler;
 import 
org.apache.iotdb.commons.client.request.ConfigNodeInternalServiceAsyncRequestManager;
+import org.apache.iotdb.commons.client.request.TestConnectionUtils;
 import org.apache.iotdb.confignode.client.CnToCnNodeRequestType;
 import 
org.apache.iotdb.confignode.client.async.handlers.rpc.ConfigNodeAsyncRequestRPCHandler;
 import 
org.apache.iotdb.confignode.client.async.handlers.rpc.ConfigNodeTSStatusRPCHandler;
@@ -59,6 +61,14 @@ public class CnToCnInternalServiceAsyncRequestManager
     return ConfigNodeAsyncRequestRPCHandler.buildHandler(requestContext, 
requestId, targetNode);
   }
 
+  @Override
+  protected void adjustClientTimeoutIfNecessary(
+      CnToCnNodeRequestType cnToCnNodeRequestType, 
AsyncConfigNodeInternalServiceClient client) {
+    if 
(CnToCnNodeRequestType.SUBMIT_TEST_CONNECTION_TASK.equals(cnToCnNodeRequestType))
 {
+      
client.setTimeoutTemporarily(TestConnectionUtils.calculateCnLeaderToAllCnMaxTime());
+    }
+  }
+
   private static class ClientPoolHolder {
     private static final CnToCnInternalServiceAsyncRequestManager INSTANCE =
         new CnToCnInternalServiceAsyncRequestManager();
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java
index 8f3e7c720fc..7ac3bbcd961 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java
@@ -27,9 +27,11 @@ import 
org.apache.iotdb.common.rpc.thrift.TSetConfigurationReq;
 import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq;
 import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
 import org.apache.iotdb.common.rpc.thrift.TSetThrottleQuotaReq;
+import 
org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
 import org.apache.iotdb.commons.client.request.AsyncRequestContext;
 import org.apache.iotdb.commons.client.request.AsyncRequestRPCHandler;
 import 
org.apache.iotdb.commons.client.request.DataNodeInternalServiceRequestManager;
+import org.apache.iotdb.commons.client.request.TestConnectionUtils;
 import org.apache.iotdb.confignode.client.CnToDnRequestType;
 import 
org.apache.iotdb.confignode.client.async.handlers.rpc.CheckTimeSeriesExistenceRPCHandler;
 import 
org.apache.iotdb.confignode.client.async.handlers.rpc.CountPathsUsingTemplateRPCHandler;
@@ -360,6 +362,14 @@ public class CnToDnInternalServiceAsyncRequestManager
     return DataNodeAsyncRequestRPCHandler.buildHandler(requestContext, 
requestId, targetNode);
   }
 
+  @Override
+  protected void adjustClientTimeoutIfNecessary(
+      CnToDnRequestType cnToDnRequestType, AsyncDataNodeInternalServiceClient 
client) {
+    if 
(CnToDnRequestType.SUBMIT_TEST_CONNECTION_TASK.equals(cnToDnRequestType)) {
+      
client.setTimeoutTemporarily(TestConnectionUtils.calculateCnLeaderToAllDnMaxTime());
+    }
+  }
+
   private static class ClientPoolHolder {
 
     private static final CnToDnInternalServiceAsyncRequestManager INSTANCE =
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterManager.java
index 808112cff4c..8dba6addffd 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterManager.java
@@ -31,7 +31,7 @@ import org.apache.iotdb.common.rpc.thrift.TServiceType;
 import org.apache.iotdb.common.rpc.thrift.TTestConnectionResp;
 import org.apache.iotdb.common.rpc.thrift.TTestConnectionResult;
 import org.apache.iotdb.commons.client.request.AsyncRequestContext;
-import org.apache.iotdb.commons.client.request.Utils;
+import org.apache.iotdb.commons.client.request.TestConnectionUtils;
 import org.apache.iotdb.confignode.client.CnToCnNodeRequestType;
 import org.apache.iotdb.confignode.client.CnToDnRequestType;
 import 
org.apache.iotdb.confignode.client.async.CnToCnInternalServiceAsyncRequestManager;
@@ -196,7 +196,7 @@ public class ClusterManager {
         new TSender()
             .setConfigNodeLocation(
                 
ConfigNodeDescriptor.getInstance().getConf().generateLocalConfigNodeLocation());
-    return Utils.testConnectionsImpl(
+    return TestConnectionUtils.testConnectionsImpl(
         configNodeLocations,
         sender,
         TConfigNodeLocation::getConfigNodeId,
@@ -205,8 +205,7 @@ public class ClusterManager {
         CnToCnNodeRequestType.TEST_CONNECTION,
         (AsyncRequestContext<Object, TSStatus, CnToCnNodeRequestType, 
TConfigNodeLocation>
                 handler) ->
-            CnToCnInternalServiceAsyncRequestManager.getInstance()
-                .sendAsyncRequestWithRetry(handler));
+            
CnToCnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequest(handler));
   }
 
   private List<TTestConnectionResult> badConfigNodeConnectionResult(
@@ -221,7 +220,7 @@ public class ClusterManager {
         new TSender()
             .setConfigNodeLocation(
                 
ConfigNodeDescriptor.getInstance().getConf().generateLocalConfigNodeLocation());
-    return Utils.testConnectionsImpl(
+    return TestConnectionUtils.testConnectionsImpl(
         dataNodeLocations,
         sender,
         TDataNodeLocation::getDataNodeId,
@@ -229,8 +228,7 @@ public class ClusterManager {
         TServiceType.DataNodeInternalService,
         CnToDnRequestType.TEST_CONNECTION,
         (AsyncRequestContext<Object, TSStatus, CnToDnRequestType, 
TDataNodeLocation> handler) ->
-            CnToDnInternalServiceAsyncRequestManager.getInstance()
-                .sendAsyncRequestWithRetry(handler));
+            
CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequest(handler));
   }
 
   private List<TTestConnectionResult> badDataNodeConnectionResult(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
index 43e30e515e8..5fa7bad27b3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.commons.client.ClientManager;
 import org.apache.iotdb.commons.client.ThriftClient;
 import org.apache.iotdb.commons.client.factory.ThriftClientFactory;
 import org.apache.iotdb.commons.client.property.ThriftClientProperty;
+import org.apache.iotdb.commons.client.request.TestConnectionUtils;
 import org.apache.iotdb.commons.client.sync.SyncThriftClientWithErrorHandler;
 import org.apache.iotdb.commons.consensus.ConfigRegionId;
 import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
@@ -211,12 +212,12 @@ public class ConfigNodeClient implements 
IConfigNodeRPCService.Iface, ThriftClie
     connectAndSync();
   }
 
-  public void connect(TEndPoint endpoint) throws TException {
+  public void connect(TEndPoint endpoint, int timeoutMs) throws TException {
     try {
       transport =
           DeepCopyRpcTransportFactory.INSTANCE.getTransport(
               // As there is a try-catch already, we do not need to use 
TSocket.wrap
-              endpoint.getIp(), endpoint.getPort(), 
property.getConnectionTimeoutMs());
+              endpoint.getIp(), endpoint.getPort(), timeoutMs);
       if (!transport.isOpen()) {
         transport.open();
       }
@@ -230,18 +231,28 @@ public class ConfigNodeClient implements 
IConfigNodeRPCService.Iface, ThriftClie
 
   private void connectAndSync() throws TException {
     try {
-      tryToConnect();
+      tryToConnect(property.getConnectionTimeoutMs());
     } catch (TException e) {
       // Can not connect to each config node
       syncLatestConfigNodeList();
-      tryToConnect();
+      tryToConnect(property.getConnectionTimeoutMs());
     }
   }
 
-  private void tryToConnect() throws TException {
+  private void connectAndSync(int timeoutMs) throws TException {
+    try {
+      tryToConnect(timeoutMs);
+    } catch (TException e) {
+      // Can not connect to each config node
+      syncLatestConfigNodeList();
+      tryToConnect(timeoutMs);
+    }
+  }
+
+  private void tryToConnect(int timeoutMs) throws TException {
     if (configLeader != null) {
       try {
-        connect(configLeader);
+        connect(configLeader, timeoutMs);
         return;
       } catch (TException ignore) {
         logger.warn("The current node may have been down {},try next node", 
configLeader);
@@ -266,7 +277,7 @@ public class ConfigNodeClient implements 
IConfigNodeRPCService.Iface, ThriftClie
       TEndPoint tryEndpoint = configNodes.get(cursor);
 
       try {
-        connect(tryEndpoint);
+        connect(tryEndpoint, timeoutMs);
         return;
       } catch (TException ignore) {
         logger.warn("The current node may have been down {},try next node", 
tryEndpoint);
@@ -744,9 +755,16 @@ public class ConfigNodeClient implements 
IConfigNodeRPCService.Iface, ThriftClie
 
   @Override
   public TTestConnectionResp submitTestConnectionTaskToLeader() throws 
TException {
-    return executeRemoteCallWithRetry(
-        () -> client.submitTestConnectionTaskToLeader(),
-        resp -> !updateConfigNodeLeader(resp.getStatus()));
+    try {
+      // this rpc need special timeout
+      connectAndSync(TestConnectionUtils.calculateDnToCnLeaderMaxTime());
+      return executeRemoteCallWithRetry(
+          () -> client.submitTestConnectionTaskToLeader(),
+          resp -> !updateConfigNodeLeader(resp.getStatus()));
+    } finally {
+      // reset timeout to default
+      connectAndSync();
+    }
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/cn/DnToCnInternalServiceAsyncRequestManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/cn/DnToCnInternalServiceAsyncRequestManager.java
index 3f676367240..b6a218a732e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/cn/DnToCnInternalServiceAsyncRequestManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/cn/DnToCnInternalServiceAsyncRequestManager.java
@@ -20,9 +20,11 @@
 package org.apache.iotdb.db.protocol.client.cn;
 
 import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
+import 
org.apache.iotdb.commons.client.async.AsyncConfigNodeInternalServiceClient;
 import org.apache.iotdb.commons.client.request.AsyncRequestContext;
 import org.apache.iotdb.commons.client.request.AsyncRequestRPCHandler;
 import 
org.apache.iotdb.commons.client.request.ConfigNodeInternalServiceAsyncRequestManager;
+import org.apache.iotdb.commons.client.request.TestConnectionUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,6 +51,14 @@ public class DnToCnInternalServiceAsyncRequestManager
     return ConfigNodeAsyncRequestRPCHandler.buildHandler(requestContext, 
requestId, targetNode);
   }
 
+  @Override
+  protected void adjustClientTimeoutIfNecessary(
+      DnToCnRequestType dnToCnRequestType, 
AsyncConfigNodeInternalServiceClient client) {
+    if 
(DnToCnRequestType.SUBMIT_TEST_CONNECTION_TASK.equals(dnToCnRequestType)) {
+      
client.setTimeoutTemporarily(TestConnectionUtils.calculateCnLeaderToAllNodeMaxTime());
+    }
+  }
+
   private static class ClientPoolHolder {
     private static final DnToCnInternalServiceAsyncRequestManager INSTANCE =
         new DnToCnInternalServiceAsyncRequestManager();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 2c427a35ca3..2effa00ce89 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -281,7 +281,7 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static 
org.apache.iotdb.commons.client.request.Utils.testConnectionsImpl;
+import static 
org.apache.iotdb.commons.client.request.TestConnectionUtils.testConnectionsImpl;
 import static 
org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
 import static 
org.apache.iotdb.db.service.RegionMigrateService.REGION_MIGRATE_PROCESS;
 import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
@@ -1466,8 +1466,7 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
         TServiceType.ConfigNodeInternalService,
         DnToCnRequestType.TEST_CONNECTION,
         (AsyncRequestContext<Object, TSStatus, DnToCnRequestType, 
TConfigNodeLocation> handler) ->
-            DnToCnInternalServiceAsyncRequestManager.getInstance()
-                .sendAsyncRequestWithRetry(handler));
+            
DnToCnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequest(handler));
   }
 
   private List<TTestConnectionResult> testAllDataNodeInternalServiceConnection(
@@ -1479,8 +1478,7 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
         TServiceType.DataNodeInternalService,
         DnToDnRequestType.TEST_CONNECTION,
         (AsyncRequestContext<Object, TSStatus, DnToDnRequestType, 
TDataNodeLocation> handler) ->
-            DnToDnInternalServiceAsyncRequestManager.getInstance()
-                .sendAsyncRequestWithRetry(handler));
+            
DnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequest(handler));
   }
 
   private List<TTestConnectionResult> testAllDataNodeMPPServiceConnection(
@@ -1492,7 +1490,7 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
         TServiceType.DataNodeMPPService,
         DnToDnRequestType.TEST_CONNECTION,
         (AsyncRequestContext<Object, TSStatus, DnToDnRequestType, 
TDataNodeLocation> handler) ->
-            
DataNodeMPPServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(handler));
+            
DataNodeMPPServiceAsyncRequestManager.getInstance().sendAsyncRequest(handler));
   }
 
   private List<TTestConnectionResult> testAllDataNodeExternalServiceConnection(
@@ -1504,8 +1502,7 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
         TServiceType.DataNodeExternalService,
         DnToDnRequestType.TEST_CONNECTION,
         (AsyncRequestContext<Object, TSStatus, DnToDnRequestType, 
TDataNodeLocation> handler) ->
-            DataNodeExternalServiceAsyncRequestManager.getInstance()
-                .sendAsyncRequestWithRetry(handler));
+            
DataNodeExternalServiceAsyncRequestManager.getInstance().sendAsyncRequest(handler));
   }
 
   @Override
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeInternalServiceClient.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeInternalServiceClient.java
index b5adc961136..1b3d6a21a1f 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeInternalServiceClient.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeInternalServiceClient.java
@@ -42,6 +42,8 @@ public class AsyncConfigNodeInternalServiceClient extends 
IConfigNodeRPCService.
   private static final Logger logger =
       LoggerFactory.getLogger(AsyncConfigNodeInternalServiceClient.class);
 
+  private long originalTimeout = -1;
+
   private final boolean printLogWhenEncounterException;
   private final TEndPoint endpoint;
   private final ClientManager<TEndPoint, AsyncConfigNodeInternalServiceClient> 
clientManager;
@@ -98,9 +100,35 @@ public class AsyncConfigNodeInternalServiceClient extends 
IConfigNodeRPCService.
    * RPC is finished.
    */
   private void returnSelf() {
+    if (originalTimeout != -1) {
+      recoverTimeout();
+    }
     clientManager.returnClient(endpoint, this);
   }
 
+  /**
+   * Call this method when needed to temporarily modify the timeout period. 
The original timeout
+   * will be saved and automatically restored when the client is returned.
+   */
+  public synchronized void setTimeoutTemporarily(long timeout) {
+    if (originalTimeout != -1) {
+      logger.warn(
+          "This client's timeout has been set to {}. If you need to set it to 
{}, please call the recoverTimeout() first.",
+          originalTimeout,
+          timeout);
+    }
+    originalTimeout = getTimeout();
+    setTimeout(timeout);
+  }
+
+  private synchronized void recoverTimeout() {
+    if (originalTimeout == -1) {
+      logger.warn("This client's timeout has not been modified, cannot reset");
+    }
+    setTimeout(originalTimeout);
+    originalTimeout = -1;
+  }
+
   private void close() {
     ___transport.close();
     ___currentMethod = null;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
index 7b617788c3e..e56aab91c0d 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
@@ -43,6 +43,8 @@ public class AsyncDataNodeInternalServiceClient extends 
IDataNodeRPCService.Asyn
   private static final Logger logger =
       LoggerFactory.getLogger(AsyncDataNodeInternalServiceClient.class);
 
+  public long originalTimeout = -1;
+
   private final boolean printLogWhenEncounterException;
 
   private final TEndPoint endpoint;
@@ -110,9 +112,35 @@ public class AsyncDataNodeInternalServiceClient extends 
IDataNodeRPCService.Asyn
    * RPC is finished.
    */
   private void returnSelf() {
+    if (originalTimeout != -1) {
+      recoverTimeout();
+    }
     clientManager.returnClient(endpoint, this);
   }
 
+  /**
+   * Call this method when needed to temporarily modify the timeout period. 
The original timeout
+   * will be saved and automatically restored when the client is returned.
+   */
+  public synchronized void setTimeoutTemporarily(long timeout) {
+    if (originalTimeout != -1) {
+      logger.warn(
+          "This client's timeout has been set to {}. If you need to set it to 
{}, please call the recoverTimeout() first.",
+          originalTimeout,
+          timeout);
+    }
+    originalTimeout = getTimeout();
+    setTimeout(timeout);
+  }
+
+  private synchronized void recoverTimeout() {
+    if (originalTimeout == -1) {
+      logger.warn("This client's timeout has not been modified, cannot reset");
+    }
+    setTimeout(originalTimeout);
+    originalTimeout = -1;
+  }
+
   private void close() {
     ___transport.close();
     ___currentMethod = null;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java
index ed3feb6598b..aa49355b31f 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java
@@ -152,9 +152,12 @@ public abstract class AsyncRequestManager<RequestType, 
NodeLocation, Client> {
     try {
       if (!actionMap.containsKey(requestContext.getRequestType())) {
         throw new UnsupportedOperationException(
-            "unsupported request type: " + requestContext.getRequestType());
+            "unsupported request type "
+                + requestContext.getRequestType()
+                + ", please set it in 
AsyncRequestManager::initActionMapBuilder()");
       }
       Client client = 
clientManager.borrowClient(nodeLocationToEndPoint(targetNode));
+      adjustClientTimeoutIfNecessary(requestContext.getRequestType(), client);
       Object req = requestContext.getRequest(requestId);
       AsyncRequestRPCHandler<?, RequestType, NodeLocation> handler =
           buildHandler(requestContext, requestId, targetNode);
@@ -170,6 +173,10 @@ public abstract class AsyncRequestManager<RequestType, 
NodeLocation, Client> {
     }
   }
 
+  protected void adjustClientTimeoutIfNecessary(RequestType type, Client 
client) {
+    // In default, no need to do this
+  }
+
   protected abstract TEndPoint nodeLocationToEndPoint(NodeLocation location);
 
   protected abstract AsyncRequestRPCHandler<?, RequestType, NodeLocation> 
buildHandler(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/Utils.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/TestConnectionUtils.java
similarity index 71%
rename from 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/Utils.java
rename to 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/TestConnectionUtils.java
index 6d4704f059b..99590df4542 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/Utils.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/TestConnectionUtils.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.common.rpc.thrift.TSender;
 import org.apache.iotdb.common.rpc.thrift.TServiceProvider;
 import org.apache.iotdb.common.rpc.thrift.TServiceType;
 import org.apache.iotdb.common.rpc.thrift.TTestConnectionResult;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import java.util.ArrayList;
@@ -34,7 +35,12 @@ import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
-public class Utils {
+public class TestConnectionUtils {
+  private static int dataNodeServiceRequestTimeout =
+      CommonDescriptor.getInstance().getConfig().getConnectionTimeoutInMS();
+  private static int configNodeServiceRequestTimeout =
+      CommonDescriptor.getInstance().getConfig().getConnectionTimeoutInMS();
+
   public static <ServiceProviderLocation, RequestType>
       List<TTestConnectionResult> testConnectionsImpl(
           List<ServiceProviderLocation> nodeLocations,
@@ -75,4 +81,33 @@ public class Utils {
             });
     return results;
   }
+
+  public static int calculateCnLeaderToAllCnMaxTime() {
+    return
+    // SUBMIT_TEST_CONNECTION_TASK rpc timeout
+    configNodeServiceRequestTimeout
+        // cn internal service
+        + configNodeServiceRequestTimeout
+        // dn internal service
+        + dataNodeServiceRequestTimeout;
+  }
+
+  public static int calculateCnLeaderToAllDnMaxTime() {
+    return
+    // SUBMIT_TEST_CONNECTION_TASK rpc timeout
+    configNodeServiceRequestTimeout
+        // cn internal service
+        + configNodeServiceRequestTimeout
+        // dn internal, external, mpp service
+        + 3 * dataNodeServiceRequestTimeout;
+  }
+
+  public static int calculateCnLeaderToAllNodeMaxTime() {
+    return (int) ((calculateCnLeaderToAllCnMaxTime() + 
calculateCnLeaderToAllDnMaxTime()) * 1.1);
+  }
+
+  public static int calculateDnToCnLeaderMaxTime() {
+    return calculateCnLeaderToAllDnMaxTime()
+        + 
CommonDescriptor.getInstance().getConfig().getConnectionTimeoutInMS();
+  }
 }

Reply via email to