This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8dc39b570e1 KAFKA-19233; Allow fenced members to rejoin share group 
with epoch 0 (#21309)
8dc39b570e1 is described below

commit 8dc39b570e15ea555bf2b791f1aba5a3ea9f446d
Author: David Jacot <[email protected]>
AuthorDate: Thu Jan 15 14:51:39 2026 +0100

    KAFKA-19233; Allow fenced members to rejoin share group with epoch 0 
(#21309)
    
    Apply the same fix for Share groups. The
    throwIfShareGroupMemberEpochIsInvalid method was rejecting epoch=0 for
    existing members, preventing fenced members from rejoining.
    
    Changes:
    - Add early return for receivedMemberEpoch=0 in Share group validation
    - Add unit test testShareGroupMemberCanRejoinWithEpochZero
    - Add integration test in ShareGroupHeartbeatRequestTest
    
    Reviewers: Lianet Magrans <[email protected]>, Andrew Schofield
     <[email protected]>
---
 .../server/ShareGroupHeartbeatRequestTest.scala    | 94 ++++++++++++++++++++++
 .../coordinator/group/GroupMetadataManager.java    |  3 +
 .../group/GroupMetadataManagerTest.java            | 64 +++++++++++++++
 3 files changed, 161 insertions(+)

diff --git 
a/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala
index b05a97fe119..f2993ecac99 100644
--- a/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala
@@ -932,6 +932,100 @@ class ShareGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
     }
   }
 
+  @ClusterTest(
+    serverProperties = Array(
+      new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+      new ClusterConfigProperty(key = "offsets.topic.replication.factor", 
value = "1")
+    ))
+  def testFencedMemberCanRejoinWithEpochZero(): Unit = {
+    val admin = cluster.admin()
+
+    try {
+      TestUtils.createOffsetsTopicWithAdmin(
+        admin = admin,
+        brokers = cluster.brokers.values().asScala.toSeq,
+        controllers = cluster.controllers().values().asScala.toSeq
+      )
+
+      val memberId = Uuid.randomUuid().toString
+      val groupId = "test-fenced-rejoin-grp"
+
+      // Heartbeat request to join the group.
+      var shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequest.Builder(
+        new ShareGroupHeartbeatRequestData()
+          .setGroupId(groupId)
+          .setMemberId(memberId)
+          .setMemberEpoch(0)
+          .setSubscribedTopicNames(List("foo").asJava)
+      ).build()
+
+      // Wait for successful join.
+      var shareGroupHeartbeatResponse: ShareGroupHeartbeatResponse = null
+      TestUtils.waitUntilTrue(() => {
+        shareGroupHeartbeatResponse = 
connectAndReceive(shareGroupHeartbeatRequest)
+        shareGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
+      }, msg = s"Could not join the group successfully. Last response 
$shareGroupHeartbeatResponse.")
+
+      // Verify initial join success.
+      assertNotNull(shareGroupHeartbeatResponse.data.memberId)
+      assertEquals(1, shareGroupHeartbeatResponse.data.memberEpoch)
+
+      // Create the topic to trigger partition assignment.
+      val topicId = TestUtils.createTopicWithAdminRaw(
+        admin = admin,
+        topic = "foo",
+        numPartitions = 3
+      )
+
+      // Heartbeat to get partitions assigned.
+      shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequest.Builder(
+        new ShareGroupHeartbeatRequestData()
+          .setGroupId(groupId)
+          .setMemberId(memberId)
+          .setMemberEpoch(shareGroupHeartbeatResponse.data.memberEpoch)
+      ).build()
+
+      // Expected assignment.
+      val expectedAssignment = new ShareGroupHeartbeatResponseData.Assignment()
+        .setTopicPartitions(List(new 
ShareGroupHeartbeatResponseData.TopicPartitions()
+          .setTopicId(topicId)
+          .setPartitions(List[Integer](0, 1, 2).asJava)).asJava)
+
+      // Wait until partitions are assigned.
+      TestUtils.waitUntilTrue(() => {
+        shareGroupHeartbeatResponse = 
connectAndReceive(shareGroupHeartbeatRequest)
+        shareGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
+          shareGroupHeartbeatResponse.data.assignment == expectedAssignment
+      }, msg = s"Could not get partitions assigned. Last response 
$shareGroupHeartbeatResponse.")
+
+      val epochBeforeRejoin = shareGroupHeartbeatResponse.data.memberEpoch
+      assertTrue(epochBeforeRejoin > 0, s"Expected epoch > 0 but got 
$epochBeforeRejoin")
+
+      // Simulate a fenced member attempting to rejoin with epoch=0.
+      val rejoinRequest = new ShareGroupHeartbeatRequest.Builder(
+        new ShareGroupHeartbeatRequestData()
+          .setGroupId(groupId)
+          .setMemberId(memberId)
+          .setMemberEpoch(0)
+          .setSubscribedTopicNames(List("foo").asJava)
+      ).build()
+
+      val rejoinResponse = connectAndReceive(rejoinRequest)
+
+      // Verify the rejoin succeeds.
+      val expectedRejoinResponse = new ShareGroupHeartbeatResponseData()
+        .setErrorCode(Errors.NONE.code)
+        .setMemberId(memberId)
+        .setMemberEpoch(epochBeforeRejoin)
+        .setHeartbeatIntervalMs(rejoinResponse.data.heartbeatIntervalMs)
+        .setAssignment(expectedAssignment)
+
+      assertEquals(expectedRejoinResponse, rejoinResponse.data)
+    } finally {
+      admin.close()
+    }
+  }
+
   private def connectAndReceive(request: ShareGroupHeartbeatRequest): 
ShareGroupHeartbeatResponse = {
     
IntegrationTestUtils.connectAndReceive[ShareGroupHeartbeatResponse](request, 
cluster.brokerBoundPorts().get(0))
   }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index acec5e22550..38bd662bdc2 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -1598,6 +1598,9 @@ public class GroupMetadataManager {
         ShareGroupMember member,
         int receivedMemberEpoch
     ) {
+        // Epoch 0 is a special value indicating the member wants to (re)join 
the group.
+        if (receivedMemberEpoch == 0) return;
+
         if (receivedMemberEpoch > member.memberEpoch()) {
             throw new FencedMemberEpochException("The share group member has a 
greater member "
                 + "epoch (" + receivedMemberEpoch + ") than the one known by 
the group coordinator ("
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 549016c3167..18dd26f9cdc 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -694,6 +694,70 @@ public class GroupMetadataManagerTest {
         );
     }
 
+    @Test
+    public void testShareGroupMemberCanRejoinWithEpochZero() {
+        String groupId = "fooup";
+        String memberId = Uuid.randomUuid().toString();
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 3)
+            .addRacks()
+            .buildCoordinatorMetadataImage();
+
+        long fooTopicHash = computeTopicHash(fooTopicName, metadataImage);
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("share");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withShareGroupAssignor(assignor)
+            .withMetadataImage(metadataImage)
+            .build();
+
+        // Set up a Share group member with epoch 100.
+        ShareGroupMember member = new ShareGroupMember.Builder(memberId)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(100)
+            .setPreviousMemberEpoch(99)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setSubscribedTopicNames(List.of(fooTopicName))
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0, 1, 2)))
+            .build();
+
+        
context.replay(GroupCoordinatorRecordHelpers.newShareGroupMemberSubscriptionRecord(groupId,
 member));
+        
context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 
100, computeGroupHash(Map.of(
+            fooTopicName, fooTopicHash
+        ))));
+        
context.replay(GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentRecord(groupId,
 memberId, mkAssignment(
+            mkTopicAssignment(fooTopicId, 0, 1, 2)
+        )));
+        
context.replay(GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentEpochRecord(groupId,
 100));
+        
context.replay(GroupCoordinatorRecordHelpers.newShareGroupCurrentAssignmentRecord(groupId,
 member));
+
+        // Member rejoins with epoch=0 - should succeed.
+        // Since the subscription/metadata hasn't changed, group epoch stays 
at 100.
+        CoordinatorResult<Map.Entry<ShareGroupHeartbeatResponseData, 
Optional<InitializeShareGroupStateParameters>>, CoordinatorRecord> result = 
context.shareGroupHeartbeat(
+            new ShareGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+                .setMemberEpoch(0)
+                .setSubscribedTopicNames(List.of(fooTopicName)));
+
+        assertEquals(
+            new ShareGroupHeartbeatResponseData()
+                .setMemberId(memberId)
+                .setMemberEpoch(100)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new ShareGroupHeartbeatResponseData.Assignment()
+                    .setTopicPartitions(List.of(
+                        new ShareGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(fooTopicId)
+                            .setPartitions(List.of(0, 1, 2))))),
+            result.response().getKey()
+        );
+    }
+
     @Test
     public void testMemberJoinsEmptyConsumerGroup() {
         String groupId = "fooup";

Reply via email to