[ https://issues.apache.org/jira/browse/KAFKA-598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13509064#comment-13509064 ]
Joel Koshy commented on KAFKA-598: ---------------------------------- Thanks for bringing this up. I had considered a slight variant of your suggestion (although I was thinking of issuing requests with upper fetch size for incomplete partitions one partition at a time, instead of another multi-fetch on all incomplete partitions as done in this patch). I didn't go with it mainly due to the concern (1) that you raise - i.e., to avoid doing sequential fetches, although I don't think that is too much of an issue. Whether having an upper fetch size config is better than the current set up depends on the use case: e.g., if there are only a few partitions that have large messages, then the upper fetch size approach would work well. In the worst case of all partitions being incomplete that would lead to a large allocation - which is also why I felt the "pipelined fetches of incomplete partitions" approach added no real value (since it is equivalent in terms of the net memory the client is expected to handle). Anyway I think the above then leads naturally to your suggestion of using a single fetch size and dividing that across all partitions. I like that approach - especially since there are no new configs to deal with. I would think the memory allocation concerns are valid but tolerable from the client's perspective - i.e., the heavy memory allocations only kick in when there are incomplete partitions in which case I think most clients would want to consume anyway (along with a log warning indicating a large message). One minor drawback is that there isn't really a clear default value for fetch size - right now, it is reasonable to say with a fetch size of 1MB that is also the (approximate) max size of a message. With the above re-design we can no longer map the config to messages since there is no prior knowledge of number of partitions consumed by each consumer thread, but I don't think that's a big deal. So as I see it the, choices are: 1) Change the semantics of fetch size to be net, across all partitions. If/when incomplete partitions are encountered, For each incomplete partition, issue a fetch request of size fetch.size. I think if we do this we should also include the total memory that would be used - i.e., including the queued chunks. 2) Introduce a new config called upper fetch size that kicks in whenever there are incomplete partitions - for which: a) issue a multi-fetch request with size upper fetch size for all incomplete partitions. OR b) issue sequential fetch requests of upper fetch size, one incomplete partition at a time. 3) If we had byte-addressability for fetches (which I really think we should allow at least for these kinds of internal APIs) we could consider a third option: keep fetch size as is, and issue pipelined fetch requests to build up and complete incomplete partition, one at a time. What do people think? > decouple fetch size from max message size > ----------------------------------------- > > Key: KAFKA-598 > URL: https://issues.apache.org/jira/browse/KAFKA-598 > Project: Kafka > Issue Type: Bug > Components: core > Affects Versions: 0.8 > Reporter: Jun Rao > Assignee: Joel Koshy > Attachments: KAFKA-598-v1.patch > > > Currently, a consumer has to set fetch size larger than the max message size. > This increases the memory footprint on the consumer, especially when a large > number of topic/partition is subscribed. By decoupling the fetch size from > max message size, we can use a smaller fetch size for normal consumption and > when hitting a large message (hopefully rare), we automatically increase > fetch size to max message size temporarily. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira