This is an automated email from the ASF dual-hosted git repository.

clolov pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit a305da2a01302fb6d6bc078ca713f8aa0dec589a
Author: David Jacot <[email protected]>
AuthorDate: Thu Jan 15 11:58:59 2026 +0100

    KAFKA-19233; Allow fenced members to rejoin consumer group with epoch 0 
(#21305)
    
    This fix allows members to rejoin a consumer group with memberEpoch=0
    after being fenced, as specified by KIP-848. Previously, the validation
    in throwIfConsumerGroupMemberEpochIsInvalid rejected epoch=0 when the
    member had a higher epoch on the server.
    
    Changes:
    - Add early return for receivedMemberEpoch=0 in validation method
    - Add unit tests for rejoin in STABLE, UNREVOKED_PARTITIONS, and
    UNRELEASED_PARTITIONS states
    - Add integration test for fenced member rejoin flow
    
    Reviewers: Dongnuo Lyu <[email protected]>, Sean Quah
     <[email protected]>, Lianet Magrans <[email protected]>
---
 .../server/ConsumerGroupHeartbeatRequestTest.scala |  88 +++++++++
 .../coordinator/group/GroupMetadataManager.java    |   4 +
 .../group/GroupMetadataManagerTest.java            | 215 +++++++++++++++++++++
 3 files changed, 307 insertions(+)

diff --git 
a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
index 506d0007924..ae2bb4186ec 100644
--- 
a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
@@ -763,4 +763,92 @@ class ConsumerGroupHeartbeatRequestTest(cluster: 
ClusterInstance) extends GroupC
     assertFalse(memberId.isEmpty)
     admin.close()
   }
+
+  @ClusterTest
+  def testFencedMemberCanRejoinWithEpochZero(): Unit = {
+    // Creates the __consumer_offsets topics because it won't be created 
automatically
+    // in this test because it does not use FindCoordinator API.
+    createOffsetsTopic()
+
+    val memberId = Uuid.randomUuid().toString
+    val groupId = "test-fenced-rejoin-grp"
+
+    // Heartbeat request to join the group.
+    var consumerGroupHeartbeatRequest = new 
ConsumerGroupHeartbeatRequest.Builder(
+      new ConsumerGroupHeartbeatRequestData()
+        .setGroupId(groupId)
+        .setMemberId(memberId)
+        .setMemberEpoch(0)
+        .setRebalanceTimeoutMs(5 * 60 * 1000)
+        .setSubscribedTopicNames(List("foo").asJava)
+        .setTopicPartitions(List.empty.asJava)
+    ).build()
+
+    // Wait for successful join.
+    var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
+    TestUtils.waitUntilTrue(() => {
+      consumerGroupHeartbeatResponse = 
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
+      consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
+    }, msg = s"Could not join the group successfully. Last response 
$consumerGroupHeartbeatResponse.")
+
+    // Verify initial join success.
+    assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
+    assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch)
+
+    // Create the topic to trigger partition assignment.
+    val topicId = createTopic(
+      topic = "foo",
+      numPartitions = 3
+    )
+
+    // Heartbeat to get partitions assigned.
+    consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
+      new ConsumerGroupHeartbeatRequestData()
+        .setGroupId(groupId)
+        .setMemberId(memberId)
+        .setMemberEpoch(consumerGroupHeartbeatResponse.data.memberEpoch)
+    ).build()
+
+    // Expected assignment.
+    val expectedAssignment = new 
ConsumerGroupHeartbeatResponseData.Assignment()
+      .setTopicPartitions(List(new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+        .setTopicId(topicId)
+        .setPartitions(List[Integer](0, 1, 2).asJava)).asJava)
+
+    // Wait until partitions are assigned and member epoch advances.
+    TestUtils.waitUntilTrue(() => {
+      consumerGroupHeartbeatResponse = 
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
+      consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
+        consumerGroupHeartbeatResponse.data.assignment == expectedAssignment
+    }, msg = s"Could not get partitions assigned. Last response 
$consumerGroupHeartbeatResponse.")
+
+    // Verify member has epoch > 0 (should be 2).
+    assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
+    assertEquals(expectedAssignment, 
consumerGroupHeartbeatResponse.data.assignment)
+
+    // Simulate a fenced member attempting to rejoin with epoch=0.
+    val rejoinRequest = new ConsumerGroupHeartbeatRequest.Builder(
+      new ConsumerGroupHeartbeatRequestData()
+        .setGroupId(groupId)
+        .setMemberId(memberId)
+        .setMemberEpoch(0)
+        .setRebalanceTimeoutMs(5 * 60 * 1000)
+        .setSubscribedTopicNames(List("foo").asJava)
+        .setTopicPartitions(List.empty.asJava)
+    ).build()
+
+    val rejoinResponse = 
connectAndReceive[ConsumerGroupHeartbeatResponse](rejoinRequest)
+
+    // Verify the full response.
+    // Since the subscription/metadata hasn't changed, the member should get
+    // their current state back with the same epoch (2) and assignment.
+    val expectedRejoinResponse = new ConsumerGroupHeartbeatResponseData()
+      .setErrorCode(Errors.NONE.code)
+      .setMemberId(memberId)
+      .setMemberEpoch(2)
+      .setHeartbeatIntervalMs(rejoinResponse.data.heartbeatIntervalMs)
+      .setAssignment(expectedAssignment)
+
+    assertEquals(expectedRejoinResponse, rejoinResponse.data)
+  }
 }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 8afdb44ca7d..f2de7d29ca2 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -1574,6 +1574,10 @@ public class GroupMetadataManager {
         int receivedMemberEpoch,
         List<ConsumerGroupHeartbeatRequestData.TopicPartitions> 
ownedTopicPartitions
     ) {
+        // Epoch 0 is a special value indicating the member wants to (re)join 
the group.
+        // This is valid per KIP-848 fenced member recovery protocol.
+        if (receivedMemberEpoch == 0) return;
+
         if (receivedMemberEpoch > member.memberEpoch()) {
             throw new FencedMemberEpochException("The consumer group member 
has a greater member "
                 + "epoch (" + receivedMemberEpoch + ") than the one known by 
the group coordinator ("
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 9da92fcfc0a..d3a14fba3ed 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -479,6 +479,221 @@ public class GroupMetadataManagerTest {
         assertEquals(100, result.response().memberEpoch());
     }
 
+    @Test
+    public void testMemberCanRejoinWithEpochZeroInStableState() {
+        String groupId = "fooup";
+        String memberId = Uuid.randomUuid().toString();
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 3)
+            .addRacks()
+            .buildCoordinatorMetadataImage();
+
+        long fooTopicHash = computeTopicHash(fooTopicName, metadataImage);
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, 
List.of(assignor))
+            .withMetadataImage(metadataImage)
+            .build();
+
+        // Member is in STABLE state with epoch 100.
+        ConsumerGroupMember member = new ConsumerGroupMember.Builder(memberId)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(100)
+            .setPreviousMemberEpoch(99)
+            .setRebalanceTimeoutMs(5000)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setSubscribedTopicNames(List.of("foo"))
+            .setServerAssignorName("range")
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0, 1, 2)))
+            .build();
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
 member));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
 100, computeGroupHash(Map.of(
+            fooTopicName, fooTopicHash
+        ))));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
 memberId, mkAssignment(
+            mkTopicAssignment(fooTopicId, 0, 1, 2)
+        )));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 100));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 member));
+
+        assertEquals(MemberState.STABLE, 
context.consumerGroupMemberState(groupId, memberId));
+
+        // Member rejoins with epoch=0 - should succeed per KIP-848.
+        // Since the member is STABLE with the same subscription and 
assignment,
+        // the group epoch should not bump and the member gets their current 
state back.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, 
CoordinatorRecord> result = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+                .setMemberEpoch(0)
+                .setRebalanceTimeoutMs(5000)
+                .setSubscribedTopicNames(List.of("foo"))
+                .setTopicPartitions(List.of()));
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId)
+                .setMemberEpoch(100)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setTopicPartitions(List.of(
+                        new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(fooTopicId)
+                            .setPartitions(List.of(0, 1, 2))))),
+            result.response()
+        );
+    }
+
+    @Test
+    public void testMemberCanRejoinWithEpochZeroInUnrevokedPartitionsState() {
+        String groupId = "fooup";
+        String memberId = Uuid.randomUuid().toString();
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 3)
+            .addRacks()
+            .buildCoordinatorMetadataImage();
+
+        long fooTopicHash = computeTopicHash(fooTopicName, metadataImage);
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, 
List.of(assignor))
+            .withMetadataImage(metadataImage)
+            .build();
+
+        // Member is in UNREVOKED_PARTITIONS state with epoch 100.
+        // The group has advanced to epoch 101 with a new target assignment 
[0, 1].
+        // The member still has partition 2 pending revocation.
+        ConsumerGroupMember member = new ConsumerGroupMember.Builder(memberId)
+            .setState(MemberState.UNREVOKED_PARTITIONS)
+            .setMemberEpoch(100)
+            .setPreviousMemberEpoch(99)
+            .setRebalanceTimeoutMs(5000)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setSubscribedTopicNames(List.of("foo"))
+            .setServerAssignorName("range")
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0, 1)))
+            
.setPartitionsPendingRevocation(mkAssignment(mkTopicAssignment(fooTopicId, 2)))
+            .build();
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
 member));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
 101, computeGroupHash(Map.of(
+            fooTopicName, fooTopicHash
+        ))));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
 memberId, mkAssignment(
+            mkTopicAssignment(fooTopicId, 0, 1)
+        )));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 101));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 member));
+
+        assertEquals(MemberState.UNREVOKED_PARTITIONS, 
context.consumerGroupMemberState(groupId, memberId));
+
+        // Member rejoins with epoch=0 - should succeed per KIP-848.
+        // The member advances to epoch 101 and gets their target assignment 
[0, 1].
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, 
CoordinatorRecord> result = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+                .setMemberEpoch(0)
+                .setRebalanceTimeoutMs(5000)
+                .setSubscribedTopicNames(List.of("foo"))
+                .setTopicPartitions(List.of()));
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId)
+                .setMemberEpoch(101)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setTopicPartitions(List.of(
+                        new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(fooTopicId)
+                            .setPartitions(List.of(0, 1))))),
+            result.response()
+        );
+    }
+
+    @Test
+    public void testMemberCanRejoinWithEpochZeroInUnreleasedPartitionsState() {
+        String groupId = "fooup";
+        String memberId = Uuid.randomUuid().toString();
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 3)
+            .addRacks()
+            .buildCoordinatorMetadataImage();
+
+        long fooTopicHash = computeTopicHash(fooTopicName, metadataImage);
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, 
List.of(assignor))
+            .withMetadataImage(metadataImage)
+            .build();
+
+        // Member is in UNRELEASED_PARTITIONS state with epoch 100.
+        ConsumerGroupMember member = new ConsumerGroupMember.Builder(memberId)
+            .setState(MemberState.UNRELEASED_PARTITIONS)
+            .setMemberEpoch(100)
+            .setPreviousMemberEpoch(99)
+            .setRebalanceTimeoutMs(5000)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setSubscribedTopicNames(List.of("foo"))
+            .setServerAssignorName("range")
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0)))
+            .build();
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
 member));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
 100, computeGroupHash(Map.of(
+            fooTopicName, fooTopicHash
+        ))));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
 memberId, mkAssignment(
+            mkTopicAssignment(fooTopicId, 0, 1, 2)
+        )));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 100));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 member));
+
+        assertEquals(MemberState.UNRELEASED_PARTITIONS, 
context.consumerGroupMemberState(groupId, memberId));
+
+        // Member rejoins with epoch=0 - should succeed per KIP-848.
+        // Since the subscription/metadata hasn't changed, group epoch stays 
at 100.
+        // The member gets the target assignment [0, 1, 2].
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, 
CoordinatorRecord> result = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+                .setMemberEpoch(0)
+                .setRebalanceTimeoutMs(5000)
+                .setSubscribedTopicNames(List.of("foo"))
+                .setTopicPartitions(List.of()));
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId)
+                .setMemberEpoch(100)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setTopicPartitions(List.of(
+                        new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(fooTopicId)
+                            .setPartitions(List.of(0, 1, 2))))),
+            result.response()
+        );
+    }
+
     @Test
     public void testMemberJoinsEmptyConsumerGroup() {
         String groupId = "fooup";

Reply via email to