This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ff94c44e707 HOTFIX: Revert "KAFKA-18067: Kafka Streams can leak 
Producer client under EOS (#17931)" (#19078)
ff94c44e707 is described below

commit ff94c44e7077d3de98fd9467aae898de80d91d88
Author: Bruno Cadonna <[email protected]>
AuthorDate: Mon Mar 3 11:58:56 2025 +0100

    HOTFIX: Revert "KAFKA-18067: Kafka Streams can leak Producer client under 
EOS (#17931)" (#19078)
    
    This reverts commit e8837465a5fc478f1c79d1ad475b43e00a39a5d7.
    
    The commit that is reverted prevents Kafka Streams from re-initializing
    its transactional producer. If an exception that fences the
    transactional producer occurs, the producer is not re-initialized during
    the handling of the exception. That causes an infinite loop of
    ProducerFencedExceptions with corresponding rebalances.
    
    Reviewers: Lucas Brutschy <[email protected]>, David Jacot
    <[email protected]>
---
 .../streams/processor/internals/ActiveTaskCreator.java     |  3 +--
 .../kafka/streams/processor/internals/StreamsProducer.java |  6 ------
 .../streams/processor/internals/ActiveTaskCreatorTest.java | 14 --------------
 3 files changed, 1 insertion(+), 22 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
index 1dffc4ebbd3..6c973e096fc 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
@@ -119,8 +119,7 @@ class ActiveTaskCreator {
     }
 
     public void reInitializeProducer() {
-        if (!streamsProducer.isClosed())
-            streamsProducer.resetProducer(producer());
+        streamsProducer.resetProducer(producer());
     }
 
     StreamsProducer streamsProducer() {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
index 546186b2dbd..1048b5a2ecf 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
@@ -70,7 +70,6 @@ public class StreamsProducer {
     private Producer<byte[], byte[]> producer;
     private boolean transactionInFlight = false;
     private boolean transactionInitialized = false;
-    private boolean closed = false;
     private double oldProducerTotalBlockedTime = 0;
     // we have a single `StreamsProducer` per thread, and thus a single 
`sendException` instance,
     // which we share across all tasks, ie, all `RecordCollectorImpl`
@@ -99,10 +98,6 @@ public class StreamsProducer {
         return transactionInFlight;
     }
 
-    boolean isClosed() {
-        return closed;
-    }
-
     /**
      * @throws IllegalStateException if EOS is disabled
      */
@@ -325,7 +320,6 @@ public class StreamsProducer {
 
     void close() {
         producer.close();
-        closed = true;
         transactionInFlight = false;
         transactionInitialized = false;
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
index 73da4b7cc05..6a4339a3ed7 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
@@ -190,23 +190,9 @@ public class ActiveTaskCreatorTest {
 
         activeTaskCreator.close();
 
-        assertThat(activeTaskCreator.streamsProducer().isClosed(), is(true));
         assertThat(mockClientSupplier.producers.get(0).closed(), is(true));
     }
 
-    @Test
-    public void shouldNotReInitializeProducerOnClose() {
-        properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
StreamsConfig.EXACTLY_ONCE_V2);
-        mockClientSupplier.setApplicationIdForProducer("appId");
-        createTasks();
-
-        activeTaskCreator.streamsProducer().close();
-        activeTaskCreator.reInitializeProducer();
-        // If streamsProducer is not closed, clientSupplier will recreate a 
producer,
-        // resulting in more than one producer being created.
-        assertThat(mockClientSupplier.producers.size(), is(1));
-    }
-
     // error handling
 
     @Test

Reply via email to