[
https://issues.apache.org/jira/browse/KAFKA-598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13508898#comment-13508898
]
Jay Kreps commented on KAFKA-598:
---------------------------------
I have some minor stylistic feedback, but first I think it would be good to
discuss the model this implements and get consensus on that.
My understand of this patch is that it does the following:
1. multifetch all partitions using a per-partition fetch size based on the
configuration the user provides in the consumer config (fetch.size)
2. check if there are any incomplete fetches and re-fetch these partitions
using the consumer config (upper.fetch.size)
I think this may not be the best approach, but I am not sure. Here is my
argument for why this configuration isn't really better than the current setup.
Let's say you want to configure your consumer to be reliable and not crash, you
need to align your jvm heap settings with your kafka memory usage. How much
memory will this configuration use? Well in the worst case all partitions will
come back incomplete so you need enough memory for upper.fetch.size *
num_partitions. Actually since we have a queue of chunks, it is a multiple of
this, but I think we need to fix that as a separate issue, so ignore that for
now. Two conclusions from this: (1) the only parameter that matters is
upper.fetch.size and if I have sufficient memory for that why not fetch more?
(2) the memory I need depends on the number of partitions I am assigned, but
this is out of my control (if a consumer dies it will increase) so it is almost
impossible to set this right.
Here is an alternative. Have only one configuration: max.fetch.size which
bounds the per-request memory allocation. Instead of using this for ever
partition, instead use max.fetch.size/num_partitions so that increasing the
number of partitions decreases the fetch size but does not increase memory
usage. For incomplete fetches, follow up by doing a sequential fetch for each
incomplete partition using the full max.fetch.size for just that partition. The
reason I think this is better is that you get a hard bound on memory usage
(which in practice you MUST have to run reliably) and this same bound also acts
as the limit on the largest message you can handle. The two counter-arguments
against this approach are (1) rather than crashing if you add partitions this
approach will get slower (due to smaller fetches and eventually sequential
fetches) you could argue that crashing is better than slow, (2) there could
potentially be memory allocation downsides to doing large allocations in the
common case (though there are definitely I/O benefits).
Let's figure this out and then I will do a more detailed review of the patch.
> decouple fetch size from max message size
> -----------------------------------------
>
> Key: KAFKA-598
> URL: https://issues.apache.org/jira/browse/KAFKA-598
> Project: Kafka
> Issue Type: Bug
> Components: core
> Affects Versions: 0.8
> Reporter: Jun Rao
> Assignee: Joel Koshy
> Attachments: KAFKA-598-v1.patch
>
>
> Currently, a consumer has to set fetch size larger than the max message size.
> This increases the memory footprint on the consumer, especially when a large
> number of topic/partition is subscribed. By decoupling the fetch size from
> max message size, we can use a smaller fetch size for normal consumption and
> when hitting a large message (hopefully rare), we automatically increase
> fetch size to max message size temporarily.
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira