This is an automated email from the ASF dual-hosted git repository. clolov pushed a commit to branch 4.2 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 55ab4178e696599624be615cfd653a4795bb4484 Author: David Jacot <[email protected]> AuthorDate: Fri Jan 16 13:56:31 2026 +0100 KAFKA-19233; Allow fenced members to rejoin streams group with epoch 0 (#21312) This fix allows members to rejoin a streams group with memberEpoch=0 after being fenced, as specified by KIP-848. Previously, the validation in throwIfStreamsGroupMemberEpochIsInvalid rejected epoch=0 when the member had a higher epoch on the server. Reviewers: Lianet Magrans <[email protected]>, Lucas Brutschy <[email protected]> --- .../server/StreamsGroupHeartbeatRequestTest.scala | 86 ++++++++++++++++++++++ .../coordinator/group/GroupMetadataManager.java | 3 + .../group/GroupMetadataManagerTest.java | 75 +++++++++++++++++++ 3 files changed, 164 insertions(+) diff --git a/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala index 252ee82fb14..ced299205ab 100644 --- a/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala @@ -1008,6 +1008,92 @@ class StreamsGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupCo } } + @ClusterTest + def testFencedMemberCanRejoinWithEpochZero(): Unit = { + val admin = cluster.admin() + val memberId = "test-fenced-rejoin-member" + val groupId = "test-fenced-rejoin-group" + val topicName = "test-fenced-topic" + + try { + TestUtils.createOffsetsTopicWithAdmin( + admin = admin, + brokers = cluster.brokers.values().asScala.toSeq, + controllers = cluster.controllers().values().asScala.toSeq + ) + + // Create topic first. + TestUtils.createTopicWithAdmin( + admin = admin, + brokers = cluster.brokers.values().asScala.toSeq, + controllers = cluster.controllers().values().asScala.toSeq, + topic = topicName, + numPartitions = 3 + ) + + val topology = createMockTopology(topicName) + + // Join the group. + var streamsGroupHeartbeatResponse: StreamsGroupHeartbeatResponseData = null + TestUtils.waitUntilTrue(() => { + streamsGroupHeartbeatResponse = streamsGroupHeartbeat( + groupId = groupId, + memberId = memberId, + rebalanceTimeoutMs = 1000, + activeTasks = Option(streamsGroupHeartbeatResponse) + .map(r => convertTaskIds(r.activeTasks())) + .getOrElse(List.empty), + standbyTasks = Option(streamsGroupHeartbeatResponse) + .map(r => convertTaskIds(r.standbyTasks())) + .getOrElse(List.empty), + warmupTasks = Option(streamsGroupHeartbeatResponse) + .map(r => convertTaskIds(r.warmupTasks())) + .getOrElse(List.empty), + topology = topology + ) + streamsGroupHeartbeatResponse.errorCode == Errors.NONE.code() && + streamsGroupHeartbeatResponse.activeTasks() != null && + !streamsGroupHeartbeatResponse.activeTasks().isEmpty + }, "Could not join the group successfully.") + + // Verify initial join success with assignment. + assertEquals(2, streamsGroupHeartbeatResponse.memberEpoch()) + + // Expected assignment. + val expectedActiveTasks = List(new StreamsGroupHeartbeatResponseData.TaskIds() + .setSubtopologyId(streamsGroupHeartbeatResponse.activeTasks().get(0).subtopologyId()) + .setPartitions(List[Integer](0, 1, 2).asJava)).asJava + + // Simulate a fenced member attempting to rejoin with epoch=0. + val rejoinResponse = streamsGroupHeartbeat( + groupId = groupId, + memberId = memberId, + memberEpoch = 0, + rebalanceTimeoutMs = 1000, + activeTasks = List.empty, + standbyTasks = List.empty, + warmupTasks = List.empty, + topology = topology + ) + + // Verify the full response. + // Since the topology hasn't changed, the member should get their current + // state back with the same epoch (2) and assignment. + val expectedRejoinResponse = new StreamsGroupHeartbeatResponseData() + .setErrorCode(Errors.NONE.code()) + .setMemberId(memberId) + .setMemberEpoch(2) + .setHeartbeatIntervalMs(rejoinResponse.heartbeatIntervalMs()) + .setActiveTasks(expectedActiveTasks) + .setStandbyTasks(List.empty.asJava) + .setWarmupTasks(List.empty.asJava) + + assertEquals(expectedRejoinResponse, rejoinResponse) + } finally { + admin.close() + } + } + private def convertTaskIds(responseTasks: java.util.List[StreamsGroupHeartbeatResponseData.TaskIds]): List[StreamsGroupHeartbeatRequestData.TaskIds] = { if (responseTasks == null) { List() diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 868e8af149a..022c1991360 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -1715,6 +1715,9 @@ public class GroupMetadataManager { List<StreamsGroupHeartbeatRequestData.TaskIds> ownedStandbyTasks, List<StreamsGroupHeartbeatRequestData.TaskIds> ownedWarmupTasks ) { + // Epoch 0 is a special value indicating the member wants to (re)join the group. + if (receivedMemberEpoch == 0) return; + if (receivedMemberEpoch > member.memberEpoch()) { throw new FencedMemberEpochException("The streams group member has a greater member " + "epoch (" + receivedMemberEpoch + ") than the one known by the group coordinator (" diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 3ce3c266183..274f0946ca6 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -758,6 +758,81 @@ public class GroupMetadataManagerTest { ); } + @Test + public void testStreamsGroupMemberCanRejoinWithEpochZero() { + String groupId = "fooup"; + String memberId = Uuid.randomUuid().toString(); + String subtopology1 = "subtopology1"; + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Topology topology = new Topology() + .setEpoch(1) + .setSubtopologies(List.of( + new Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName)) + )); + + CoordinatorMetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 3) + .addRacks() + .buildCoordinatorMetadataImage(); + + long fooTopicHash = computeTopicHash(fooTopicName, metadataImage); + + MockTaskAssignor assignor = new MockTaskAssignor("sticky"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withStreamsGroupTaskAssignors(List.of(assignor)) + .withMetadataImage(metadataImage) + .build(); + + // Set up a Streams group member with epoch 100. + StreamsGroupMember member = streamsGroupMemberBuilderWithDefaults(memberId) + .setMemberEpoch(100) + .setPreviousMemberEpoch(99) + .setTopologyEpoch(1) + .setAssignedTasks(mkTasksTupleWithCommonEpoch(TaskRole.ACTIVE, 100, TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2))) + .build(); + + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, member)); + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, topology)); + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 100, computeGroupHash(Map.of( + fooTopicName, fooTopicHash + )), 1, Map.of("num.standby.replicas", "0"))); + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, + TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2) + ))); + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 100)); + context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, member)); + + // Member rejoins with epoch=0 - should succeed per KIP-848. + // Since the topology/metadata hasn't changed, group epoch stays at 100. + CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> result = context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(1500) + .setTopology(topology) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of())); + + assertResponseEquals( + new StreamsGroupHeartbeatResponseData() + .setMemberId(memberId) + .setMemberEpoch(100) + .setHeartbeatIntervalMs(5000) + .setActiveTasks(List.of( + new StreamsGroupHeartbeatResponseData.TaskIds() + .setSubtopologyId(subtopology1) + .setPartitions(List.of(0, 1, 2)))) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of()) + .setStatus(List.of()), + result.response().data() + ); + } + @Test public void testMemberJoinsEmptyConsumerGroup() { String groupId = "fooup";
