This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch check-shit in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 87acc743324875fbd5b052e6c2bd50c1c4e48a5a Author: Steve Yurong Su <[email protected]> AuthorDate: Thu Jul 10 10:00:59 2025 +0800 c --- .../client/IoTDBDataNodeAsyncClientManager.java | 2 ++ .../async/AsyncPipeDataTransferServiceClient.java | 18 ++++++++++++++++++ 2 files changed, 20 insertions(+) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java index 15a044abae4..520772a9fdd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java @@ -347,7 +347,9 @@ public class IoTDBDataNodeAsyncClientManager extends IoTDBClientManager ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.remove(receiverAttributes); if (clientManager != null) { try { + LOGGER.warn("AsyncPipeDataTransferServiceClient closed. 1", new Exception()); clientManager.close(); + LOGGER.warn("AsyncPipeDataTransferServiceClient closed. 2", new Exception()); } catch (final Exception e) { LOGGER.warn("Failed to close client manager.", e); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java index 06d9ecbe12f..fa11bd2c27d 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java @@ -72,12 +72,14 @@ public class AsyncPipeDataTransferServiceClient extends IClientRPCService.AsyncC this.printLogWhenEncounterException = property.isPrintLogWhenEncounterException(); this.endpoint = endpoint; this.clientManager = clientManager; + LOGGER.warn("AsyncPipeDataTransferServiceClient id = {}, constructor", id); } @Override public void onComplete() { super.onComplete(); returnSelf(); + LOGGER.warn("AsyncPipeDataTransferServiceClient id = {}, onComplete", id); } @Override @@ -85,18 +87,22 @@ public class AsyncPipeDataTransferServiceClient extends IClientRPCService.AsyncC super.onError(e); ThriftClient.resolveException(e, this); returnSelf(); + LOGGER.warn("AsyncPipeDataTransferServiceClient id = {}, onError", id, e); } @Override public void invalidate() { if (!hasError()) { super.onError(new Exception(String.format("This client %d has been invalidated", id))); + LOGGER.warn("AsyncPipeDataTransferServiceClient id = {}, invalidate 1", id); } + LOGGER.warn("AsyncPipeDataTransferServiceClient id = {}, invalidate 2", id, new Exception()); } @Override public void invalidateAll() { clientManager.clear(endpoint); + LOGGER.warn("AsyncPipeDataTransferServiceClient id = {}, invalidateAll", id, new Exception()); } @Override @@ -111,7 +117,9 @@ public class AsyncPipeDataTransferServiceClient extends IClientRPCService.AsyncC public void returnSelf() { if (shouldReturnSelf.get()) { clientManager.returnClient(endpoint, this); + LOGGER.warn("AsyncPipeDataTransferServiceClient id = {}, returnSelf 1", id); } + LOGGER.warn("AsyncPipeDataTransferServiceClient id = {}, returnSelf 2", id); } public void setShouldReturnSelf(final boolean shouldReturnSelf) { @@ -125,11 +133,13 @@ public class AsyncPipeDataTransferServiceClient extends IClientRPCService.AsyncC setTimeout(timeout); LOGGER.error("Failed to set timeout dynamically, set it statically", e); } + LOGGER.warn("AsyncPipeDataTransferServiceClient id = {}, timeout dynamically", id); } private void close() { ___transport.close(); ___currentMethod = null; + LOGGER.warn("AsyncPipeDataTransferServiceClient id = {}, closed", id); } private boolean isReady() { @@ -165,10 +175,13 @@ public class AsyncPipeDataTransferServiceClient extends IClientRPCService.AsyncC if (___transport != null && ___transport.isOpen()) { ___transport.close(); LOGGER.warn("Manually closing transport to prevent resource leakage."); + LOGGER.warn("AsyncPipeDataTransferServiceClient id = {}, resetMethodState 1", id); } ___currentMethod = null; LOGGER.info("Method state has been reset due to manager not running."); + LOGGER.warn("AsyncPipeDataTransferServiceClient id = {}, resetMethodState 2", id); } + LOGGER.warn("AsyncPipeDataTransferServiceClient id = {}, resetMethodState 3", id); } public String getIp() { @@ -203,11 +216,14 @@ public class AsyncPipeDataTransferServiceClient extends IClientRPCService.AsyncC final TEndPoint endPoint, final PooledObject<AsyncPipeDataTransferServiceClient> pooledObject) { pooledObject.getObject().close(); + LOGGER.warn( + "AsyncPipeDataTransferServiceClient#Factory endpoint = {}, destroyObject", endPoint); } @Override public PooledObject<AsyncPipeDataTransferServiceClient> makeObject(final TEndPoint endPoint) throws Exception { + LOGGER.warn("AsyncPipeDataTransferServiceClient#Factory endpoint = {}, makeObject", endPoint); return new DefaultPooledObject<>( new AsyncPipeDataTransferServiceClient( thriftClientProperty, @@ -220,6 +236,8 @@ public class AsyncPipeDataTransferServiceClient extends IClientRPCService.AsyncC public boolean validateObject( final TEndPoint endPoint, final PooledObject<AsyncPipeDataTransferServiceClient> pooledObject) { + LOGGER.warn( + "AsyncPipeDataTransferServiceClient#Factory endpoint = {}, validateObject", endPoint); return pooledObject.getObject().isReady(); } }
