[ 
https://issues.apache.org/jira/browse/KAFKA-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15313828#comment-15313828
 ] 

Yuto Kawamura commented on KAFKA-3775:
--------------------------------------

Thanks for feedback [~BigAndy] .

> With the purposed design some partitions would remain with out a consumer. 
> This seems like a fundamental switch away from Kafka's current model, and a 
> risky one in IMHO.

Some partitions would remain without a consumer *if the number of living 
instances become lower than the number of {{num of partitions / 
max.tasks.assigned}}*.
Let's say you have 100 partitions and launching 50 KafkaStreams instances with 
setting {{max.tasks.assigned=5}}. When you started all 50 instances each 
instance might get 2 partitions assigned, which is the desired distribution.
Then what will happen when an instance failed? 2 partitions which were held by 
the dead instance will be reassigned to remaining instances without any problem 
as other instances still have plenty number of {{max.tasks.assigned}}.
If more than 31 instances dead at the moment, yes, some partitions will be 
remain unassigned, but this is out of consideration as the value of 
{{max.tasks.assigned}} was determined with the consideration to the amount of 
system resources(CPU, mem, network bandwidth), which means these unassigned 
partitions could never be processed normally even they reassigned to the living 
instances because of hardware resource is limited.

> This seems like a fundamental switch away from Kafka's current model, and a 
> risky one in IMHO.

BTW, may I ask what you meant by "Kafka's current model" and what risk could 
you expect much concretely?(user won't noticed unassigned partitions existence?)

> Could you also elaborate on why settings such as 'max.poll.records' don't 
> help stop your initial instance going pop? Maybe there are other alternative 
> solutions here...

Because even I set {{max.poll.records}} to lower, it reduced the number of 
records fetched by single Fetch request but instead the number of Fetch request 
will be increased. That means the total throughput wouldn't chagne which still 
leads traffic bursting.
At the same time, it doesn't make sense to me that adjusting the value of 
{{max.poll.records}} with expecting that a single gets all partitions assigned, 
as I can set that value to much higher practically when other instances join 
the group and partitions are evenly distributed.


> Throttle maximum number of tasks assigned to a single KafkaStreams
> ------------------------------------------------------------------
>
>                 Key: KAFKA-3775
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3775
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 0.10.0.0
>            Reporter: Yuto Kawamura
>            Assignee: Yuto Kawamura
>             Fix For: 0.10.1.0
>
>
> As of today, if I start a Kafka Streams app on a single machine which 
> consists of single KafkaStreams instance, that instance gets all partitions 
> of the target topic assigned.
> As we're using it to process topics which has huge number of partitions and 
> message traffic, it is a problem that we don't have a way of throttling the 
> maximum amount of partitions assigned to a single instance.
> In fact, when we started a Kafka Streams app which consumes a topic which has 
> more than 10MB/sec traffic of each partition we saw that all partitions 
> assigned to the first instance and soon the app dead by OOM.
> I know that there's some workarounds considerable here. for example:
> - Start multiple instances at once so the partitions distributed evenly.
>   => Maybe works. but as Kafka Streams is a library but not an execution 
> framework, there's no predefined procedure of starting Kafka Streams apps so 
> some users might wanna take an option to start the first single instance and 
> check if it works as expected with lesster number of partitions(I want :p)
> - Adjust config parameters such as {{buffered.records.per.partition}}, 
> {{max.partition.fetch.bytes}} and {{max.poll.records}} to reduce the heap 
> pressure.
>   => Maybe works. but still have two problems IMO:
>   - Still leads traffic explosion with high throughput processing as it 
> accepts all incoming messages from hundreads of partitions.
>   - In the first place, by the distributed system principle, it's wired that 
> users don't have a away to control maximum "partitions" assigned to a single 
> shard(an instance of KafkaStreams here). Users should be allowed to provide 
> the maximum amount of partitions that is considered as possible to be 
> processed with single instance(or host).
> Here, I'd like to introduce a new configuration parameter 
> {{max.tasks.assigned}}, which limits the number of tasks(a notion of 
> partition) assigned to the processId(which is the notion of single 
> KafkaStreams instance).
> At the same time we need to change StreamPartitionAssignor(TaskAssignor) to 
> tolerate the incomplete assignment. That is, Kafka Streams should continue 
> working for the part of partitions even there are some partitions left 
> unassigned, in order to satisfy this> "user may want to take an option to 
> start the first single instance and check if it works as expected with 
> lesster number of partitions(I want :p)".
> I've implemented the rough POC for this. PTAL and if it make sense I will 
> continue sophisticating it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to