[ 
https://issues.apache.org/jira/browse/KAFKA-14220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pritam Kumar updated KAFKA-14220:
---------------------------------
    Description:     (was: Issue:

In case of the revocation of partitions, the updation of "partition count" 
metrics is being done before updating the new set of assignments. 
"invokePartitionsRevoked" method of "onJoinComplete" function of 
"ConsumerCoordinator" class is being called before the "

subscriptions.assignFromSubscribed(assignedPartitions)" of the same class. As a 
result of which the old assigned partition count is getting updated again and 
again even after future rebalances.

Demo:

Suppose the current assignment is like:

Assigned partitions: [partition-1, partition-2]
Current owned partitions: []
Added partitions (assigned - owned): [partition-1, partition-2]
Revoked partitions (owned - assigned): []

After that some other worker joined and part of that, as a result of which 
“partition-2” has to be revoked.

Assigned partitions: [partition-1]
Current owned partitions: [partition-1, partition-2]
Added partitions (assigned - owned): []
Revoked partitions (owned - assigned): [partition-2]

But as the "assignment" need to be updated with these new assignment via the 
following logic:
[kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L463]

Line 463 in 
[3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
||subscriptions.assignFromSubscribed(assignedPartitions);|

 

But before this only "{*}updatePartitionCount{*}()" is getting called via 
"{*}invokePartitionsRevoked{*}":

 
[kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L443]

Line 443 in 
[3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
||firstException.compareAndSet(null, 
invokePartitionsRevoked(revokedPartitions));|

 

Due to this when it is going to call for the "{*}assignedPartitions{*}" of 
"{*}consumer{*}" via the following:
[kafka/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L892]

Line 892 in 
[3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
||public Set<TopicPartition> assignment() {|

 

the "{*}assignedPartitions{*}" is not yet updated.

Solution:
As part of the bug fix to KAFKA-12622 introducing code changes to 
update the partition count metrics after the the newly assigned partition are 
registered.)

> "partition-count" not getting updated after revocation of partitions in case 
> of Incremental Co-operative rebalancing.
> ---------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-14220
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14220
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 3.0.1
>            Reporter: Pritam Kumar
>            Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to