[ https://issues.apache.org/jira/browse/KAFKA-598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13508898#comment-13508898 ]
Jay Kreps commented on KAFKA-598: --------------------------------- I have some minor stylistic feedback, but first I think it would be good to discuss the model this implements and get consensus on that. My understand of this patch is that it does the following: 1. multifetch all partitions using a per-partition fetch size based on the configuration the user provides in the consumer config (fetch.size) 2. check if there are any incomplete fetches and re-fetch these partitions using the consumer config (upper.fetch.size) I think this may not be the best approach, but I am not sure. Here is my argument for why this configuration isn't really better than the current setup. Let's say you want to configure your consumer to be reliable and not crash, you need to align your jvm heap settings with your kafka memory usage. How much memory will this configuration use? Well in the worst case all partitions will come back incomplete so you need enough memory for upper.fetch.size * num_partitions. Actually since we have a queue of chunks, it is a multiple of this, but I think we need to fix that as a separate issue, so ignore that for now. Two conclusions from this: (1) the only parameter that matters is upper.fetch.size and if I have sufficient memory for that why not fetch more? (2) the memory I need depends on the number of partitions I am assigned, but this is out of my control (if a consumer dies it will increase) so it is almost impossible to set this right. Here is an alternative. Have only one configuration: max.fetch.size which bounds the per-request memory allocation. Instead of using this for ever partition, instead use max.fetch.size/num_partitions so that increasing the number of partitions decreases the fetch size but does not increase memory usage. For incomplete fetches, follow up by doing a sequential fetch for each incomplete partition using the full max.fetch.size for just that partition. The reason I think this is better is that you get a hard bound on memory usage (which in practice you MUST have to run reliably) and this same bound also acts as the limit on the largest message you can handle. The two counter-arguments against this approach are (1) rather than crashing if you add partitions this approach will get slower (due to smaller fetches and eventually sequential fetches) you could argue that crashing is better than slow, (2) there could potentially be memory allocation downsides to doing large allocations in the common case (though there are definitely I/O benefits). Let's figure this out and then I will do a more detailed review of the patch. > decouple fetch size from max message size > ----------------------------------------- > > Key: KAFKA-598 > URL: https://issues.apache.org/jira/browse/KAFKA-598 > Project: Kafka > Issue Type: Bug > Components: core > Affects Versions: 0.8 > Reporter: Jun Rao > Assignee: Joel Koshy > Attachments: KAFKA-598-v1.patch > > > Currently, a consumer has to set fetch size larger than the max message size. > This increases the memory footprint on the consumer, especially when a large > number of topic/partition is subscribed. By decoupling the fetch size from > max message size, we can use a smaller fetch size for normal consumption and > when hitting a large message (hopefully rare), we automatically increase > fetch size to max message size temporarily. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira