[ 
https://issues.apache.org/jira/browse/KAFKA-598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13508898#comment-13508898
 ] 

Jay Kreps commented on KAFKA-598:
---------------------------------

I have some minor stylistic feedback, but first I think it would be good to 
discuss the model this implements and get consensus on that.

My understand of this patch is that it does the following:
1. multifetch all partitions using a per-partition fetch size based on the 
configuration the user provides in the consumer config (fetch.size)
2. check if there are any incomplete fetches and re-fetch these partitions 
using the consumer config (upper.fetch.size)

I think this may not be the best approach, but I am not sure. Here is my 
argument for why this configuration isn't really better than the current setup. 
Let's say you want to configure your consumer to be reliable and not crash, you 
need to align your jvm heap settings with your kafka memory usage. How much 
memory will this configuration use? Well in the worst case all partitions will 
come back incomplete so you need enough memory for upper.fetch.size * 
num_partitions. Actually since we have a queue of chunks, it is a multiple of 
this, but I think we need to fix that as a separate issue, so ignore that for 
now. Two conclusions from this: (1) the only parameter that matters is 
upper.fetch.size and if I have sufficient memory for that why not fetch more? 
(2) the memory I need depends on the number of partitions I am assigned, but 
this is out of my control (if a consumer dies it will increase) so it is almost 
impossible to set this right.

Here is an alternative. Have only one configuration: max.fetch.size which 
bounds the per-request memory allocation. Instead of using this for ever 
partition, instead use max.fetch.size/num_partitions so that increasing the 
number of partitions decreases the fetch size but does not increase memory 
usage. For incomplete fetches, follow up by doing a sequential fetch for each 
incomplete partition using the full max.fetch.size for just that partition. The 
reason I think this is better is that you get a hard bound on memory usage 
(which in practice you MUST have to run reliably) and this same bound also acts 
as the limit on the largest message you can handle. The two counter-arguments 
against this approach are (1) rather than crashing if you add partitions this 
approach will get slower (due to smaller fetches and eventually sequential 
fetches) you could argue that crashing is better than slow, (2) there could 
potentially be memory allocation downsides to doing large allocations in the 
common case (though there are definitely I/O benefits).

Let's figure this out and then I will do a more detailed review of the patch.
                
> decouple fetch size from max message size
> -----------------------------------------
>
>                 Key: KAFKA-598
>                 URL: https://issues.apache.org/jira/browse/KAFKA-598
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Assignee: Joel Koshy
>         Attachments: KAFKA-598-v1.patch
>
>
> Currently, a consumer has to set fetch size larger than the max message size. 
> This increases the memory footprint on the consumer, especially when a large 
> number of topic/partition is subscribed. By decoupling the fetch size from 
> max message size, we can use a smaller fetch size for normal consumption and 
> when hitting a large message (hopefully rare), we automatically increase 
> fetch size to max message size temporarily.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to