Hi!

Thanks for feedback - I agree that the proper way to fix this issue is to 
provide per-request data limit.
Will try to do it.

Thanks,
Andrey.



> On 21 Jul 2016, at 18:57, Jay Kreps <j...@confluent.io> wrote:
> 
> I think the memory usage for consumers can be improved a lot, but I think
> there may be a better way then what you are proposing.
> 
> The problem is exactly what you describe: the bound the user sets is
> per-partition, but the number of partitions may be quite high. The consumer
> could provide a bound on the response size by only requesting a subset of
> the partitions, but this would mean that if there was no data available on
> those partitions the consumer wouldn't be checking other partitions, which
> would add latency.
> 
> I think the solution is to add a new "max response size" parameter to the
> fetch request so the server checks all partitions but doesn't send back
> more than this amount in total. This has to be done carefully to ensure
> fairness (i.e. if one partition has unbounded amounts of data it shouldn't
> indefinitely starve other partitions).
> 
> This will fix memory management both in the replicas and for consumers.
> 
> There is a JIRA for this: https://issues.apache.org/jira/browse/KAFKA-2063
> 
> I think it isn't too hard to do and would be a huge aid to the memory
> profile of both the clients and server.
> 
> I also don't think there is much use in setting a max size that expands
> dynamically since in any case you have to be able to support the maximum,
> so you might as well always use that rather than expanding and contracting
> dynamically. That is, if your max fetch response size is 64MB you need to
> budget 64MB of free memory, so making it smaller some of the time doesn't
> really help you.
> 
> -Jay
> 
> On Thu, Jul 21, 2016 at 2:49 AM, Andrey L. Neporada <
> anepor...@yandex-team.ru> wrote:
> 
>> Hi all!
>> 
>> We noticed that our Kafka cluster uses a lot of memory for replication.
>> Our Kafka usage pattern is following:
>> 
>> 1. Most messages are small (tens or hundreds kilobytes at most), but some
>> (rare) messages can be several megabytes.So, we have to set
>> replica.fetch.max.bytes = max.message.bytes = 8MB
>> 2. Each Kafka broker handles several thousands of partitions from multiple
>> topics.
>> 
>> In this scenario total memory required for replication (i.e.
>> replica.fetch.max.bytes * numOfPartitions) is unreasonably big.
>> 
>> So we would like to propose following approach to fix this problem:
>> 
>> 1. Introduce new config parameter replica.fetch.base.bytes - which is the
>> initial size of replication data chunk. By default this parameter should be
>> equal to replica.fetch.max.bytes so the replication process will work as
>> before.
>> 
>> 2. If the ReplicaFetcherThread fails when trying to replicate message
>> bigger than current replication chunk, we increase it twofold (or up to
>> replica.fetch.max.bytes, whichever is smaller) and retry.
>> 
>> 3. If the chunk is replicated successfully we try to decrease the size of
>> replication chunk back to replica.fetch.base.bytes.
>> 
>> 
>> By choosing replica.fetch.base.bytes in optimal way (in our case ~200K),
>> we we able to significatly decrease memory usage without any noticeable
>> impact on replication efficiency.
>> 
>> Here is JIRA ticket (with PR):
>> https://issues.apache.org/jira/browse/KAFKA-3979
>> 
>> Your comments and feedback are highly appreciated!
>> 
>> 
>> Thanks,
>> Andrey.

Reply via email to