[ 
https://issues.apache.org/jira/browse/KAFKA-687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14077480#comment-14077480
 ] 

Joel Koshy commented on KAFKA-687:
----------------------------------

Short update on this:

After the initial review comments, I was trying to make the allocation module 
more generic so we can reuse it in the new consumer. Furthermore, I was trying 
to get rid of the "symmetric" mode (which is for wildcards only and with 
identical subscriptions across all consumers) and make "roundrobin" more 
general. The basic approach was to sort the consumer IDs based on a hash of the 
consumerID with the topic appended to it - effectively scrambling (in a 
consistent order) the list of consumer streams available for a given topic - 
and then doing a round-robin assignment across available partitions of the 
topic. This did not actually work as well as expected. Here is the output of 
some simulations:
{code}
    [2014-07-25 17:00:35,559] INFO Owned count summary for 6284 partitions 
across 63 consumer ids (9 consumers with 7 streams): min: 8.000000; max: 
200.000000; avg: 99.746032; stddev: 58.871914; ideal: 99.746033 
(unit.kafka.consumer.PartitionAllocatorTest:68)
    [2014-07-25 17:00:36,791] INFO Owned count summary for 6118 partitions 
across 42 consumer ids (7 consumers with 6 streams): min: 57.000000; max: 
254.000000; avg: 145.666667; stddev: 60.954468; ideal: 145.666672 
(unit.kafka.consumer.PartitionAllocatorTest:68)
    [2014-07-25 17:00:38,065] INFO Owned count summary for 10652 partitions 
across 88 consumer ids (11 consumers with 8 streams): min: 4.000000; max: 
335.000000; avg: 169.079365; stddev: 101.093266; ideal: 121.045456 
(unit.kafka.consumer.PartitionAllocatorTest:68)
    [2014-07-25 17:02:07,198] INFO Owned count summary for 10839 partitions 
across 200 consumer ids (20 consumers with 10 streams): min: 3.000000; max: 
330.000000; avg: 172.047619; stddev: 99.267223; ideal: 54.195000 
(unit.kafka.consumer.PartitionAllocatorTest:68)
    [2014-07-25 17:24:35,676] INFO Owned count summary for 6439 partitions 
across 12 consumer ids (2 consumers with 6 streams): min: 445.000000; max: 
626.000000; avg: 536.583333; stddev: 58.445714; ideal: 536.583313 
(unit.kafka.consumer.PartitionAllocatorTest:68)
    [2014-07-25 17:24:36,787] INFO Owned count summary for 11777 partitions 
across 63 consumer ids (7 consumers with 9 streams): min: 5.000000; max: 
369.000000; avg: 186.936508; stddev: 113.972531; ideal: 186.936508 
(unit.kafka.consumer.PartitionAllocatorTest:68)
    [2014-07-25 17:25:20,108] INFO Owned count summary for 10488 partitions 
across 144 consumer ids (18 consumers with 8 streams): min: 8.000000; max: 
335.000000; avg: 166.476190; stddev: 101.988433; ideal: 72.833336 
(unit.kafka.consumer.PartitionAllocatorTest:68)
    [2014-07-25 17:33:52,532] INFO Owned count summary for 5783 partitions 
across 25 consumer ids (5 consumers with 5 streams): min: 141.000000; max: 
336.000000; avg: 231.320000; stddev: 69.337171; ideal: 231.320007 
(unit.kafka.consumer.PartitionAllocatorTest:68)
    [2014-07-25 17:33:53,268] INFO Owned count summary for 6181 partitions 
across 7 consumer ids (7 consumers with 1 streams): min: 801.000000; max: 
980.000000; avg: 883.000000; stddev: 59.654561; ideal: 883.000000 
(unit.kafka.consumer.PartitionAllocatorTest:68)
    [2014-07-25 17:33:56,124] INFO Owned count summary for 6475 partitions 
across 32 consumer ids (4 consumers with 8 streams): min: 105.000000; max: 
299.000000; avg: 202.343750; stddev: 62.999544; ideal: 202.343750 
(unit.kafka.consumer.PartitionAllocatorTest:68)
    [2014-07-25 17:35:10,370] INFO Owned count summary for 7739 partitions 
across 162 consumer ids (18 consumers with 9 streams): min: 6.000000; max: 
239.000000; avg: 122.841270; stddev: 69.379788; ideal: 47.771606 
(unit.kafka.consumer.PartitionAllocatorTest:68)
    [2014-07-25 17:35:11,834] INFO Owned count summary for 9070 partitions 
across 14 consumer ids (2 consumers with 7 streams): min: 520.000000; max: 
774.000000; avg: 647.857143; stddev: 84.860843; ideal: 647.857117 
(unit.kafka.consumer.PartitionAllocatorTest:68)
    [2014-07-25 17:36:37,935] INFO Owned count summary for 10933 partitions 
across 85 consumer ids (17 consumers with 5 streams): min: 5.000000; max: 
350.000000; avg: 173.539683; stddev: 105.619192; ideal: 128.623535 
(unit.kafka.consumer.PartitionAllocatorTest:68)
    [2014-07-25 17:36:40,641] INFO Owned count summary for 8665 partitions 
across 64 consumer ids (8 consumers with 8 streams): min: 4.000000; max: 
267.000000; avg: 137.539683; stddev: 82.121434; ideal: 135.390625 
(unit.kafka.consumer.PartitionAllocatorTest:68)
    [2014-07-25 17:36:42,612] INFO Owned count summary for 8432 partitions 
across 48 consumer ids (6 consumers with 8 streams): min: 68.000000; max: 
328.000000; avg: 175.666667; stddev: 78.829828; ideal: 175.666672 
(unit.kafka.consumer.PartitionAllocatorTest:68)
    [2014-07-25 17:36:44,010] INFO Owned count summary for 6899 partitions 
across 110 consumer ids (11 consumers with 10 streams): min: 2.000000; max: 
204.000000; avg: 109.507937; stddev: 61.068146; ideal: 62.718182 
(unit.kafka.consumer.PartitionAllocatorTest:68)
    [2014-07-25 17:37:57,128] INFO Owned count summary for 8622 partitions 
across 48 consumer ids (6 consumers with 8 streams): min: 61.000000; max: 
324.000000; avg: 179.625000; stddev: 76.374114; ideal: 179.625000 
(unit.kafka.consumer.PartitionAllocatorTest:68)
    [2014-07-25 17:37:58,873] INFO Owned count summary for 6288 partitions 
across 84 consumer ids (12 consumers with 7 streams): min: 1.000000; max: 
200.000000; avg: 99.809524; stddev: 62.132236; ideal: 74.857140 
(unit.kafka.consumer.PartitionAllocatorTest:68)
    [2014-07-25 17:37:59,913] INFO Owned count summary for 6467 partitions 
across 91 consumer ids (13 consumers with 7 streams): min: 5.000000; max: 
200.000000; avg: 102.650794; stddev: 59.990096; ideal: 71.065933 
(unit.kafka.consumer.PartitionAllocatorTest:68)
    [2014-07-25 17:38:01,621] INFO Owned count summary for 6311 partitions 
across 8 consumer ids (2 consumers with 4 streams): min: 716.000000; max: 
869.000000; avg: 788.875000; stddev: 53.799728; ideal: 788.875000 
(unit.kafka.consumer.PartitionAllocatorTest:68)
{code}

Will think more about it and reconsider keeping the specialized case (which is 
actually the common case - i.e., consume with wildcards and identical 
subscriptions).

> Rebalance algorithm should consider partitions from all topics
> --------------------------------------------------------------
>
>                 Key: KAFKA-687
>                 URL: https://issues.apache.org/jira/browse/KAFKA-687
>             Project: Kafka
>          Issue Type: Improvement
>    Affects Versions: 0.9.0
>            Reporter: Pablo Barrera
>            Assignee: Joel Koshy
>         Attachments: KAFKA-687.patch, KAFKA-687_2014-07-18_15:55:15.patch
>
>
> The current rebalance step, as stated in the original Kafka paper [1], splits 
> the partitions per topic between all the consumers. So if you have 100 topics 
> with 2 partitions each and 10 consumers only two consumers will be used. That 
> is, for each topic all partitions will be listed and shared between the 
> consumers in the consumer group in order (not randomly).
> If the consumer group is reading from several topics at the same time it 
> makes sense to split all the partitions from all topics between all the 
> consumer. Following the example, we will have 200 partitions in total, 20 per 
> consumer, using the 10 consumers.
> The load per topic could be different and the division should consider this. 
> However even a random division should be better than the current algorithm 
> while reading from several topics and should harm reading from a few topics 
> with several partitions.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to