Maysam Yabandeh created KAFKA-3493:
--------------------------------------
Summary: Replica fetcher load is not balanced over fetcher threads
Key: KAFKA-3493
URL: https://issues.apache.org/jira/browse/KAFKA-3493
Project: Kafka
Issue Type: Improvement
Affects Versions: 0.9.0.1
Reporter: Maysam Yabandeh
Fix For: 0.10.0.1
The replicas are not evenly distributed among the fetcher threads. This has
caused some fetcher threads get overloaded and hence their requests timeout
frequently. This is especially a big issue when a new node is added to the
cluster and the fetch traffic is large.
Here is an example run in a test cluster with 10 brokers and 6 fetcher threads
(per source broker). A single topic consisting of 500+ partitions was assigned
to have a replica for each parition on the newly added broker.
{code}[kafka-jetstream.canary]myabandeh@sjc8c-rl17-23b:~$ for i in `seq 0 5`;
do grep ReplicaFetcherThread-$i- /var/log/kafka/server.log | grep "reset its
fetch offset from 0" | wc -l; done
85
83
85
83
85
85
[kafka-jetstream.canary]myabandeh@sjc8c-rl17-23b:~$ for i in `seq 0 5`; do grep
ReplicaFetcherThread-$i-22 /var/log/kafka/server.log | grep "reset its fetch
offset from 0" | wc -l; done
15
1
13
1
14
1
{code}
The problem is that AbstractFetcherManager::getFetcherId method does not take
the broker id into account:
{code}
private def getFetcherId(topic: String, partitionId: Int) : Int = {
Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers
}
{code}
Hence although the replicas are evenly distributed among the fetcher ids across
all source brokers, this is not necessarily the case for each broker
separately.
I think a random function would do a much better job in distributing the load
over the fetcher threads from each source broker.
Thoughts?
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)