[
https://issues.apache.org/jira/browse/KAFKA-6088?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ismael Juma resolved KAFKA-6088.
--------------------------------
Resolution: Fixed
Since it was fixed in 0.11.0.0, marking it as fixed.
> Kafka Consumer slows down when reading from highly compacted topics
> -------------------------------------------------------------------
>
> Key: KAFKA-6088
> URL: https://issues.apache.org/jira/browse/KAFKA-6088
> Project: Kafka
> Issue Type: Bug
> Components: clients
> Affects Versions: 0.10.2.1
> Reporter: James Cheng
> Fix For: 0.11.0.0
>
>
> Summary of the issue
> -----
> We found a performance issue with the Kafka Consumer where it gets less
> efficient if you have frequent gaps in offsets (which happens when there is
> lots of compaction on the topic).
> The issue is present in 0.10.2.1 and possibly prior.
> It is fixed in 0.11.0.0.
> Summary of cause
> -----
> The fetcher code assumes that there will be no gaps in message offsets. If
> there are, it does an additional round trip to the broker. For topics with
> large gaps in offsets, it is possible that most calls to {{poll()}} will
> generate a roundtrip to the broker.
> Background and details
> -----
> We have a topic with roughly 8 million records. The topic is log compacted.
> It turns out that most of the initial records in the topic were never
> overwritten, whereas in the 2nd half of the topic we had lots of overwritten
> records. That means that for the first part of the topic, there are no gaps
> in offsets. But in the 2nd part of the topic, there are frequent gaps in the
> offsets (due to records being compacted away).
> We have a consumer that starts up and reads the entire topic from beginning
> to end. We noticed that the consumer would read through the first part of the
> topic very quickly. When it got to the part of the topic with frequent gaps
> in offsets, consumption rate slowed down dramatically. This slowdown was
> consistent across multiple runs.
> What is happening is this:
> 1) A call to {{poll()}} happens. The consumer goes to the broker and returns
> 1MB of data (the default of {{max.partition.fetch.bytes}}). It then returns
> to the caller just 500 records (the default of {{max.poll.records}}), and
> keeps the rest of the data in memory to use in future calls to {{poll()}}.
> 2) Before returning the 500 records, the consumer library records the *next*
> offset it should return. It does so by taking the offset of the last record,
> and adds 1 to it. (The offset of the 500th message from the set, plus 1). It
> calls this the {{nextOffset}}
> 3) The application finishes processing the 500 messages, and makes another
> call to {{poll()}} happens. During this call, the consumer library does a
> sanity check. It checks that the first message of the set *it is about to
> return* has an offset that matches the value of {{nextOffset}}. That is it
> checks if the 501th record has an offset that is 1 greater than the 500th
> record.
> a. If it matches, then it returns an additional 500 records, and
> increments the {{nextOffset}} to (offset of the 1000th record, plus 1)
> b. If it doesn't match, then it throws away the remainder of the 1MB of
> data that it stored in memory in step 1, and it goes back to the broker to
> fetch an additional 1MB of data, starting at the offset {{nextOffset}}.
> In topics have no gaps (a non-compacted topic), then the code will always hit
> the 3a code path.
> If the topic has gaps in offsets and the call to {{poll()}} happens to fall
> onto a gap, then the code will hit code path 3b.
> If the gaps are frequent, then it will frequently hit code path 3b.
> The worst case scenario that can happen is if you have a large number of
> gaps, and you run with {{max.poll.records=1}}. Every gap will result in a new
> fetch to the broker. You may possibly end up only processing one message per
> fetch. Or, said another way, you will end up doing a single fetch for every
> single message in the partition.
> Repro
> -----
> We created a repro. It appears that the bug is in 0.10.2.1, but was fixed in
> 0.11. I've attached the tarball with all the code and instructions.
> The repro is:
> 1) Create a single partition topic with log compaction turned on
> 2) Write messages with the following keys: 1 1 2 2 3 3 4 4 5 5 ... (each
> message key written twice in a row)
> 3) Let compaction happen. This would mean that that offsets 0 2 4 6 8 10 ...
> would be compacted away
> 4) Consume from this topic with {{max.poll.records=1}}
> More concretely,
> Here is the producer code:
> {code}
> Producer<String, String> producer = new KafkaProducer<String, String>(props);
> for (int i = 0; i < 1000000; i++) {
> producer.send(new ProducerRecord<String, String>("compacted",
> Integer.toString(i), Integer.toString(i)));
> producer.send(new ProducerRecord<String, String>("compacted",
> Integer.toString(i), Integer.toString(i)));
> }
> producer.flush();
> producer.close();
> {code}
> When consuming with a 0.10.2.1 consumer, you can see this pattern (with
> Fetcher logs at DEBUG, see file consumer_0.10.2/debug.log):
> {code}
> offset = 1, key = 0, value = 0
> 22:58:51.262 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Ignoring
> fetched records for compacted-0 at offset 3 since the current position is 2
> 22:58:51.263 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Sending fetch
> for partitions [compacted-0] to broker 192.168.0.105:9092 (id: 0 rack: null)
> offset = 3, key = 1, value = 1
> 22:58:51.299 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Ignoring
> fetched records for compacted-0 at offset 5 since the current position is 4
> 22:58:51.299 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Sending fetch
> for partitions [compacted-0] to broker 192.168.0.105:9092 (id: 0 rack: null)
> offset = 5, key = 2, value = 2
> 22:58:51.337 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Ignoring
> fetched records for compacted-0 at offset 7 since the current position is 6
> 22:58:51.337 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Sending fetch
> for partitions [compacted-0] to broker 192.168.0.105:9092 (id: 0 rack: null)
> offset = 7, key = 3, value = 3
> 22:58:51.361 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Ignoring
> fetched records for compacted-0 at offset 9 since the current position is 8
> 22:58:51.361 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Sending fetch
> for partitions [compacted-0] to broker 192.168.0.105:9092 (id: 0 rack: null)
> offset = 9, key = 4, value = 4
> 22:58:51.382 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Ignoring
> fetched records for compacted-0 at offset 11 since the current position is 10
> 22:58:51.382 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Sending fetch
> for partitions [compacted-0] to broker 192.168.0.105:9092 (id: 0 rack: null)
> offset = 11, key = 5, value = 5
> 22:58:51.404 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Ignoring
> fetched records for compacted-0 at offset 13 since the current position is 12
> 22:58:51.404 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Sending fetch
> for partitions [compacted-0] to broker 192.168.0.105:9092 (id: 0 rack: null)
> offset = 13, key = 6, value = 6
> 22:58:51.424 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Ignoring
> fetched records for compacted-0 at offset 15 since the current position is 14
> 22:58:51.424 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Sending fetch
> for partitions [compacted-0] to broker 192.168.0.105:9092 (id: 0 rack: null)
> {code}
> When consuming with a 0.11.0.1 consumer ,you can see the following pattern:
> (see file consumer_0.11/debug.log):
> {code}
> offset = 1, key = 0, value = 0
> offset = 3, key = 1, value = 1
> offset = 5, key = 2, value = 2
> offset = 7, key = 3, value = 3
> offset = 9, key = 4, value = 4
> offset = 11, key = 5, value = 5
> offset = 13, key = 6, value = 6
> offset = 15, key = 7, value = 7
> offset = 17, key = 8, value = 8
> offset = 19, key = 9, value = 9
> offset = 21, key = 10, value = 10
> {code}
> From looking at the github history, it appears it was fixed in
> https://github.com/apache/kafka/commit/a0b8e435c9419a9402d08408260bea0c1d95cff0
> Specifically, this line
> https://github.com/apache/kafka/commit/a0b8e435c9419a9402d08408260bea0c1d95cff0#diff-b45245913eaae46aa847d2615d62cde0L930
> Was replaced by this line:
> https://github.com/apache/kafka/commit/a0b8e435c9419a9402d08408260bea0c1d95cff0#diff-b45245913eaae46aa847d2615d62cde0R933
> Mitigation
> -----
> This problem is fixed in 0.11.0.0. If you can upgrade to 0.11.0.0, then you
> will not be affected by the problem.
> If you cannot upgrade to 0.11.0.0, then you can reduce the impact of this by
> increasing the value of {{max.poll.records}}. This works because check
> happens on each call to {{poll()}}, and increasing the value of
> {{max.poll.records}} will reduce the number of calls to {{poll()}}.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)