This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch object_type in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 7d1c11742108e77bb2e6a488306c46639807e602 Author: Steve Yurong Su <[email protected]> AuthorDate: Fri Jul 11 15:10:14 2025 +0800 Pipe: Fix connection leak caused by clients not closed after task dropped (2 situations) (#15910) (cherry picked from commit bf8329b15179bead9bb50c731274fae3dad816e9) --- .../protocol/IoTDBConfigRegionConnector.java | 8 ++++---- .../subtask/connector/PipeConnectorSubtask.java | 5 +++++ .../client/IoTDBDataNodeAsyncClientManager.java | 20 +++++++++++++++++++- .../async/IoTDBDataRegionAsyncConnector.java | 4 +++- .../handler/PipeTransferTrackableHandler.java | 2 ++ .../async/handler/PipeTransferTsFileHandler.java | 22 ++++++++++++++++++---- .../apache/iotdb/commons/client/ClientManager.java | 22 +++++++++++++--------- .../async/AsyncPipeDataTransferServiceClient.java | 2 +- .../connector/protocol/IoTDBSslSyncConnector.java | 14 +++++++++++--- 9 files changed, 76 insertions(+), 23 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java index c3a1c0dd522..351887d2cfc 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java @@ -149,7 +149,7 @@ public class IoTDBConfigRegionConnector extends IoTDBSslSyncConnector { private void doTransfer(final PipeConfigRegionWritePlanEvent pipeConfigRegionWritePlanEvent) throws PipeException { - final Pair<IoTDBSyncClient, Boolean> clientAndStatus = clientManager.getClient(); + final Pair<IoTDBSyncClient, Boolean> clientAndStatus = getClientManager().getClient(); final TPipeTransferResp resp; try { @@ -175,7 +175,7 @@ public class IoTDBConfigRegionConnector extends IoTDBSslSyncConnector { final TSStatus status = resp.getStatus(); // Send handshake req and then re-transfer the event if (status.getCode() == TSStatusCode.PIPE_CONFIG_RECEIVER_HANDSHAKE_NEEDED.getStatusCode()) { - clientManager.sendHandshakeReq(clientAndStatus); + getClientManager().sendHandshakeReq(clientAndStatus); } // Only handle the failed statuses to avoid string format performance overhead if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() @@ -214,7 +214,7 @@ public class IoTDBConfigRegionConnector extends IoTDBSslSyncConnector { final long creationTime = snapshotEvent.getCreationTime(); final File snapshotFile = snapshotEvent.getSnapshotFile(); final File templateFile = snapshotEvent.getTemplateFile(); - final Pair<IoTDBSyncClient, Boolean> clientAndStatus = clientManager.getClient(); + final Pair<IoTDBSyncClient, Boolean> clientAndStatus = getClientManager().getClient(); // 1. Transfer snapshotFile, and template File if exists transferFilePieces( @@ -265,7 +265,7 @@ public class IoTDBConfigRegionConnector extends IoTDBSslSyncConnector { final TSStatus status = resp.getStatus(); // Send handshake req and then re-transfer the event if (status.getCode() == TSStatusCode.PIPE_CONFIG_RECEIVER_HANDSHAKE_NEEDED.getStatusCode()) { - clientManager.sendHandshakeReq(clientAndStatus); + getClientManager().sendHandshakeReq(clientAndStatus); } // Only handle the failed statuses to avoid string format performance overhead if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java index 5d8d622a74a..2415ea6e36e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java @@ -164,6 +164,11 @@ public class PipeConnectorSubtask extends PipeAbstractConnectorSubtask { } private void transferHeartbeatEvent(final PipeHeartbeatEvent event) { + // DO NOT call heartbeat or transfer after closed, or will cause connection leak + if (isClosed.get()) { + return; + } + try { outputPipeConnector.heartbeat(); outputPipeConnector.transfer(event); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java index 15a044abae4..a2f75bf110a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java @@ -299,6 +299,18 @@ public class IoTDBDataNodeAsyncClientManager extends IoTDBClientManager client.resetMethodStateIfStopped(); throw e; } finally { + if (isClosed) { + try { + client.close(); + client.invalidateAll(); + } catch (final Exception e) { + LOGGER.warn( + "Failed to close client {}:{} after handshake failure when the manager is closed.", + targetNodeUrl.getIp(), + targetNodeUrl.getPort(), + e); + } + } client.setShouldReturnSelf(true); client.returnSelf(); } @@ -348,8 +360,14 @@ public class IoTDBDataNodeAsyncClientManager extends IoTDBClientManager if (clientManager != null) { try { clientManager.close(); + LOGGER.info( + "Closed AsyncPipeDataTransferServiceClientManager for receiver attributes: {}", + receiverAttributes); } catch (final Exception e) { - LOGGER.warn("Failed to close client manager.", e); + LOGGER.warn( + "Failed to close AsyncPipeDataTransferServiceClientManager for receiver attributes: {}", + receiverAttributes, + e); } } return null; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java index af10360a036..176bcd94cfb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java @@ -198,7 +198,9 @@ public class IoTDBDataRegionAsyncConnector extends IoTDBConnector { @Override public void heartbeat() throws Exception { - syncConnector.heartbeat(); + if (!isClosed()) { + syncConnector.heartbeat(); + } } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTrackableHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTrackableHandler.java index 4908adba8c6..e78cd9adbf3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTrackableHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTrackableHandler.java @@ -82,6 +82,8 @@ public abstract class PipeTransferTrackableHandler if (connector.isClosed()) { clearEventsReferenceCount(); connector.eliminateHandler(this); + client.setShouldReturnSelf(true); + client.returnSelf(); return false; } doTransfer(client, req); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java index d43a9799116..46ff84276b0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java @@ -411,11 +411,25 @@ public class PipeTransferTsFileHandler extends PipeTransferTrackableHandler { } private void returnClientIfNecessary() { - if (client != null) { - client.setShouldReturnSelf(true); - client.returnSelf(); - client = null; + if (client == null) { + return; + } + + if (connector.isClosed()) { + try { + client.close(); + client.invalidateAll(); + } catch (final Exception e) { + LOGGER.warn( + "Failed to close or invalidate client when connector is closed. Client: {}, Exception: {}", + client, + e.getMessage(), + e); + } } + client.setShouldReturnSelf(true); + client.returnSelf(); + client = null; } @Override diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java index 79fcc799ae9..0915e69802e 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java @@ -64,15 +64,19 @@ public class ClientManager<K, V> implements IClientManager<K, V> { * return of a client is automatic whenever a particular client is used. */ public void returnClient(K node, V client) { - Optional.ofNullable(node) - .ifPresent( - x -> { - try { - pool.returnObject(node, client); - } catch (Exception e) { - LOGGER.warn("Return client {} for node {} to pool failed.", client, node, e); - } - }); + if (node != null) { + try { + pool.returnObject(node, client); + } catch (Exception e) { + LOGGER.warn("Return client {} for node {} to pool failed.", client, node, e); + } + } else if (client instanceof ThriftClient) { + ((ThriftClient) client).invalidateAll(); + LOGGER.warn( + "Return client {} to pool failed because the node is null. " + + "This may cause resource leak, please check your code.", + client); + } } @Override diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java index 06d9ecbe12f..f8b2bfb08c4 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java @@ -127,7 +127,7 @@ public class AsyncPipeDataTransferServiceClient extends IClientRPCService.AsyncC } } - private void close() { + public void close() { ___transport.close(); ___currentMethod = null; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java index eeb32ee5810..44e91874641 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java @@ -66,7 +66,14 @@ public abstract class IoTDBSslSyncConnector extends IoTDBConnector { private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBSslSyncConnector.class); - protected IoTDBSyncClientManager clientManager; + private volatile IoTDBSyncClientManager clientManager; + + protected IoTDBSyncClientManager getClientManager() { + if (clientManager == null) { + throw new IllegalStateException("IoTDB sync client manager has been closed"); + } + return clientManager; + } @Override public void validate(final PipeParameterValidator validator) throws Exception { @@ -153,7 +160,7 @@ public abstract class IoTDBSslSyncConnector extends IoTDBConnector { @Override public void handshake() throws Exception { - clientManager.checkClientStatusAndTryReconstructIfNecessary(); + getClientManager().checkClientStatusAndTryReconstructIfNecessary(); } @Override @@ -229,7 +236,7 @@ public abstract class IoTDBSslSyncConnector extends IoTDBConnector { // Send handshake req and then re-transfer the event if (status.getCode() == TSStatusCode.PIPE_CONFIG_RECEIVER_HANDSHAKE_NEEDED.getStatusCode()) { - clientManager.sendHandshakeReq(clientAndStatus); + getClientManager().sendHandshakeReq(clientAndStatus); } // Only handle the failed statuses to avoid string format performance overhead if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() @@ -255,6 +262,7 @@ public abstract class IoTDBSslSyncConnector extends IoTDBConnector { public void close() { if (clientManager != null) { clientManager.close(); + clientManager = null; } super.close();
