[
https://issues.apache.org/jira/browse/KAFKA-3559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15547089#comment-15547089
]
Guozhang Wang commented on KAFKA-3559:
--------------------------------------
Here are some more thoughts on this issue and how we can improve the situation:
Currently with Kafka Streams each rebalance is expensive, even if it is only
"partial" (i.e. only a few of the non-leader members in the consumer group has
decided to re-join, which will not trigger a full rebalance but only will cause
the coordinator to send back the assignment again), since anyways
{{onPartitionRevoked}} and {{onPartitionAssigned}} will be triggered, closing
and (re-)constructing the tasks. For example, on my local (a very small)
laptop, with a complex topology containing 10+ stores and 15+ internal topics,
with 3 threads on rebalance could take up to 20 seconds.
On the other hand, we want to close the tasks in {{onPartitionRevoked}} before
the synchronization barrier only because threads may hold some file locks
related to these tasks. And since tasks are all committed right before closing,
I think it is safe to delay the destruction of tasks so that we may be able to
save the time of closing / reconstructing such tasks. More specifically:
1. In {{onPartitionRevoked}}, instead of closing the tasks, we only need to
commit the tasks and "pause" them by calling their topology processor's newly
added {{flush}} calls, releasing the corresponding file locks of the tasks: in
fact, it is automatically done since we will not process any messages during
the rebalance anyways.
2. Then in {{onPartitionAssigned}}, we can if there are any tasks that have
really been migrated out of the thread; for those tasks, closing them (and note
that since these tasks are already committed in {{onPartitionRevoked}}, closing
them will only involve calling the topology processor's {{close}} function, as
well as closing the state stores), otherwise "resume" processing.
We need to think through some minor issues such as the above mentioned file
locks for persistent state stores, how clean-up will work without introducing
deadlocks, etc. But I think in general this solution should work.
> Task creation time taking too long in rebalance callback
> --------------------------------------------------------
>
> Key: KAFKA-3559
> URL: https://issues.apache.org/jira/browse/KAFKA-3559
> Project: Kafka
> Issue Type: Improvement
> Components: streams
> Reporter: Guozhang Wang
> Assignee: Eno Thereska
> Labels: architecture
> Fix For: 0.10.2.0
>
>
> Currently in Kafka Streams, we create stream tasks upon getting newly
> assigned partitions in rebalance callback function {code} onPartitionAssigned
> {code}, which involves initialization of the processor state stores as well
> (including opening the rocksDB, restore the store from changelog, etc, which
> takes time).
> With a large number of state stores, the initialization time itself could
> take tens of seconds, which usually is larger than the consumer session
> timeout. As a result, when the callback is completed, the consumer is already
> treated as failed by the coordinator and rebalance again.
> We need to consider if we can optimize the initialization process, or move it
> out of the callback function, and while initializing the stores one-by-one,
> use poll call to send heartbeats to avoid being kicked out by coordinator.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)