Andrey - I’m not sure we quite have consensus on the Randomisation vs Round Robin issue but it’s probably worth you just raising a kip and put one of the options as a rejected alternative.
B > On 29 Jul 2016, at 11:59, Ben Stopford <b...@confluent.io> wrote: > > Thanks for the kicking this one off Andrey. Generally it looks great! > > I left a comment on the Jira regarding whether we should remove the existing > limitBytes, along with a potential alternative to doing randomisation. > > B >> On 29 Jul 2016, at 09:17, Andrey L. Neporada <anepor...@yandex-team.ru> >> wrote: >> >> Hi all! >> >> I would like to get your feedback on PR for bug KAFKA-2063. >> Looks like KIP is needed there, but it would be nice to get feedback first. >> >> Thanks, >> Andrey. >> >> >>> On 22 Jul 2016, at 12:26, Andrey L. Neporada <anepor...@yandex-team.ru> >>> wrote: >>> >>> Hi! >>> >>> Thanks for feedback - I agree that the proper way to fix this issue is to >>> provide per-request data limit. >>> Will try to do it. >>> >>> Thanks, >>> Andrey. >>> >>> >>> >>>> On 21 Jul 2016, at 18:57, Jay Kreps <j...@confluent.io> wrote: >>>> >>>> I think the memory usage for consumers can be improved a lot, but I think >>>> there may be a better way then what you are proposing. >>>> >>>> The problem is exactly what you describe: the bound the user sets is >>>> per-partition, but the number of partitions may be quite high. The consumer >>>> could provide a bound on the response size by only requesting a subset of >>>> the partitions, but this would mean that if there was no data available on >>>> those partitions the consumer wouldn't be checking other partitions, which >>>> would add latency. >>>> >>>> I think the solution is to add a new "max response size" parameter to the >>>> fetch request so the server checks all partitions but doesn't send back >>>> more than this amount in total. This has to be done carefully to ensure >>>> fairness (i.e. if one partition has unbounded amounts of data it shouldn't >>>> indefinitely starve other partitions). >>>> >>>> This will fix memory management both in the replicas and for consumers. >>>> >>>> There is a JIRA for this: https://issues.apache.org/jira/browse/KAFKA-2063 >>>> >>>> I think it isn't too hard to do and would be a huge aid to the memory >>>> profile of both the clients and server. >>>> >>>> I also don't think there is much use in setting a max size that expands >>>> dynamically since in any case you have to be able to support the maximum, >>>> so you might as well always use that rather than expanding and contracting >>>> dynamically. That is, if your max fetch response size is 64MB you need to >>>> budget 64MB of free memory, so making it smaller some of the time doesn't >>>> really help you. >>>> >>>> -Jay >>>> >>>> On Thu, Jul 21, 2016 at 2:49 AM, Andrey L. Neporada < >>>> anepor...@yandex-team.ru> wrote: >>>> >>>>> Hi all! >>>>> >>>>> We noticed that our Kafka cluster uses a lot of memory for replication. >>>>> Our Kafka usage pattern is following: >>>>> >>>>> 1. Most messages are small (tens or hundreds kilobytes at most), but some >>>>> (rare) messages can be several megabytes.So, we have to set >>>>> replica.fetch.max.bytes = max.message.bytes = 8MB >>>>> 2. Each Kafka broker handles several thousands of partitions from multiple >>>>> topics. >>>>> >>>>> In this scenario total memory required for replication (i.e. >>>>> replica.fetch.max.bytes * numOfPartitions) is unreasonably big. >>>>> >>>>> So we would like to propose following approach to fix this problem: >>>>> >>>>> 1. Introduce new config parameter replica.fetch.base.bytes - which is the >>>>> initial size of replication data chunk. By default this parameter should >>>>> be >>>>> equal to replica.fetch.max.bytes so the replication process will work as >>>>> before. >>>>> >>>>> 2. If the ReplicaFetcherThread fails when trying to replicate message >>>>> bigger than current replication chunk, we increase it twofold (or up to >>>>> replica.fetch.max.bytes, whichever is smaller) and retry. >>>>> >>>>> 3. If the chunk is replicated successfully we try to decrease the size of >>>>> replication chunk back to replica.fetch.base.bytes. >>>>> >>>>> >>>>> By choosing replica.fetch.base.bytes in optimal way (in our case ~200K), >>>>> we we able to significatly decrease memory usage without any noticeable >>>>> impact on replication efficiency. >>>>> >>>>> Here is JIRA ticket (with PR): >>>>> https://issues.apache.org/jira/browse/KAFKA-3979 >>>>> >>>>> Your comments and feedback are highly appreciated! >>>>> >>>>> >>>>> Thanks, >>>>> Andrey. >>> >> >