This is an automated email from the ASF dual-hosted git repository. clolov pushed a commit to branch 4.2 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 662060be9477fba1340ea348fe49bbc826fc191f Author: David Jacot <[email protected]> AuthorDate: Thu Jan 15 14:51:39 2026 +0100 KAFKA-19233; Allow fenced members to rejoin share group with epoch 0 (#21309) Apply the same fix for Share groups. The throwIfShareGroupMemberEpochIsInvalid method was rejecting epoch=0 for existing members, preventing fenced members from rejoining. Changes: - Add early return for receivedMemberEpoch=0 in Share group validation - Add unit test testShareGroupMemberCanRejoinWithEpochZero - Add integration test in ShareGroupHeartbeatRequestTest Reviewers: Lianet Magrans <[email protected]>, Andrew Schofield <[email protected]> --- .../server/ShareGroupHeartbeatRequestTest.scala | 94 ++++++++++++++++++++++ .../coordinator/group/GroupMetadataManager.java | 3 + .../group/GroupMetadataManagerTest.java | 64 +++++++++++++++ 3 files changed, 161 insertions(+) diff --git a/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala b/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala index b05a97fe119..f2993ecac99 100644 --- a/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala @@ -932,6 +932,100 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) { } } + @ClusterTest( + serverProperties = Array( + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testFencedMemberCanRejoinWithEpochZero(): Unit = { + val admin = cluster.admin() + + try { + TestUtils.createOffsetsTopicWithAdmin( + admin = admin, + brokers = cluster.brokers.values().asScala.toSeq, + controllers = cluster.controllers().values().asScala.toSeq + ) + + val memberId = Uuid.randomUuid().toString + val groupId = "test-fenced-rejoin-grp" + + // Heartbeat request to join the group. + var shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequest.Builder( + new ShareGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setSubscribedTopicNames(List("foo").asJava) + ).build() + + // Wait for successful join. + var shareGroupHeartbeatResponse: ShareGroupHeartbeatResponse = null + TestUtils.waitUntilTrue(() => { + shareGroupHeartbeatResponse = connectAndReceive(shareGroupHeartbeatRequest) + shareGroupHeartbeatResponse.data.errorCode == Errors.NONE.code + }, msg = s"Could not join the group successfully. Last response $shareGroupHeartbeatResponse.") + + // Verify initial join success. + assertNotNull(shareGroupHeartbeatResponse.data.memberId) + assertEquals(1, shareGroupHeartbeatResponse.data.memberEpoch) + + // Create the topic to trigger partition assignment. + val topicId = TestUtils.createTopicWithAdminRaw( + admin = admin, + topic = "foo", + numPartitions = 3 + ) + + // Heartbeat to get partitions assigned. + shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequest.Builder( + new ShareGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(shareGroupHeartbeatResponse.data.memberEpoch) + ).build() + + // Expected assignment. + val expectedAssignment = new ShareGroupHeartbeatResponseData.Assignment() + .setTopicPartitions(List(new ShareGroupHeartbeatResponseData.TopicPartitions() + .setTopicId(topicId) + .setPartitions(List[Integer](0, 1, 2).asJava)).asJava) + + // Wait until partitions are assigned. + TestUtils.waitUntilTrue(() => { + shareGroupHeartbeatResponse = connectAndReceive(shareGroupHeartbeatRequest) + shareGroupHeartbeatResponse.data.errorCode == Errors.NONE.code && + shareGroupHeartbeatResponse.data.assignment == expectedAssignment + }, msg = s"Could not get partitions assigned. Last response $shareGroupHeartbeatResponse.") + + val epochBeforeRejoin = shareGroupHeartbeatResponse.data.memberEpoch + assertTrue(epochBeforeRejoin > 0, s"Expected epoch > 0 but got $epochBeforeRejoin") + + // Simulate a fenced member attempting to rejoin with epoch=0. + val rejoinRequest = new ShareGroupHeartbeatRequest.Builder( + new ShareGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setSubscribedTopicNames(List("foo").asJava) + ).build() + + val rejoinResponse = connectAndReceive(rejoinRequest) + + // Verify the rejoin succeeds. + val expectedRejoinResponse = new ShareGroupHeartbeatResponseData() + .setErrorCode(Errors.NONE.code) + .setMemberId(memberId) + .setMemberEpoch(epochBeforeRejoin) + .setHeartbeatIntervalMs(rejoinResponse.data.heartbeatIntervalMs) + .setAssignment(expectedAssignment) + + assertEquals(expectedRejoinResponse, rejoinResponse.data) + } finally { + admin.close() + } + } + private def connectAndReceive(request: ShareGroupHeartbeatRequest): ShareGroupHeartbeatResponse = { IntegrationTestUtils.connectAndReceive[ShareGroupHeartbeatResponse](request, cluster.brokerBoundPorts().get(0)) } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index f2de7d29ca2..868e8af149a 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -1606,6 +1606,9 @@ public class GroupMetadataManager { ShareGroupMember member, int receivedMemberEpoch ) { + // Epoch 0 is a special value indicating the member wants to (re)join the group. + if (receivedMemberEpoch == 0) return; + if (receivedMemberEpoch > member.memberEpoch()) { throw new FencedMemberEpochException("The share group member has a greater member " + "epoch (" + receivedMemberEpoch + ") than the one known by the group coordinator (" diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index d3a14fba3ed..3ce3c266183 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -694,6 +694,70 @@ public class GroupMetadataManagerTest { ); } + @Test + public void testShareGroupMemberCanRejoinWithEpochZero() { + String groupId = "fooup"; + String memberId = Uuid.randomUuid().toString(); + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + + CoordinatorMetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 3) + .addRacks() + .buildCoordinatorMetadataImage(); + + long fooTopicHash = computeTopicHash(fooTopicName, metadataImage); + + MockPartitionAssignor assignor = new MockPartitionAssignor("share"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withShareGroupAssignor(assignor) + .withMetadataImage(metadataImage) + .build(); + + // Set up a Share group member with epoch 100. + ShareGroupMember member = new ShareGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(100) + .setPreviousMemberEpoch(99) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setSubscribedTopicNames(List.of(fooTopicName)) + .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 0, 1, 2))) + .build(); + + context.replay(GroupCoordinatorRecordHelpers.newShareGroupMemberSubscriptionRecord(groupId, member)); + context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 100, computeGroupHash(Map.of( + fooTopicName, fooTopicHash + )))); + context.replay(GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentRecord(groupId, memberId, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2) + ))); + context.replay(GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentEpochRecord(groupId, 100)); + context.replay(GroupCoordinatorRecordHelpers.newShareGroupCurrentAssignmentRecord(groupId, member)); + + // Member rejoins with epoch=0 - should succeed. + // Since the subscription/metadata hasn't changed, group epoch stays at 100. + CoordinatorResult<Map.Entry<ShareGroupHeartbeatResponseData, Optional<InitializeShareGroupStateParameters>>, CoordinatorRecord> result = context.shareGroupHeartbeat( + new ShareGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setSubscribedTopicNames(List.of(fooTopicName))); + + assertEquals( + new ShareGroupHeartbeatResponseData() + .setMemberId(memberId) + .setMemberEpoch(100) + .setHeartbeatIntervalMs(5000) + .setAssignment(new ShareGroupHeartbeatResponseData.Assignment() + .setTopicPartitions(List.of( + new ShareGroupHeartbeatResponseData.TopicPartitions() + .setTopicId(fooTopicId) + .setPartitions(List.of(0, 1, 2))))), + result.response().getKey() + ); + } + @Test public void testMemberJoinsEmptyConsumerGroup() { String groupId = "fooup";
