This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new 7ed24036538 KAFKA-19449: Don't consider partitions unreleased when 
pending revocation by the current member (#20781)
7ed24036538 is described below

commit 7ed24036538090e79d711fabef75e184531fa4f5
Author: Sean Quah <[email protected]>
AuthorDate: Sat Nov 15 10:50:02 2025 +0000

    KAFKA-19449: Don't consider partitions unreleased when pending revocation 
by the current member (#20781)
    
    During reconciliation, we transition members to UNRELEASED_PARTITIONS
    when the target assignment contains a partition that is owned by another
    member. Partitions are considered owned by another member if they are in
    a member's assignment or partitions pending revocation. Ownership is
    only updated after reconciliation has finished.
    
    Fix a bug where we consider a partition owned by another member when it
    is actually in the current member's list of partitions pending
    revocation. In some cases, this skips a transition to
    UNRELEASED_PARTITIONS and allows the current assignment to converge to
    the target assignment one heartbeat sooner.
    
    Reviewers: PoAn Yang <[email protected]>, David Jacot <[email protected]>
---
 .../modern/consumer/CurrentAssignmentBuilder.java  |  6 +-
 .../consumer/CurrentAssignmentBuilderTest.java     | 66 ++++++++++++++++++++++
 2 files changed, 71 insertions(+), 1 deletion(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java
index 3a7ea6692b0..c265c509b6f 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java
@@ -254,7 +254,11 @@ public class CurrentAssignmentBuilder {
             Set<Integer> partitionsPendingAssignment = new HashSet<>(target);
             partitionsPendingAssignment.removeAll(assignedPartitions);
             hasUnreleasedPartitions = 
partitionsPendingAssignment.removeIf(partitionId ->
-                currentPartitionEpoch.apply(topicId, partitionId) != -1
+                currentPartitionEpoch.apply(topicId, partitionId) != -1 &&
+                // Don't consider a partition unreleased if it is owned by the 
current member
+                // because it is pending revocation. This is safe to do since 
only a single member
+                // can own a partition at a time.
+                !member.partitionsPendingRevocation().getOrDefault(topicId, 
Set.of()).contains(partitionId)
             ) || hasUnreleasedPartitions;
 
             if (!assignedPartitions.isEmpty()) {
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilderTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilderTest.java
index 8e4e36712ba..4565d5fd3eb 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilderTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilderTest.java
@@ -399,6 +399,72 @@ public class CurrentAssignmentBuilderTest {
         );
     }
 
+    @Test
+    public void 
testUnrevokedPartitionsToStableWithReturnedPartitionsPendingRevocation() {
+        String topic1 = "topic1";
+        String topic2 = "topic2";
+        Uuid topicId1 = Uuid.randomUuid();
+        Uuid topicId2 = Uuid.randomUuid();
+
+        ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
+            .setState(MemberState.UNREVOKED_PARTITIONS)
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(10)
+            .setAssignedPartitions(mkAssignment(
+                mkTopicAssignment(topicId1, 2, 3),
+                mkTopicAssignment(topicId2, 5, 6)))
+            .setPartitionsPendingRevocation(mkAssignment(
+                // Partition 4 is pending revocation by the member but is back 
in the latest target
+                // assignment.
+                mkTopicAssignment(topicId1, 4)))
+            .build();
+
+        ConsumerGroupMember updatedMember = new 
CurrentAssignmentBuilder(member)
+            .withTargetAssignment(12, new Assignment(mkAssignment(
+                mkTopicAssignment(topicId1, 2, 3, 4),
+                mkTopicAssignment(topicId2, 5, 6, 7))))
+            .withCurrentPartitionEpoch((topicId, partitionId) -> {
+                if (topicId.equals(topicId1)) {
+                    // Partitions 2 and 3 are in the member's current 
assignment.
+                    // Partition 4 is pending revocation by the member.
+                    switch (partitionId) {
+                        case 2:
+                        case 3:
+                        case 4:
+                            return 10;
+                    }
+                } else if (topicId.equals(topicId2)) {
+                    // Partitions 5 and 6 are in the member's current 
assignment.
+                    switch (partitionId) {
+                        case 5:
+                        case 6:
+                            return 10;
+                    }
+                }
+                return -1;
+            })
+            .withOwnedTopicPartitions(Arrays.asList(
+                new ConsumerGroupHeartbeatRequestData.TopicPartitions()
+                    .setTopicId(topicId1)
+                    .setPartitions(Arrays.asList(2, 3)),
+                new ConsumerGroupHeartbeatRequestData.TopicPartitions()
+                    .setTopicId(topicId2)
+                    .setPartitions(Arrays.asList(5, 6))))
+            .build();
+
+        assertEquals(
+            new ConsumerGroupMember.Builder("member")
+                .setState(MemberState.STABLE)
+                .setMemberEpoch(12)
+                .setPreviousMemberEpoch(10)
+                .setAssignedPartitions(mkAssignment(
+                    mkTopicAssignment(topicId1, 2, 3, 4),
+                    mkTopicAssignment(topicId2, 5, 6, 7)))
+                .build(),
+            updatedMember
+        );
+    }
+
     @Test
     public void testUnreleasedPartitionsToStable() {
         Uuid topicId1 = Uuid.randomUuid();

Reply via email to