[ 
https://issues.apache.org/jira/browse/KAFKA-14639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17705734#comment-17705734
 ] 

Guozhang Wang commented on KAFKA-14639:
---------------------------------------

As for the fix.. it's quite tough, since the ultimate and best solution is the 
new rebalance protocol KIP, and if we want to have a near-term fix, it has to 
be only touching on the client side, not the server side. That means the 
suggestions on the other ticket cannot apply.

The trickiness is that, when a sync-group request got a REBALANCE_IN_PROGRESS 
error for generation X, it means that (let's take the conversion case of 
COORDINATOR_LOAD_IN_PROGRESS out for the moment) the consumer did not actually 
miss a rebalance, it's just that the next generation X+1 has started; HOWEVER, 
its current owned partitions is not from generation X, but from an older 
version, say X-1 (since it did not get the actual assignment for generation X), 
and hence when it sends out that owned partitions of generation X-1, it is 
likely to be discarded by the coordinators. Without fixing the broker-side, 
this trickiness is hard to resolve.

I'd suggest we coordinate with the folks working on the KIP (a.k.a. on the 
broker-side) for the fix at the near term. If everyone agrees this is a good 
issue to be remedied before the KIP is out, I'd suggest we took the 
broker-change, a.k.a. the description in KAFKA-14016 by Shawn. 

If that's considered a too short fix given the KIP, another fix could be, that 
on the assignor side, do not discard the old generation's owned partitions if 
those partitions are NOT claimed by anyone else --- admittedly this is a risky 
(think about a partition owned by host A in generation X, and then assigned to 
B in generation X+1 which host A missed, and then host B missed generation X+2 
in which case it's given back to A, in which case we may see that partitions 
consuming offsets "going backwards") fix but may give us the runway until the 
new rebalance protocol is out. Personally I do not like it and would only 
consider as a last resort.

> Kafka CooperativeStickyAssignor revokes/assigns partition in one rebalance 
> cycle
> --------------------------------------------------------------------------------
>
>                 Key: KAFKA-14639
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14639
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, consumer
>    Affects Versions: 3.2.1
>            Reporter: Bojan Blagojevic
>            Assignee: Philip Nee
>            Priority: Major
>         Attachments: consumers-jira.log
>
>
> I have an application that runs 6 consumers in parallel. I am getting some 
> unexpected results when I use {{{}CooperativeStickyAssignor{}}}. If I 
> understand the mechanism correctly, if the consumer looses partition in one 
> rebalance cycle, the partition should be assigned in the next rebalance cycle.
> This assumption is based on the 
> [RebalanceProtocol|https://kafka.apache.org/31/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.RebalanceProtocol.html]
>  documentation and few blog posts that describe the protocol, like [this 
> one|https://www.confluent.io/blog/cooperative-rebalancing-in-kafka-streams-consumer-ksqldb/]
>  on Confluent blog.
> {quote}The assignor should not reassign any owned partitions immediately, but 
> instead may indicate consumers the need for partition revocation so that the 
> revoked partitions can be reassigned to other consumers in the next rebalance 
> event. This is designed for sticky assignment logic which attempts to 
> minimize partition reassignment with cooperative adjustments.
> {quote}
> {quote}Any member that revoked partitions then rejoins the group, triggering 
> a second rebalance so that its revoked partitions can be assigned. Until 
> then, these partitions are unowned and unassigned.
> {quote}
> These are the logs from the application that uses 
> {{{}protocol='cooperative-sticky'{}}}. In the same rebalance cycle 
> ({{{}generationId=640{}}}) {{partition 74}} moves from {{consumer-3}} to 
> {{{}consumer-4{}}}. I omitted the lines that are logged by the other 4 
> consumers.
> Mind that the log is in reverse(bottom to top)
> {code:java}
> 2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler1 : New 
> partition assignment: partition-59, seek to min common offset: 85120524
> 2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler2 : Partitions 
> [partition-59] assigned successfully
> 2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler1 : Partitions 
> assigned: [partition-59]
> 2022-12-14 11:18:24 1 — [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer clientId=partition-3-my-client-id-my-group-id, 
> groupId=my-group-id] Adding newly assigned partitions: partition-59
> 2022-12-14 11:18:24 1 — [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer clientId=partition-3-my-client-id-my-group-id, 
> groupId=my-group-id] Notifying assignor about the new 
> Assignment(partitions=[partition-59])
> 2022-12-14 11:18:24 1 — [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer clientId=partition-3-my-client-id-my-group-id, 
> groupId=my-group-id] Request joining group due to: need to revoke partitions 
> [partition-26, partition-74] as indicated by the current assignment and 
> re-join
> 2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler2 : Partitions 
> [partition-26, partition-74] revoked successfully
> 2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler1 : Finished 
> removing partition data
> 2022-12-14 11:18:24 1 — [consumer-4] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer clientId=partition-4-my-client-id-my-group-id, 
> groupId=my-group-id] (Re-)joining group
> 2022-12-14 11:18:24 1 — [consumer-4] x.y.z.MyRebalanceHandler1 : New 
> partition assignment: partition-74, seek to min common offset: 107317730
> 2022-12-14 11:18:24 1 — [consumer-4] x.y.z.MyRebalanceHandler2 : Partitions 
> [partition-74] assigned successfully
> 2022-12-14 11:18:24 1 — [consumer-4] x.y.z.MyRebalanceHandler1 : Partitions 
> assigned: [partition-74]
> 2022-12-14 11:18:24 1 — [consumer-4] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer clientId=partition-4-my-client-id-my-group-id, 
> groupId=my-group-id] Adding newly assigned partitions: partition-74
> 2022-12-14 11:18:24 1 — [consumer-4] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer clientId=partition-4-my-client-id-my-group-id, 
> groupId=my-group-id] Notifying assignor about the new 
> Assignment(partitions=[partition-74])
> 2022-12-14 11:18:24 1 — [consumer-4] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer clientId=partition-4-my-client-id-my-group-id, 
> groupId=my-group-id] Request joining group due to: need to revoke partitions 
> [partition-57] as indicated by the current assignment and re-join
> 2022-12-14 11:18:24 1 — [consumer-4] x.y.z.MyRebalanceHandler2 : Partitions 
> [partition-57] revoked successfully
> 2022-12-14 11:18:24 1 — [consumer-4] x.y.z.MyRebalanceHandler1 : Finished 
> removing partition data
> 2022-12-14 11:18:22 1 — [consumer-3] x.y.z.MyRebalanceHandler1 : Partitions 
> revoked: [partition-26, partition-74]
> 2022-12-14 11:18:22 1 — [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer clientId=partition-3-my-client-id-my-group-id, 
> groupId=my-group-id] Revoke previously assigned partitions partition-26, 
> partition-74
> 2022-12-14 11:18:22 1 — [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer clientId=partition-3-my-client-id-my-group-id, 
> groupId=my-group-id] Updating assignment with\n\tAssigned partitions: 
> [partition-59]\n\tCurrent owned partitions: [partition-26, 
> partition-74]\n\tAdded partitions (assigned - owned): 
> [partition-59]\n\tRevoked partitions (owned - assigned): [partition-26, 
> partition-74]
> 2022-12-14 11:18:22 1 — [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer clientId=partition-3-my-client-id-my-group-id, 
> groupId=my-group-id] Successfully synced group in generation 
> Generation{generationId=640, 
> memberId='partition-3-my-client-id-my-group-id-c31afd19-3f22-43cb-ad07-9088aa98d3af',
>  protocol='cooperative-sticky'}
> 2022-12-14 11:18:22 1 — [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer clientId=partition-3-my-client-id-my-group-id, 
> groupId=my-group-id] Successfully joined group with generation 
> Generation{generationId=640, 
> memberId='partition-3-my-client-id-my-group-id-c31afd19-3f22-43cb-ad07-9088aa98d3af',
>  protocol='cooperative-sticky'}
> 2022-12-14 11:18:22 1 — [consumer-4] x.y.z.MyRebalanceHandler1 : Partitions 
> revoked: [partition-57]
> 2022-12-14 11:18:22 1 — [consumer-4] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer clientId=partition-4-my-client-id-my-group-id, 
> groupId=my-group-id] Revoke previously assigned partitions partition-57
> 2022-12-14 11:18:22 1 — [consumer-4] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer clientId=partition-4-my-client-id-my-group-id, 
> groupId=my-group-id] Updating assignment with\n\tAssigned partitions: 
> [partition-74]\n\tCurrent owned partitions: [partition-57]\n\tAdded 
> partitions (assigned - owned): [partition-74]\n\tRevoked partitions (owned - 
> assigned): [partition-57]
> 2022-12-14 11:18:21 1 — [id-1] o.a.k.c.c.internals.ConsumerCoordinator : 
> [Consumer clientId=partition-4-my-client-id-my-group-id, groupId=my-group-id] 
> Successfully synced group in generation Generation{generationId=640, 
> memberId='partition-4-my-client-id-my-group-id-ae2af665-edc9-4a8e-b658-98372d142477',
>  protocol='cooperative-sticky'}
> 2022-12-14 11:18:21 1 — [consumer-4] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer clientId=partition-4-my-client-id-my-group-id, 
> groupId=my-group-id] Successfully joined group with generation 
> Generation{generationId=640, 
> memberId='partition-4-my-client-id-my-group-id-ae2af665-edc9-4a8e-b658-98372d142477',
>  protocol='cooperative-sticky'} {code}
> Is this expected?
> Kafka client version is 3.2.1.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to