This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new cbea4f69bd8 KAFKA-19546: Rebalance should be triggered by subscription
change during group protocol downgrade (#20417)
cbea4f69bd8 is described below
commit cbea4f69bd82cbe6dcdcf643198e0ec265ae7d74
Author: Dongnuo Lyu <[email protected]>
AuthorDate: Wed Sep 24 09:40:45 2025 -0400
KAFKA-19546: Rebalance should be triggered by subscription change during
group protocol downgrade (#20417)
During online downgrade, when a static member using the consumer
protocol which is also the last member using the consumer protocol is
replaced by another static member using the classic protocol with the
same instance id, the latter will take the assignment of the former and
an online downgrade will be triggered.
In the current implementation, if the replacing static member has a
different subscription, no rebalance will be triggered when the
downgrade happens. The patch checks whether the static member has
changed subscription and triggers a rebalance when it does.
Reviewers: Sean Quah <[email protected]>, David Jacot
<[email protected]>
---
.../coordinator/group/GroupMetadataManager.java | 104 ++++++++++-------
.../group/GroupMetadataManagerTest.java | 129 +++++++++++++--------
2 files changed, 144 insertions(+), 89 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 4f156d75daa..cea68e09ffb 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -1257,16 +1257,19 @@ public class GroupMetadataManager {
/**
* Creates a ClassicGroup corresponding to the given ConsumerGroup.
*
- * @param consumerGroup The converted ConsumerGroup.
- * @param leavingMembers The leaving member(s) that triggered the
downgrade validation.
- * @param joiningMember The newly joined member if the downgrade is
triggered by static member replacement.
- * When not null, must have an instanceId that
matches an existing member.
- * @param records The record list to which the conversion
records are added.
+ * @param consumerGroup The converted ConsumerGroup.
+ * @param leavingMembers The leaving member(s) that triggered
the downgrade validation.
+ * @param joiningMember The newly joined member if the
downgrade is triggered by static member replacement.
+ * When not null, must have an instanceId
that matches the replaced member.
+ * @param hasSubscriptionChanged The boolean indicating whether the
joining member has a different subscription
+ * from the replaced member. Only used
when joiningMember is set.
+ * @param records The record list to which the
conversion records are added.
*/
private void convertToClassicGroup(
ConsumerGroup consumerGroup,
Set<ConsumerGroupMember> leavingMembers,
ConsumerGroupMember joiningMember,
+ boolean hasSubscriptionChanged,
List<CoordinatorRecord> records
) {
if (joiningMember == null) {
@@ -1307,9 +1310,12 @@ public class GroupMetadataManager {
classicGroup.allMembers().forEach(member ->
rescheduleClassicGroupMemberHeartbeat(classicGroup, member));
- // If the downgrade is triggered by a member leaving the group, a
rebalance should be triggered.
+ // If the downgrade is triggered by a member leaving the group or a
static
+ // member replacement with a different subscription, a rebalance
should be triggered.
if (joiningMember == null) {
- prepareRebalance(classicGroup, String.format("Downgrade group %s
from consumer to classic.", classicGroup.groupId()));
+ prepareRebalance(classicGroup, String.format("Downgrade group %s
from consumer to classic for member leaving.", classicGroup.groupId()));
+ } else if (hasSubscriptionChanged) {
+ prepareRebalance(classicGroup, String.format("Downgrade group %s
from consumer to classic for static member replacement with different
subscription.", classicGroup.groupId()));
}
log.info("[GroupId {}] Converted the consumer group to a classic
group.", consumerGroup.groupId());
@@ -2401,6 +2407,10 @@ public class GroupMetadataManager {
);
}
+ ConsumerGroupMember existingStaticMemberOrNull =
group.staticMember(request.groupInstanceId());
+ boolean downgrade = existingStaticMemberOrNull != null &&
+ validateOnlineDowngradeWithReplacedMember(group,
existingStaticMemberOrNull);
+
int groupEpoch = group.groupEpoch();
SubscriptionType subscriptionType = group.subscriptionType();
final ConsumerProtocolSubscription subscription =
deserializeSubscription(protocols);
@@ -2447,49 +2457,61 @@ public class GroupMetadataManager {
subscriptionType = result.subscriptionType;
}
- // 2. Update the target assignment if the group epoch is larger than
the target assignment epoch. The delta between
- // the existing and the new target assignment is persisted to the
partition.
- final int targetAssignmentEpoch;
- final Assignment targetAssignment;
+ if (downgrade) {
+ // 2. If the static member subscription hasn't changed, reconcile
the member's assignment with the existing
+ // assignment if the member is not fully reconciled yet. If the
static member subscription has changed, a
+ // rebalance will be triggered during downgrade anyway so we can
skip the reconciliation.
+ if (!bumpGroupEpoch) {
+ updatedMember = maybeReconcile(
+ groupId,
+ updatedMember,
+ group::currentPartitionEpoch,
+ group.assignmentEpoch(),
+ group.targetAssignment(updatedMember.memberId(),
updatedMember.instanceId()),
+ toTopicPartitions(subscription.ownedPartitions(),
metadataImage),
+ records
+ );
+ }
- if (groupEpoch > group.assignmentEpoch()) {
- targetAssignment = updateTargetAssignment(
+ // 3. Downgrade the consumer group.
+ convertToClassicGroup(
group,
- groupEpoch,
- member,
+ Set.of(),
updatedMember,
- subscriptionType,
+ bumpGroupEpoch,
records
);
- targetAssignmentEpoch = groupEpoch;
} else {
- targetAssignmentEpoch = group.assignmentEpoch();
- targetAssignment =
group.targetAssignment(updatedMember.memberId(), updatedMember.instanceId());
+ // If no downgrade is triggered.
- }
+ // 2. Update the target assignment if the group epoch is larger
than the target assignment epoch.
+ // The delta between the existing and the new target assignment is
persisted to the partition.
+ final int targetAssignmentEpoch;
+ final Assignment targetAssignment;
- // 3. Reconcile the member's assignment with the target assignment if
the member is not
- // fully reconciled yet.
- updatedMember = maybeReconcile(
- groupId,
- updatedMember,
- group::currentPartitionEpoch,
- targetAssignmentEpoch,
- targetAssignment,
- toTopicPartitions(subscription.ownedPartitions(), metadataImage),
- records
- );
+ if (groupEpoch > group.assignmentEpoch()) {
+ targetAssignment = updateTargetAssignment(
+ group,
+ groupEpoch,
+ member,
+ updatedMember,
+ subscriptionType,
+ records
+ );
+ targetAssignmentEpoch = groupEpoch;
+ } else {
+ targetAssignmentEpoch = group.assignmentEpoch();
+ targetAssignment =
group.targetAssignment(updatedMember.memberId(), updatedMember.instanceId());
+ }
- // 4. Maybe downgrade the consumer group if the last static member
using the
- // consumer protocol is replaced by the joining static member.
- ConsumerGroupMember existingStaticMemberOrNull =
group.staticMember(request.groupInstanceId());
- boolean downgrade = existingStaticMemberOrNull != null &&
- validateOnlineDowngradeWithReplacedMember(group,
existingStaticMemberOrNull);
- if (downgrade) {
- convertToClassicGroup(
- group,
- Set.of(),
+ // 3. Reconcile the member's assignment with the target assignment
if the member is not fully reconciled yet.
+ updatedMember = maybeReconcile(
+ groupId,
updatedMember,
+ group::currentPartitionEpoch,
+ targetAssignmentEpoch,
+ targetAssignment,
+ toTopicPartitions(subscription.ownedPartitions(),
metadataImage),
records
);
}
@@ -4084,7 +4106,7 @@ public class GroupMetadataManager {
List<CoordinatorRecord> records = new ArrayList<>();
if (validateOnlineDowngradeWithFencedMembers(group, members)) {
- convertToClassicGroup(group, members, null, records);
+ convertToClassicGroup(group, members, null, false, records);
return new CoordinatorResult<>(records, response, null, false);
} else {
for (ConsumerGroupMember member : members) {
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index efe2ad96435..8d4ae4fbe07 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -12457,8 +12457,11 @@ public class GroupMetadataManagerTest {
assertTrue(classicGroup.isInState(PREPARING_REBALANCE));
}
- @Test
- public void
testLastStaticConsumerProtocolMemberReplacedByClassicProtocolMember() throws
ExecutionException, InterruptedException {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void
testLastStaticConsumerProtocolMemberReplacedByClassicProtocolMember(
+ boolean isSubscriptionChanged
+ ) throws ExecutionException, InterruptedException {
String groupId = "group-id";
String memberId1 = Uuid.randomUuid().toString();
String oldMemberId2 = Uuid.randomUuid().toString();
@@ -12469,11 +12472,9 @@ public class GroupMetadataManagerTest {
Uuid barTopicId = Uuid.randomUuid();
String barTopicName = "bar";
- MockPartitionAssignor assignor = new MockPartitionAssignor("range");
-
List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols1 =
List.of(
new ConsumerGroupMemberMetadataValue.ClassicProtocol()
- .setName("range")
+ .setName(NoOpPartitionAssignor.NAME)
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new
ConsumerPartitionAssignor.Subscription(
List.of(fooTopicName, barTopicName),
null,
@@ -12493,8 +12494,8 @@ public class GroupMetadataManagerTest {
.setPreviousMemberEpoch(9)
.setClientId(DEFAULT_CLIENT_ID)
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
- .setSubscribedTopicNames(List.of("foo", "bar"))
- .setServerAssignorName("range")
+ .setSubscribedTopicNames(List.of(fooTopicName, barTopicName))
+ .setServerAssignorName(NoOpPartitionAssignor.NAME)
.setRebalanceTimeoutMs(45000)
.setClassicMemberMetadata(
new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
@@ -12512,8 +12513,8 @@ public class GroupMetadataManagerTest {
.setPreviousMemberEpoch(9)
.setClientId(DEFAULT_CLIENT_ID)
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
- .setSubscribedTopicNames(List.of("foo"))
- .setServerAssignorName("range")
+ .setSubscribedTopicNames(List.of(fooTopicName))
+ .setServerAssignorName(NoOpPartitionAssignor.NAME)
.setRebalanceTimeoutMs(45000)
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(fooTopicId, 3, 4, 5)))
@@ -12524,12 +12525,14 @@ public class GroupMetadataManagerTest {
.addTopic(barTopicId, barTopicName, 2)
.addRacks()
.buildCoordinatorMetadataImage();
+ long fooTopicHash = computeTopicHash(fooTopicName, metadataImage);
+ long barTopicHash = computeTopicHash(barTopicName, metadataImage);
// Consumer group with two members.
// Member 1 uses the classic protocol and static member 2 uses the
consumer protocol.
GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG,
ConsumerGroupMigrationPolicy.DOWNGRADE.toString())
-
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
List.of(assignor))
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(new
NoOpPartitionAssignor()))
.withMetadataImage(metadataImage)
.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
.withMember(member1)
@@ -12549,12 +12552,19 @@ public class GroupMetadataManagerTest {
context.groupMetadataManager.consumerGroup(groupId).setMetadataRefreshDeadline(Long.MAX_VALUE,
10);
// A new member using classic protocol with the same instance id
joins, scheduling the downgrade.
+ byte[] protocolsMetadata2 =
Utils.toArray(ConsumerProtocol.serializeSubscription(new
ConsumerPartitionAssignor.Subscription(
+ isSubscriptionChanged ? List.of(fooTopicName, barTopicName) :
List.of(fooTopicName))));
+ JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols2 =
+ new JoinGroupRequestData.JoinGroupRequestProtocolCollection(1);
+ protocols2.add(new JoinGroupRequestProtocol()
+ .setName(NoOpPartitionAssignor.NAME)
+ .setMetadata(protocolsMetadata2));
JoinGroupRequestData joinRequest = new
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
.withGroupId(groupId)
.withMemberId(UNKNOWN_MEMBER_ID)
.withGroupInstanceId(instanceId)
.withProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
- .withDefaultProtocolTypeAndProtocols()
+ .withProtocols(protocols2)
.build();
GroupMetadataManagerTestContext.JoinResult result =
context.sendClassicGroupJoin(joinRequest);
result.appendFuture.complete(null);
@@ -12566,14 +12576,15 @@ public class GroupMetadataManagerTest {
.build();
ConsumerGroupMember expectedNewClassicMember2 = new
ConsumerGroupMember.Builder(oldMember2, newMemberId2)
.setPreviousMemberEpoch(0)
+ .setMemberEpoch(isSubscriptionChanged ? 11 : 10)
+ .setSubscribedTopicNames(isSubscriptionChanged ?
List.of(fooTopicName, barTopicName) : List.of(fooTopicName))
.setRebalanceTimeoutMs(joinRequest.rebalanceTimeoutMs())
.setClassicMemberMetadata(
new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
.setSessionTimeoutMs(joinRequest.sessionTimeoutMs())
.setSupportedProtocols(List.of(new
ConsumerGroupMemberMetadataValue.ClassicProtocol()
- .setName("range")
-
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new
ConsumerPartitionAssignor.Subscription(
- List.of(fooTopicName)))))))
+ .setName(NoOpPartitionAssignor.NAME)
+ .setMetadata(protocolsMetadata2)))
).build();
byte[] assignment1 =
Utils.toArray(ConsumerProtocol.serializeAssignment(new
ConsumerPartitionAssignor.Assignment(List.of(
@@ -12600,7 +12611,7 @@ public class GroupMetadataManagerTest {
context.time,
10,
Optional.of(ConsumerProtocol.PROTOCOL_TYPE),
- Optional.of("range"),
+ Optional.of(NoOpPartitionAssignor.NAME),
Optional.of(memberId1),
Optional.of(context.time.milliseconds())
);
@@ -12636,42 +12647,60 @@ public class GroupMetadataManagerTest {
assertTrue(Set.of(memberId1, newMemberId2).contains(leader));
expectedClassicGroup.setLeaderId(Optional.of(leader));
- assertUnorderedRecordsEquals(
- List.of(
- // Remove the existing member 2 that uses the consumer
protocol.
-
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
oldMemberId2)),
-
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
oldMemberId2)),
-
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
oldMemberId2)),
+ List<List<CoordinatorRecord>> replacingRecords = List.of(
+ // Remove the existing member 2 that uses the consumer protocol.
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
oldMemberId2)),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
oldMemberId2)),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
oldMemberId2)),
- // Create the new member 2 that uses the consumer protocol.
-
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedNewConsumerMember2)),
-
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
newMemberId2, expectedNewConsumerMember2.assignedPartitions())),
-
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedNewConsumerMember2)),
+ // Create the new member 2 that uses the consumer protocol.
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedNewConsumerMember2)),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
newMemberId2, expectedNewConsumerMember2.assignedPartitions())),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedNewConsumerMember2))
+ );
- // Update the new member 2 to the member that uses classic
protocol.
+ List<List<CoordinatorRecord>> memberUpdateRecords;
+ if (isSubscriptionChanged) {
+ memberUpdateRecords = List.of(
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedNewClassicMember2)),
-
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedNewClassicMember2)),
-
- // Remove member 1, member 2 and the consumer group.
- List.of(
-
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
memberId1),
-
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
newMemberId2)
- ),
- List.of(
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
memberId1),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
newMemberId2)
- ),
-
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord(groupId)),
- List.of(
-
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
memberId1),
-
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
newMemberId2)
- ),
-
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataTombstoneRecord(groupId)),
-
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord(groupId)),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11,
computeGroupHash(Map.of(
+ fooTopicName, fooTopicHash,
+ barTopicName, barTopicHash
+ ))))
+ );
+ } else {
+ memberUpdateRecords = List.of(
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedNewClassicMember2)),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedNewClassicMember2))
+ );
+ }
- // Create the classic group.
-
List.of(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(expectedClassicGroup,
assignments))
+ List<List<CoordinatorRecord>> downgradeRecords = List.of(
+ // Remove member 1, member 2 and the consumer group.
+ List.of(
+
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
memberId1),
+
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
newMemberId2)
+ ),
+ List.of(
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
memberId1),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
newMemberId2)
),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord(groupId)),
+ List.of(
+
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
memberId1),
+
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
newMemberId2)
+ ),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataTombstoneRecord(groupId)),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord(groupId)),
+
+ // Create the classic group.
+
List.of(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(expectedClassicGroup,
assignments))
+ );
+
+ assertUnorderedRecordsEquals(
+ Stream.of(replacingRecords, memberUpdateRecords, downgradeRecords)
+ .flatMap(List::stream)
+ .collect(Collectors.toList()),
result.records
);
@@ -12681,9 +12710,13 @@ public class GroupMetadataManagerTest {
);
assertNotNull(heartbeatTimeout);
- // No rebalance is triggered.
+ // If the subscription is changed, a rebalance is triggered.
ClassicGroup classicGroup =
context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId, false);
- assertTrue(classicGroup.isInState(STABLE));
+ if (isSubscriptionChanged) {
+ assertTrue(classicGroup.isInState(PREPARING_REBALANCE));
+ } else {
+ assertTrue(classicGroup.isInState(STABLE));
+ }
}
@Test