[ https://issues.apache.org/jira/browse/KAFKA-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jason Gustafson resolved KAFKA-7548. ------------------------------------ Resolution: Fixed Fix Version/s: 2.4.0 > KafkaConsumer should not throw away already fetched data for paused > partitions. > ------------------------------------------------------------------------------- > > Key: KAFKA-7548 > URL: https://issues.apache.org/jira/browse/KAFKA-7548 > Project: Kafka > Issue Type: Improvement > Components: clients > Reporter: Mayuresh Gharat > Assignee: Sean Glover > Priority: Major > Fix For: 2.4.0 > > Attachments: image-2019-06-24-01-43-02-034.png > > > Today when we call KafkaConsumer.poll(), it will fetch data from Kafka > asynchronously and is put in to a local buffer (completedFetches). > If now we pause some TopicPartitions and call KafkaConsumer.poll(), we might > throw away any buffered data that we might have in the local buffer for these > TopicPartitions. Generally, if an application is calling pause on some > TopicPartitions, it is likely to resume those TopicPartitions in near future, > which would require KafkaConsumer to re-issue a fetch for the same data that > it had buffered earlier for these TopicPartitions. This is a wasted effort > from the application's point of view. > At Linkedin, we made a hotfix to see if NOT throwing away the prefetched data > would improve the performance for stream applications like Samza. We ran a > benchmark to compare the "before-fix" and "after-fix" versions. > We had a consumer subscribed to 10 partitions of a high volume topic and > paused predefined number partitions for every poll call. The partitions to > pause were chosen randomly for each poll() call. > * Time to run Benchmark = 60 seconds. > * MaxPollRecords = 1 > * Number of TopicPartition subscribed = 10 > ||Number Of Partitions Paused||Number of Records consumed (Before > fix)||Number of Records consumed (After fix)|| > |9|2087|4884693| > > h4. _[#Updated June 24, 2019]_ > I followed up with [~mgharat] on the status of this work since the current > [patch|https://github.com/apache/kafka/pull/5844] PR is stale. This work > would also be beneficial to the Alpakka Kafka connector, which frequently > pauses partitions as a means of back-pressure from upstream Akka Streams > graph stages. I've reviewed the PR feedback from [~hachikuji] and > reimplemented this solution to add completed fetches that belong to paused > partitions back to the queue. I also rebased against the latest trunk which > caused more changes as a result of subscription event handlers being removed > from the fetcher class. > I created a [sample > project|https://github.com/seglo/kafka-consumer-tests/tree/seglo/KAFKA-7548] > that simulates the pause partition scenario that [~mgharat] described above. > It only uses the Kafka client instead of a stream processor like Samza or > Alpakka Kafka. Even without setting max.poll.records to 1 there are > significant gains in the number of records consumed and the amount of traffic > between the consumer and brokers. I created two versions of the sample > project, one based on the latest available Kafka Client Consumer (2.2.1) and > one based on the new patch (2.4.0-SNAPSHOT). Each app has its own topic with > its own producers and is constrained with cgroups. For full details of the > experiment see the [K8s resources in this > [branch|https://github.com/seglo/kafka-consumer-tests/tree/seglo/KAFKA-7548/KAFKA-7548]. > [I exported a Grafana snapshot for public > viewing|https://snapshot.raintank.io/dashboard/snapshot/RDFTsgNvzP5bTmuc8X6hq7vLixp9tUtL?orgId=2]. > I included a screenshot in the attachments. > -- This message was sent by Atlassian JIRA (v7.6.14#76016)