Hi All,

We are running a 10 node kafka cluster with a theoretical peak message
handling rate of more than 2 million (1KB) messages per second, but we are
running into bottlenecks on Kafka producer scaling it beyond 150K request
per sec.

*Our Configuration*
We have a 10 node kafka cluster on 0.8.2.1 with no compression and ack = -1
and replicas = 3. Each node has 20+ CPU, 40GB+ RAM and 6 hard disks of 3.7
TB and 150 MBytesPerSec network.

*The Issue:*
The issue occurs when no partition is specified and the producer round
robins among all partitions. The issue causes a situation where the main
thread is waiting for memory, but the batch size that is sent is very
small. This causes a drastic drop in throughput and an increase in
latencies.

Memory for a batch is allocated when a new batch is created and is put in a
queue on a per TopicPartition basis.

This leads to the following situation. Assume that there is one topic and
60 partitions, distributed across 60 brokers. Assume that 40 brokers are
faster than others and has drained its TopicPartition queue completely.
Note that the other 60 brokers still ahve many batches in their queue.

Now, the main producer thread will start allocating new batches. Due to the
round robin allocation, it will allocate 60 batches for the next 60
messages. Suppose there is only available memory for 50 batches. Now the 40
fast brokers have 1 batch with 1 message in it the next 10 batches will be
created at the tail of the slow brokers, and then the main thread will go
to sleep waiting for memory.

At this point, the TopicPartition Map will look like this: The key is the
partition number (there are 60 partitions ) and the value is the queue of
batches, where each element of the list is the number of messages in that
batch ( batch size is 16k, and message size is 1k so 15 messages per batch)
So the zeroth partition has 8 pending batches, and the last batch has 10
messages. Other batches have 15 messages.

{0=[15, 15, 15, 15, 15, 15, 15, 10], 1=[], 2=[1], 3=[15, 15, 15, 15, 15,
15, 15, 15, 15, 5], 4=[1], 5=[15, 15, 15, 15, 15, 15, 15, 14], 6=[1],
7=[15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 9], 8=[15, 15, 15, 15, 15, 15,
15, 15, 3], 9=[1], 10=[15, 15, 15, 15, 15, 15, 15, 10], 11=[1], 12=[1],
13=[15, 15, 15, 15, 15, 15, 15, 15, 15, 5], 14=[1], 15=[15, 15, 15, 15, 15,
15, 15, 14], 16=[1], 17=[15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 9],
18=[15, 15, 15, 15, 15, 15, 15, 15, 2], 19=[], 20=[15, 15, 15, 15, 15, 15,
15, 10], 21=[1], 22=[1], 23=[15, 15, 15, 15, 15, 15, 15, 15, 15, 5],
24=[1], 25=[15, 15, 15, 15, 15, 15, 15, 14], 26=[1], 27=[15, 15, 15, 15,
15, 15, 15, 15, 15, 15, 8], 28=[15, 15, 15, 15, 15, 15, 15, 15, 2], 29=[1],
30=[15, 15, 15, 15, 15, 15, 15, 10], 31=[1], 32=[1], 33=[15, 15, 15, 15,
15, 15, 15, 15, 15, 5], 34=[], 35=[15, 15, 15, 15, 15, 15, 15, 14], 36=[],
37=[15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 8], 38=[15, 15, 15, 15, 15, 15,
15, 15, 3], 39=[1], 40=[15, 15, 15, 15, 15, 15, 15, 10], 41=[1], 42=[1],
43=[15, 15, 15, 15, 15, 15, 15, 15, 15, 5], 44=[1], 45=[15, 15, 15, 15, 15,
15, 15, 14], 46=[1], 47=[15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 9],
48=[15, 15, 15, 15, 15, 15, 15, 15, 2], 49=[1], 50=[15, 15, 15, 15, 15, 15,
15, 10], 51=[1], 52=[], 53=[15, 15, 15, 15, 15, 15, 15, 15, 15, 5], 54=[],
55=[15, 15, 15, 15, 15, 15, 15, 14], 56=[1], 57=[15, 15, 15, 15, 15, 15,
15, 15, 15, 15, 8], 58=[15, 15, 15, 15, 15, 15, 15, 15, 3], 59=[1]}

The sender thread wakes up, detects that there are threads waiting for
memory and immediately schedules a send for all the batches (including the
ones with only 1 message in them )
In the example above, partitions 4, 9, 12, 39 etc would be sent, even
though there's only 1 message per batch.

This causes a vicious cycle where threads waiting for memory causes
half-filled batches, which reduces the throughput even more.

Furthermore, ironically, although there are other TopicPartitions with many
full batches, the sender thread is unable to send them because they have
already been preallocated to a TopicPartition.

The main problems are:
1. The preallocation of memory for each batch
2. The preallocation of partition number for each batch
3. Sending half filled batches when threads are waiting for memory. In a
high throughput system, there will always be threads waiting for memory.
This should not cause the sender thread to send half filled batches.

I am working on a patch for this. Please let me know if the community is
interested in merging this upstream. I can create a KIP with more details
and start a discussion around an acceptable implementation.

Thanks
Sharath

Reply via email to