This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8dc39b570e1 KAFKA-19233; Allow fenced members to rejoin share group
with epoch 0 (#21309)
8dc39b570e1 is described below
commit 8dc39b570e15ea555bf2b791f1aba5a3ea9f446d
Author: David Jacot <[email protected]>
AuthorDate: Thu Jan 15 14:51:39 2026 +0100
KAFKA-19233; Allow fenced members to rejoin share group with epoch 0
(#21309)
Apply the same fix for Share groups. The
throwIfShareGroupMemberEpochIsInvalid method was rejecting epoch=0 for
existing members, preventing fenced members from rejoining.
Changes:
- Add early return for receivedMemberEpoch=0 in Share group validation
- Add unit test testShareGroupMemberCanRejoinWithEpochZero
- Add integration test in ShareGroupHeartbeatRequestTest
Reviewers: Lianet Magrans <[email protected]>, Andrew Schofield
<[email protected]>
---
.../server/ShareGroupHeartbeatRequestTest.scala | 94 ++++++++++++++++++++++
.../coordinator/group/GroupMetadataManager.java | 3 +
.../group/GroupMetadataManagerTest.java | 64 +++++++++++++++
3 files changed, 161 insertions(+)
diff --git
a/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala
index b05a97fe119..f2993ecac99 100644
--- a/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala
@@ -932,6 +932,100 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
}
}
+ @ClusterTest(
+ serverProperties = Array(
+ new ClusterConfigProperty(key = "offsets.topic.num.partitions", value =
"1"),
+ new ClusterConfigProperty(key = "offsets.topic.replication.factor",
value = "1")
+ ))
+ def testFencedMemberCanRejoinWithEpochZero(): Unit = {
+ val admin = cluster.admin()
+
+ try {
+ TestUtils.createOffsetsTopicWithAdmin(
+ admin = admin,
+ brokers = cluster.brokers.values().asScala.toSeq,
+ controllers = cluster.controllers().values().asScala.toSeq
+ )
+
+ val memberId = Uuid.randomUuid().toString
+ val groupId = "test-fenced-rejoin-grp"
+
+ // Heartbeat request to join the group.
+ var shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequest.Builder(
+ new ShareGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(0)
+ .setSubscribedTopicNames(List("foo").asJava)
+ ).build()
+
+ // Wait for successful join.
+ var shareGroupHeartbeatResponse: ShareGroupHeartbeatResponse = null
+ TestUtils.waitUntilTrue(() => {
+ shareGroupHeartbeatResponse =
connectAndReceive(shareGroupHeartbeatRequest)
+ shareGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
+ }, msg = s"Could not join the group successfully. Last response
$shareGroupHeartbeatResponse.")
+
+ // Verify initial join success.
+ assertNotNull(shareGroupHeartbeatResponse.data.memberId)
+ assertEquals(1, shareGroupHeartbeatResponse.data.memberEpoch)
+
+ // Create the topic to trigger partition assignment.
+ val topicId = TestUtils.createTopicWithAdminRaw(
+ admin = admin,
+ topic = "foo",
+ numPartitions = 3
+ )
+
+ // Heartbeat to get partitions assigned.
+ shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequest.Builder(
+ new ShareGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(shareGroupHeartbeatResponse.data.memberEpoch)
+ ).build()
+
+ // Expected assignment.
+ val expectedAssignment = new ShareGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(List(new
ShareGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(topicId)
+ .setPartitions(List[Integer](0, 1, 2).asJava)).asJava)
+
+ // Wait until partitions are assigned.
+ TestUtils.waitUntilTrue(() => {
+ shareGroupHeartbeatResponse =
connectAndReceive(shareGroupHeartbeatRequest)
+ shareGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
+ shareGroupHeartbeatResponse.data.assignment == expectedAssignment
+ }, msg = s"Could not get partitions assigned. Last response
$shareGroupHeartbeatResponse.")
+
+ val epochBeforeRejoin = shareGroupHeartbeatResponse.data.memberEpoch
+ assertTrue(epochBeforeRejoin > 0, s"Expected epoch > 0 but got
$epochBeforeRejoin")
+
+ // Simulate a fenced member attempting to rejoin with epoch=0.
+ val rejoinRequest = new ShareGroupHeartbeatRequest.Builder(
+ new ShareGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(0)
+ .setSubscribedTopicNames(List("foo").asJava)
+ ).build()
+
+ val rejoinResponse = connectAndReceive(rejoinRequest)
+
+ // Verify the rejoin succeeds.
+ val expectedRejoinResponse = new ShareGroupHeartbeatResponseData()
+ .setErrorCode(Errors.NONE.code)
+ .setMemberId(memberId)
+ .setMemberEpoch(epochBeforeRejoin)
+ .setHeartbeatIntervalMs(rejoinResponse.data.heartbeatIntervalMs)
+ .setAssignment(expectedAssignment)
+
+ assertEquals(expectedRejoinResponse, rejoinResponse.data)
+ } finally {
+ admin.close()
+ }
+ }
+
private def connectAndReceive(request: ShareGroupHeartbeatRequest):
ShareGroupHeartbeatResponse = {
IntegrationTestUtils.connectAndReceive[ShareGroupHeartbeatResponse](request,
cluster.brokerBoundPorts().get(0))
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index acec5e22550..38bd662bdc2 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -1598,6 +1598,9 @@ public class GroupMetadataManager {
ShareGroupMember member,
int receivedMemberEpoch
) {
+ // Epoch 0 is a special value indicating the member wants to (re)join
the group.
+ if (receivedMemberEpoch == 0) return;
+
if (receivedMemberEpoch > member.memberEpoch()) {
throw new FencedMemberEpochException("The share group member has a
greater member "
+ "epoch (" + receivedMemberEpoch + ") than the one known by
the group coordinator ("
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 549016c3167..18dd26f9cdc 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -694,6 +694,70 @@ public class GroupMetadataManagerTest {
);
}
+ @Test
+ public void testShareGroupMemberCanRejoinWithEpochZero() {
+ String groupId = "fooup";
+ String memberId = Uuid.randomUuid().toString();
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 3)
+ .addRacks()
+ .buildCoordinatorMetadataImage();
+
+ long fooTopicHash = computeTopicHash(fooTopicName, metadataImage);
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("share");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withShareGroupAssignor(assignor)
+ .withMetadataImage(metadataImage)
+ .build();
+
+ // Set up a Share group member with epoch 100.
+ ShareGroupMember member = new ShareGroupMember.Builder(memberId)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(100)
+ .setPreviousMemberEpoch(99)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setSubscribedTopicNames(List.of(fooTopicName))
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId,
0, 1, 2)))
+ .build();
+
+
context.replay(GroupCoordinatorRecordHelpers.newShareGroupMemberSubscriptionRecord(groupId,
member));
+
context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId,
100, computeGroupHash(Map.of(
+ fooTopicName, fooTopicHash
+ ))));
+
context.replay(GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentRecord(groupId,
memberId, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2)
+ )));
+
context.replay(GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentEpochRecord(groupId,
100));
+
context.replay(GroupCoordinatorRecordHelpers.newShareGroupCurrentAssignmentRecord(groupId,
member));
+
+ // Member rejoins with epoch=0 - should succeed.
+ // Since the subscription/metadata hasn't changed, group epoch stays
at 100.
+ CoordinatorResult<Map.Entry<ShareGroupHeartbeatResponseData,
Optional<InitializeShareGroupStateParameters>>, CoordinatorRecord> result =
context.shareGroupHeartbeat(
+ new ShareGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(0)
+ .setSubscribedTopicNames(List.of(fooTopicName)));
+
+ assertEquals(
+ new ShareGroupHeartbeatResponseData()
+ .setMemberId(memberId)
+ .setMemberEpoch(100)
+ .setHeartbeatIntervalMs(5000)
+ .setAssignment(new ShareGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(List.of(
+ new ShareGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(fooTopicId)
+ .setPartitions(List.of(0, 1, 2))))),
+ result.response().getKey()
+ );
+ }
+
@Test
public void testMemberJoinsEmptyConsumerGroup() {
String groupId = "fooup";