[
https://issues.apache.org/jira/browse/KAFKA-13435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
David Jacot resolved KAFKA-13435.
---------------------------------
Reviewer: Jason Gustafson
Resolution: Fixed
> Static membership protocol should let the leader skip assignment (KIP-814)
> --------------------------------------------------------------------------
>
> Key: KAFKA-13435
> URL: https://issues.apache.org/jira/browse/KAFKA-13435
> Project: Kafka
> Issue Type: Bug
> Components: consumer
> Affects Versions: 3.0.0
> Reporter: Ryan Leslie
> Assignee: David Jacot
> Priority: Critical
> Labels: new-rebalance-should-fix
> Fix For: 3.2.0
>
>
> When using consumer groups with static membership, if the consumer marked as
> leader has restarted, then metadata changes such as partition increase are
> not triggering expected rebalances.
> To reproduce this issue, simply:
> # Create a static consumer subscribed to a single topic
> # Close the consumer and create a new one with the same group instance id
> # Increase partitions for the topic
> # Observe that no rebalance occurs and the new partitions are not assigned
> I have only tested this in 2.7, but it may apply to newer versions as well.
> h3. Analysis
> In {_}ConsumerCoordinator{_}, one responsibility of the leader consumer is to
> track metadata and trigger a rebalance if there are changes such as new
> partitions added:
> [https://github.com/apache/kafka/blob/43bcc5682da82a602a4c0a000dc7433d0507b450/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L793]
> {code:java}
> if (assignmentSnapshot != null &&
> !assignmentSnapshot.matches(metadataSnapshot)) {
> ...
> requestRejoinIfNecessary(reason);
> return true;
> }
> {code}
> Note thatĀ _assignmentSnapshot_ is currently only set if the consumer is the
> leader:
> [https://github.com/apache/kafka/blob/43bcc5682da82a602a4c0a000dc7433d0507b450/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L353]
> {code:java}
> // Only the leader is responsible for monitoring for metadata changes (i.e.
> partition changes)
> if (!isLeader)
> assignmentSnapshot = null;
> {code}
> And _isLeader_ is only true after an assignment is performed during a
> rebalance:
> [https://github.com/apache/kafka/blob/43bcc5682da82a602a4c0a000dc7433d0507b450/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L634]
> That is, when a consumer group forms, exactly one consumer in the group
> should haveĀ _isLeader == True_ and be responsible for triggering rebalances
> on metadata changes.
> However, in the case of static membership, if the leader has been restarted
> and rejoined the group, the group essentially no longer has a current leader.
> Even though the metadata changes are fetched, no rebalance will be triggered.
> That is, _isLeader_ will be false for all members.
> This issue does not resolve until after an actual group change that causes a
> proper rebalance. In order to safely make a partition increase when using
> static membership, consumers must be stopped and have timed out, or forcibly
> removed with {_}AdminClient.removeMembersFromConsumerGroup(){_}.
> Correcting this in the client probably also requires help from the broker.
> Currently, when a static consumer that is leader is restarted, the
> coordinator does recognize the change:
> e.g. leader _bbfcb930-61a3-4d21-945c-85f4576490ff_ was restarted
> {noformat}
> [2021-11-04 13:53:13,487] INFO [GroupCoordinator 4]: Static member
> Some(1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0) of group ryan_test
> with unknown member id rejoins, assigning new member id
> 1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0-af88ecf2-
> 6ebf-47da-95ef-c54fef17ab74, while old member id
> 1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0-bbfcb930-61a3-4d21-945c-85f4576490ff
> will be removed. (
> kafka.coordinator.group.GroupCoordinator){noformat}
> However, it does not attempt to update the leader id since this isn't a new
> rebalance, and JOIN_GROUP will continue returning the now stale member id as
> leader:
> {noformat}
> 2021-11-04 13:53:13,490 DEBUG o.a.k.c.c.i.AbstractCoordinator [Consumer
> instanceId=1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0,
> clientId=1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0,
> groupId=ryan_test] Received successful JoinGroup response:
> JoinGroupResponseData(throttleTimeMs=0, errorCode=0, generationId=40,
> protocolType='consumer', protocolName='range',
> leader='1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0-bbfcb930-61a3-4d21-945c-85f4576490ff',
>
> memberId='1GK7DRJPHZ0LRV91Y4D3SYHS5928XHXJQ6263GT26V5P70QX0-af88ecf2-6ebf-47da-95ef-c54fef17ab74',
> members=[]){noformat}
> This means that it's not easy for any particular restarted member to identify
> that it should consider itself leader and handle metadata changes.
> There is reference to the difficulty of leader restarts in KAFKA-7728 but the
> focus seemed mainly on avoiding needless rebalances for static members. That
> goal was accomplished, but this issue seems to be a side effect of both not
> rebalancing AND not having the rejoined member reclaim its leadership status.
> Also, I have not verified if it's strictly related or valid, but noticed this
> ticket has been opened too: KAFKA-12759.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)