This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 384586e52c8 KAFKA-19233; Allow fenced members to rejoin consumer group
with epoch 0 (#21305)
384586e52c8 is described below
commit 384586e52c8fe6e4637280c8d9a6e5187c37cbec
Author: David Jacot <[email protected]>
AuthorDate: Thu Jan 15 11:58:59 2026 +0100
KAFKA-19233; Allow fenced members to rejoin consumer group with epoch 0
(#21305)
This fix allows members to rejoin a consumer group with memberEpoch=0
after being fenced, as specified by KIP-848. Previously, the validation
in throwIfConsumerGroupMemberEpochIsInvalid rejected epoch=0 when the
member had a higher epoch on the server.
Changes:
- Add early return for receivedMemberEpoch=0 in validation method
- Add unit tests for rejoin in STABLE, UNREVOKED_PARTITIONS, and
UNRELEASED_PARTITIONS states
- Add integration test for fenced member rejoin flow
Reviewers: Dongnuo Lyu <[email protected]>, Sean Quah
<[email protected]>, Lianet Magrans <[email protected]>
---
.../server/ConsumerGroupHeartbeatRequestTest.scala | 88 +++++++++
.../coordinator/group/GroupMetadataManager.java | 4 +
.../group/GroupMetadataManagerTest.java | 215 +++++++++++++++++++++
3 files changed, 307 insertions(+)
diff --git
a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
index f974f2ec3ff..0d54ca6e25c 100644
---
a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
+++
b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
@@ -676,4 +676,92 @@ class ConsumerGroupHeartbeatRequestTest(cluster:
ClusterInstance) extends GroupC
assertNotNull(memberId)
assertFalse(memberId.isEmpty)
}
+
+ @ClusterTest
+ def testFencedMemberCanRejoinWithEpochZero(): Unit = {
+ // Creates the __consumer_offsets topics because it won't be created
automatically
+ // in this test because it does not use FindCoordinator API.
+ createOffsetsTopic()
+
+ val memberId = Uuid.randomUuid().toString
+ val groupId = "test-fenced-rejoin-grp"
+
+ // Heartbeat request to join the group.
+ var consumerGroupHeartbeatRequest = new
ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(5 * 60 * 1000)
+ .setSubscribedTopicNames(List("foo").asJava)
+ .setTopicPartitions(List.empty.asJava)
+ ).build()
+
+ // Wait for successful join.
+ var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
+ TestUtils.waitUntilTrue(() => {
+ consumerGroupHeartbeatResponse =
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
+ consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
+ }, msg = s"Could not join the group successfully. Last response
$consumerGroupHeartbeatResponse.")
+
+ // Verify initial join success.
+ assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
+ assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch)
+
+ // Create the topic to trigger partition assignment.
+ val topicId = createTopic(
+ topic = "foo",
+ numPartitions = 3
+ )
+
+ // Heartbeat to get partitions assigned.
+ consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(consumerGroupHeartbeatResponse.data.memberEpoch)
+ ).build()
+
+ // Expected assignment.
+ val expectedAssignment = new
ConsumerGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(List(new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(topicId)
+ .setPartitions(List[Integer](0, 1, 2).asJava)).asJava)
+
+ // Wait until partitions are assigned and member epoch advances.
+ TestUtils.waitUntilTrue(() => {
+ consumerGroupHeartbeatResponse =
connectAndReceive[ConsumerGroupHeartbeatResponse](consumerGroupHeartbeatRequest)
+ consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
+ consumerGroupHeartbeatResponse.data.assignment == expectedAssignment
+ }, msg = s"Could not get partitions assigned. Last response
$consumerGroupHeartbeatResponse.")
+
+ // Verify member has epoch > 0 (should be 2).
+ assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(expectedAssignment,
consumerGroupHeartbeatResponse.data.assignment)
+
+ // Simulate a fenced member attempting to rejoin with epoch=0.
+ val rejoinRequest = new ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(5 * 60 * 1000)
+ .setSubscribedTopicNames(List("foo").asJava)
+ .setTopicPartitions(List.empty.asJava)
+ ).build()
+
+ val rejoinResponse =
connectAndReceive[ConsumerGroupHeartbeatResponse](rejoinRequest)
+
+ // Verify the full response.
+ // Since the subscription/metadata hasn't changed, the member should get
+ // their current state back with the same epoch (2) and assignment.
+ val expectedRejoinResponse = new ConsumerGroupHeartbeatResponseData()
+ .setErrorCode(Errors.NONE.code)
+ .setMemberId(memberId)
+ .setMemberEpoch(2)
+ .setHeartbeatIntervalMs(rejoinResponse.data.heartbeatIntervalMs)
+ .setAssignment(expectedAssignment)
+
+ assertEquals(expectedRejoinResponse, rejoinResponse.data)
+ }
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 5135516c28d..acec5e22550 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -1566,6 +1566,10 @@ public class GroupMetadataManager {
int receivedMemberEpoch,
List<ConsumerGroupHeartbeatRequestData.TopicPartitions>
ownedTopicPartitions
) {
+ // Epoch 0 is a special value indicating the member wants to (re)join
the group.
+ // This is valid per KIP-848 fenced member recovery protocol.
+ if (receivedMemberEpoch == 0) return;
+
if (receivedMemberEpoch > member.memberEpoch()) {
throw new FencedMemberEpochException("The consumer group member
has a greater member "
+ "epoch (" + receivedMemberEpoch + ") than the one known by
the group coordinator ("
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 769b282403c..549016c3167 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -479,6 +479,221 @@ public class GroupMetadataManagerTest {
assertEquals(100, result.response().memberEpoch());
}
+ @Test
+ public void testMemberCanRejoinWithEpochZeroInStableState() {
+ String groupId = "fooup";
+ String memberId = Uuid.randomUuid().toString();
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 3)
+ .addRacks()
+ .buildCoordinatorMetadataImage();
+
+ long fooTopicHash = computeTopicHash(fooTopicName, metadataImage);
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
List.of(assignor))
+ .withMetadataImage(metadataImage)
+ .build();
+
+ // Member is in STABLE state with epoch 100.
+ ConsumerGroupMember member = new ConsumerGroupMember.Builder(memberId)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(100)
+ .setPreviousMemberEpoch(99)
+ .setRebalanceTimeoutMs(5000)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setSubscribedTopicNames(List.of("foo"))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId,
0, 1, 2)))
+ .build();
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
member));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
100, computeGroupHash(Map.of(
+ fooTopicName, fooTopicHash
+ ))));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2)
+ )));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
100));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
member));
+
+ assertEquals(MemberState.STABLE,
context.consumerGroupMemberState(groupId, memberId));
+
+ // Member rejoins with epoch=0 - should succeed per KIP-848.
+ // Since the member is STABLE with the same subscription and
assignment,
+ // the group epoch should not bump and the member gets their current
state back.
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData,
CoordinatorRecord> result = context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicNames(List.of("foo"))
+ .setTopicPartitions(List.of()));
+
+ assertResponseEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setMemberId(memberId)
+ .setMemberEpoch(100)
+ .setHeartbeatIntervalMs(5000)
+ .setAssignment(new
ConsumerGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(List.of(
+ new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(fooTopicId)
+ .setPartitions(List.of(0, 1, 2))))),
+ result.response()
+ );
+ }
+
+ @Test
+ public void testMemberCanRejoinWithEpochZeroInUnrevokedPartitionsState() {
+ String groupId = "fooup";
+ String memberId = Uuid.randomUuid().toString();
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 3)
+ .addRacks()
+ .buildCoordinatorMetadataImage();
+
+ long fooTopicHash = computeTopicHash(fooTopicName, metadataImage);
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
List.of(assignor))
+ .withMetadataImage(metadataImage)
+ .build();
+
+ // Member is in UNREVOKED_PARTITIONS state with epoch 100.
+ // The group has advanced to epoch 101 with a new target assignment
[0, 1].
+ // The member still has partition 2 pending revocation.
+ ConsumerGroupMember member = new ConsumerGroupMember.Builder(memberId)
+ .setState(MemberState.UNREVOKED_PARTITIONS)
+ .setMemberEpoch(100)
+ .setPreviousMemberEpoch(99)
+ .setRebalanceTimeoutMs(5000)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setSubscribedTopicNames(List.of("foo"))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId,
0, 1)))
+
.setPartitionsPendingRevocation(mkAssignment(mkTopicAssignment(fooTopicId, 2)))
+ .build();
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
member));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
101, computeGroupHash(Map.of(
+ fooTopicName, fooTopicHash
+ ))));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1)
+ )));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
101));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
member));
+
+ assertEquals(MemberState.UNREVOKED_PARTITIONS,
context.consumerGroupMemberState(groupId, memberId));
+
+ // Member rejoins with epoch=0 - should succeed per KIP-848.
+ // The member advances to epoch 101 and gets their target assignment
[0, 1].
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData,
CoordinatorRecord> result = context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicNames(List.of("foo"))
+ .setTopicPartitions(List.of()));
+
+ assertResponseEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setMemberId(memberId)
+ .setMemberEpoch(101)
+ .setHeartbeatIntervalMs(5000)
+ .setAssignment(new
ConsumerGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(List.of(
+ new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(fooTopicId)
+ .setPartitions(List.of(0, 1))))),
+ result.response()
+ );
+ }
+
+ @Test
+ public void testMemberCanRejoinWithEpochZeroInUnreleasedPartitionsState() {
+ String groupId = "fooup";
+ String memberId = Uuid.randomUuid().toString();
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 3)
+ .addRacks()
+ .buildCoordinatorMetadataImage();
+
+ long fooTopicHash = computeTopicHash(fooTopicName, metadataImage);
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
List.of(assignor))
+ .withMetadataImage(metadataImage)
+ .build();
+
+ // Member is in UNRELEASED_PARTITIONS state with epoch 100.
+ ConsumerGroupMember member = new ConsumerGroupMember.Builder(memberId)
+ .setState(MemberState.UNRELEASED_PARTITIONS)
+ .setMemberEpoch(100)
+ .setPreviousMemberEpoch(99)
+ .setRebalanceTimeoutMs(5000)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setSubscribedTopicNames(List.of("foo"))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId,
0)))
+ .build();
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
member));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
100, computeGroupHash(Map.of(
+ fooTopicName, fooTopicHash
+ ))));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2)
+ )));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
100));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
member));
+
+ assertEquals(MemberState.UNRELEASED_PARTITIONS,
context.consumerGroupMemberState(groupId, memberId));
+
+ // Member rejoins with epoch=0 - should succeed per KIP-848.
+ // Since the subscription/metadata hasn't changed, group epoch stays
at 100.
+ // The member gets the target assignment [0, 1, 2].
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData,
CoordinatorRecord> result = context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicNames(List.of("foo"))
+ .setTopicPartitions(List.of()));
+
+ assertResponseEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setMemberId(memberId)
+ .setMemberEpoch(100)
+ .setHeartbeatIntervalMs(5000)
+ .setAssignment(new
ConsumerGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(List.of(
+ new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(fooTopicId)
+ .setPartitions(List.of(0, 1, 2))))),
+ result.response()
+ );
+ }
+
@Test
public void testMemberJoinsEmptyConsumerGroup() {
String groupId = "fooup";