This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 48f6d7680ef KAFKA-19898: Close ConsumerNetworkThread on failed start
(#20930)
48f6d7680ef is described below
commit 48f6d7680ef0a1dd075d34c71b8ad9de87719697
Author: Andrew Schofield <[email protected]>
AuthorDate: Wed Nov 19 23:16:38 2025 +0000
KAFKA-19898: Close ConsumerNetworkThread on failed start (#20930)
Client unit tests are failing because of OOM. If the
`ConsumerNetworkThread` instantiated in the constructor for
`ApplicationEventHandler` fails to start in a timely fashion, it can be
leaked.
Reviewers: Lianet Magrans <[email protected]>, Kirk True
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
.../internals/events/ApplicationEventHandler.java | 15 +++++++++++++--
1 file changed, 13 insertions(+), 2 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java
index 645b121483f..5e6a25ed31a 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java
@@ -62,7 +62,7 @@ public class ApplicationEventHandler implements Closeable {
this.time = time;
this.applicationEventQueue = applicationEventQueue;
this.asyncConsumerMetrics = asyncConsumerMetrics;
- this.networkThread = new ConsumerNetworkThread(logContext,
+ ConsumerNetworkThread networkThread = new
ConsumerNetworkThread(logContext,
time,
applicationEventQueue,
applicationEventReaper,
@@ -71,7 +71,18 @@ public class ApplicationEventHandler implements Closeable {
requestManagersSupplier,
asyncConsumerMetrics);
- this.networkThread.start(initializationTimeoutMs);
+ try {
+ networkThread.start(initializationTimeoutMs);
+ } catch (Exception e) {
+ try {
+ networkThread.close();
+ } finally {
+ networkThread = null;
+ }
+ throw e;
+ } finally {
+ this.networkThread = networkThread;
+ }
}
/**