[ 
https://issues.apache.org/jira/browse/KAFKA-3810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15354011#comment-15354011
 ] 

Jun Rao commented on KAFKA-3810:
--------------------------------

[~onurkaraman], [~becket_qin], thanks for the patch. I have a couple of follow 
up comments.

1. I actually like [~ewencp]'s suggestion in the PR. Being able to give a full 
message back to the consumer when the fetch size is too small seems like a 
useful general feature, not just for internal topics. It's true that the 
consumer will get more bytes than requested. However, that's what's needed for 
the consumer to make progress.

2. The patch always adjusts the fetch size based on Log.MaxMessageSize, 
regardless of wether they are large messages in the log or not. This increases 
the memory footprint in the consumer unnecessarily. A potential better 
implementation is to only return more than bytes than the fetch size when the 
first message to be fetched is larger than the fetch size. Since large messages 
should be rare, this improves the memory footprint in the consumer since in the 
common case, the fetch data is still bounded by the fetch size. We know the 
size of the first message to be fetched in FileMessageSet.searchFor(). We just 
need to pass that info back to the caller.

So, I was thinking we can make the fix more general. If we find a message in 
the log larger than the fetch size, we just give the full message back. The 
fetch size in the consumer is than just for performance tuning. We allow the 
user to set the MaxMessageSize per topic. So, it's expected that the 
producer/consumer need at least that amount of memory. Do you think this makes 
sense? Do you want to do a follow up KIP to make the solution more general?

> replication of internal topics should not be limited by 
> replica.fetch.max.bytes
> -------------------------------------------------------------------------------
>
>                 Key: KAFKA-3810
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3810
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Onur Karaman
>            Assignee: Onur Karaman
>             Fix For: 0.10.1.0
>
>
> From the kafka-dev mailing list discussion:
> [\[DISCUSS\] scalability limits in the 
> coordinator|http://mail-archives.apache.org/mod_mbox/kafka-dev/201605.mbox/%3ccamquqbzddtadhcgl6h4smtgo83uqt4s72gc03b3vfghnme3...@mail.gmail.com%3E]
> There's a scalability limit on the new consumer / coordinator regarding the 
> amount of group metadata we can fit into one message. This restricts a 
> combination of consumer group size, topic subscription sizes, topic 
> assignment sizes, and any remaining member metadata.
> Under more strenuous use cases like mirroring clusters with thousands of 
> topics, this limitation can be reached even after applying gzip to the 
> __consumer_offsets topic.
> Various options were proposed in the discussion:
> # Config change: reduce the number of consumers in the group. This isn't 
> always a realistic answer in more strenuous use cases like MirrorMaker 
> clusters or for auditing.
> # Config change: split the group into smaller groups which together will get 
> full coverage of the topics. This gives each group member a smaller 
> subscription.(ex: g1 has topics starting with a-m while g2 has topics 
> starting with n-z). This would be operationally painful to manage.
> # Config change: split the topics among members of the group. Again this 
> gives each group member a smaller subscription. This would also be 
> operationally painful to manage.
> # Config change: bump up KafkaConfig.messageMaxBytes (a topic-level config) 
> and KafkaConfig.replicaFetchMaxBytes (a broker-level config). Applying 
> messageMaxBytes to just the __consumer_offsets topic seems relatively 
> harmless, but bumping up the broker-level replicaFetchMaxBytes would probably 
> need more attention.
> # Config change: try different compression codecs. Based on 2 minutes of 
> googling, it seems like lz4 and snappy are faster than gzip but have worse 
> compression, so this probably won't help.
> # Implementation change: support sending the regex over the wire instead of 
> the fully expanded topic subscriptions. I think people said in the past that 
> different languages have subtle differences in regex, so this doesn't play 
> nicely with cross-language groups.
> # Implementation change: maybe we can reverse the mapping? Instead of mapping 
> from member to subscriptions, we can map a subscription to a list of members.
> # Implementation change: maybe we can try to break apart the subscription and 
> assignments from the same SyncGroupRequest into multiple records? They can 
> still go to the same message set and get appended together. This way the 
> limit become the segment size, which shouldn't be a problem. This can be 
> tricky to get right because we're currently keying these messages on the 
> group, so I think records from the same rebalance might accidentally compact 
> one another, but my understanding of compaction isn't that great.
> # Implementation change: try to apply some tricks on the assignment 
> serialization to make it smaller.
> # Config and Implementation change: bump up the __consumer_offsets topic 
> messageMaxBytes and (from [~junrao]) fix how we deal with the case when a 
> message is larger than the fetch size. Today, if the fetch size is smaller 
> than the fetch size, the consumer will get stuck. Instead, we can simply 
> return the full message if it's larger than the fetch size w/o requiring the 
> consumer to manually adjust the fetch size.
> # Config and Implementation change: same as above but only apply the special 
> fetch logic when fetching from internal topics



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to