This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new 7ed24036538 KAFKA-19449: Don't consider partitions unreleased when
pending revocation by the current member (#20781)
7ed24036538 is described below
commit 7ed24036538090e79d711fabef75e184531fa4f5
Author: Sean Quah <[email protected]>
AuthorDate: Sat Nov 15 10:50:02 2025 +0000
KAFKA-19449: Don't consider partitions unreleased when pending revocation
by the current member (#20781)
During reconciliation, we transition members to UNRELEASED_PARTITIONS
when the target assignment contains a partition that is owned by another
member. Partitions are considered owned by another member if they are in
a member's assignment or partitions pending revocation. Ownership is
only updated after reconciliation has finished.
Fix a bug where we consider a partition owned by another member when it
is actually in the current member's list of partitions pending
revocation. In some cases, this skips a transition to
UNRELEASED_PARTITIONS and allows the current assignment to converge to
the target assignment one heartbeat sooner.
Reviewers: PoAn Yang <[email protected]>, David Jacot <[email protected]>
---
.../modern/consumer/CurrentAssignmentBuilder.java | 6 +-
.../consumer/CurrentAssignmentBuilderTest.java | 66 ++++++++++++++++++++++
2 files changed, 71 insertions(+), 1 deletion(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java
index 3a7ea6692b0..c265c509b6f 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java
@@ -254,7 +254,11 @@ public class CurrentAssignmentBuilder {
Set<Integer> partitionsPendingAssignment = new HashSet<>(target);
partitionsPendingAssignment.removeAll(assignedPartitions);
hasUnreleasedPartitions =
partitionsPendingAssignment.removeIf(partitionId ->
- currentPartitionEpoch.apply(topicId, partitionId) != -1
+ currentPartitionEpoch.apply(topicId, partitionId) != -1 &&
+ // Don't consider a partition unreleased if it is owned by the
current member
+ // because it is pending revocation. This is safe to do since
only a single member
+ // can own a partition at a time.
+ !member.partitionsPendingRevocation().getOrDefault(topicId,
Set.of()).contains(partitionId)
) || hasUnreleasedPartitions;
if (!assignedPartitions.isEmpty()) {
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilderTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilderTest.java
index 8e4e36712ba..4565d5fd3eb 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilderTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilderTest.java
@@ -399,6 +399,72 @@ public class CurrentAssignmentBuilderTest {
);
}
+ @Test
+ public void
testUnrevokedPartitionsToStableWithReturnedPartitionsPendingRevocation() {
+ String topic1 = "topic1";
+ String topic2 = "topic2";
+ Uuid topicId1 = Uuid.randomUuid();
+ Uuid topicId2 = Uuid.randomUuid();
+
+ ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
+ .setState(MemberState.UNREVOKED_PARTITIONS)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(topicId1, 2, 3),
+ mkTopicAssignment(topicId2, 5, 6)))
+ .setPartitionsPendingRevocation(mkAssignment(
+ // Partition 4 is pending revocation by the member but is back
in the latest target
+ // assignment.
+ mkTopicAssignment(topicId1, 4)))
+ .build();
+
+ ConsumerGroupMember updatedMember = new
CurrentAssignmentBuilder(member)
+ .withTargetAssignment(12, new Assignment(mkAssignment(
+ mkTopicAssignment(topicId1, 2, 3, 4),
+ mkTopicAssignment(topicId2, 5, 6, 7))))
+ .withCurrentPartitionEpoch((topicId, partitionId) -> {
+ if (topicId.equals(topicId1)) {
+ // Partitions 2 and 3 are in the member's current
assignment.
+ // Partition 4 is pending revocation by the member.
+ switch (partitionId) {
+ case 2:
+ case 3:
+ case 4:
+ return 10;
+ }
+ } else if (topicId.equals(topicId2)) {
+ // Partitions 5 and 6 are in the member's current
assignment.
+ switch (partitionId) {
+ case 5:
+ case 6:
+ return 10;
+ }
+ }
+ return -1;
+ })
+ .withOwnedTopicPartitions(Arrays.asList(
+ new ConsumerGroupHeartbeatRequestData.TopicPartitions()
+ .setTopicId(topicId1)
+ .setPartitions(Arrays.asList(2, 3)),
+ new ConsumerGroupHeartbeatRequestData.TopicPartitions()
+ .setTopicId(topicId2)
+ .setPartitions(Arrays.asList(5, 6))))
+ .build();
+
+ assertEquals(
+ new ConsumerGroupMember.Builder("member")
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(12)
+ .setPreviousMemberEpoch(10)
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(topicId1, 2, 3, 4),
+ mkTopicAssignment(topicId2, 5, 6, 7)))
+ .build(),
+ updatedMember
+ );
+ }
+
@Test
public void testUnreleasedPartitionsToStable() {
Uuid topicId1 = Uuid.randomUuid();