This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new f50516a0384 Adjust verify connection rpc time limit to handle port
abandoned port (#12855)
f50516a0384 is described below
commit f50516a038492e65fb46dfe08244b9b1883b2508
Author: Li Yu Heng <[email protected]>
AuthorDate: Fri Jul 5 19:16:52 2024 +0800
Adjust verify connection rpc time limit to handle port abandoned port
(#12855)
* don't retry
* ??
* ConfigNodeClient
* add log
* recover timeout in a better way
* tan review
* synchronized
---
.../CnToCnInternalServiceAsyncRequestManager.java | 10 ++++++
.../CnToDnInternalServiceAsyncRequestManager.java | 10 ++++++
.../iotdb/confignode/manager/ClusterManager.java | 12 +++----
.../iotdb/db/protocol/client/ConfigNodeClient.java | 38 ++++++++++++++++------
.../DnToCnInternalServiceAsyncRequestManager.java | 10 ++++++
.../impl/DataNodeInternalRPCServiceImpl.java | 13 +++-----
.../AsyncConfigNodeInternalServiceClient.java | 28 ++++++++++++++++
.../async/AsyncDataNodeInternalServiceClient.java | 28 ++++++++++++++++
.../client/request/AsyncRequestManager.java | 9 ++++-
.../{Utils.java => TestConnectionUtils.java} | 37 ++++++++++++++++++++-
10 files changed, 168 insertions(+), 27 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToCnInternalServiceAsyncRequestManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToCnInternalServiceAsyncRequestManager.java
index bd5ee3fe499..19eaf9d9a40 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToCnInternalServiceAsyncRequestManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToCnInternalServiceAsyncRequestManager.java
@@ -21,9 +21,11 @@ package org.apache.iotdb.confignode.client.async;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TNodeLocations;
+import
org.apache.iotdb.commons.client.async.AsyncConfigNodeInternalServiceClient;
import org.apache.iotdb.commons.client.request.AsyncRequestContext;
import org.apache.iotdb.commons.client.request.AsyncRequestRPCHandler;
import
org.apache.iotdb.commons.client.request.ConfigNodeInternalServiceAsyncRequestManager;
+import org.apache.iotdb.commons.client.request.TestConnectionUtils;
import org.apache.iotdb.confignode.client.CnToCnNodeRequestType;
import
org.apache.iotdb.confignode.client.async.handlers.rpc.ConfigNodeAsyncRequestRPCHandler;
import
org.apache.iotdb.confignode.client.async.handlers.rpc.ConfigNodeTSStatusRPCHandler;
@@ -59,6 +61,14 @@ public class CnToCnInternalServiceAsyncRequestManager
return ConfigNodeAsyncRequestRPCHandler.buildHandler(requestContext,
requestId, targetNode);
}
+ @Override
+ protected void adjustClientTimeoutIfNecessary(
+ CnToCnNodeRequestType cnToCnNodeRequestType,
AsyncConfigNodeInternalServiceClient client) {
+ if
(CnToCnNodeRequestType.SUBMIT_TEST_CONNECTION_TASK.equals(cnToCnNodeRequestType))
{
+
client.setTimeoutTemporarily(TestConnectionUtils.calculateCnLeaderToAllCnMaxTime());
+ }
+ }
+
private static class ClientPoolHolder {
private static final CnToCnInternalServiceAsyncRequestManager INSTANCE =
new CnToCnInternalServiceAsyncRequestManager();
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java
index 8f3e7c720fc..7ac3bbcd961 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java
@@ -27,9 +27,11 @@ import
org.apache.iotdb.common.rpc.thrift.TSetConfigurationReq;
import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq;
import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
import org.apache.iotdb.common.rpc.thrift.TSetThrottleQuotaReq;
+import
org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.client.request.AsyncRequestContext;
import org.apache.iotdb.commons.client.request.AsyncRequestRPCHandler;
import
org.apache.iotdb.commons.client.request.DataNodeInternalServiceRequestManager;
+import org.apache.iotdb.commons.client.request.TestConnectionUtils;
import org.apache.iotdb.confignode.client.CnToDnRequestType;
import
org.apache.iotdb.confignode.client.async.handlers.rpc.CheckTimeSeriesExistenceRPCHandler;
import
org.apache.iotdb.confignode.client.async.handlers.rpc.CountPathsUsingTemplateRPCHandler;
@@ -360,6 +362,14 @@ public class CnToDnInternalServiceAsyncRequestManager
return DataNodeAsyncRequestRPCHandler.buildHandler(requestContext,
requestId, targetNode);
}
+ @Override
+ protected void adjustClientTimeoutIfNecessary(
+ CnToDnRequestType cnToDnRequestType, AsyncDataNodeInternalServiceClient
client) {
+ if
(CnToDnRequestType.SUBMIT_TEST_CONNECTION_TASK.equals(cnToDnRequestType)) {
+
client.setTimeoutTemporarily(TestConnectionUtils.calculateCnLeaderToAllDnMaxTime());
+ }
+ }
+
private static class ClientPoolHolder {
private static final CnToDnInternalServiceAsyncRequestManager INSTANCE =
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterManager.java
index 808112cff4c..8dba6addffd 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterManager.java
@@ -31,7 +31,7 @@ import org.apache.iotdb.common.rpc.thrift.TServiceType;
import org.apache.iotdb.common.rpc.thrift.TTestConnectionResp;
import org.apache.iotdb.common.rpc.thrift.TTestConnectionResult;
import org.apache.iotdb.commons.client.request.AsyncRequestContext;
-import org.apache.iotdb.commons.client.request.Utils;
+import org.apache.iotdb.commons.client.request.TestConnectionUtils;
import org.apache.iotdb.confignode.client.CnToCnNodeRequestType;
import org.apache.iotdb.confignode.client.CnToDnRequestType;
import
org.apache.iotdb.confignode.client.async.CnToCnInternalServiceAsyncRequestManager;
@@ -196,7 +196,7 @@ public class ClusterManager {
new TSender()
.setConfigNodeLocation(
ConfigNodeDescriptor.getInstance().getConf().generateLocalConfigNodeLocation());
- return Utils.testConnectionsImpl(
+ return TestConnectionUtils.testConnectionsImpl(
configNodeLocations,
sender,
TConfigNodeLocation::getConfigNodeId,
@@ -205,8 +205,7 @@ public class ClusterManager {
CnToCnNodeRequestType.TEST_CONNECTION,
(AsyncRequestContext<Object, TSStatus, CnToCnNodeRequestType,
TConfigNodeLocation>
handler) ->
- CnToCnInternalServiceAsyncRequestManager.getInstance()
- .sendAsyncRequestWithRetry(handler));
+
CnToCnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequest(handler));
}
private List<TTestConnectionResult> badConfigNodeConnectionResult(
@@ -221,7 +220,7 @@ public class ClusterManager {
new TSender()
.setConfigNodeLocation(
ConfigNodeDescriptor.getInstance().getConf().generateLocalConfigNodeLocation());
- return Utils.testConnectionsImpl(
+ return TestConnectionUtils.testConnectionsImpl(
dataNodeLocations,
sender,
TDataNodeLocation::getDataNodeId,
@@ -229,8 +228,7 @@ public class ClusterManager {
TServiceType.DataNodeInternalService,
CnToDnRequestType.TEST_CONNECTION,
(AsyncRequestContext<Object, TSStatus, CnToDnRequestType,
TDataNodeLocation> handler) ->
- CnToDnInternalServiceAsyncRequestManager.getInstance()
- .sendAsyncRequestWithRetry(handler));
+
CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequest(handler));
}
private List<TTestConnectionResult> badDataNodeConnectionResult(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
index 43e30e515e8..5fa7bad27b3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.commons.client.ClientManager;
import org.apache.iotdb.commons.client.ThriftClient;
import org.apache.iotdb.commons.client.factory.ThriftClientFactory;
import org.apache.iotdb.commons.client.property.ThriftClientProperty;
+import org.apache.iotdb.commons.client.request.TestConnectionUtils;
import org.apache.iotdb.commons.client.sync.SyncThriftClientWithErrorHandler;
import org.apache.iotdb.commons.consensus.ConfigRegionId;
import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
@@ -211,12 +212,12 @@ public class ConfigNodeClient implements
IConfigNodeRPCService.Iface, ThriftClie
connectAndSync();
}
- public void connect(TEndPoint endpoint) throws TException {
+ public void connect(TEndPoint endpoint, int timeoutMs) throws TException {
try {
transport =
DeepCopyRpcTransportFactory.INSTANCE.getTransport(
// As there is a try-catch already, we do not need to use
TSocket.wrap
- endpoint.getIp(), endpoint.getPort(),
property.getConnectionTimeoutMs());
+ endpoint.getIp(), endpoint.getPort(), timeoutMs);
if (!transport.isOpen()) {
transport.open();
}
@@ -230,18 +231,28 @@ public class ConfigNodeClient implements
IConfigNodeRPCService.Iface, ThriftClie
private void connectAndSync() throws TException {
try {
- tryToConnect();
+ tryToConnect(property.getConnectionTimeoutMs());
} catch (TException e) {
// Can not connect to each config node
syncLatestConfigNodeList();
- tryToConnect();
+ tryToConnect(property.getConnectionTimeoutMs());
}
}
- private void tryToConnect() throws TException {
+ private void connectAndSync(int timeoutMs) throws TException {
+ try {
+ tryToConnect(timeoutMs);
+ } catch (TException e) {
+ // Can not connect to each config node
+ syncLatestConfigNodeList();
+ tryToConnect(timeoutMs);
+ }
+ }
+
+ private void tryToConnect(int timeoutMs) throws TException {
if (configLeader != null) {
try {
- connect(configLeader);
+ connect(configLeader, timeoutMs);
return;
} catch (TException ignore) {
logger.warn("The current node may have been down {},try next node",
configLeader);
@@ -266,7 +277,7 @@ public class ConfigNodeClient implements
IConfigNodeRPCService.Iface, ThriftClie
TEndPoint tryEndpoint = configNodes.get(cursor);
try {
- connect(tryEndpoint);
+ connect(tryEndpoint, timeoutMs);
return;
} catch (TException ignore) {
logger.warn("The current node may have been down {},try next node",
tryEndpoint);
@@ -744,9 +755,16 @@ public class ConfigNodeClient implements
IConfigNodeRPCService.Iface, ThriftClie
@Override
public TTestConnectionResp submitTestConnectionTaskToLeader() throws
TException {
- return executeRemoteCallWithRetry(
- () -> client.submitTestConnectionTaskToLeader(),
- resp -> !updateConfigNodeLeader(resp.getStatus()));
+ try {
+ // this rpc need special timeout
+ connectAndSync(TestConnectionUtils.calculateDnToCnLeaderMaxTime());
+ return executeRemoteCallWithRetry(
+ () -> client.submitTestConnectionTaskToLeader(),
+ resp -> !updateConfigNodeLeader(resp.getStatus()));
+ } finally {
+ // reset timeout to default
+ connectAndSync();
+ }
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/cn/DnToCnInternalServiceAsyncRequestManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/cn/DnToCnInternalServiceAsyncRequestManager.java
index 3f676367240..b6a218a732e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/cn/DnToCnInternalServiceAsyncRequestManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/cn/DnToCnInternalServiceAsyncRequestManager.java
@@ -20,9 +20,11 @@
package org.apache.iotdb.db.protocol.client.cn;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
+import
org.apache.iotdb.commons.client.async.AsyncConfigNodeInternalServiceClient;
import org.apache.iotdb.commons.client.request.AsyncRequestContext;
import org.apache.iotdb.commons.client.request.AsyncRequestRPCHandler;
import
org.apache.iotdb.commons.client.request.ConfigNodeInternalServiceAsyncRequestManager;
+import org.apache.iotdb.commons.client.request.TestConnectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,6 +51,14 @@ public class DnToCnInternalServiceAsyncRequestManager
return ConfigNodeAsyncRequestRPCHandler.buildHandler(requestContext,
requestId, targetNode);
}
+ @Override
+ protected void adjustClientTimeoutIfNecessary(
+ DnToCnRequestType dnToCnRequestType,
AsyncConfigNodeInternalServiceClient client) {
+ if
(DnToCnRequestType.SUBMIT_TEST_CONNECTION_TASK.equals(dnToCnRequestType)) {
+
client.setTimeoutTemporarily(TestConnectionUtils.calculateCnLeaderToAllNodeMaxTime());
+ }
+ }
+
private static class ClientPoolHolder {
private static final DnToCnInternalServiceAsyncRequestManager INSTANCE =
new DnToCnInternalServiceAsyncRequestManager();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 2c427a35ca3..2effa00ce89 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -281,7 +281,7 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import static
org.apache.iotdb.commons.client.request.Utils.testConnectionsImpl;
+import static
org.apache.iotdb.commons.client.request.TestConnectionUtils.testConnectionsImpl;
import static
org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
import static
org.apache.iotdb.db.service.RegionMigrateService.REGION_MIGRATE_PROCESS;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
@@ -1466,8 +1466,7 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
TServiceType.ConfigNodeInternalService,
DnToCnRequestType.TEST_CONNECTION,
(AsyncRequestContext<Object, TSStatus, DnToCnRequestType,
TConfigNodeLocation> handler) ->
- DnToCnInternalServiceAsyncRequestManager.getInstance()
- .sendAsyncRequestWithRetry(handler));
+
DnToCnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequest(handler));
}
private List<TTestConnectionResult> testAllDataNodeInternalServiceConnection(
@@ -1479,8 +1478,7 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
TServiceType.DataNodeInternalService,
DnToDnRequestType.TEST_CONNECTION,
(AsyncRequestContext<Object, TSStatus, DnToDnRequestType,
TDataNodeLocation> handler) ->
- DnToDnInternalServiceAsyncRequestManager.getInstance()
- .sendAsyncRequestWithRetry(handler));
+
DnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequest(handler));
}
private List<TTestConnectionResult> testAllDataNodeMPPServiceConnection(
@@ -1492,7 +1490,7 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
TServiceType.DataNodeMPPService,
DnToDnRequestType.TEST_CONNECTION,
(AsyncRequestContext<Object, TSStatus, DnToDnRequestType,
TDataNodeLocation> handler) ->
-
DataNodeMPPServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(handler));
+
DataNodeMPPServiceAsyncRequestManager.getInstance().sendAsyncRequest(handler));
}
private List<TTestConnectionResult> testAllDataNodeExternalServiceConnection(
@@ -1504,8 +1502,7 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
TServiceType.DataNodeExternalService,
DnToDnRequestType.TEST_CONNECTION,
(AsyncRequestContext<Object, TSStatus, DnToDnRequestType,
TDataNodeLocation> handler) ->
- DataNodeExternalServiceAsyncRequestManager.getInstance()
- .sendAsyncRequestWithRetry(handler));
+
DataNodeExternalServiceAsyncRequestManager.getInstance().sendAsyncRequest(handler));
}
@Override
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeInternalServiceClient.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeInternalServiceClient.java
index b5adc961136..1b3d6a21a1f 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeInternalServiceClient.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeInternalServiceClient.java
@@ -42,6 +42,8 @@ public class AsyncConfigNodeInternalServiceClient extends
IConfigNodeRPCService.
private static final Logger logger =
LoggerFactory.getLogger(AsyncConfigNodeInternalServiceClient.class);
+ private long originalTimeout = -1;
+
private final boolean printLogWhenEncounterException;
private final TEndPoint endpoint;
private final ClientManager<TEndPoint, AsyncConfigNodeInternalServiceClient>
clientManager;
@@ -98,9 +100,35 @@ public class AsyncConfigNodeInternalServiceClient extends
IConfigNodeRPCService.
* RPC is finished.
*/
private void returnSelf() {
+ if (originalTimeout != -1) {
+ recoverTimeout();
+ }
clientManager.returnClient(endpoint, this);
}
+ /**
+ * Call this method when needed to temporarily modify the timeout period.
The original timeout
+ * will be saved and automatically restored when the client is returned.
+ */
+ public synchronized void setTimeoutTemporarily(long timeout) {
+ if (originalTimeout != -1) {
+ logger.warn(
+ "This client's timeout has been set to {}. If you need to set it to
{}, please call the recoverTimeout() first.",
+ originalTimeout,
+ timeout);
+ }
+ originalTimeout = getTimeout();
+ setTimeout(timeout);
+ }
+
+ private synchronized void recoverTimeout() {
+ if (originalTimeout == -1) {
+ logger.warn("This client's timeout has not been modified, cannot reset");
+ }
+ setTimeout(originalTimeout);
+ originalTimeout = -1;
+ }
+
private void close() {
___transport.close();
___currentMethod = null;
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
index 7b617788c3e..e56aab91c0d 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
@@ -43,6 +43,8 @@ public class AsyncDataNodeInternalServiceClient extends
IDataNodeRPCService.Asyn
private static final Logger logger =
LoggerFactory.getLogger(AsyncDataNodeInternalServiceClient.class);
+ public long originalTimeout = -1;
+
private final boolean printLogWhenEncounterException;
private final TEndPoint endpoint;
@@ -110,9 +112,35 @@ public class AsyncDataNodeInternalServiceClient extends
IDataNodeRPCService.Asyn
* RPC is finished.
*/
private void returnSelf() {
+ if (originalTimeout != -1) {
+ recoverTimeout();
+ }
clientManager.returnClient(endpoint, this);
}
+ /**
+ * Call this method when needed to temporarily modify the timeout period.
The original timeout
+ * will be saved and automatically restored when the client is returned.
+ */
+ public synchronized void setTimeoutTemporarily(long timeout) {
+ if (originalTimeout != -1) {
+ logger.warn(
+ "This client's timeout has been set to {}. If you need to set it to
{}, please call the recoverTimeout() first.",
+ originalTimeout,
+ timeout);
+ }
+ originalTimeout = getTimeout();
+ setTimeout(timeout);
+ }
+
+ private synchronized void recoverTimeout() {
+ if (originalTimeout == -1) {
+ logger.warn("This client's timeout has not been modified, cannot reset");
+ }
+ setTimeout(originalTimeout);
+ originalTimeout = -1;
+ }
+
private void close() {
___transport.close();
___currentMethod = null;
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java
index ed3feb6598b..aa49355b31f 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java
@@ -152,9 +152,12 @@ public abstract class AsyncRequestManager<RequestType,
NodeLocation, Client> {
try {
if (!actionMap.containsKey(requestContext.getRequestType())) {
throw new UnsupportedOperationException(
- "unsupported request type: " + requestContext.getRequestType());
+ "unsupported request type "
+ + requestContext.getRequestType()
+ + ", please set it in
AsyncRequestManager::initActionMapBuilder()");
}
Client client =
clientManager.borrowClient(nodeLocationToEndPoint(targetNode));
+ adjustClientTimeoutIfNecessary(requestContext.getRequestType(), client);
Object req = requestContext.getRequest(requestId);
AsyncRequestRPCHandler<?, RequestType, NodeLocation> handler =
buildHandler(requestContext, requestId, targetNode);
@@ -170,6 +173,10 @@ public abstract class AsyncRequestManager<RequestType,
NodeLocation, Client> {
}
}
+ protected void adjustClientTimeoutIfNecessary(RequestType type, Client
client) {
+ // In default, no need to do this
+ }
+
protected abstract TEndPoint nodeLocationToEndPoint(NodeLocation location);
protected abstract AsyncRequestRPCHandler<?, RequestType, NodeLocation>
buildHandler(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/Utils.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/TestConnectionUtils.java
similarity index 71%
rename from
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/Utils.java
rename to
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/TestConnectionUtils.java
index 6d4704f059b..99590df4542 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/Utils.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/TestConnectionUtils.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.common.rpc.thrift.TSender;
import org.apache.iotdb.common.rpc.thrift.TServiceProvider;
import org.apache.iotdb.common.rpc.thrift.TServiceType;
import org.apache.iotdb.common.rpc.thrift.TTestConnectionResult;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.rpc.TSStatusCode;
import java.util.ArrayList;
@@ -34,7 +35,12 @@ import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
-public class Utils {
+public class TestConnectionUtils {
+ private static int dataNodeServiceRequestTimeout =
+ CommonDescriptor.getInstance().getConfig().getConnectionTimeoutInMS();
+ private static int configNodeServiceRequestTimeout =
+ CommonDescriptor.getInstance().getConfig().getConnectionTimeoutInMS();
+
public static <ServiceProviderLocation, RequestType>
List<TTestConnectionResult> testConnectionsImpl(
List<ServiceProviderLocation> nodeLocations,
@@ -75,4 +81,33 @@ public class Utils {
});
return results;
}
+
+ public static int calculateCnLeaderToAllCnMaxTime() {
+ return
+ // SUBMIT_TEST_CONNECTION_TASK rpc timeout
+ configNodeServiceRequestTimeout
+ // cn internal service
+ + configNodeServiceRequestTimeout
+ // dn internal service
+ + dataNodeServiceRequestTimeout;
+ }
+
+ public static int calculateCnLeaderToAllDnMaxTime() {
+ return
+ // SUBMIT_TEST_CONNECTION_TASK rpc timeout
+ configNodeServiceRequestTimeout
+ // cn internal service
+ + configNodeServiceRequestTimeout
+ // dn internal, external, mpp service
+ + 3 * dataNodeServiceRequestTimeout;
+ }
+
+ public static int calculateCnLeaderToAllNodeMaxTime() {
+ return (int) ((calculateCnLeaderToAllCnMaxTime() +
calculateCnLeaderToAllDnMaxTime()) * 1.1);
+ }
+
+ public static int calculateDnToCnLeaderMaxTime() {
+ return calculateCnLeaderToAllDnMaxTime()
+ +
CommonDescriptor.getInstance().getConfig().getConnectionTimeoutInMS();
+ }
}