Hi All, We are running a 10 node kafka cluster with a theoretical peak message handling rate of more than 2 million (1KB) messages per second, but we are running into bottlenecks on Kafka producer scaling it beyond 150K request per sec.
*Our Configuration* We have a 10 node kafka cluster on 0.8.2.1 with no compression and ack = -1 and replicas = 3. Each node has 20+ CPU, 40GB+ RAM and 6 hard disks of 3.7 TB and 150 MBytesPerSec network. *The Issue:* The issue occurs when no partition is specified and the producer round robins among all partitions. The issue causes a situation where the main thread is waiting for memory, but the batch size that is sent is very small. This causes a drastic drop in throughput and an increase in latencies. Memory for a batch is allocated when a new batch is created and is put in a queue on a per TopicPartition basis. This leads to the following situation. Assume that there is one topic and 60 partitions, distributed across 60 brokers. Assume that 40 brokers are faster than others and has drained its TopicPartition queue completely. Note that the other 60 brokers still ahve many batches in their queue. Now, the main producer thread will start allocating new batches. Due to the round robin allocation, it will allocate 60 batches for the next 60 messages. Suppose there is only available memory for 50 batches. Now the 40 fast brokers have 1 batch with 1 message in it the next 10 batches will be created at the tail of the slow brokers, and then the main thread will go to sleep waiting for memory. At this point, the TopicPartition Map will look like this: The key is the partition number (there are 60 partitions ) and the value is the queue of batches, where each element of the list is the number of messages in that batch ( batch size is 16k, and message size is 1k so 15 messages per batch) So the zeroth partition has 8 pending batches, and the last batch has 10 messages. Other batches have 15 messages. {0=[15, 15, 15, 15, 15, 15, 15, 10], 1=[], 2=[1], 3=[15, 15, 15, 15, 15, 15, 15, 15, 15, 5], 4=[1], 5=[15, 15, 15, 15, 15, 15, 15, 14], 6=[1], 7=[15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 9], 8=[15, 15, 15, 15, 15, 15, 15, 15, 3], 9=[1], 10=[15, 15, 15, 15, 15, 15, 15, 10], 11=[1], 12=[1], 13=[15, 15, 15, 15, 15, 15, 15, 15, 15, 5], 14=[1], 15=[15, 15, 15, 15, 15, 15, 15, 14], 16=[1], 17=[15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 9], 18=[15, 15, 15, 15, 15, 15, 15, 15, 2], 19=[], 20=[15, 15, 15, 15, 15, 15, 15, 10], 21=[1], 22=[1], 23=[15, 15, 15, 15, 15, 15, 15, 15, 15, 5], 24=[1], 25=[15, 15, 15, 15, 15, 15, 15, 14], 26=[1], 27=[15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 8], 28=[15, 15, 15, 15, 15, 15, 15, 15, 2], 29=[1], 30=[15, 15, 15, 15, 15, 15, 15, 10], 31=[1], 32=[1], 33=[15, 15, 15, 15, 15, 15, 15, 15, 15, 5], 34=[], 35=[15, 15, 15, 15, 15, 15, 15, 14], 36=[], 37=[15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 8], 38=[15, 15, 15, 15, 15, 15, 15, 15, 3], 39=[1], 40=[15, 15, 15, 15, 15, 15, 15, 10], 41=[1], 42=[1], 43=[15, 15, 15, 15, 15, 15, 15, 15, 15, 5], 44=[1], 45=[15, 15, 15, 15, 15, 15, 15, 14], 46=[1], 47=[15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 9], 48=[15, 15, 15, 15, 15, 15, 15, 15, 2], 49=[1], 50=[15, 15, 15, 15, 15, 15, 15, 10], 51=[1], 52=[], 53=[15, 15, 15, 15, 15, 15, 15, 15, 15, 5], 54=[], 55=[15, 15, 15, 15, 15, 15, 15, 14], 56=[1], 57=[15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 8], 58=[15, 15, 15, 15, 15, 15, 15, 15, 3], 59=[1]} The sender thread wakes up, detects that there are threads waiting for memory and immediately schedules a send for all the batches (including the ones with only 1 message in them ) In the example above, partitions 4, 9, 12, 39 etc would be sent, even though there's only 1 message per batch. This causes a vicious cycle where threads waiting for memory causes half-filled batches, which reduces the throughput even more. Furthermore, ironically, although there are other TopicPartitions with many full batches, the sender thread is unable to send them because they have already been preallocated to a TopicPartition. The main problems are: 1. The preallocation of memory for each batch 2. The preallocation of partition number for each batch 3. Sending half filled batches when threads are waiting for memory. In a high throughput system, there will always be threads waiting for memory. This should not cause the sender thread to send half filled batches. I am working on a patch for this. Please let me know if the community is interested in merging this upstream. I can create a KIP with more details and start a discussion around an acceptable implementation. Thanks Sharath