[
https://issues.apache.org/jira/browse/KAFKA-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jason Gustafson resolved KAFKA-7548.
------------------------------------
Resolution: Fixed
Fix Version/s: 2.4.0
> KafkaConsumer should not throw away already fetched data for paused
> partitions.
> -------------------------------------------------------------------------------
>
> Key: KAFKA-7548
> URL: https://issues.apache.org/jira/browse/KAFKA-7548
> Project: Kafka
> Issue Type: Improvement
> Components: clients
> Reporter: Mayuresh Gharat
> Assignee: Sean Glover
> Priority: Major
> Fix For: 2.4.0
>
> Attachments: image-2019-06-24-01-43-02-034.png
>
>
> Today when we call KafkaConsumer.poll(), it will fetch data from Kafka
> asynchronously and is put in to a local buffer (completedFetches).
> If now we pause some TopicPartitions and call KafkaConsumer.poll(), we might
> throw away any buffered data that we might have in the local buffer for these
> TopicPartitions. Generally, if an application is calling pause on some
> TopicPartitions, it is likely to resume those TopicPartitions in near future,
> which would require KafkaConsumer to re-issue a fetch for the same data that
> it had buffered earlier for these TopicPartitions. This is a wasted effort
> from the application's point of view.
> At Linkedin, we made a hotfix to see if NOT throwing away the prefetched data
> would improve the performance for stream applications like Samza. We ran a
> benchmark to compare the "before-fix" and "after-fix" versions.
> We had a consumer subscribed to 10 partitions of a high volume topic and
> paused predefined number partitions for every poll call. The partitions to
> pause were chosen randomly for each poll() call.
> * Time to run Benchmark = 60 seconds.
> * MaxPollRecords = 1
> * Number of TopicPartition subscribed = 10
> ||Number Of Partitions Paused||Number of Records consumed (Before
> fix)||Number of Records consumed (After fix)||
> |9|2087|4884693|
>
> h4. _[#Updated June 24, 2019]_
> I followed up with [~mgharat] on the status of this work since the current
> [patch|https://github.com/apache/kafka/pull/5844] PR is stale. This work
> would also be beneficial to the Alpakka Kafka connector, which frequently
> pauses partitions as a means of back-pressure from upstream Akka Streams
> graph stages. I've reviewed the PR feedback from [~hachikuji] and
> reimplemented this solution to add completed fetches that belong to paused
> partitions back to the queue. I also rebased against the latest trunk which
> caused more changes as a result of subscription event handlers being removed
> from the fetcher class.
> I created a [sample
> project|https://github.com/seglo/kafka-consumer-tests/tree/seglo/KAFKA-7548]
> that simulates the pause partition scenario that [~mgharat] described above.
> It only uses the Kafka client instead of a stream processor like Samza or
> Alpakka Kafka. Even without setting max.poll.records to 1 there are
> significant gains in the number of records consumed and the amount of traffic
> between the consumer and brokers. I created two versions of the sample
> project, one based on the latest available Kafka Client Consumer (2.2.1) and
> one based on the new patch (2.4.0-SNAPSHOT). Each app has its own topic with
> its own producers and is constrained with cgroups. For full details of the
> experiment see the [K8s resources in this
> [branch|https://github.com/seglo/kafka-consumer-tests/tree/seglo/KAFKA-7548/KAFKA-7548].
> [I exported a Grafana snapshot for public
> viewing|https://snapshot.raintank.io/dashboard/snapshot/RDFTsgNvzP5bTmuc8X6hq7vLixp9tUtL?orgId=2].
> I included a screenshot in the attachments.
>
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)