This is an automated email from the ASF dual-hosted git repository.
CRZbulabula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 1e945dd3a99 Gracefully manage async client threads (#17683)
1e945dd3a99 is described below
commit 1e945dd3a9985ffb79ddaab940f2912507f9318b
Author: Yongzao <[email protected]>
AuthorDate: Sun May 17 17:07:36 2026 +0800
Gracefully manage async client threads (#17683)
---
.../async/AsyncAINodeHeartbeatClientPool.java | 15 +++++++++++-
.../async/AsyncConfigNodeHeartbeatClientPool.java | 15 +++++++++++-
.../async/AsyncDataNodeHeartbeatClientPool.java | 28 ++++++++++++++++++++--
.../client/sync/SyncConfigNodeClientPool.java | 5 ++++
.../client/request/AsyncRequestManager.java | 16 +++++++++++--
5 files changed, 73 insertions(+), 6 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncAINodeHeartbeatClientPool.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncAINodeHeartbeatClientPool.java
index f3e1d1064ef..d15b45cbcd5 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncAINodeHeartbeatClientPool.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncAINodeHeartbeatClientPool.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.confignode.client.async;
import org.apache.iotdb.ainode.rpc.thrift.TAIHeartbeatReq;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.ClientManager;
import org.apache.iotdb.commons.client.ClientPoolFactory;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.async.AsyncAINodeInternalServiceClient;
@@ -47,10 +48,22 @@ public class AsyncAINodeHeartbeatClientPool {
*/
public void getAINodeHeartBeat(
TEndPoint endPoint, TAIHeartbeatReq req, AINodeHeartbeatHandler handler)
{
+ AsyncAINodeInternalServiceClient client = null;
+ boolean dispatched = false;
try {
- clientManager.borrowClient(endPoint).getAIHeartbeat(req, handler);
+ client = clientManager.borrowClient(endPoint);
+ client.getAIHeartbeat(req, handler);
+ dispatched = true;
} catch (Exception ignore) {
// Just ignore
+ } finally {
+ // After the async call is dispatched, the client's onComplete/onError
callback is
+ // responsible for returning the client. If the RPC was not dispatched
(exception
+ // before/during the call), the client must be returned here to prevent
pool leakage.
+ if (!dispatched && client != null && clientManager instanceof
ClientManager) {
+ ((ClientManager<TEndPoint, AsyncAINodeInternalServiceClient>)
clientManager)
+ .returnClient(endPoint, client);
+ }
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java
index a6dffbe0eef..5cf716d0dd1 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.confignode.client.async;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.ClientManager;
import org.apache.iotdb.commons.client.ClientPoolFactory;
import org.apache.iotdb.commons.client.IClientManager;
import
org.apache.iotdb.commons.client.async.AsyncConfigNodeInternalServiceClient;
@@ -48,10 +49,22 @@ public class AsyncConfigNodeHeartbeatClientPool {
TEndPoint endPoint,
TConfigNodeHeartbeatReq heartbeatReq,
ConfigNodeHeartbeatHandler handler) {
+ AsyncConfigNodeInternalServiceClient client = null;
+ boolean dispatched = false;
try {
-
clientManager.borrowClient(endPoint).getConfigNodeHeartBeat(heartbeatReq,
handler);
+ client = clientManager.borrowClient(endPoint);
+ client.getConfigNodeHeartBeat(heartbeatReq, handler);
+ dispatched = true;
} catch (Exception ignore) {
// Just ignore
+ } finally {
+ // After the async call is dispatched, the client's onComplete/onError
callback is
+ // responsible for returning the client. If the RPC was not dispatched
(exception
+ // before/during the call), the client must be returned here to prevent
pool leakage.
+ if (!dispatched && client != null && clientManager instanceof
ClientManager) {
+ ((ClientManager<TEndPoint, AsyncConfigNodeInternalServiceClient>)
clientManager)
+ .returnClient(endPoint, client);
+ }
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java
index f99a88ebc77..d31ec405e0c 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.confignode.client.async;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.ClientManager;
import org.apache.iotdb.commons.client.ClientPoolFactory;
import org.apache.iotdb.commons.client.IClientManager;
import
org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
@@ -49,19 +50,42 @@ public class AsyncDataNodeHeartbeatClientPool {
*/
public void getDataNodeHeartBeat(
TEndPoint endPoint, TDataNodeHeartbeatReq req, DataNodeHeartbeatHandler
handler) {
+ AsyncDataNodeInternalServiceClient client = null;
+ boolean dispatched = false;
try {
- clientManager.borrowClient(endPoint).getDataNodeHeartBeat(req, handler);
+ client = clientManager.borrowClient(endPoint);
+ client.getDataNodeHeartBeat(req, handler);
+ dispatched = true;
} catch (Exception ignore) {
// Just ignore
+ } finally {
+ returnClientIfNotDispatched(endPoint, client, dispatched);
}
}
public void writeAuditLog(
TEndPoint endPoint, TAuditLogReq req, DataNodeWriteAuditLogHandler
handler) {
+ AsyncDataNodeInternalServiceClient client = null;
+ boolean dispatched = false;
try {
- clientManager.borrowClient(endPoint).writeAuditLog(req, handler);
+ client = clientManager.borrowClient(endPoint);
+ client.writeAuditLog(req, handler);
+ dispatched = true;
} catch (Exception e) {
// Just ignore
+ } finally {
+ returnClientIfNotDispatched(endPoint, client, dispatched);
+ }
+ }
+
+ // After the async call is dispatched, the client's onComplete/onError
callback is responsible
+ // for returning the client. If the RPC was not dispatched (exception
before/during the call),
+ // the client must be returned here to prevent pool leakage.
+ private void returnClientIfNotDispatched(
+ TEndPoint endPoint, AsyncDataNodeInternalServiceClient client, boolean
dispatched) {
+ if (!dispatched && client != null && clientManager instanceof
ClientManager) {
+ ((ClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>)
clientManager)
+ .returnClient(endPoint, client);
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java
index f87b686855e..8414519307b 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java
@@ -138,6 +138,11 @@ public class SyncConfigNodeClientPool {
while (status.getCode() ==
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
TimeUnit.MILLISECONDS.sleep(2000);
updateConfigNodeLeader(status);
+ if (configNodeLeader == null) {
+ LOGGER.warn(
+ "Redirection recommended for removeConfigNode but no leader
endpoint provided, abort retry.");
+ break;
+ }
try (SyncConfigNodeIServiceClient clientLeader =
clientManager.borrowClient(configNodeLeader)) {
status = clientLeader.removeConfigNode(configNodeLocation);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java
index fa97fec57b0..1f147070acb 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.commons.client.request;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.ClientManager;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.i18n.ClientMessages;
@@ -178,6 +179,9 @@ public abstract class AsyncRequestManager<RequestType,
NodeLocation, Client> {
int requestId,
NodeLocation targetNode,
int retryCount) {
+ final TEndPoint endPoint = nodeLocationToEndPoint(targetNode);
+ Client client = null;
+ boolean dispatched = false;
try {
if (!actionMap.containsKey(requestContext.getRequestType())) {
throw new UnsupportedOperationException(
@@ -185,20 +189,28 @@ public abstract class AsyncRequestManager<RequestType,
NodeLocation, Client> {
+ requestContext.getRequestType()
+ ", please set it in
AsyncRequestManager::initActionMapBuilder()");
}
- Client client =
clientManager.borrowClient(nodeLocationToEndPoint(targetNode));
+ client = clientManager.borrowClient(endPoint);
adjustClientTimeoutIfNecessary(requestContext.getRequestType(), client);
Object req = requestContext.getRequest(requestId);
AsyncRequestRPCHandler<?, RequestType, NodeLocation> handler =
buildHandler(requestContext, requestId, targetNode);
Objects.requireNonNull(actionMap.get(requestContext.getRequestType()))
.accept(req, client, handler);
+ // After accept() returns, the async callback (onComplete/onError) takes
over the
+ // responsibility of returning the client to the pool. Before this
point, if any exception
+ // is thrown, the client must be returned/invalidated here to prevent
pool leakage.
+ dispatched = true;
} catch (Exception e) {
LOGGER.warn(
ClientMessages.ASYNC_REQUEST_FAILED_ON_NODE,
requestContext.getRequestType(),
- nodeLocationToEndPoint(targetNode),
+ endPoint,
e.getMessage(),
retryCount);
+ } finally {
+ if (!dispatched && client != null && clientManager instanceof
ClientManager) {
+ ((ClientManager<TEndPoint, Client>)
clientManager).returnClient(endPoint, client);
+ }
}
}