[ https://issues.apache.org/jira/browse/KAFKA-14220 ]
Pritam Kumar deleted comment on KAFKA-14220:
--------------------------------------
was (Author: JIRAUSER295638):
https://github.com/apache/kafka/pull/12622
> "partition-count" not getting updated after revocation of partitions in case
> of Incremental Co-operative rebalancing.
> ---------------------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-14220
> URL: https://issues.apache.org/jira/browse/KAFKA-14220
> Project: Kafka
> Issue Type: Bug
> Components: KafkaConnect
> Affects Versions: 3.0.1
> Reporter: Pritam Kumar
> Priority: Major
> Labels: pull-request-available
>
> Issue:
>
In case of the revocation of partitions, the updation of "partition count"
> metrics is being done before updating the new set of assignments.
> "invokePartitionsRevoked" method of "onJoinComplete" function of
> "ConsumerCoordinator" class is being called before the "
> subscriptions.assignFromSubscribed(assignedPartitions)" of the same class. As
> a result of which the old assigned partition count is getting updated again
> and again even after future rebalances.
> Demo:
> Suppose the current assignment is like:
> Assigned partitions: [partition-1, partition-2]
> Current owned partitions: []
> Added partitions (assigned - owned): [partition-1, partition-2]
> Revoked partitions (owned - assigned): []
> After that some other worker joined and part of that, as a result of which
> “partition-2” has to be revoked.
> Assigned partitions: [partition-1]
> Current owned partitions: [partition-1, partition-2]
> Added partitions (assigned - owned): []
> Revoked partitions (owned - assigned): [partition-2]
> But as the "assignment" need to be updated with these new assignment via the
> following logic:
> [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L463]
> Line 463 in
> [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
> ||subscriptions.assignFromSubscribed(assignedPartitions);|
>
> But before this only "{*}updatePartitionCount{*}()" is getting called via
> "{*}invokePartitionsRevoked{*}":
>
> [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L443]
> Line 443 in
> [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
> ||firstException.compareAndSet(null,
> invokePartitionsRevoked(revokedPartitions));|
>
> Due to this when it is going to call for the "{*}assignedPartitions{*}" of
> "{*}consumer{*}" via the following:
> [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L892]
> Line 892 in
> [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
> ||public Set<TopicPartition> assignment() {|
>
> the "{*}assignedPartitions{*}" is not yet updated.
> Solution:
As part of the bug fix to KAFKA-12622 introducing code changes to
> update the partition count metrics after the the newly assigned partition are
> registered.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)