This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c6194bbb0ac MINOR: populate TopicName in ConsumerGroupDescribe (#15205)
c6194bbb0ac is described below

commit c6194bbb0ac7415373d3d428155e340f867eb703
Author: Dongnuo Lyu <139248811+dongnuo...@users.noreply.github.com>
AuthorDate: Thu Jan 25 08:16:33 2024 -0500

    MINOR: populate TopicName in ConsumerGroupDescribe (#15205)
    
    The patch populates the topic name of 
`ConsumerGroupDescribeResponseData.TopicPartitions` with the corresponding 
topic id in `ConsumerGroupDescribe`.
    
    Reviewers: David Jacot <dja...@confluent.io>
---
 .../coordinator/group/GroupMetadataManager.java    |  6 ++-
 .../coordinator/group/consumer/ConsumerGroup.java  | 11 +++++-
 .../group/consumer/ConsumerGroupMember.java        | 43 +++++++++++++++-----
 .../group/GroupMetadataManagerTest.java            | 17 ++++++--
 .../group/consumer/ConsumerGroupMemberTest.java    | 46 +++++++++++++++++++---
 .../group/consumer/ConsumerGroupTest.java          |  3 +-
 6 files changed, 104 insertions(+), 22 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 206aaf2f59c..d32874be3df 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -483,7 +483,11 @@ public class GroupMetadataManager {
         final List<ConsumerGroupDescribeResponseData.DescribedGroup> 
describedGroups = new ArrayList<>();
         groupIds.forEach(groupId -> {
             try {
-                describedGroups.add(consumerGroup(groupId, 
committedOffset).asDescribedGroup(committedOffset, defaultAssignor.name()));
+                describedGroups.add(consumerGroup(groupId, 
committedOffset).asDescribedGroup(
+                    committedOffset,
+                    defaultAssignor.name(),
+                    metadataImage.topics()
+                ));
             } catch (GroupIdNotFoundException exception) {
                 describedGroups.add(new 
ConsumerGroupDescribeResponseData.DescribedGroup()
                     .setGroupId(groupId)
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
index 3b9390dd974..dc3903068d6 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
@@ -963,7 +963,11 @@ public class ConsumerGroup implements Group {
         return value == null ? 1 : value + 1;
     }
 
-    public ConsumerGroupDescribeResponseData.DescribedGroup 
asDescribedGroup(long committedOffset, String defaultAssignor) {
+    public ConsumerGroupDescribeResponseData.DescribedGroup asDescribedGroup(
+        long committedOffset,
+        String defaultAssignor,
+        TopicsImage topicsImage
+    ) {
         ConsumerGroupDescribeResponseData.DescribedGroup describedGroup = new 
ConsumerGroupDescribeResponseData.DescribedGroup()
             .setGroupId(groupId)
             
.setAssignorName(preferredServerAssignor(committedOffset).orElse(defaultAssignor))
@@ -972,7 +976,10 @@ public class ConsumerGroup implements Group {
             .setAssignmentEpoch(targetAssignmentEpoch.get(committedOffset));
         members.entrySet(committedOffset).forEach(
             entry -> describedGroup.members().add(
-                
entry.getValue().asConsumerGroupDescribeMember(targetAssignment.get(entry.getValue().memberId(),
 committedOffset))
+                entry.getValue().asConsumerGroupDescribeMember(
+                    targetAssignment.get(entry.getValue().memberId(), 
committedOffset),
+                    topicsImage
+                )
             )
         );
         return describedGroup;
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java
index 9110a24c627..5159f1f27ca 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java
@@ -20,6 +20,8 @@ import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
 import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
 import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsImage;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -552,15 +554,19 @@ public class ConsumerGroupMember {
      *
      * @return The ConsumerGroupMember mapped as 
ConsumerGroupDescribeResponseData.Member.
      */
-    public ConsumerGroupDescribeResponseData.Member 
asConsumerGroupDescribeMember(Assignment targetAssignment) {
+    public ConsumerGroupDescribeResponseData.Member 
asConsumerGroupDescribeMember(
+        Assignment targetAssignment,
+        TopicsImage topicsImage
+    ) {
         return new ConsumerGroupDescribeResponseData.Member()
             .setMemberEpoch(memberEpoch)
             .setMemberId(memberId)
             .setAssignment(new ConsumerGroupDescribeResponseData.Assignment()
-                
.setTopicPartitions(topicPartitionsFromMap(assignedPartitions)))
+                .setTopicPartitions(topicPartitionsFromMap(assignedPartitions, 
topicsImage)))
             .setTargetAssignment(new 
ConsumerGroupDescribeResponseData.Assignment()
                 .setTopicPartitions(topicPartitionsFromMap(
-                    targetAssignment != null ? targetAssignment.partitions() : 
Collections.emptyMap()
+                    targetAssignment != null ? targetAssignment.partitions() : 
Collections.emptyMap(),
+                    topicsImage
                 )))
             .setClientHost(clientHost)
             .setClientId(clientId)
@@ -571,13 +577,32 @@ public class ConsumerGroupMember {
     }
 
     private static List<ConsumerGroupDescribeResponseData.TopicPartitions> 
topicPartitionsFromMap(
-        Map<Uuid, Set<Integer>> partitions
+        Map<Uuid, Set<Integer>> partitions,
+        TopicsImage topicsImage
+    ) {
+        List<ConsumerGroupDescribeResponseData.TopicPartitions> 
topicPartitions = new ArrayList<>();
+        partitions.forEach((topicId, partitionSet) -> {
+            String topicName = lookupTopicNameById(topicId, topicsImage);
+            if (topicName != null) {
+                topicPartitions.add(new 
ConsumerGroupDescribeResponseData.TopicPartitions()
+                    .setTopicId(topicId)
+                    .setTopicName(topicName)
+                    .setPartitions(new ArrayList<>(partitionSet)));
+            }
+        });
+        return topicPartitions;
+    }
+
+    private static String lookupTopicNameById(
+        Uuid topicId,
+        TopicsImage topicsImage
     ) {
-        return partitions.entrySet().stream().map(
-            item -> new ConsumerGroupDescribeResponseData.TopicPartitions()
-                .setTopicId(item.getKey())
-                .setPartitions(new ArrayList<>(item.getValue()))
-        ).collect(Collectors.toList());
+        TopicImage topicImage = topicsImage.getTopic(topicId);
+        if (topicImage != null) {
+            return topicImage.name();
+        } else {
+            return null;
+        }
     }
 
     @Override
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 783b2af0016..a385daba221 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -9690,7 +9690,10 @@ public class GroupMetadataManagerTest {
                 .setGroupEpoch(epoch)
                 .setGroupId(consumerGroupIds.get(1))
                 .setMembers(Arrays.asList(
-                    memberBuilder.build().asConsumerGroupDescribeMember(new 
Assignment(Collections.emptyMap()))
+                    memberBuilder.build().asConsumerGroupDescribeMember(
+                        new Assignment(Collections.emptyMap()),
+                        new MetadataImageBuilder().build().topics()
+                    )
                 ))
                 .setGroupState("assigning")
                 .setAssignorName("assignorName")
@@ -9727,10 +9730,15 @@ public class GroupMetadataManagerTest {
         String memberId1 = "memberId1";
         String memberId2 = "memberId2";
         String topicName = "topicName";
+        Uuid topicId = Uuid.randomUuid();
+        MetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(topicId, topicName, 3)
+            .build();
 
         MockPartitionAssignor assignor = new MockPartitionAssignor("range");
         GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
             .withAssignors(Collections.singletonList(assignor))
+            .withMetadataImage(metadataImage)
             .build();
 
         ConsumerGroupMember.Builder memberBuilder1 = new 
ConsumerGroupMember.Builder(memberId1)
@@ -9739,7 +9747,8 @@ public class GroupMetadataManagerTest {
         context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, 
epoch + 1));
 
         Map<Uuid, Set<Integer>> assignmentMap = new HashMap<>();
-        assignmentMap.put(Uuid.randomUuid(), Collections.emptySet());
+        assignmentMap.put(topicId, Collections.emptySet());
+
         ConsumerGroupMember.Builder memberBuilder2 = new 
ConsumerGroupMember.Builder(memberId2);
         
context.replay(RecordHelpers.newMemberSubscriptionRecord(consumerGroupId, 
memberBuilder2.build()));
         
context.replay(RecordHelpers.newTargetAssignmentRecord(consumerGroupId, 
memberId2, assignmentMap));
@@ -9762,8 +9771,8 @@ public class GroupMetadataManagerTest {
         describedGroup = new ConsumerGroupDescribeResponseData.DescribedGroup()
             .setGroupId(consumerGroupId)
             .setMembers(Arrays.asList(
-                memberBuilder1.build().asConsumerGroupDescribeMember(new 
Assignment(Collections.emptyMap())),
-                memberBuilder2.build().asConsumerGroupDescribeMember(new 
Assignment(assignmentMap))
+                memberBuilder1.build().asConsumerGroupDescribeMember(new 
Assignment(Collections.emptyMap()), metadataImage.topics()),
+                memberBuilder2.build().asConsumerGroupDescribeMember(new 
Assignment(assignmentMap), metadataImage.topics())
             ))
             .setGroupState("assigning")
             .setAssignorName("range")
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMemberTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMemberTest.java
index 05ce03eda81..2433c7ef3a9 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMemberTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMemberTest.java
@@ -18,8 +18,10 @@ package org.apache.kafka.coordinator.group.consumer;
 
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
+import org.apache.kafka.coordinator.group.GroupMetadataManagerTest;
 import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
 import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.image.MetadataImage;
 import org.junit.jupiter.api.Test;
 
 import java.nio.ByteBuffer;
@@ -38,7 +40,6 @@ import java.util.stream.Collectors;
 import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
 import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-
 public class ConsumerGroupMemberTest {
 
     @Test
@@ -318,6 +319,13 @@ public class ConsumerGroupMemberTest {
         Uuid topicId1 = Uuid.randomUuid();
         Uuid topicId2 = Uuid.randomUuid();
         Uuid topicId3 = Uuid.randomUuid();
+        Uuid topicId4 = Uuid.randomUuid();
+        MetadataImage metadataImage = new 
GroupMetadataManagerTest.MetadataImageBuilder()
+            .addTopic(topicId1, "topic1", 3)
+            .addTopic(topicId2, "topic2", 3)
+            .addTopic(topicId3, "topic3", 3)
+            .addTopic(topicId4, "topic4", 3)
+            .build();
         List<Integer> assignedPartitions = Arrays.asList(0, 1, 2);
         int epoch = 10;
         ConsumerGroupCurrentMemberAssignmentValue record = new 
ConsumerGroupCurrentMemberAssignmentValue()
@@ -341,7 +349,7 @@ public class ConsumerGroupMemberTest {
         List<String> subscribedTopicNames = Arrays.asList("topic1", "topic2");
         String subscribedTopicRegex = "topic.*";
         Map<Uuid, Set<Integer>> assignmentMap = new HashMap<>();
-        assignmentMap.put(Uuid.randomUuid(), new HashSet<>());
+        assignmentMap.put(topicId4, new HashSet<>(assignedPartitions));
         Assignment targetAssignment = new Assignment(assignmentMap);
         ConsumerGroupMember member = new ConsumerGroupMember.Builder(memberId)
             .updateWith(record)
@@ -353,7 +361,7 @@ public class ConsumerGroupMemberTest {
             .setSubscribedTopicRegex(subscribedTopicRegex)
             .build();
 
-        ConsumerGroupDescribeResponseData.Member actual = 
member.asConsumerGroupDescribeMember(targetAssignment);
+        ConsumerGroupDescribeResponseData.Member actual = 
member.asConsumerGroupDescribeMember(targetAssignment, metadataImage.topics());
         ConsumerGroupDescribeResponseData.Member expected = new 
ConsumerGroupDescribeResponseData.Member()
             .setMemberId(memberId)
             .setMemberEpoch(epoch)
@@ -365,13 +373,18 @@ public class ConsumerGroupMemberTest {
             .setSubscribedTopicRegex(subscribedTopicRegex)
             .setAssignment(
                 new ConsumerGroupDescribeResponseData.Assignment()
-                    .setTopicPartitions(Collections.singletonList(new 
ConsumerGroupDescribeResponseData.TopicPartitions().setTopicId(topicId1).setPartitions(assignedPartitions)))
+                    .setTopicPartitions(Collections.singletonList(new 
ConsumerGroupDescribeResponseData.TopicPartitions()
+                        .setTopicId(topicId1)
+                        .setTopicName("topic1")
+                        .setPartitions(assignedPartitions)
+                    ))
             )
             .setTargetAssignment(
                 new ConsumerGroupDescribeResponseData.Assignment()
                     
.setTopicPartitions(targetAssignment.partitions().entrySet().stream().map(
                         item -> new 
ConsumerGroupDescribeResponseData.TopicPartitions()
                             .setTopicId(item.getKey())
+                            .setTopicName("topic4")
                             .setPartitions(new ArrayList<>(item.getValue()))
                     ).collect(Collectors.toList()))
             );
@@ -384,8 +397,31 @@ public class ConsumerGroupMemberTest {
         ConsumerGroupMember member = new 
ConsumerGroupMember.Builder(Uuid.randomUuid().toString())
             .build();
 
-        ConsumerGroupDescribeResponseData.Member consumerGroupDescribeMember = 
member.asConsumerGroupDescribeMember(null);
+        ConsumerGroupDescribeResponseData.Member consumerGroupDescribeMember = 
member.asConsumerGroupDescribeMember(
+            null, new 
GroupMetadataManagerTest.MetadataImageBuilder().build().topics());
 
         assertEquals(new ConsumerGroupDescribeResponseData.Assignment(), 
consumerGroupDescribeMember.targetAssignment());
     }
+
+    @Test
+    public void testAsConsumerGroupDescribeWithTopicNameNotFound() {
+        Uuid memberId = Uuid.randomUuid();
+        ConsumerGroupCurrentMemberAssignmentValue record = new 
ConsumerGroupCurrentMemberAssignmentValue()
+            .setAssignedPartitions(Collections.singletonList(new 
ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions()
+                .setTopicId(Uuid.randomUuid())
+                .setPartitions(Arrays.asList(0, 1, 2))));
+        ConsumerGroupMember member = new 
ConsumerGroupMember.Builder(memberId.toString())
+            .updateWith(record)
+            .build();
+
+        ConsumerGroupDescribeResponseData.Member expected = new 
ConsumerGroupDescribeResponseData.Member()
+            .setMemberId(memberId.toString())
+            .setSubscribedTopicRegex("");
+        ConsumerGroupDescribeResponseData.Member actual = 
member.asConsumerGroupDescribeMember(null,
+            new GroupMetadataManagerTest.MetadataImageBuilder()
+                .addTopic(Uuid.randomUuid(), "foo", 3)
+                .build().topics()
+        );
+        assertEquals(expected, actual);
+    }
 }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
index 7272e82f7f0..3f2b999608d 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
@@ -978,7 +978,8 @@ public class ConsumerGroupTest {
                 new 
ConsumerGroupDescribeResponseData.Member().setMemberId("member2")
                     .setSubscribedTopicRegex("")
             ));
-        ConsumerGroupDescribeResponseData.DescribedGroup actual = 
group.asDescribedGroup(1, "");
+        ConsumerGroupDescribeResponseData.DescribedGroup actual = 
group.asDescribedGroup(1, "",
+            new 
GroupMetadataManagerTest.MetadataImageBuilder().build().topics());
 
         assertEquals(expected, actual);
     }

Reply via email to