[
https://issues.apache.org/jira/browse/KAFKA-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yuto Kawamura updated KAFKA-3775:
---------------------------------
Resolution: Won't Fix
Status: Resolved (was: Patch Available)
> Throttle maximum number of tasks assigned to a single KafkaStreams
> ------------------------------------------------------------------
>
> Key: KAFKA-3775
> URL: https://issues.apache.org/jira/browse/KAFKA-3775
> Project: Kafka
> Issue Type: Improvement
> Components: streams
> Affects Versions: 0.10.0.0
> Reporter: Yuto Kawamura
> Assignee: Yuto Kawamura
> Fix For: 0.10.1.0
>
>
> As of today, if I start a Kafka Streams app on a single machine which
> consists of single KafkaStreams instance, that instance gets all partitions
> of the target topic assigned.
> As we're using it to process topics which has huge number of partitions and
> message traffic, it is a problem that we don't have a way of throttling the
> maximum amount of partitions assigned to a single instance.
> In fact, when we started a Kafka Streams app which consumes a topic which has
> more than 10MB/sec traffic of each partition we saw that all partitions
> assigned to the first instance and soon the app dead by OOM.
> I know that there's some workarounds considerable here. for example:
> - Start multiple instances at once so the partitions distributed evenly.
> => Maybe works. but as Kafka Streams is a library but not an execution
> framework, there's no predefined procedure of starting Kafka Streams apps so
> some users might wanna take an option to start the first single instance and
> check if it works as expected with lesster number of partitions(I want :p)
> - Adjust config parameters such as {{buffered.records.per.partition}},
> {{max.partition.fetch.bytes}} and {{max.poll.records}} to reduce the heap
> pressure.
> => Maybe works. but still have two problems IMO:
> - Still leads traffic explosion with high throughput processing as it
> accepts all incoming messages from hundreads of partitions.
> - In the first place, by the distributed system principle, it's wired that
> users don't have a away to control maximum "partitions" assigned to a single
> shard(an instance of KafkaStreams here). Users should be allowed to provide
> the maximum amount of partitions that is considered as possible to be
> processed with single instance(or host).
> Here, I'd like to introduce a new configuration parameter
> {{max.tasks.assigned}}, which limits the number of tasks(a notion of
> partition) assigned to the processId(which is the notion of single
> KafkaStreams instance).
> At the same time we need to change StreamPartitionAssignor(TaskAssignor) to
> tolerate the incomplete assignment. That is, Kafka Streams should continue
> working for the part of partitions even there are some partitions left
> unassigned, in order to satisfy this> "user may want to take an option to
> start the first single instance and check if it works as expected with
> lesster number of partitions(I want :p)".
> I've implemented the rough POC for this. PTAL and if it make sense I will
> continue sophisticating it.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)