[
https://issues.apache.org/jira/browse/KAFKA-14024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Guozhang Wang reassigned KAFKA-14024:
-------------------------------------
Assignee: Shawn Wang (was: Guozhang Wang)
> Consumer stuck during cooperative rebalance for Commit offset in onJoinPrepare
> ------------------------------------------------------------------------------
>
> Key: KAFKA-14024
> URL: https://issues.apache.org/jira/browse/KAFKA-14024
> Project: Kafka
> Issue Type: Bug
> Components: clients
> Affects Versions: 3.2.0
> Reporter: Shawn Wang
> Assignee: Shawn Wang
> Priority: Blocker
> Labels: new-consumer-threading-should-fix
> Fix For: 3.3.0, 3.2.1
>
>
> Hi
> In https://issues.apache.org/jira/browse/KAFKA-13310. we tried to fix a issue
> that consumer#poll(duration) will be returned after the provided duration.
> It's because if rebalance needed, we'll try to commit current offset first
> before rebalance synchronously. And if the offset committing takes too long,
> the consumer#poll will spend more time than provided duration. To fix that,
> we change commit sync with commit async before rebalance (i.e. onPrepareJoin).
>
> However, in this ticket, we found the async commit will keep sending a new
> commit request during each Consumer#poll, because the offset commit never
> completes in time. The impact is that the existing consumer will be kicked
> out of the group after rebalance timeout without joining the group. That is,
> suppose we have consumer A in group G, and now consumer B joined the group,
> after the rebalance, only consumer B in the group.
>
> The workaround for this issue is to change the assignor back to eager
> assignors, ex: StickyAssignor, RoundRobinAssignor.
>
> To fix the issue, we come out 2 solutions:
> # we can explicitly wait for the async commit complete in onPrepareJoin, but
> that would let the KAFKA-13310 issue happen again.
> # 2.we can try to keep the async commit offset future currently inflight. So
> that we can make sure each Consumer#poll, we are waiting for the future
> completes
>
> Besides, there's also another bug found during fixing this bug. Before
> KAFKA-13310, we commitOffset sync with rebalanceTimeout, which will retry
> when retriable error until timeout. After KAFKA-13310, we thought we have
> retry, but we'll retry after partitions revoking. That is, even though the
> retried offset commit successfully, it still causes some partitions offsets
> un-committed, and after rebalance, other consumers will consume overlapping
> records.
>
>
> ===
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L752]
>
> we didn't wait for client to receive commit offset response here, so
> onJoinPrepareAsyncCommitCompleted will be false in cooperative rebalance, and
> client will loop in invoking onJoinPrepare.
> I think the EAGER mode don't have this problem because it will revoke the
> partitions even if onJoinPrepareAsyncCommitCompleted=false and will not try
> to commit next round.
> reproduce:
> * single node Kafka version 3.2.0 && client version 3.2.0
> * topic1 have 5 partititons
> * start a consumer1 (cooperative rebalance)
> * start another consumer2 (same consumer group)
> * consumer1 will hang for a long time before re-join
> * from server log consumer1 rebalance timeout before joineGroup and re-join
> with another memberId
> consume1's log keeps printing:
> 16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer
> clientId=consumer-xx-1, groupId=xxx] Executing onJoinPrepare with generation
> 54 and memberId consumer-xxx-1-fd3d04a8-009a-4ed1-949e-71b636716938
> (ConsumerCoordinator.java:739)
> 16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer
> clientId=consumer-xxx-1, groupId=xxx] Sending asynchronous auto-commit of
> offsets \{topic1-4=OffsetAndMetadata{offset=5, leaderEpoch=0, metadata=''}}
> (ConsumerCoordinator.java:1143)
>
> and coordinator's log:
> [2022-06-26 17:00:13,855] INFO [GroupCoordinator 0]: Preparing to rebalance
> group xxx in state PreparingRebalance with old generation 56
> (__consumer_offsets-30) (reason: Adding new member
> consumer-xxx-1-fa7fe5ec-bd2f-42f6-b5d7-c5caeafe71ac with group instance id
> None; client reason: rebalance failed due to 'The group member needs to have
> a valid member id before actually entering a consumer group.'
> (MemberIdRequiredException)) (kafka.coordinator.group.GroupCoordinator)
> [2022-06-26 17:00:43,855] INFO [GroupCoordinator 0]: Group xxx removed
> dynamic members who haven't joined:
> Set(consumer-xxx-1-d62a0923-6ca6-48dd-a84e-f97136d4603a)
> (kafka.coordinator.group.GroupCoordinator)
> [2022-06-26 17:00:43,856] INFO [GroupCoordinator 0]: Stabilized group xxx
> generation 57 (__consumer_offsets-30) with 3 members
> (kafka.coordinator.group.GroupCoordinator)
> [2022-06-26 17:00:44,048] INFO [GroupCoordinator 0]: Dynamic member with
> unknown member id joins group xxx in CompletingRebalance state. Created a new
> member id consumer-xxx-1-f0298aa0-711c-498e-bdfd-1dd205d7b640 and request the
> member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
> [2022-06-26 17:00:44,053] INFO [GroupCoordinator 0]: Assignment received from
> leader consumer-xxx-1-e842a14c-eff7-4b55-9463-72b9c2534afd for group xxx for
> generation 57. The group has 3 members, 0 of which are static.
> (kafka.coordinator.group.GroupCoordinator)
> [2022-06-26 17:00:44,243] INFO [GroupCoordinator 0]: Preparing to rebalance
> group xxx in state PreparingRebalance with old generation 57
> (__consumer_offsets-30) (reason: Adding new member
> consumer-xxx-1-f0298aa0-711c-498e-bdfd-1dd205d7b640 with group instance id
> None; client reason: rebalance failed due to 'The group member needs to have
> a valid member id before actually entering a consumer group.'
> (MemberIdRequiredException)) (kafka.coordinator.group.GroupCoordinator)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)