[
https://issues.apache.org/jira/browse/KAFKA-598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13509064#comment-13509064
]
Joel Koshy commented on KAFKA-598:
----------------------------------
Thanks for bringing this up. I had considered a slight variant of your
suggestion (although I was thinking of issuing requests with upper fetch
size for incomplete partitions one partition at a time, instead of another
multi-fetch on all incomplete partitions as done in this patch). I didn't go
with it mainly due to the concern (1) that you raise - i.e., to avoid doing
sequential fetches, although I don't think that is too much of an issue.
Whether having an upper fetch size config is better than the current set up
depends on the use case: e.g., if there are only a few partitions that have
large messages, then the upper fetch size approach would work well. In the
worst case of all partitions being incomplete that would lead to a large
allocation - which is also why I felt the "pipelined fetches of incomplete
partitions" approach added no real value (since it is equivalent in terms of
the net memory the client is expected to handle).
Anyway I think the above then leads naturally to your suggestion of using a
single fetch size and dividing that across all partitions. I like that
approach - especially since there are no new configs to deal with. I would
think the memory allocation concerns are valid but tolerable from the
client's perspective - i.e., the heavy memory allocations only kick in when
there are incomplete partitions in which case I think most clients would
want to consume anyway (along with a log warning indicating a large
message). One minor drawback is that there isn't really a clear default
value for fetch size - right now, it is reasonable to say with a fetch size
of 1MB that is also the (approximate) max size of a message. With the above
re-design we can no longer map the config to messages since there is no
prior knowledge of number of partitions consumed by each consumer thread,
but I don't think that's a big deal.
So as I see it the, choices are:
1) Change the semantics of fetch size to be net, across all partitions.
If/when incomplete partitions are encountered, For each incomplete
partition, issue a fetch request of size fetch.size. I think if we do
this we should also include the total memory that would be used - i.e.,
including the queued chunks.
2) Introduce a new config called upper fetch size that kicks in whenever
there are incomplete partitions - for which:
a) issue a multi-fetch request with size upper fetch size for all
incomplete partitions.
OR
b) issue sequential fetch requests of upper fetch size, one incomplete
partition at a time.
3) If we had byte-addressability for fetches (which I really think we should
allow at least for these kinds of internal APIs) we could consider a
third option: keep fetch size as is, and issue pipelined fetch requests
to build up and complete incomplete partition, one at a time.
What do people think?
> decouple fetch size from max message size
> -----------------------------------------
>
> Key: KAFKA-598
> URL: https://issues.apache.org/jira/browse/KAFKA-598
> Project: Kafka
> Issue Type: Bug
> Components: core
> Affects Versions: 0.8
> Reporter: Jun Rao
> Assignee: Joel Koshy
> Attachments: KAFKA-598-v1.patch
>
>
> Currently, a consumer has to set fetch size larger than the max message size.
> This increases the memory footprint on the consumer, especially when a large
> number of topic/partition is subscribed. By decoupling the fetch size from
> max message size, we can use a smaller fetch size for normal consumption and
> when hitting a large message (hopefully rare), we automatically increase
> fetch size to max message size temporarily.
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira