Niclas Lockner created KAFKA-12870:
--------------------------------------
Summary: RecordAccumulator stuck in a flushing state
Key: KAFKA-12870
URL: https://issues.apache.org/jira/browse/KAFKA-12870
Project: Kafka
Issue Type: Bug
Components: producer , streams
Affects Versions: 2.8.0, 2.6.1
Reporter: Niclas Lockner
After a Kafka Stream with exactly once enabled has performed its first commit,
the RecordAccumulator within the stream's internal producer gets stuck in a
state where all subsequent ProducerBatches that get allocated are immediately
flushed instead of being held in memory until they expire, regardless of the
stream's linger or batch size config.
This is reproduced in the example code found at <GitHub link to be added>,
which can be run with ./gradlew run --args=<bootstrap servers>
The example has a producer that sends 1 record/sec to one topic, and a Kafka
stream with EOS enabled that forwards the records from that topic to another
topic with the configuration linger = 5 sec, commit interval = 10 sec.
The expected behavior when running the example is that the stream's
ProducerBatches will expire (or get flushed because of the commit) every 5th
second, and that the stream's producer will send a ProduceRequest every 5th
second with an expired ProducerBatch that contains 5 records.
The actual behavior is that the ProducerBatch is made immediately available for
the Sender, and the Sender sends one ProduceRequest for each record.
The example code contains a copy of the RecordAccumulator class (copied from
kafka-clients 2.8.0) with some additional logging added to
* RecordAccumulator#ready(Cluster, long)
* RecordAccumulator#beginFlush()
* RecordAccumulator#awaitFlushCompletion()
These log entries show
* that the batches are considered sendable because a flush is in progress
* that Sender.maybeSendAndPollTransactionalRequest() calls RecordAccumulator's
beginFlush() without also calling awaitFlushCompletion(), and that this makes
RecordAccumulator's flushesInProgress jump between 1-2 instead of the expected
0-1.
This issue is not reproducible in version 2.3.1.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)