This is an automated email from the ASF dual-hosted git repository.

clolov pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 55ab4178e696599624be615cfd653a4795bb4484
Author: David Jacot <[email protected]>
AuthorDate: Fri Jan 16 13:56:31 2026 +0100

    KAFKA-19233; Allow fenced members to rejoin streams group with epoch 0 
(#21312)
    
    This fix allows members to rejoin a streams group with memberEpoch=0
    after being fenced, as specified by KIP-848. Previously, the validation
    in throwIfStreamsGroupMemberEpochIsInvalid rejected epoch=0 when the
    member had a higher epoch on the server.
    
    Reviewers: Lianet Magrans <[email protected]>, Lucas Brutschy
     <[email protected]>
---
 .../server/StreamsGroupHeartbeatRequestTest.scala  | 86 ++++++++++++++++++++++
 .../coordinator/group/GroupMetadataManager.java    |  3 +
 .../group/GroupMetadataManagerTest.java            | 75 +++++++++++++++++++
 3 files changed, 164 insertions(+)

diff --git 
a/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala
index 252ee82fb14..ced299205ab 100644
--- 
a/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala
@@ -1008,6 +1008,92 @@ class StreamsGroupHeartbeatRequestTest(cluster: 
ClusterInstance) extends GroupCo
     }
   }
 
+  @ClusterTest
+  def testFencedMemberCanRejoinWithEpochZero(): Unit = {
+    val admin = cluster.admin()
+    val memberId = "test-fenced-rejoin-member"
+    val groupId = "test-fenced-rejoin-group"
+    val topicName = "test-fenced-topic"
+
+    try {
+      TestUtils.createOffsetsTopicWithAdmin(
+        admin = admin,
+        brokers = cluster.brokers.values().asScala.toSeq,
+        controllers = cluster.controllers().values().asScala.toSeq
+      )
+
+      // Create topic first.
+      TestUtils.createTopicWithAdmin(
+        admin = admin,
+        brokers = cluster.brokers.values().asScala.toSeq,
+        controllers = cluster.controllers().values().asScala.toSeq,
+        topic = topicName,
+        numPartitions = 3
+      )
+
+      val topology = createMockTopology(topicName)
+
+      // Join the group.
+      var streamsGroupHeartbeatResponse: StreamsGroupHeartbeatResponseData = 
null
+      TestUtils.waitUntilTrue(() => {
+        streamsGroupHeartbeatResponse = streamsGroupHeartbeat(
+          groupId = groupId,
+          memberId = memberId,
+          rebalanceTimeoutMs = 1000,
+          activeTasks = Option(streamsGroupHeartbeatResponse)
+            .map(r => convertTaskIds(r.activeTasks()))
+            .getOrElse(List.empty),
+          standbyTasks = Option(streamsGroupHeartbeatResponse)
+            .map(r => convertTaskIds(r.standbyTasks()))
+            .getOrElse(List.empty),
+          warmupTasks = Option(streamsGroupHeartbeatResponse)
+            .map(r => convertTaskIds(r.warmupTasks()))
+            .getOrElse(List.empty),
+          topology = topology
+        )
+        streamsGroupHeartbeatResponse.errorCode == Errors.NONE.code() &&
+          streamsGroupHeartbeatResponse.activeTasks() != null &&
+          !streamsGroupHeartbeatResponse.activeTasks().isEmpty
+      }, "Could not join the group successfully.")
+
+      // Verify initial join success with assignment.
+      assertEquals(2, streamsGroupHeartbeatResponse.memberEpoch())
+
+      // Expected assignment.
+      val expectedActiveTasks = List(new 
StreamsGroupHeartbeatResponseData.TaskIds()
+        
.setSubtopologyId(streamsGroupHeartbeatResponse.activeTasks().get(0).subtopologyId())
+        .setPartitions(List[Integer](0, 1, 2).asJava)).asJava
+
+      // Simulate a fenced member attempting to rejoin with epoch=0.
+      val rejoinResponse = streamsGroupHeartbeat(
+        groupId = groupId,
+        memberId = memberId,
+        memberEpoch = 0,
+        rebalanceTimeoutMs = 1000,
+        activeTasks = List.empty,
+        standbyTasks = List.empty,
+        warmupTasks = List.empty,
+        topology = topology
+      )
+
+      // Verify the full response.
+      // Since the topology hasn't changed, the member should get their current
+      // state back with the same epoch (2) and assignment.
+      val expectedRejoinResponse = new StreamsGroupHeartbeatResponseData()
+        .setErrorCode(Errors.NONE.code())
+        .setMemberId(memberId)
+        .setMemberEpoch(2)
+        .setHeartbeatIntervalMs(rejoinResponse.heartbeatIntervalMs())
+        .setActiveTasks(expectedActiveTasks)
+        .setStandbyTasks(List.empty.asJava)
+        .setWarmupTasks(List.empty.asJava)
+
+      assertEquals(expectedRejoinResponse, rejoinResponse)
+    } finally {
+      admin.close()
+    }
+  }
+
   private def convertTaskIds(responseTasks: 
java.util.List[StreamsGroupHeartbeatResponseData.TaskIds]): 
List[StreamsGroupHeartbeatRequestData.TaskIds] = {
     if (responseTasks == null) {
       List()
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 868e8af149a..022c1991360 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -1715,6 +1715,9 @@ public class GroupMetadataManager {
         List<StreamsGroupHeartbeatRequestData.TaskIds> ownedStandbyTasks,
         List<StreamsGroupHeartbeatRequestData.TaskIds> ownedWarmupTasks
     ) {
+        // Epoch 0 is a special value indicating the member wants to (re)join 
the group.
+        if (receivedMemberEpoch == 0) return;
+
         if (receivedMemberEpoch > member.memberEpoch()) {
             throw new FencedMemberEpochException("The streams group member has 
a greater member "
                 + "epoch (" + receivedMemberEpoch + ") than the one known by 
the group coordinator ("
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 3ce3c266183..274f0946ca6 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -758,6 +758,81 @@ public class GroupMetadataManagerTest {
         );
     }
 
+    @Test
+    public void testStreamsGroupMemberCanRejoinWithEpochZero() {
+        String groupId = "fooup";
+        String memberId = Uuid.randomUuid().toString();
+        String subtopology1 = "subtopology1";
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Topology topology = new Topology()
+            .setEpoch(1)
+            .setSubtopologies(List.of(
+                new 
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
+            ));
+
+        CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 3)
+            .addRacks()
+            .buildCoordinatorMetadataImage();
+
+        long fooTopicHash = computeTopicHash(fooTopicName, metadataImage);
+
+        MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withStreamsGroupTaskAssignors(List.of(assignor))
+            .withMetadataImage(metadataImage)
+            .build();
+
+        // Set up a Streams group member with epoch 100.
+        StreamsGroupMember member = 
streamsGroupMemberBuilderWithDefaults(memberId)
+            .setMemberEpoch(100)
+            .setPreviousMemberEpoch(99)
+            .setTopologyEpoch(1)
+            .setAssignedTasks(mkTasksTupleWithCommonEpoch(TaskRole.ACTIVE, 
100, TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2)))
+            .build();
+
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
 member));
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId,
 topology));
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId,
 100, computeGroupHash(Map.of(
+            fooTopicName, fooTopicHash
+        )), 1, Map.of("num.standby.replicas", "0")));
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId,
 memberId,
+            TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+                TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2)
+            )));
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
 100));
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
 member));
+
+        // Member rejoins with epoch=0 - should succeed per KIP-848.
+        // Since the topology/metadata hasn't changed, group epoch stays at 
100.
+        CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> 
result = context.streamsGroupHeartbeat(
+            new StreamsGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+                .setMemberEpoch(0)
+                .setRebalanceTimeoutMs(1500)
+                .setTopology(topology)
+                .setActiveTasks(List.of())
+                .setStandbyTasks(List.of())
+                .setWarmupTasks(List.of()));
+
+        assertResponseEquals(
+            new StreamsGroupHeartbeatResponseData()
+                .setMemberId(memberId)
+                .setMemberEpoch(100)
+                .setHeartbeatIntervalMs(5000)
+                .setActiveTasks(List.of(
+                    new StreamsGroupHeartbeatResponseData.TaskIds()
+                        .setSubtopologyId(subtopology1)
+                        .setPartitions(List.of(0, 1, 2))))
+                .setStandbyTasks(List.of())
+                .setWarmupTasks(List.of())
+                .setStatus(List.of()),
+            result.response().data()
+        );
+    }
+
     @Test
     public void testMemberJoinsEmptyConsumerGroup() {
         String groupId = "fooup";

Reply via email to