Hi all!

We noticed that our Kafka cluster uses a lot of memory for replication. Our 
Kafka usage pattern is following:

1. Most messages are small (tens or hundreds kilobytes at most), but some 
(rare) messages can be several megabytes.So, we have to set 
replica.fetch.max.bytes = max.message.bytes = 8MB
2. Each Kafka broker handles several thousands of partitions from multiple 
topics.

In this scenario total memory required for replication (i.e. 
replica.fetch.max.bytes * numOfPartitions) is unreasonably big.

So we would like to propose following approach to fix this problem:

1. Introduce new config parameter replica.fetch.base.bytes - which is the 
initial size of replication data chunk. By default this parameter should be 
equal to replica.fetch.max.bytes so the replication process will work as before.

2. If the ReplicaFetcherThread fails when trying to replicate message bigger 
than current replication chunk, we increase it twofold (or up to 
replica.fetch.max.bytes, whichever is smaller) and retry.

3. If the chunk is replicated successfully we try to decrease the size of 
replication chunk back to replica.fetch.base.bytes.
   

By choosing replica.fetch.base.bytes in optimal way (in our case ~200K), we we 
able to significatly decrease memory usage without any noticeable impact on 
replication efficiency.

Here is JIRA ticket (with PR): https://issues.apache.org/jira/browse/KAFKA-3979

Your comments and feedback are highly appreciated!


Thanks,
Andrey.

Reply via email to