[ https://issues.apache.org/jira/browse/KAFKA-687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14077480#comment-14077480 ]
Joel Koshy commented on KAFKA-687: ---------------------------------- Short update on this: After the initial review comments, I was trying to make the allocation module more generic so we can reuse it in the new consumer. Furthermore, I was trying to get rid of the "symmetric" mode (which is for wildcards only and with identical subscriptions across all consumers) and make "roundrobin" more general. The basic approach was to sort the consumer IDs based on a hash of the consumerID with the topic appended to it - effectively scrambling (in a consistent order) the list of consumer streams available for a given topic - and then doing a round-robin assignment across available partitions of the topic. This did not actually work as well as expected. Here is the output of some simulations: {code} [2014-07-25 17:00:35,559] INFO Owned count summary for 6284 partitions across 63 consumer ids (9 consumers with 7 streams): min: 8.000000; max: 200.000000; avg: 99.746032; stddev: 58.871914; ideal: 99.746033 (unit.kafka.consumer.PartitionAllocatorTest:68) [2014-07-25 17:00:36,791] INFO Owned count summary for 6118 partitions across 42 consumer ids (7 consumers with 6 streams): min: 57.000000; max: 254.000000; avg: 145.666667; stddev: 60.954468; ideal: 145.666672 (unit.kafka.consumer.PartitionAllocatorTest:68) [2014-07-25 17:00:38,065] INFO Owned count summary for 10652 partitions across 88 consumer ids (11 consumers with 8 streams): min: 4.000000; max: 335.000000; avg: 169.079365; stddev: 101.093266; ideal: 121.045456 (unit.kafka.consumer.PartitionAllocatorTest:68) [2014-07-25 17:02:07,198] INFO Owned count summary for 10839 partitions across 200 consumer ids (20 consumers with 10 streams): min: 3.000000; max: 330.000000; avg: 172.047619; stddev: 99.267223; ideal: 54.195000 (unit.kafka.consumer.PartitionAllocatorTest:68) [2014-07-25 17:24:35,676] INFO Owned count summary for 6439 partitions across 12 consumer ids (2 consumers with 6 streams): min: 445.000000; max: 626.000000; avg: 536.583333; stddev: 58.445714; ideal: 536.583313 (unit.kafka.consumer.PartitionAllocatorTest:68) [2014-07-25 17:24:36,787] INFO Owned count summary for 11777 partitions across 63 consumer ids (7 consumers with 9 streams): min: 5.000000; max: 369.000000; avg: 186.936508; stddev: 113.972531; ideal: 186.936508 (unit.kafka.consumer.PartitionAllocatorTest:68) [2014-07-25 17:25:20,108] INFO Owned count summary for 10488 partitions across 144 consumer ids (18 consumers with 8 streams): min: 8.000000; max: 335.000000; avg: 166.476190; stddev: 101.988433; ideal: 72.833336 (unit.kafka.consumer.PartitionAllocatorTest:68) [2014-07-25 17:33:52,532] INFO Owned count summary for 5783 partitions across 25 consumer ids (5 consumers with 5 streams): min: 141.000000; max: 336.000000; avg: 231.320000; stddev: 69.337171; ideal: 231.320007 (unit.kafka.consumer.PartitionAllocatorTest:68) [2014-07-25 17:33:53,268] INFO Owned count summary for 6181 partitions across 7 consumer ids (7 consumers with 1 streams): min: 801.000000; max: 980.000000; avg: 883.000000; stddev: 59.654561; ideal: 883.000000 (unit.kafka.consumer.PartitionAllocatorTest:68) [2014-07-25 17:33:56,124] INFO Owned count summary for 6475 partitions across 32 consumer ids (4 consumers with 8 streams): min: 105.000000; max: 299.000000; avg: 202.343750; stddev: 62.999544; ideal: 202.343750 (unit.kafka.consumer.PartitionAllocatorTest:68) [2014-07-25 17:35:10,370] INFO Owned count summary for 7739 partitions across 162 consumer ids (18 consumers with 9 streams): min: 6.000000; max: 239.000000; avg: 122.841270; stddev: 69.379788; ideal: 47.771606 (unit.kafka.consumer.PartitionAllocatorTest:68) [2014-07-25 17:35:11,834] INFO Owned count summary for 9070 partitions across 14 consumer ids (2 consumers with 7 streams): min: 520.000000; max: 774.000000; avg: 647.857143; stddev: 84.860843; ideal: 647.857117 (unit.kafka.consumer.PartitionAllocatorTest:68) [2014-07-25 17:36:37,935] INFO Owned count summary for 10933 partitions across 85 consumer ids (17 consumers with 5 streams): min: 5.000000; max: 350.000000; avg: 173.539683; stddev: 105.619192; ideal: 128.623535 (unit.kafka.consumer.PartitionAllocatorTest:68) [2014-07-25 17:36:40,641] INFO Owned count summary for 8665 partitions across 64 consumer ids (8 consumers with 8 streams): min: 4.000000; max: 267.000000; avg: 137.539683; stddev: 82.121434; ideal: 135.390625 (unit.kafka.consumer.PartitionAllocatorTest:68) [2014-07-25 17:36:42,612] INFO Owned count summary for 8432 partitions across 48 consumer ids (6 consumers with 8 streams): min: 68.000000; max: 328.000000; avg: 175.666667; stddev: 78.829828; ideal: 175.666672 (unit.kafka.consumer.PartitionAllocatorTest:68) [2014-07-25 17:36:44,010] INFO Owned count summary for 6899 partitions across 110 consumer ids (11 consumers with 10 streams): min: 2.000000; max: 204.000000; avg: 109.507937; stddev: 61.068146; ideal: 62.718182 (unit.kafka.consumer.PartitionAllocatorTest:68) [2014-07-25 17:37:57,128] INFO Owned count summary for 8622 partitions across 48 consumer ids (6 consumers with 8 streams): min: 61.000000; max: 324.000000; avg: 179.625000; stddev: 76.374114; ideal: 179.625000 (unit.kafka.consumer.PartitionAllocatorTest:68) [2014-07-25 17:37:58,873] INFO Owned count summary for 6288 partitions across 84 consumer ids (12 consumers with 7 streams): min: 1.000000; max: 200.000000; avg: 99.809524; stddev: 62.132236; ideal: 74.857140 (unit.kafka.consumer.PartitionAllocatorTest:68) [2014-07-25 17:37:59,913] INFO Owned count summary for 6467 partitions across 91 consumer ids (13 consumers with 7 streams): min: 5.000000; max: 200.000000; avg: 102.650794; stddev: 59.990096; ideal: 71.065933 (unit.kafka.consumer.PartitionAllocatorTest:68) [2014-07-25 17:38:01,621] INFO Owned count summary for 6311 partitions across 8 consumer ids (2 consumers with 4 streams): min: 716.000000; max: 869.000000; avg: 788.875000; stddev: 53.799728; ideal: 788.875000 (unit.kafka.consumer.PartitionAllocatorTest:68) {code} Will think more about it and reconsider keeping the specialized case (which is actually the common case - i.e., consume with wildcards and identical subscriptions). > Rebalance algorithm should consider partitions from all topics > -------------------------------------------------------------- > > Key: KAFKA-687 > URL: https://issues.apache.org/jira/browse/KAFKA-687 > Project: Kafka > Issue Type: Improvement > Affects Versions: 0.9.0 > Reporter: Pablo Barrera > Assignee: Joel Koshy > Attachments: KAFKA-687.patch, KAFKA-687_2014-07-18_15:55:15.patch > > > The current rebalance step, as stated in the original Kafka paper [1], splits > the partitions per topic between all the consumers. So if you have 100 topics > with 2 partitions each and 10 consumers only two consumers will be used. That > is, for each topic all partitions will be listed and shared between the > consumers in the consumer group in order (not randomly). > If the consumer group is reading from several topics at the same time it > makes sense to split all the partitions from all topics between all the > consumer. Following the example, we will have 200 partitions in total, 20 per > consumer, using the 10 consumers. > The load per topic could be different and the division should consider this. > However even a random division should be better than the current algorithm > while reading from several topics and should harm reading from a few topics > with several partitions. -- This message was sent by Atlassian JIRA (v6.2#6252)