This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch ty/object_type in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 1b3b1a14d664d99cf9e7e6991e2e1a4e51ce80aa Author: Zhenyu Luo <[email protected]> AuthorDate: Tue Dec 2 11:29:47 2025 +0800 Pipe: Ignore logging when `returnSelf` is called in the event of an exception in `AsyncClient`. (#16827) (cherry picked from commit e3127e6c61f13f91c95c851d49253c4d668dff8e) --- .../handler/PipeTransferTrackableHandler.java | 15 +++++++------ .../async/handler/PipeTransferTsFileHandler.java | 15 +++++++------ .../apache/iotdb/commons/client/ClientManager.java | 25 ++++++++++++++++++++++ .../async/AsyncPipeDataTransferServiceClient.java | 14 +++++++++++- 4 files changed, 56 insertions(+), 13 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java index d0d6d054299..8a552f0cbee 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java @@ -97,12 +97,15 @@ public abstract class PipeTransferTrackableHandler clearEventsReferenceCount(); connector.eliminateHandler(this, true); client.setShouldReturnSelf(true); - try { - client.returnSelf(); - } catch (final IllegalStateException e) { - LOGGER.info( - "Illegal state when return the client to object pool, maybe the pool is already cleared. Will ignore."); - } + client.returnSelf( + (e) -> { + if (e instanceof IllegalStateException) { + LOGGER.info( + "Illegal state when return the client to object pool, maybe the pool is already cleared. Will ignore."); + return true; + } + return false; + }); this.client = null; return false; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java index 3cdb2abda15..3ef46034554 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java @@ -431,12 +431,15 @@ public class PipeTransferTsFileHandler extends PipeTransferTrackableHandler { } client.setShouldReturnSelf(true); - try { - client.returnSelf(); - } catch (final IllegalStateException e) { - LOGGER.info( - "Illegal state when return the client to object pool, maybe the pool is already cleared. Will ignore."); - } + client.returnSelf( + (e) -> { + if (e instanceof IllegalStateException) { + LOGGER.info( + "Illegal state when return the client to object pool, maybe the pool is already cleared. Will ignore."); + return true; + } + return false; + }); client = null; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java index 0915e69802e..41d779c4398 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java @@ -29,6 +29,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Optional; +import java.util.function.Function; public class ClientManager<K, V> implements IClientManager<K, V> { @@ -79,6 +80,30 @@ public class ClientManager<K, V> implements IClientManager<K, V> { } } + /** + * return a client V for node K to the {@link ClientManager}, and ignore some exception + * + * <p>Note: We do not define this interface in {@link IClientManager} to make you aware that the + * return of a client is automatic whenever a particular client is used. + */ + public void returnClient(K node, V client, Function<Exception, Boolean> ignoreError) { + if (node != null) { + try { + pool.returnObject(node, client); + } catch (Exception e) { + if (!Boolean.TRUE.equals(ignoreError.apply(e))) { + LOGGER.warn("Return client {} for node {} to pool failed.", client, node, e); + } + } + } else if (client instanceof ThriftClient) { + ((ThriftClient) client).invalidateAll(); + LOGGER.warn( + "Return client {} to pool failed because the node is null. " + + "This may cause resource leak, please check your code.", + client); + } + } + @Override public void clear(K node) { Optional.ofNullable(node) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java index f662b697ebf..e5a89215817 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java @@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; public class AsyncPipeDataTransferServiceClient extends IClientRPCService.AsyncClient implements ThriftClient { @@ -84,7 +85,8 @@ public class AsyncPipeDataTransferServiceClient extends IClientRPCService.AsyncC public void onError(final Exception e) { super.onError(e); ThriftClient.resolveException(e, this); - returnSelf(); + returnSelf( + (i) -> i instanceof IllegalStateException && "Client has an error!".equals(i.getMessage())); } @Override @@ -114,6 +116,16 @@ public class AsyncPipeDataTransferServiceClient extends IClientRPCService.AsyncC } } + /** + * return self, the method doesn't need to be called by the user and will be triggered after the + * RPC is finished. + */ + public void returnSelf(Function<Exception, Boolean> ignoreError) { + if (shouldReturnSelf.get()) { + clientManager.returnClient(endpoint, this, ignoreError); + } + } + public void setShouldReturnSelf(final boolean shouldReturnSelf) { this.shouldReturnSelf.set(shouldReturnSelf); }
