This is an automated email from the ASF dual-hosted git repository.
cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ff94c44e707 HOTFIX: Revert "KAFKA-18067: Kafka Streams can leak
Producer client under EOS (#17931)" (#19078)
ff94c44e707 is described below
commit ff94c44e7077d3de98fd9467aae898de80d91d88
Author: Bruno Cadonna <[email protected]>
AuthorDate: Mon Mar 3 11:58:56 2025 +0100
HOTFIX: Revert "KAFKA-18067: Kafka Streams can leak Producer client under
EOS (#17931)" (#19078)
This reverts commit e8837465a5fc478f1c79d1ad475b43e00a39a5d7.
The commit that is reverted prevents Kafka Streams from re-initializing
its transactional producer. If an exception that fences the
transactional producer occurs, the producer is not re-initialized during
the handling of the exception. That causes an infinite loop of
ProducerFencedExceptions with corresponding rebalances.
Reviewers: Lucas Brutschy <[email protected]>, David Jacot
<[email protected]>
---
.../streams/processor/internals/ActiveTaskCreator.java | 3 +--
.../kafka/streams/processor/internals/StreamsProducer.java | 6 ------
.../streams/processor/internals/ActiveTaskCreatorTest.java | 14 --------------
3 files changed, 1 insertion(+), 22 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
index 1dffc4ebbd3..6c973e096fc 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
@@ -119,8 +119,7 @@ class ActiveTaskCreator {
}
public void reInitializeProducer() {
- if (!streamsProducer.isClosed())
- streamsProducer.resetProducer(producer());
+ streamsProducer.resetProducer(producer());
}
StreamsProducer streamsProducer() {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
index 546186b2dbd..1048b5a2ecf 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
@@ -70,7 +70,6 @@ public class StreamsProducer {
private Producer<byte[], byte[]> producer;
private boolean transactionInFlight = false;
private boolean transactionInitialized = false;
- private boolean closed = false;
private double oldProducerTotalBlockedTime = 0;
// we have a single `StreamsProducer` per thread, and thus a single
`sendException` instance,
// which we share across all tasks, ie, all `RecordCollectorImpl`
@@ -99,10 +98,6 @@ public class StreamsProducer {
return transactionInFlight;
}
- boolean isClosed() {
- return closed;
- }
-
/**
* @throws IllegalStateException if EOS is disabled
*/
@@ -325,7 +320,6 @@ public class StreamsProducer {
void close() {
producer.close();
- closed = true;
transactionInFlight = false;
transactionInitialized = false;
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
index 73da4b7cc05..6a4339a3ed7 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
@@ -190,23 +190,9 @@ public class ActiveTaskCreatorTest {
activeTaskCreator.close();
- assertThat(activeTaskCreator.streamsProducer().isClosed(), is(true));
assertThat(mockClientSupplier.producers.get(0).closed(), is(true));
}
- @Test
- public void shouldNotReInitializeProducerOnClose() {
- properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_V2);
- mockClientSupplier.setApplicationIdForProducer("appId");
- createTasks();
-
- activeTaskCreator.streamsProducer().close();
- activeTaskCreator.reInitializeProducer();
- // If streamsProducer is not closed, clientSupplier will recreate a
producer,
- // resulting in more than one producer being created.
- assertThat(mockClientSupplier.producers.size(), is(1));
- }
-
// error handling
@Test