This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new e9c5069be78 KAFKA-18687: Setting the subscriptionMetadata during 
conversion to consumer group (#19790)
e9c5069be78 is described below

commit e9c5069be78cb00d2af4f45c803907660b157726
Author: Dongnuo Lyu <139248811+dongnuo...@users.noreply.github.com>
AuthorDate: Tue May 27 05:25:57 2025 -0400

    KAFKA-18687: Setting the subscriptionMetadata during conversion to consumer 
group (#19790)
    
    When a consumer protocol static member replaces an existing member in a
    classic group, it's not necessary to recompute the assignment. However,
    it happens anyway.
    
    In
    
    
[ConsumerGroup.fromClassicGroup](https://github.com/apache/kafka/blob/0ff4dafb7de4e24ddb7961d52e50e728f2eee4eb/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java#L1140),
    we don't set the group's subscriptionMetadata.  Later in the consumer
    group heartbeat, we [call
    
    
updateSubscriptionMetadata](https://github.com/apache/kafka/blob/0ff4dafb7de4e24ddb7961d52e50e728f2eee4eb/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java#L1748),
    which [notices that the group's subscriptionMetadata needs an
    
    
update](https://github.com/apache/kafka/blob/0ff4dafb7de4e24ddb7961d52e50e728f2eee4eb/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java#L2757)
    and bumps the epoch. Since the epoch is bumped, we [recompute the
    
    
assignment](https://github.com/apache/kafka/blob/0ff4dafb7de4e24ddb7961d52e50e728f2eee4eb/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java#L1766).
    
    As a fix, this patch sets the subscriptionMetadata in
    ConsumerGroup.fromClassicGroup.
    
    Reviewers: Sean Quah <sq...@confluent.io>, David Jacot <dja...@confluent.io>
---
 .../coordinator/group/GroupMetadataManager.java    |  3 +-
 .../group/modern/consumer/ConsumerGroup.java       | 15 ++++-
 .../group/GroupMetadataManagerTest.java            | 66 +++++++++-------------
 .../group/modern/consumer/ConsumerGroupTest.java   |  8 ++-
 4 files changed, 49 insertions(+), 43 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index d53586efe4d..d228c7538d0 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -1115,7 +1115,8 @@ public class GroupMetadataManager {
                 snapshotRegistry,
                 metrics,
                 classicGroup,
-                metadataImage.topics()
+                metadataImage.topics(),
+                metadataImage.cluster()
             );
         } catch (SchemaException e) {
             log.warn("Cannot upgrade classic group " + classicGroup.groupId() +
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
index 16e0d0fae4a..c0f8673eae6 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
@@ -43,6 +43,7 @@ import org.apache.kafka.coordinator.group.modern.MemberState;
 import org.apache.kafka.coordinator.group.modern.ModernGroup;
 import org.apache.kafka.coordinator.group.modern.ModernGroupMember;
 import org.apache.kafka.coordinator.group.modern.SubscriptionCount;
+import org.apache.kafka.image.ClusterImage;
 import org.apache.kafka.image.TopicsImage;
 import org.apache.kafka.timeline.SnapshotRegistry;
 import org.apache.kafka.timeline.TimelineHashMap;
@@ -1129,7 +1130,8 @@ public class ConsumerGroup extends 
ModernGroup<ConsumerGroupMember> {
      * @param snapshotRegistry  The SnapshotRegistry.
      * @param metrics           The GroupCoordinatorMetricsShard.
      * @param classicGroup      The converted classic group.
-     * @param topicsImage       The TopicsImage for topic id and topic name 
conversion.
+     * @param topicsImage       The current metadata for all available topics.
+     * @param clusterImage      The current metadata for the Kafka cluster.
      * @return  The created ConsumerGroup.
      *
      * @throws SchemaException if any member's subscription or assignment 
cannot be deserialized.
@@ -1139,7 +1141,8 @@ public class ConsumerGroup extends 
ModernGroup<ConsumerGroupMember> {
         SnapshotRegistry snapshotRegistry,
         GroupCoordinatorMetricsShard metrics,
         ClassicGroup classicGroup,
-        TopicsImage topicsImage
+        TopicsImage topicsImage,
+        ClusterImage clusterImage
     ) {
         String groupId = classicGroup.groupId();
         ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, 
groupId, metrics);
@@ -1195,6 +1198,12 @@ public class ConsumerGroup extends 
ModernGroup<ConsumerGroupMember> {
             consumerGroup.updateMember(newMember);
         });
 
+        
consumerGroup.setSubscriptionMetadata(consumerGroup.computeSubscriptionMetadata(
+            consumerGroup.subscribedTopicNames(),
+            topicsImage,
+            clusterImage
+        ));
+
         return consumerGroup;
     }
 
@@ -1210,6 +1219,8 @@ public class ConsumerGroup extends 
ModernGroup<ConsumerGroupMember> {
             
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId(),
 consumerGroupMember))
         );
 
+        
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId(),
 subscriptionMetadata()));
+
         
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId(),
 groupEpoch()));
 
         members().forEach((consumerGroupMemberId, consumerGroupMember) ->
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 6375038c66c..cb4d93d5d14 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -9815,6 +9815,10 @@ public class GroupMetadataManagerTest {
 
             // Create the new consumer group with member 1.
             
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, 
expectedMember1),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
 Map.of(
+                fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1),
+                barTopicName, new TopicMetadata(barTopicId, barTopicName, 1)
+            )),
             GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 
0),
             
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
memberId1, expectedMember1.assignedPartitions()),
             
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 0),
@@ -9823,12 +9827,6 @@ public class GroupMetadataManagerTest {
             // Member 2 joins the new consumer group.
             
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, 
expectedMember2),
 
-            // The subscription metadata hasn't been updated during the 
conversion, so a new one is computed.
-            
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
 Map.of(
-                fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1),
-                barTopicName, new TopicMetadata(barTopicId, barTopicName, 1)
-            )),
-
             // Newly joining member 2 bumps the group epoch. A new target 
assignment is computed.
             GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 
1),
             
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
memberId2, assignor.targetPartitions(memberId2)),
@@ -10030,6 +10028,11 @@ public class GroupMetadataManagerTest {
             
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, 
expectedMember1),
             
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, 
expectedMember2),
 
+            
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
 Map.of(
+                fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2),
+                barTopicName, new TopicMetadata(barTopicId, barTopicName, 1)
+            )),
+
             GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 
0),
             
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
memberId1, expectedMember1.assignedPartitions()),
             
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
memberId2, expectedMember2.assignedPartitions()),
@@ -10042,12 +10045,6 @@ public class GroupMetadataManagerTest {
             // Member 3 joins the new consumer group.
             
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, 
expectedMember3),
 
-            // The subscription metadata hasn't been updated during the 
conversion, so a new one is computed.
-            
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
 Map.of(
-                fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2),
-                barTopicName, new TopicMetadata(barTopicId, barTopicName, 1)
-            )),
-
             // Newly joining member 3 bumps the group epoch. A new target 
assignment is computed.
             GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 
1),
             
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
memberId1, assignor.targetPartitions(memberId1)),
@@ -10235,7 +10232,7 @@ public class GroupMetadataManagerTest {
         );
 
         group.transitionTo(PREPARING_REBALANCE);
-        group.transitionTo(COMPLETING_REBALANCE);
+        group.initNextGeneration();
         group.transitionTo(STABLE);
 
         
context.replay(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(group, 
assignments));
@@ -10257,8 +10254,8 @@ public class GroupMetadataManagerTest {
 
         ConsumerGroupMember expectedClassicMember = new 
ConsumerGroupMember.Builder(memberId)
             .setInstanceId(instanceId)
-            .setMemberEpoch(0)
-            .setPreviousMemberEpoch(0)
+            .setMemberEpoch(group.generationId())
+            .setPreviousMemberEpoch(group.generationId())
             .setClientId(DEFAULT_CLIENT_ID)
             .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
             .setSubscribedTopicNames(List.of(fooTopicName))
@@ -10294,7 +10291,7 @@ public class GroupMetadataManagerTest {
             .build();
 
         ConsumerGroupMember expectedFinalConsumerMember = new 
ConsumerGroupMember.Builder(expectedReplacingConsumerMember)
-            .setMemberEpoch(1)
+            .setMemberEpoch(group.generationId())
             .setServerAssignorName(NoOpPartitionAssignor.NAME)
             .setRebalanceTimeoutMs(5000)
             .setClassicMemberMetadata(null)
@@ -10306,9 +10303,10 @@ public class GroupMetadataManagerTest {
 
             // Create the new consumer group with the static member.
             
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, 
expectedClassicMember),
-            GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 
0),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
 Map.of(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1))),
+            GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 
group.generationId()),
             
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
memberId, expectedClassicMember.assignedPartitions()),
-            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 0),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 group.generationId()),
             
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, 
expectedClassicMember),
 
             // Remove the static member because the rejoining member replaces 
it.
@@ -10321,17 +10319,10 @@ public class GroupMetadataManagerTest {
             
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
newMemberId, mkAssignment(mkTopicAssignment(fooTopicId, 0))),
             
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, 
expectedReplacingConsumerMember),
 
-            // The static member rejoins the new consumer group.
+            // The static member rejoins the new consumer group with the same 
instance id and
+            // takes the assignment of the previous member. No new target 
assignment is computed.
             
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, 
expectedFinalConsumerMember),
 
-            // The subscription metadata hasn't been updated during the 
conversion, so a new one is computed.
-            
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
 Map.of(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1))),
-
-            // Newly joining static member bumps the group epoch. A new target 
assignment is computed.
-            GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 
1),
-            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
newMemberId, mkAssignment(mkTopicAssignment(fooTopicId, 0))),
-            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 1),
-
             // The newly created static member takes the assignment from the 
existing member.
             // Bump its member epoch and transition to STABLE.
             
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, 
expectedFinalConsumerMember)
@@ -10432,6 +10423,10 @@ public class GroupMetadataManagerTest {
 
             // Create the new consumer group with member 1.
             
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, 
expectedMember1),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
 Map.of(
+                fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1),
+                barTopicName, new TopicMetadata(barTopicId, barTopicName, 1)
+            )),
             GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 
1),
             
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
memberId1, expectedMember1.assignedPartitions()),
             
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 1),
@@ -10440,12 +10435,6 @@ public class GroupMetadataManagerTest {
             // Member 2 joins the new consumer group.
             
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, 
expectedMember2),
 
-            // The subscription metadata hasn't been updated during the 
conversion, so a new one is computed.
-            
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
 Map.of(
-                fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1),
-                barTopicName, new TopicMetadata(barTopicId, barTopicName, 1)
-            )),
-
             // Newly joining member 2 bumps the group epoch. A new target 
assignment is computed.
             GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 
2),
             
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
memberId2, Collections.emptyMap()),
@@ -10817,6 +10806,11 @@ public class GroupMetadataManagerTest {
             
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, 
expectedMember1),
             
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, 
expectedMember2),
 
+            
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
 Map.of(
+                fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2),
+                barTopicName, new TopicMetadata(barTopicId, barTopicName, 1)
+            )),
+
             GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 
1),
             
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
memberId1, expectedMember1.assignedPartitions()),
             
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
memberId2, expectedMember2.assignedPartitions()),
@@ -10829,12 +10823,6 @@ public class GroupMetadataManagerTest {
             // Member 3 joins the new consumer group.
             
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, 
expectedMember3),
 
-            // The subscription metadata hasn't been updated during the 
conversion, so a new one is computed.
-            
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
 Map.of(
-                fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2),
-                barTopicName, new TopicMetadata(barTopicId, barTopicName, 1)
-            )),
-
             // Newly joining member 3 bumps the group epoch. A new target 
assignment is computed.
             GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 
2),
             
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
memberId1, assignor.targetPartitions(memberId1)),
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
index 886820e5513..8153a84d397 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
@@ -1533,7 +1533,8 @@ public class ConsumerGroupTest {
             new SnapshotRegistry(logContext),
             mock(GroupCoordinatorMetricsShard.class),
             classicGroup,
-            metadataImage.topics()
+            metadataImage.topics(),
+            metadataImage.cluster()
         );
 
         ConsumerGroup expectedConsumerGroup = new ConsumerGroup(
@@ -1546,6 +1547,10 @@ public class ConsumerGroupTest {
         expectedConsumerGroup.updateTargetAssignment(memberId, new 
Assignment(mkAssignment(
             mkTopicAssignment(fooTopicId, 0)
         )));
+        expectedConsumerGroup.setSubscriptionMetadata(Map.of(
+            fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1),
+            barTopicName, new TopicMetadata(barTopicId, barTopicName, 1)
+        ));
         expectedConsumerGroup.updateMember(new 
ConsumerGroupMember.Builder(memberId)
             .setMemberEpoch(classicGroup.generationId())
             .setState(MemberState.STABLE)
@@ -1577,6 +1582,7 @@ public class ConsumerGroupTest {
         assertEquals(expectedConsumerGroup.groupEpoch(), 
consumerGroup.groupEpoch());
         assertEquals(expectedConsumerGroup.state(), consumerGroup.state());
         assertEquals(expectedConsumerGroup.preferredServerAssignor(), 
consumerGroup.preferredServerAssignor());
+        assertEquals(Map.copyOf(expectedConsumerGroup.subscriptionMetadata()), 
Map.copyOf(consumerGroup.subscriptionMetadata()));
         assertEquals(expectedConsumerGroup.members(), consumerGroup.members());
     }
 

Reply via email to