This is an automated email from the ASF dual-hosted git repository. dajac pushed a commit to branch 4.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push: new e9c5069be78 KAFKA-18687: Setting the subscriptionMetadata during conversion to consumer group (#19790) e9c5069be78 is described below commit e9c5069be78cb00d2af4f45c803907660b157726 Author: Dongnuo Lyu <139248811+dongnuo...@users.noreply.github.com> AuthorDate: Tue May 27 05:25:57 2025 -0400 KAFKA-18687: Setting the subscriptionMetadata during conversion to consumer group (#19790) When a consumer protocol static member replaces an existing member in a classic group, it's not necessary to recompute the assignment. However, it happens anyway. In [ConsumerGroup.fromClassicGroup](https://github.com/apache/kafka/blob/0ff4dafb7de4e24ddb7961d52e50e728f2eee4eb/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java#L1140), we don't set the group's subscriptionMetadata. Later in the consumer group heartbeat, we [call updateSubscriptionMetadata](https://github.com/apache/kafka/blob/0ff4dafb7de4e24ddb7961d52e50e728f2eee4eb/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java#L1748), which [notices that the group's subscriptionMetadata needs an update](https://github.com/apache/kafka/blob/0ff4dafb7de4e24ddb7961d52e50e728f2eee4eb/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java#L2757) and bumps the epoch. Since the epoch is bumped, we [recompute the assignment](https://github.com/apache/kafka/blob/0ff4dafb7de4e24ddb7961d52e50e728f2eee4eb/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java#L1766). As a fix, this patch sets the subscriptionMetadata in ConsumerGroup.fromClassicGroup. Reviewers: Sean Quah <sq...@confluent.io>, David Jacot <dja...@confluent.io> --- .../coordinator/group/GroupMetadataManager.java | 3 +- .../group/modern/consumer/ConsumerGroup.java | 15 ++++- .../group/GroupMetadataManagerTest.java | 66 +++++++++------------- .../group/modern/consumer/ConsumerGroupTest.java | 8 ++- 4 files changed, 49 insertions(+), 43 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index d53586efe4d..d228c7538d0 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -1115,7 +1115,8 @@ public class GroupMetadataManager { snapshotRegistry, metrics, classicGroup, - metadataImage.topics() + metadataImage.topics(), + metadataImage.cluster() ); } catch (SchemaException e) { log.warn("Cannot upgrade classic group " + classicGroup.groupId() + diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java index 16e0d0fae4a..c0f8673eae6 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java @@ -43,6 +43,7 @@ import org.apache.kafka.coordinator.group.modern.MemberState; import org.apache.kafka.coordinator.group.modern.ModernGroup; import org.apache.kafka.coordinator.group.modern.ModernGroupMember; import org.apache.kafka.coordinator.group.modern.SubscriptionCount; +import org.apache.kafka.image.ClusterImage; import org.apache.kafka.image.TopicsImage; import org.apache.kafka.timeline.SnapshotRegistry; import org.apache.kafka.timeline.TimelineHashMap; @@ -1129,7 +1130,8 @@ public class ConsumerGroup extends ModernGroup<ConsumerGroupMember> { * @param snapshotRegistry The SnapshotRegistry. * @param metrics The GroupCoordinatorMetricsShard. * @param classicGroup The converted classic group. - * @param topicsImage The TopicsImage for topic id and topic name conversion. + * @param topicsImage The current metadata for all available topics. + * @param clusterImage The current metadata for the Kafka cluster. * @return The created ConsumerGroup. * * @throws SchemaException if any member's subscription or assignment cannot be deserialized. @@ -1139,7 +1141,8 @@ public class ConsumerGroup extends ModernGroup<ConsumerGroupMember> { SnapshotRegistry snapshotRegistry, GroupCoordinatorMetricsShard metrics, ClassicGroup classicGroup, - TopicsImage topicsImage + TopicsImage topicsImage, + ClusterImage clusterImage ) { String groupId = classicGroup.groupId(); ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, groupId, metrics); @@ -1195,6 +1198,12 @@ public class ConsumerGroup extends ModernGroup<ConsumerGroupMember> { consumerGroup.updateMember(newMember); }); + consumerGroup.setSubscriptionMetadata(consumerGroup.computeSubscriptionMetadata( + consumerGroup.subscribedTopicNames(), + topicsImage, + clusterImage + )); + return consumerGroup; } @@ -1210,6 +1219,8 @@ public class ConsumerGroup extends ModernGroup<ConsumerGroupMember> { records.add(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId(), consumerGroupMember)) ); + records.add(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId(), subscriptionMetadata())); + records.add(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId(), groupEpoch())); members().forEach((consumerGroupMemberId, consumerGroupMember) -> diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 6375038c66c..cb4d93d5d14 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -9815,6 +9815,10 @@ public class GroupMetadataManagerTest { // Create the new consumer group with member 1. GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1), + GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of( + fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1), + barTopicName, new TopicMetadata(barTopicId, barTopicName, 1) + )), GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 0), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, expectedMember1.assignedPartitions()), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 0), @@ -9823,12 +9827,6 @@ public class GroupMetadataManagerTest { // Member 2 joins the new consumer group. GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember2), - // The subscription metadata hasn't been updated during the conversion, so a new one is computed. - GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of( - fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1), - barTopicName, new TopicMetadata(barTopicId, barTopicName, 1) - )), - // Newly joining member 2 bumps the group epoch. A new target assignment is computed. GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId2, assignor.targetPartitions(memberId2)), @@ -10030,6 +10028,11 @@ public class GroupMetadataManagerTest { GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1), GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember2), + GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of( + fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2), + barTopicName, new TopicMetadata(barTopicId, barTopicName, 1) + )), + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 0), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, expectedMember1.assignedPartitions()), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId2, expectedMember2.assignedPartitions()), @@ -10042,12 +10045,6 @@ public class GroupMetadataManagerTest { // Member 3 joins the new consumer group. GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember3), - // The subscription metadata hasn't been updated during the conversion, so a new one is computed. - GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of( - fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2), - barTopicName, new TopicMetadata(barTopicId, barTopicName, 1) - )), - // Newly joining member 3 bumps the group epoch. A new target assignment is computed. GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, assignor.targetPartitions(memberId1)), @@ -10235,7 +10232,7 @@ public class GroupMetadataManagerTest { ); group.transitionTo(PREPARING_REBALANCE); - group.transitionTo(COMPLETING_REBALANCE); + group.initNextGeneration(); group.transitionTo(STABLE); context.replay(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(group, assignments)); @@ -10257,8 +10254,8 @@ public class GroupMetadataManagerTest { ConsumerGroupMember expectedClassicMember = new ConsumerGroupMember.Builder(memberId) .setInstanceId(instanceId) - .setMemberEpoch(0) - .setPreviousMemberEpoch(0) + .setMemberEpoch(group.generationId()) + .setPreviousMemberEpoch(group.generationId()) .setClientId(DEFAULT_CLIENT_ID) .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) .setSubscribedTopicNames(List.of(fooTopicName)) @@ -10294,7 +10291,7 @@ public class GroupMetadataManagerTest { .build(); ConsumerGroupMember expectedFinalConsumerMember = new ConsumerGroupMember.Builder(expectedReplacingConsumerMember) - .setMemberEpoch(1) + .setMemberEpoch(group.generationId()) .setServerAssignorName(NoOpPartitionAssignor.NAME) .setRebalanceTimeoutMs(5000) .setClassicMemberMetadata(null) @@ -10306,9 +10303,10 @@ public class GroupMetadataManagerTest { // Create the new consumer group with the static member. GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedClassicMember), - GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 0), + GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1))), + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, group.generationId()), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId, expectedClassicMember.assignedPartitions()), - GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 0), + GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, group.generationId()), GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedClassicMember), // Remove the static member because the rejoining member replaces it. @@ -10321,17 +10319,10 @@ public class GroupMetadataManagerTest { GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, newMemberId, mkAssignment(mkTopicAssignment(fooTopicId, 0))), GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedReplacingConsumerMember), - // The static member rejoins the new consumer group. + // The static member rejoins the new consumer group with the same instance id and + // takes the assignment of the previous member. No new target assignment is computed. GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedFinalConsumerMember), - // The subscription metadata hasn't been updated during the conversion, so a new one is computed. - GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1))), - - // Newly joining static member bumps the group epoch. A new target assignment is computed. - GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1), - GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, newMemberId, mkAssignment(mkTopicAssignment(fooTopicId, 0))), - GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 1), - // The newly created static member takes the assignment from the existing member. // Bump its member epoch and transition to STABLE. GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedFinalConsumerMember) @@ -10432,6 +10423,10 @@ public class GroupMetadataManagerTest { // Create the new consumer group with member 1. GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1), + GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of( + fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1), + barTopicName, new TopicMetadata(barTopicId, barTopicName, 1) + )), GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, expectedMember1.assignedPartitions()), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 1), @@ -10440,12 +10435,6 @@ public class GroupMetadataManagerTest { // Member 2 joins the new consumer group. GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember2), - // The subscription metadata hasn't been updated during the conversion, so a new one is computed. - GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of( - fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1), - barTopicName, new TopicMetadata(barTopicId, barTopicName, 1) - )), - // Newly joining member 2 bumps the group epoch. A new target assignment is computed. GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 2), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId2, Collections.emptyMap()), @@ -10817,6 +10806,11 @@ public class GroupMetadataManagerTest { GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1), GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember2), + GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of( + fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2), + barTopicName, new TopicMetadata(barTopicId, barTopicName, 1) + )), + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, expectedMember1.assignedPartitions()), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId2, expectedMember2.assignedPartitions()), @@ -10829,12 +10823,6 @@ public class GroupMetadataManagerTest { // Member 3 joins the new consumer group. GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember3), - // The subscription metadata hasn't been updated during the conversion, so a new one is computed. - GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of( - fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2), - barTopicName, new TopicMetadata(barTopicId, barTopicName, 1) - )), - // Newly joining member 3 bumps the group epoch. A new target assignment is computed. GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 2), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, assignor.targetPartitions(memberId1)), diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java index 886820e5513..8153a84d397 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java @@ -1533,7 +1533,8 @@ public class ConsumerGroupTest { new SnapshotRegistry(logContext), mock(GroupCoordinatorMetricsShard.class), classicGroup, - metadataImage.topics() + metadataImage.topics(), + metadataImage.cluster() ); ConsumerGroup expectedConsumerGroup = new ConsumerGroup( @@ -1546,6 +1547,10 @@ public class ConsumerGroupTest { expectedConsumerGroup.updateTargetAssignment(memberId, new Assignment(mkAssignment( mkTopicAssignment(fooTopicId, 0) ))); + expectedConsumerGroup.setSubscriptionMetadata(Map.of( + fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1), + barTopicName, new TopicMetadata(barTopicId, barTopicName, 1) + )); expectedConsumerGroup.updateMember(new ConsumerGroupMember.Builder(memberId) .setMemberEpoch(classicGroup.generationId()) .setState(MemberState.STABLE) @@ -1577,6 +1582,7 @@ public class ConsumerGroupTest { assertEquals(expectedConsumerGroup.groupEpoch(), consumerGroup.groupEpoch()); assertEquals(expectedConsumerGroup.state(), consumerGroup.state()); assertEquals(expectedConsumerGroup.preferredServerAssignor(), consumerGroup.preferredServerAssignor()); + assertEquals(Map.copyOf(expectedConsumerGroup.subscriptionMetadata()), Map.copyOf(consumerGroup.subscriptionMetadata())); assertEquals(expectedConsumerGroup.members(), consumerGroup.members()); }