[ 
https://issues.apache.org/jira/browse/KAFKA-14220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-14220:
----------------------------------
    Fix Version/s:     (was: 3.0.1)

> "partition-count" not getting updated after revocation of partitions in case 
> of Incremental Co-operative rebalancing.
> ---------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-14220
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14220
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 3.0.1
>            Reporter: Pritam Kumar
>            Priority: Major
>              Labels: pull-request-available
>
> Issue:
> 
In case of the revocation of partitions, the updation of "partition count" 
> metrics is being done before updating the new set of assignments. 
> "invokePartitionsRevoked" method of "onJoinComplete" function of 
> "ConsumerCoordinator" class is being called before the "

> subscriptions.assignFromSubscribed(assignedPartitions)" of the same class. As 
> a result of which the old assigned partition count is getting updated again 
> and again even after future rebalances.
> Demo:
> Suppose the current assignment is like:
> Assigned partitions: [partition-1, partition-2]
> Current owned partitions: []
> Added partitions (assigned - owned): [partition-1, partition-2]
> Revoked partitions (owned - assigned): []
> After that some other worker joined and part of that, as a result of which 
> “partition-2” has to be revoked.
> Assigned partitions: [partition-1]
> Current owned partitions: [partition-1, partition-2]
> Added partitions (assigned - owned): []
> Revoked partitions (owned - assigned): [partition-2]
> But as the "assignment" need to be updated with these new assignment via the 
> following logic:
> [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L463]
> Line 463 in 
> [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
> ||subscriptions.assignFromSubscribed(assignedPartitions);|
>  
> But before this only "{*}updatePartitionCount{*}()" is getting called via 
> "{*}invokePartitionsRevoked{*}":
>  
> [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L443]
> Line 443 in 
> [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
> ||firstException.compareAndSet(null, 
> invokePartitionsRevoked(revokedPartitions));|
>  
> Due to this when it is going to call for the "{*}assignedPartitions{*}" of 
> "{*}consumer{*}" via the following:
> [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L892]
> Line 892 in 
> [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
> ||public Set<TopicPartition> assignment() {|
>  
> the "{*}assignedPartitions{*}" is not yet updated.
> Solution:
As part of the bug fix to KAFKA-12622 introducing code changes to 
> update the partition count metrics after the the newly assigned partition are 
> registered.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to