Hi! Thanks for feedback - I agree that the proper way to fix this issue is to provide per-request data limit. Will try to do it.
Thanks, Andrey. > On 21 Jul 2016, at 18:57, Jay Kreps <j...@confluent.io> wrote: > > I think the memory usage for consumers can be improved a lot, but I think > there may be a better way then what you are proposing. > > The problem is exactly what you describe: the bound the user sets is > per-partition, but the number of partitions may be quite high. The consumer > could provide a bound on the response size by only requesting a subset of > the partitions, but this would mean that if there was no data available on > those partitions the consumer wouldn't be checking other partitions, which > would add latency. > > I think the solution is to add a new "max response size" parameter to the > fetch request so the server checks all partitions but doesn't send back > more than this amount in total. This has to be done carefully to ensure > fairness (i.e. if one partition has unbounded amounts of data it shouldn't > indefinitely starve other partitions). > > This will fix memory management both in the replicas and for consumers. > > There is a JIRA for this: https://issues.apache.org/jira/browse/KAFKA-2063 > > I think it isn't too hard to do and would be a huge aid to the memory > profile of both the clients and server. > > I also don't think there is much use in setting a max size that expands > dynamically since in any case you have to be able to support the maximum, > so you might as well always use that rather than expanding and contracting > dynamically. That is, if your max fetch response size is 64MB you need to > budget 64MB of free memory, so making it smaller some of the time doesn't > really help you. > > -Jay > > On Thu, Jul 21, 2016 at 2:49 AM, Andrey L. Neporada < > anepor...@yandex-team.ru> wrote: > >> Hi all! >> >> We noticed that our Kafka cluster uses a lot of memory for replication. >> Our Kafka usage pattern is following: >> >> 1. Most messages are small (tens or hundreds kilobytes at most), but some >> (rare) messages can be several megabytes.So, we have to set >> replica.fetch.max.bytes = max.message.bytes = 8MB >> 2. Each Kafka broker handles several thousands of partitions from multiple >> topics. >> >> In this scenario total memory required for replication (i.e. >> replica.fetch.max.bytes * numOfPartitions) is unreasonably big. >> >> So we would like to propose following approach to fix this problem: >> >> 1. Introduce new config parameter replica.fetch.base.bytes - which is the >> initial size of replication data chunk. By default this parameter should be >> equal to replica.fetch.max.bytes so the replication process will work as >> before. >> >> 2. If the ReplicaFetcherThread fails when trying to replicate message >> bigger than current replication chunk, we increase it twofold (or up to >> replica.fetch.max.bytes, whichever is smaller) and retry. >> >> 3. If the chunk is replicated successfully we try to decrease the size of >> replication chunk back to replica.fetch.base.bytes. >> >> >> By choosing replica.fetch.base.bytes in optimal way (in our case ~200K), >> we we able to significatly decrease memory usage without any noticeable >> impact on replication efficiency. >> >> Here is JIRA ticket (with PR): >> https://issues.apache.org/jira/browse/KAFKA-3979 >> >> Your comments and feedback are highly appreciated! >> >> >> Thanks, >> Andrey.