This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new e3127e6c61f Pipe: Ignore logging when `returnSelf` is called in the
event of an exception in `AsyncClient`. (#16827)
e3127e6c61f is described below
commit e3127e6c61f13f91c95c851d49253c4d668dff8e
Author: Zhenyu Luo <[email protected]>
AuthorDate: Tue Dec 2 11:29:47 2025 +0800
Pipe: Ignore logging when `returnSelf` is called in the event of an
exception in `AsyncClient`. (#16827)
---
.../handler/PipeTransferTrackableHandler.java | 15 +++++++------
.../async/handler/PipeTransferTsFileHandler.java | 15 +++++++------
.../apache/iotdb/commons/client/ClientManager.java | 25 ++++++++++++++++++++++
.../async/AsyncPipeDataTransferServiceClient.java | 14 +++++++++++-
4 files changed, 56 insertions(+), 13 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
index d0d6d054299..8a552f0cbee 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
@@ -97,12 +97,15 @@ public abstract class PipeTransferTrackableHandler
clearEventsReferenceCount();
connector.eliminateHandler(this, true);
client.setShouldReturnSelf(true);
- try {
- client.returnSelf();
- } catch (final IllegalStateException e) {
- LOGGER.info(
- "Illegal state when return the client to object pool, maybe the
pool is already cleared. Will ignore.");
- }
+ client.returnSelf(
+ (e) -> {
+ if (e instanceof IllegalStateException) {
+ LOGGER.info(
+ "Illegal state when return the client to object pool, maybe
the pool is already cleared. Will ignore.");
+ return true;
+ }
+ return false;
+ });
this.client = null;
return false;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
index 3cdb2abda15..3ef46034554 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
@@ -431,12 +431,15 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
}
client.setShouldReturnSelf(true);
- try {
- client.returnSelf();
- } catch (final IllegalStateException e) {
- LOGGER.info(
- "Illegal state when return the client to object pool, maybe the pool
is already cleared. Will ignore.");
- }
+ client.returnSelf(
+ (e) -> {
+ if (e instanceof IllegalStateException) {
+ LOGGER.info(
+ "Illegal state when return the client to object pool, maybe
the pool is already cleared. Will ignore.");
+ return true;
+ }
+ return false;
+ });
client = null;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
index 0915e69802e..41d779c4398 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
@@ -29,6 +29,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Optional;
+import java.util.function.Function;
public class ClientManager<K, V> implements IClientManager<K, V> {
@@ -79,6 +80,30 @@ public class ClientManager<K, V> implements
IClientManager<K, V> {
}
}
+ /**
+ * return a client V for node K to the {@link ClientManager}, and ignore
some exception
+ *
+ * <p>Note: We do not define this interface in {@link IClientManager} to
make you aware that the
+ * return of a client is automatic whenever a particular client is used.
+ */
+ public void returnClient(K node, V client, Function<Exception, Boolean>
ignoreError) {
+ if (node != null) {
+ try {
+ pool.returnObject(node, client);
+ } catch (Exception e) {
+ if (!Boolean.TRUE.equals(ignoreError.apply(e))) {
+ LOGGER.warn("Return client {} for node {} to pool failed.", client,
node, e);
+ }
+ }
+ } else if (client instanceof ThriftClient) {
+ ((ThriftClient) client).invalidateAll();
+ LOGGER.warn(
+ "Return client {} to pool failed because the node is null. "
+ + "This may cause resource leak, please check your code.",
+ client);
+ }
+ }
+
@Override
public void clear(K node) {
Optional.ofNullable(node)
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
index f662b697ebf..e5a89215817 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
@@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
public class AsyncPipeDataTransferServiceClient extends
IClientRPCService.AsyncClient
implements ThriftClient {
@@ -84,7 +85,8 @@ public class AsyncPipeDataTransferServiceClient extends
IClientRPCService.AsyncC
public void onError(final Exception e) {
super.onError(e);
ThriftClient.resolveException(e, this);
- returnSelf();
+ returnSelf(
+ (i) -> i instanceof IllegalStateException && "Client has an
error!".equals(i.getMessage()));
}
@Override
@@ -114,6 +116,16 @@ public class AsyncPipeDataTransferServiceClient extends
IClientRPCService.AsyncC
}
}
+ /**
+ * return self, the method doesn't need to be called by the user and will be
triggered after the
+ * RPC is finished.
+ */
+ public void returnSelf(Function<Exception, Boolean> ignoreError) {
+ if (shouldReturnSelf.get()) {
+ clientManager.returnClient(endpoint, this, ignoreError);
+ }
+ }
+
public void setShouldReturnSelf(final boolean shouldReturnSelf) {
this.shouldReturnSelf.set(shouldReturnSelf);
}