[ 
https://issues.apache.org/jira/browse/KAFKA-14382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17679346#comment-17679346
 ] 

Guozhang Wang commented on KAFKA-14382:
---------------------------------------

Thanks for catching this bug [~ableegoldman]! I'm late reviewing the PR but I 
agree with your general case description still. And I think we already have the 
tools to solve the fundamentals as well:

1) When we have the consumer thread refactoring done (hence that's why I also 
add the corresponding label), rebalance would be done by the background thread 
completely and not relying on Streams to call `poll` in time at all. To 
validate that the caller thread is still alive, we still need to call it within 
the max.poll.call, but nothing else like the rebalance related timeous would 
matter.

2) When we have restoration (a heavy IO operation) to the separate thread, we 
should also see the likelihood that the stream thread stuck and not being able 
to call `poll` in time much less as well.

> StreamThreads can miss rebalance events when processing records during a 
> rebalance
> ----------------------------------------------------------------------------------
>
>                 Key: KAFKA-14382
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14382
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: A. Sophie Blee-Goldman
>            Assignee: A. Sophie Blee-Goldman
>            Priority: Major
>              Labels: new-consumer-threading-should-fix, rebalancing
>             Fix For: 3.4.0, 3.3.2, 3.2.4, 3.1.3, 3.0.3
>
>
> One of the main improvements introduced by the cooperative protocol was the 
> ability to continue processing records during a rebalance. In Streams, we 
> take advantage of this by polling with a timeout of 0 when a rebalance is/has 
> been in progress, so it can return immediately and continue on through the 
> main loop to process new records. The main poll loop uses an algorithm based 
> on the max.poll.interval.ms to ensure the StreamThread returns to call #poll 
> in time to stay in the consumer group.
>  
> Generally speaking, it should exit the processing loop and invoke poll within 
> a few minutes at most based on the poll interval, though typically it will 
> break out much sooner once it's used up all the records from the last poll 
> (based on the max.poll.records config which Streams sets to 1,000 by 
> default). However, if doing heavy processing or setting a higher 
> max.poll.records, the thread may continue processing for more than a few 
> seconds. If it had sent out a JoinGroup request before going on to process 
> and was waiting for its JoinGroup response, then once it does return to 
> invoke #poll it will process this response and send out a SyncGroup – but if 
> the processing took too long, this SyncGroup may immediately fail with the 
> REBALANCE_IN_PROGRESS error.
>  
> Essentially, while the thread was processing the group leader will itself be 
> processing the JoinGroup subscriptions of all members and generating an 
> assignment, then sending this back in its SyncGroup. This may take only a few 
> seconds or less, and the group coordinator will not yet have noticed (or 
> care) that one of the consumers hasn't sent a SyncGroup – it will just return 
> the assigned partitions in the SyncGroup request of the members who have 
> responded in time, and "complete" the rebalance in their eyes. But if the 
> assignment involved moving any partitions from one consumer to another, then 
> it will need to trigger a followup rebalance right away to finish assigning 
> those partitions which were revoked in the previous rebalance. This is what 
> causes a new rebalance to be kicked off just seconds after the first one 
> began.
>  
> If the consumer that was stuck processing was among those who needed to 
> revoke partitions, this can lead to repeating rebalances – since it fails the 
> SyncGroup of the 1st rebalance it never receives the assignment for it and 
> never knows to revoke those partitions, meaning it will rejoin for the new 
> rebalance still claiming them among its ownedPartitions. When the assignor 
> generates the same assignment for the 2nd rebalance, it will again see that 
> some partitions need to be revoked and will therefore trigger yet another new 
> rebalance after finishing the 2nd. This can go on for as long as the 
> StreamThreads are struggling to finish the JoinGroup phase in time due to 
> processing.
>  
> Note that the best workaround at the moment is probably to just set a lower 
> max.poll.records to reduce the processing loop duration



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to