This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 48f6d7680ef KAFKA-19898: Close ConsumerNetworkThread on failed start 
(#20930)
48f6d7680ef is described below

commit 48f6d7680ef0a1dd075d34c71b8ad9de87719697
Author: Andrew Schofield <[email protected]>
AuthorDate: Wed Nov 19 23:16:38 2025 +0000

    KAFKA-19898: Close ConsumerNetworkThread on failed start (#20930)
    
    Client unit tests are failing because of OOM. If the
    `ConsumerNetworkThread` instantiated in the constructor for
    `ApplicationEventHandler` fails to start in a timely fashion, it can be
    leaked.
    
    Reviewers: Lianet Magrans <[email protected]>, Kirk True
     <[email protected]>, Chia-Ping Tsai <[email protected]>
---
 .../internals/events/ApplicationEventHandler.java         | 15 +++++++++++++--
 1 file changed, 13 insertions(+), 2 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java
index 645b121483f..5e6a25ed31a 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java
@@ -62,7 +62,7 @@ public class ApplicationEventHandler implements Closeable {
         this.time = time;
         this.applicationEventQueue = applicationEventQueue;
         this.asyncConsumerMetrics = asyncConsumerMetrics;
-        this.networkThread = new ConsumerNetworkThread(logContext,
+        ConsumerNetworkThread networkThread = new 
ConsumerNetworkThread(logContext,
                 time,
                 applicationEventQueue,
                 applicationEventReaper,
@@ -71,7 +71,18 @@ public class ApplicationEventHandler implements Closeable {
                 requestManagersSupplier,
                 asyncConsumerMetrics);
 
-        this.networkThread.start(initializationTimeoutMs);
+        try {
+            networkThread.start(initializationTimeoutMs);
+        } catch (Exception e) {
+            try {
+                networkThread.close();
+            } finally {
+                networkThread = null;
+            }
+            throw e;
+        } finally {
+            this.networkThread = networkThread;
+        }
     }
 
     /**

Reply via email to