This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 8140adff9f0 Pipe: Fix "waitHandshakeFinished" may wait forever when
asyncClientManager is closed (#14373) (#14435)
8140adff9f0 is described below
commit 8140adff9f0cd0bb72e3fd5fd04f299d70f2bea6
Author: Caideyipi <[email protected]>
AuthorDate: Mon Dec 16 12:09:20 2024 +0800
Pipe: Fix "waitHandshakeFinished" may wait forever when asyncClientManager
is closed (#14373) (#14435)
---
.../client/IoTDBDataNodeAsyncClientManager.java | 45 ++++++++++++++--------
.../async/AsyncPipeDataTransferServiceClient.java | 28 +++++++-------
2 files changed, 43 insertions(+), 30 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
index 1bbf5273187..aff8926d0cb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
@@ -73,16 +73,18 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
private final LoadBalancer loadBalancer;
+ private volatile boolean isClosed = false;
+
public IoTDBDataNodeAsyncClientManager(
- List<TEndPoint> endPoints,
+ final List<TEndPoint> endPoints,
/* The following parameters are used locally. */
- boolean useLeaderCache,
- String loadBalanceStrategy,
+ final boolean useLeaderCache,
+ final String loadBalanceStrategy,
/* The following parameters are used to handshake with the receiver. */
- String username,
- String password,
- boolean shouldReceiverConvertOnTypeMismatch,
- String loadTsFileStrategy) {
+ final String username,
+ final String password,
+ final boolean shouldReceiverConvertOnTypeMismatch,
+ final String loadTsFileStrategy) {
super(
endPoints,
username,
@@ -135,7 +137,7 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
return loadBalancer.borrowClient();
}
- public AsyncPipeDataTransferServiceClient borrowClient(String deviceId)
throws Exception {
+ public AsyncPipeDataTransferServiceClient borrowClient(final String
deviceId) throws Exception {
if (!useLeaderCache || Objects.isNull(deviceId)) {
return borrowClient();
}
@@ -143,7 +145,8 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
return borrowClient(LEADER_CACHE_MANAGER.getLeaderEndPoint(deviceId));
}
- public AsyncPipeDataTransferServiceClient borrowClient(TEndPoint endPoint)
throws Exception {
+ public AsyncPipeDataTransferServiceClient borrowClient(final TEndPoint
endPoint)
+ throws Exception {
if (!useLeaderCache || Objects.isNull(endPoint)) {
return borrowClient();
}
@@ -153,7 +156,7 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
if (handshakeIfNecessary(endPoint, client)) {
return client;
}
- } catch (Exception e) {
+ } catch (final Exception e) {
LOGGER.warn(
"failed to borrow client {}:{} for cached leader.",
endPoint.getIp(),
@@ -173,7 +176,8 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
* @throws Exception if an error occurs.
*/
private boolean handshakeIfNecessary(
- TEndPoint targetNodeUrl, AsyncPipeDataTransferServiceClient client)
throws Exception {
+ final TEndPoint targetNodeUrl, final AsyncPipeDataTransferServiceClient
client)
+ throws Exception {
if (client.isHandshakeFinished()) {
return true;
}
@@ -185,7 +189,7 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
final AsyncMethodCallback<TPipeTransferResp> callback =
new AsyncMethodCallback<TPipeTransferResp>() {
@Override
- public void onComplete(TPipeTransferResp response) {
+ public void onComplete(final TPipeTransferResp response) {
resp.set(response);
if (response.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
@@ -215,7 +219,7 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
}
@Override
- public void onError(Exception e) {
+ public void onError(final Exception e) {
LOGGER.warn(
"Handshake error with receiver {}:{}.",
targetNodeUrl.getIp(),
@@ -281,18 +285,24 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
return false;
}
- private void waitHandshakeFinished(AtomicBoolean isHandshakeFinished) {
+ private void waitHandshakeFinished(final AtomicBoolean isHandshakeFinished) {
try {
+ final long startTime = System.currentTimeMillis();
while (!isHandshakeFinished.get()) {
+ if (isClosed
+ || System.currentTimeMillis() - startTime
+ >
PipeConfig.getInstance().getPipeConnectorHandshakeTimeoutMs() * 2L) {
+ throw new PipeConnectionException("Timed out when waiting for client
handshake finish.");
+ }
Thread.sleep(10);
}
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
Thread.currentThread().interrupt();
throw new PipeException("Interrupted while waiting for handshake
response.", e);
}
}
- public void updateLeaderCache(String deviceId, TEndPoint endPoint) {
+ public void updateLeaderCache(final String deviceId, final TEndPoint
endPoint) {
if (!useLeaderCache || deviceId == null || endPoint == null) {
return;
}
@@ -306,6 +316,7 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
}
public void close() {
+ isClosed = true;
synchronized (IoTDBDataNodeAsyncClientManager.class) {
RECEIVER_ATTRIBUTES_REF_COUNT.computeIfPresent(
receiverAttributes,
@@ -316,7 +327,7 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
if (clientManager != null) {
try {
clientManager.close();
- } catch (Exception e) {
+ } catch (final Exception e) {
LOGGER.warn("Failed to close client manager.", e);
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
index 91fa97fc82c..f64c3f8d95d 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
@@ -58,10 +58,10 @@ public class AsyncPipeDataTransferServiceClient extends
IClientRPCService.AsyncC
private final AtomicBoolean isHandshakeFinished = new AtomicBoolean(false);
public AsyncPipeDataTransferServiceClient(
- ThriftClientProperty property,
- TEndPoint endpoint,
- TAsyncClientManager tClientManager,
- ClientManager<TEndPoint, AsyncPipeDataTransferServiceClient>
clientManager)
+ final ThriftClientProperty property,
+ final TEndPoint endpoint,
+ final TAsyncClientManager tClientManager,
+ final ClientManager<TEndPoint, AsyncPipeDataTransferServiceClient>
clientManager)
throws IOException {
super(
property.getProtocolFactory(),
@@ -81,7 +81,7 @@ public class AsyncPipeDataTransferServiceClient extends
IClientRPCService.AsyncC
}
@Override
- public void onError(Exception e) {
+ public void onError(final Exception e) {
super.onError(e);
ThriftClient.resolveException(e, this);
returnSelf();
@@ -114,11 +114,11 @@ public class AsyncPipeDataTransferServiceClient extends
IClientRPCService.AsyncC
}
}
- public void setShouldReturnSelf(boolean shouldReturnSelf) {
+ public void setShouldReturnSelf(final boolean shouldReturnSelf) {
this.shouldReturnSelf.set(shouldReturnSelf);
}
- public void setTimeoutDynamically(int timeout) {
+ public void setTimeoutDynamically(final int timeout) {
try {
((TNonblockingSocket) ___transport).setTimeout(timeout);
} catch (Exception e) {
@@ -178,20 +178,21 @@ public class AsyncPipeDataTransferServiceClient extends
IClientRPCService.AsyncC
extends AsyncThriftClientFactory<TEndPoint,
AsyncPipeDataTransferServiceClient> {
public Factory(
- ClientManager<TEndPoint, AsyncPipeDataTransferServiceClient>
clientManager,
- ThriftClientProperty thriftClientProperty,
- String threadName) {
+ final ClientManager<TEndPoint, AsyncPipeDataTransferServiceClient>
clientManager,
+ final ThriftClientProperty thriftClientProperty,
+ final String threadName) {
super(clientManager, thriftClientProperty, threadName);
}
@Override
public void destroyObject(
- TEndPoint endPoint, PooledObject<AsyncPipeDataTransferServiceClient>
pooledObject) {
+ final TEndPoint endPoint,
+ final PooledObject<AsyncPipeDataTransferServiceClient> pooledObject) {
pooledObject.getObject().close();
}
@Override
- public PooledObject<AsyncPipeDataTransferServiceClient>
makeObject(TEndPoint endPoint)
+ public PooledObject<AsyncPipeDataTransferServiceClient> makeObject(final
TEndPoint endPoint)
throws Exception {
return new DefaultPooledObject<>(
new AsyncPipeDataTransferServiceClient(
@@ -203,7 +204,8 @@ public class AsyncPipeDataTransferServiceClient extends
IClientRPCService.AsyncC
@Override
public boolean validateObject(
- TEndPoint endPoint, PooledObject<AsyncPipeDataTransferServiceClient>
pooledObject) {
+ final TEndPoint endPoint,
+ final PooledObject<AsyncPipeDataTransferServiceClient> pooledObject) {
return pooledObject.getObject().isReady();
}
}