This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e8837465a5f KAFKA-18067: Kafka Streams can leak Producer client under 
EOS (#17931)
e8837465a5f is described below

commit e8837465a5fc478f1c79d1ad475b43e00a39a5d7
Author: TengYao Chi <[email protected]>
AuthorDate: Tue Dec 10 08:12:05 2024 +0800

    KAFKA-18067: Kafka Streams can leak Producer client under EOS (#17931)
    
    To avoid leaking producers, we should add a 'closedflag toStreamProducer` 
indicating whether we should reset prouder.
    
    Reviewers: Guozhang Wang <[email protected]>, Anna Sophie 
Blee-Goldman <[email protected]>
---
 .../streams/processor/internals/ActiveTaskCreator.java     |  3 ++-
 .../kafka/streams/processor/internals/StreamsProducer.java |  6 ++++++
 .../streams/processor/internals/ActiveTaskCreatorTest.java | 14 ++++++++++++++
 3 files changed, 22 insertions(+), 1 deletion(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
index 6c973e096fc..1dffc4ebbd3 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
@@ -119,7 +119,8 @@ class ActiveTaskCreator {
     }
 
     public void reInitializeProducer() {
-        streamsProducer.resetProducer(producer());
+        if (!streamsProducer.isClosed())
+            streamsProducer.resetProducer(producer());
     }
 
     StreamsProducer streamsProducer() {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
index 1048b5a2ecf..546186b2dbd 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
@@ -70,6 +70,7 @@ public class StreamsProducer {
     private Producer<byte[], byte[]> producer;
     private boolean transactionInFlight = false;
     private boolean transactionInitialized = false;
+    private boolean closed = false;
     private double oldProducerTotalBlockedTime = 0;
     // we have a single `StreamsProducer` per thread, and thus a single 
`sendException` instance,
     // which we share across all tasks, ie, all `RecordCollectorImpl`
@@ -98,6 +99,10 @@ public class StreamsProducer {
         return transactionInFlight;
     }
 
+    boolean isClosed() {
+        return closed;
+    }
+
     /**
      * @throws IllegalStateException if EOS is disabled
      */
@@ -320,6 +325,7 @@ public class StreamsProducer {
 
     void close() {
         producer.close();
+        closed = true;
         transactionInFlight = false;
         transactionInitialized = false;
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
index 6a4339a3ed7..73da4b7cc05 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
@@ -190,9 +190,23 @@ public class ActiveTaskCreatorTest {
 
         activeTaskCreator.close();
 
+        assertThat(activeTaskCreator.streamsProducer().isClosed(), is(true));
         assertThat(mockClientSupplier.producers.get(0).closed(), is(true));
     }
 
+    @Test
+    public void shouldNotReInitializeProducerOnClose() {
+        properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
StreamsConfig.EXACTLY_ONCE_V2);
+        mockClientSupplier.setApplicationIdForProducer("appId");
+        createTasks();
+
+        activeTaskCreator.streamsProducer().close();
+        activeTaskCreator.reInitializeProducer();
+        // If streamsProducer is not closed, clientSupplier will recreate a 
producer,
+        // resulting in more than one producer being created.
+        assertThat(mockClientSupplier.producers.size(), is(1));
+    }
+
     // error handling
 
     @Test

Reply via email to