This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e8837465a5f KAFKA-18067: Kafka Streams can leak Producer client under
EOS (#17931)
e8837465a5f is described below
commit e8837465a5fc478f1c79d1ad475b43e00a39a5d7
Author: TengYao Chi <[email protected]>
AuthorDate: Tue Dec 10 08:12:05 2024 +0800
KAFKA-18067: Kafka Streams can leak Producer client under EOS (#17931)
To avoid leaking producers, we should add a 'closedflag toStreamProducer`
indicating whether we should reset prouder.
Reviewers: Guozhang Wang <[email protected]>, Anna Sophie
Blee-Goldman <[email protected]>
---
.../streams/processor/internals/ActiveTaskCreator.java | 3 ++-
.../kafka/streams/processor/internals/StreamsProducer.java | 6 ++++++
.../streams/processor/internals/ActiveTaskCreatorTest.java | 14 ++++++++++++++
3 files changed, 22 insertions(+), 1 deletion(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
index 6c973e096fc..1dffc4ebbd3 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
@@ -119,7 +119,8 @@ class ActiveTaskCreator {
}
public void reInitializeProducer() {
- streamsProducer.resetProducer(producer());
+ if (!streamsProducer.isClosed())
+ streamsProducer.resetProducer(producer());
}
StreamsProducer streamsProducer() {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
index 1048b5a2ecf..546186b2dbd 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
@@ -70,6 +70,7 @@ public class StreamsProducer {
private Producer<byte[], byte[]> producer;
private boolean transactionInFlight = false;
private boolean transactionInitialized = false;
+ private boolean closed = false;
private double oldProducerTotalBlockedTime = 0;
// we have a single `StreamsProducer` per thread, and thus a single
`sendException` instance,
// which we share across all tasks, ie, all `RecordCollectorImpl`
@@ -98,6 +99,10 @@ public class StreamsProducer {
return transactionInFlight;
}
+ boolean isClosed() {
+ return closed;
+ }
+
/**
* @throws IllegalStateException if EOS is disabled
*/
@@ -320,6 +325,7 @@ public class StreamsProducer {
void close() {
producer.close();
+ closed = true;
transactionInFlight = false;
transactionInitialized = false;
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
index 6a4339a3ed7..73da4b7cc05 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
@@ -190,9 +190,23 @@ public class ActiveTaskCreatorTest {
activeTaskCreator.close();
+ assertThat(activeTaskCreator.streamsProducer().isClosed(), is(true));
assertThat(mockClientSupplier.producers.get(0).closed(), is(true));
}
+ @Test
+ public void shouldNotReInitializeProducerOnClose() {
+ properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_V2);
+ mockClientSupplier.setApplicationIdForProducer("appId");
+ createTasks();
+
+ activeTaskCreator.streamsProducer().close();
+ activeTaskCreator.reInitializeProducer();
+ // If streamsProducer is not closed, clientSupplier will recreate a
producer,
+ // resulting in more than one producer being created.
+ assertThat(mockClientSupplier.producers.size(), is(1));
+ }
+
// error handling
@Test