This is an automated email from the ASF dual-hosted git repository. clolov pushed a commit to branch 4.2 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit a305da2a01302fb6d6bc078ca713f8aa0dec589a Author: David Jacot <[email protected]> AuthorDate: Thu Jan 15 11:58:59 2026 +0100 KAFKA-19233; Allow fenced members to rejoin consumer group with epoch 0 (#21305) This fix allows members to rejoin a consumer group with memberEpoch=0 after being fenced, as specified by KIP-848. Previously, the validation in throwIfConsumerGroupMemberEpochIsInvalid rejected epoch=0 when the member had a higher epoch on the server. Changes: - Add early return for receivedMemberEpoch=0 in validation method - Add unit tests for rejoin in STABLE, UNREVOKED_PARTITIONS, and UNRELEASED_PARTITIONS states - Add integration test for fenced member rejoin flow Reviewers: Dongnuo Lyu <[email protected]>, Sean Quah <[email protected]>, Lianet Magrans <[email protected]> --- .../server/ConsumerGroupHeartbeatRequestTest.scala | 88 +++++++++ .../coordinator/group/GroupMetadataManager.java | 4 + .../group/GroupMetadataManagerTest.java | 215 +++++++++++++++++++++ 3 files changed, 307 insertions(+) diff --git a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala index 506d0007924..ae2bb4186ec 100644 --- a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala @@ -763,4 +763,92 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupC assertFalse(memberId.isEmpty) admin.close() } + + @ClusterTest + def testFencedMemberCanRejoinWithEpochZero(): Unit = { + // Creates the __consumer_offsets topics because it won't be created automatically + // in this test because it does not use FindCoordinator API. + createOffsetsTopic() + + val memberId = Uuid.randomUuid().toString + val groupId = "test-fenced-rejoin-grp" + + // Heartbeat request to join the group. + var consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(5 * 60 * 1000) + .setSubscribedTopicNames(List("foo").asJava) + .setTopicPartitions(List.empty.asJava) + ).build() + + // Wait for successful join. + var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null + TestUtils.waitUntilTrue(() => { + consumerGroupHeartbeatResponse = connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest) + consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code + }, msg = s"Could not join the group successfully. Last response $consumerGroupHeartbeatResponse.") + + // Verify initial join success. + assertNotNull(consumerGroupHeartbeatResponse.data.memberId) + assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch) + + // Create the topic to trigger partition assignment. + val topicId = createTopic( + topic = "foo", + numPartitions = 3 + ) + + // Heartbeat to get partitions assigned. + consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(consumerGroupHeartbeatResponse.data.memberEpoch) + ).build() + + // Expected assignment. + val expectedAssignment = new ConsumerGroupHeartbeatResponseData.Assignment() + .setTopicPartitions(List(new ConsumerGroupHeartbeatResponseData.TopicPartitions() + .setTopicId(topicId) + .setPartitions(List[Integer](0, 1, 2).asJava)).asJava) + + // Wait until partitions are assigned and member epoch advances. + TestUtils.waitUntilTrue(() => { + consumerGroupHeartbeatResponse = connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest) + consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code && + consumerGroupHeartbeatResponse.data.assignment == expectedAssignment + }, msg = s"Could not get partitions assigned. Last response $consumerGroupHeartbeatResponse.") + + // Verify member has epoch > 0 (should be 2). + assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch) + assertEquals(expectedAssignment, consumerGroupHeartbeatResponse.data.assignment) + + // Simulate a fenced member attempting to rejoin with epoch=0. + val rejoinRequest = new ConsumerGroupHeartbeatRequest.Builder( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(5 * 60 * 1000) + .setSubscribedTopicNames(List("foo").asJava) + .setTopicPartitions(List.empty.asJava) + ).build() + + val rejoinResponse = connectAndReceive[ConsumerGroupHeartbeatResponse](rejoinRequest) + + // Verify the full response. + // Since the subscription/metadata hasn't changed, the member should get + // their current state back with the same epoch (2) and assignment. + val expectedRejoinResponse = new ConsumerGroupHeartbeatResponseData() + .setErrorCode(Errors.NONE.code) + .setMemberId(memberId) + .setMemberEpoch(2) + .setHeartbeatIntervalMs(rejoinResponse.data.heartbeatIntervalMs) + .setAssignment(expectedAssignment) + + assertEquals(expectedRejoinResponse, rejoinResponse.data) + } } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 8afdb44ca7d..f2de7d29ca2 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -1574,6 +1574,10 @@ public class GroupMetadataManager { int receivedMemberEpoch, List<ConsumerGroupHeartbeatRequestData.TopicPartitions> ownedTopicPartitions ) { + // Epoch 0 is a special value indicating the member wants to (re)join the group. + // This is valid per KIP-848 fenced member recovery protocol. + if (receivedMemberEpoch == 0) return; + if (receivedMemberEpoch > member.memberEpoch()) { throw new FencedMemberEpochException("The consumer group member has a greater member " + "epoch (" + receivedMemberEpoch + ") than the one known by the group coordinator (" diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 9da92fcfc0a..d3a14fba3ed 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -479,6 +479,221 @@ public class GroupMetadataManagerTest { assertEquals(100, result.response().memberEpoch()); } + @Test + public void testMemberCanRejoinWithEpochZeroInStableState() { + String groupId = "fooup"; + String memberId = Uuid.randomUuid().toString(); + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + + CoordinatorMetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 3) + .addRacks() + .buildCoordinatorMetadataImage(); + + long fooTopicHash = computeTopicHash(fooTopicName, metadataImage); + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) + .withMetadataImage(metadataImage) + .build(); + + // Member is in STABLE state with epoch 100. + ConsumerGroupMember member = new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(100) + .setPreviousMemberEpoch(99) + .setRebalanceTimeoutMs(5000) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setSubscribedTopicNames(List.of("foo")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 0, 1, 2))) + .build(); + + context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, member)); + context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 100, computeGroupHash(Map.of( + fooTopicName, fooTopicHash + )))); + context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2) + ))); + context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 100)); + context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, member)); + + assertEquals(MemberState.STABLE, context.consumerGroupMemberState(groupId, memberId)); + + // Member rejoins with epoch=0 - should succeed per KIP-848. + // Since the member is STABLE with the same subscription and assignment, + // the group epoch should not bump and the member gets their current state back. + CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> result = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(List.of("foo")) + .setTopicPartitions(List.of())); + + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(memberId) + .setMemberEpoch(100) + .setHeartbeatIntervalMs(5000) + .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() + .setTopicPartitions(List.of( + new ConsumerGroupHeartbeatResponseData.TopicPartitions() + .setTopicId(fooTopicId) + .setPartitions(List.of(0, 1, 2))))), + result.response() + ); + } + + @Test + public void testMemberCanRejoinWithEpochZeroInUnrevokedPartitionsState() { + String groupId = "fooup"; + String memberId = Uuid.randomUuid().toString(); + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + + CoordinatorMetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 3) + .addRacks() + .buildCoordinatorMetadataImage(); + + long fooTopicHash = computeTopicHash(fooTopicName, metadataImage); + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) + .withMetadataImage(metadataImage) + .build(); + + // Member is in UNREVOKED_PARTITIONS state with epoch 100. + // The group has advanced to epoch 101 with a new target assignment [0, 1]. + // The member still has partition 2 pending revocation. + ConsumerGroupMember member = new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.UNREVOKED_PARTITIONS) + .setMemberEpoch(100) + .setPreviousMemberEpoch(99) + .setRebalanceTimeoutMs(5000) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setSubscribedTopicNames(List.of("foo")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 0, 1))) + .setPartitionsPendingRevocation(mkAssignment(mkTopicAssignment(fooTopicId, 2))) + .build(); + + context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, member)); + context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 101, computeGroupHash(Map.of( + fooTopicName, fooTopicHash + )))); + context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1) + ))); + context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 101)); + context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, member)); + + assertEquals(MemberState.UNREVOKED_PARTITIONS, context.consumerGroupMemberState(groupId, memberId)); + + // Member rejoins with epoch=0 - should succeed per KIP-848. + // The member advances to epoch 101 and gets their target assignment [0, 1]. + CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> result = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(List.of("foo")) + .setTopicPartitions(List.of())); + + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(memberId) + .setMemberEpoch(101) + .setHeartbeatIntervalMs(5000) + .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() + .setTopicPartitions(List.of( + new ConsumerGroupHeartbeatResponseData.TopicPartitions() + .setTopicId(fooTopicId) + .setPartitions(List.of(0, 1))))), + result.response() + ); + } + + @Test + public void testMemberCanRejoinWithEpochZeroInUnreleasedPartitionsState() { + String groupId = "fooup"; + String memberId = Uuid.randomUuid().toString(); + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + + CoordinatorMetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 3) + .addRacks() + .buildCoordinatorMetadataImage(); + + long fooTopicHash = computeTopicHash(fooTopicName, metadataImage); + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) + .withMetadataImage(metadataImage) + .build(); + + // Member is in UNRELEASED_PARTITIONS state with epoch 100. + ConsumerGroupMember member = new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.UNRELEASED_PARTITIONS) + .setMemberEpoch(100) + .setPreviousMemberEpoch(99) + .setRebalanceTimeoutMs(5000) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setSubscribedTopicNames(List.of("foo")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 0))) + .build(); + + context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, member)); + context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 100, computeGroupHash(Map.of( + fooTopicName, fooTopicHash + )))); + context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2) + ))); + context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 100)); + context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, member)); + + assertEquals(MemberState.UNRELEASED_PARTITIONS, context.consumerGroupMemberState(groupId, memberId)); + + // Member rejoins with epoch=0 - should succeed per KIP-848. + // Since the subscription/metadata hasn't changed, group epoch stays at 100. + // The member gets the target assignment [0, 1, 2]. + CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> result = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(List.of("foo")) + .setTopicPartitions(List.of())); + + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(memberId) + .setMemberEpoch(100) + .setHeartbeatIntervalMs(5000) + .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() + .setTopicPartitions(List.of( + new ConsumerGroupHeartbeatResponseData.TopicPartitions() + .setTopicId(fooTopicId) + .setPartitions(List.of(0, 1, 2))))), + result.response() + ); + } + @Test public void testMemberJoinsEmptyConsumerGroup() { String groupId = "fooup";
