This is an automated email from the ASF dual-hosted git repository. dajac pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new c6194bbb0ac MINOR: populate TopicName in ConsumerGroupDescribe (#15205) c6194bbb0ac is described below commit c6194bbb0ac7415373d3d428155e340f867eb703 Author: Dongnuo Lyu <139248811+dongnuo...@users.noreply.github.com> AuthorDate: Thu Jan 25 08:16:33 2024 -0500 MINOR: populate TopicName in ConsumerGroupDescribe (#15205) The patch populates the topic name of `ConsumerGroupDescribeResponseData.TopicPartitions` with the corresponding topic id in `ConsumerGroupDescribe`. Reviewers: David Jacot <dja...@confluent.io> --- .../coordinator/group/GroupMetadataManager.java | 6 ++- .../coordinator/group/consumer/ConsumerGroup.java | 11 +++++- .../group/consumer/ConsumerGroupMember.java | 43 +++++++++++++++----- .../group/GroupMetadataManagerTest.java | 17 ++++++-- .../group/consumer/ConsumerGroupMemberTest.java | 46 +++++++++++++++++++--- .../group/consumer/ConsumerGroupTest.java | 3 +- 6 files changed, 104 insertions(+), 22 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 206aaf2f59c..d32874be3df 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -483,7 +483,11 @@ public class GroupMetadataManager { final List<ConsumerGroupDescribeResponseData.DescribedGroup> describedGroups = new ArrayList<>(); groupIds.forEach(groupId -> { try { - describedGroups.add(consumerGroup(groupId, committedOffset).asDescribedGroup(committedOffset, defaultAssignor.name())); + describedGroups.add(consumerGroup(groupId, committedOffset).asDescribedGroup( + committedOffset, + defaultAssignor.name(), + metadataImage.topics() + )); } catch (GroupIdNotFoundException exception) { describedGroups.add(new ConsumerGroupDescribeResponseData.DescribedGroup() .setGroupId(groupId) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java index 3b9390dd974..dc3903068d6 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java @@ -963,7 +963,11 @@ public class ConsumerGroup implements Group { return value == null ? 1 : value + 1; } - public ConsumerGroupDescribeResponseData.DescribedGroup asDescribedGroup(long committedOffset, String defaultAssignor) { + public ConsumerGroupDescribeResponseData.DescribedGroup asDescribedGroup( + long committedOffset, + String defaultAssignor, + TopicsImage topicsImage + ) { ConsumerGroupDescribeResponseData.DescribedGroup describedGroup = new ConsumerGroupDescribeResponseData.DescribedGroup() .setGroupId(groupId) .setAssignorName(preferredServerAssignor(committedOffset).orElse(defaultAssignor)) @@ -972,7 +976,10 @@ public class ConsumerGroup implements Group { .setAssignmentEpoch(targetAssignmentEpoch.get(committedOffset)); members.entrySet(committedOffset).forEach( entry -> describedGroup.members().add( - entry.getValue().asConsumerGroupDescribeMember(targetAssignment.get(entry.getValue().memberId(), committedOffset)) + entry.getValue().asConsumerGroupDescribeMember( + targetAssignment.get(entry.getValue().memberId(), committedOffset), + topicsImage + ) ) ); return describedGroup; diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java index 9110a24c627..5159f1f27ca 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java @@ -20,6 +20,8 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData; import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.image.TopicImage; +import org.apache.kafka.image.TopicsImage; import java.util.ArrayList; import java.util.Collections; @@ -552,15 +554,19 @@ public class ConsumerGroupMember { * * @return The ConsumerGroupMember mapped as ConsumerGroupDescribeResponseData.Member. */ - public ConsumerGroupDescribeResponseData.Member asConsumerGroupDescribeMember(Assignment targetAssignment) { + public ConsumerGroupDescribeResponseData.Member asConsumerGroupDescribeMember( + Assignment targetAssignment, + TopicsImage topicsImage + ) { return new ConsumerGroupDescribeResponseData.Member() .setMemberEpoch(memberEpoch) .setMemberId(memberId) .setAssignment(new ConsumerGroupDescribeResponseData.Assignment() - .setTopicPartitions(topicPartitionsFromMap(assignedPartitions))) + .setTopicPartitions(topicPartitionsFromMap(assignedPartitions, topicsImage))) .setTargetAssignment(new ConsumerGroupDescribeResponseData.Assignment() .setTopicPartitions(topicPartitionsFromMap( - targetAssignment != null ? targetAssignment.partitions() : Collections.emptyMap() + targetAssignment != null ? targetAssignment.partitions() : Collections.emptyMap(), + topicsImage ))) .setClientHost(clientHost) .setClientId(clientId) @@ -571,13 +577,32 @@ public class ConsumerGroupMember { } private static List<ConsumerGroupDescribeResponseData.TopicPartitions> topicPartitionsFromMap( - Map<Uuid, Set<Integer>> partitions + Map<Uuid, Set<Integer>> partitions, + TopicsImage topicsImage + ) { + List<ConsumerGroupDescribeResponseData.TopicPartitions> topicPartitions = new ArrayList<>(); + partitions.forEach((topicId, partitionSet) -> { + String topicName = lookupTopicNameById(topicId, topicsImage); + if (topicName != null) { + topicPartitions.add(new ConsumerGroupDescribeResponseData.TopicPartitions() + .setTopicId(topicId) + .setTopicName(topicName) + .setPartitions(new ArrayList<>(partitionSet))); + } + }); + return topicPartitions; + } + + private static String lookupTopicNameById( + Uuid topicId, + TopicsImage topicsImage ) { - return partitions.entrySet().stream().map( - item -> new ConsumerGroupDescribeResponseData.TopicPartitions() - .setTopicId(item.getKey()) - .setPartitions(new ArrayList<>(item.getValue())) - ).collect(Collectors.toList()); + TopicImage topicImage = topicsImage.getTopic(topicId); + if (topicImage != null) { + return topicImage.name(); + } else { + return null; + } } @Override diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 783b2af0016..a385daba221 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -9690,7 +9690,10 @@ public class GroupMetadataManagerTest { .setGroupEpoch(epoch) .setGroupId(consumerGroupIds.get(1)) .setMembers(Arrays.asList( - memberBuilder.build().asConsumerGroupDescribeMember(new Assignment(Collections.emptyMap())) + memberBuilder.build().asConsumerGroupDescribeMember( + new Assignment(Collections.emptyMap()), + new MetadataImageBuilder().build().topics() + ) )) .setGroupState("assigning") .setAssignorName("assignorName") @@ -9727,10 +9730,15 @@ public class GroupMetadataManagerTest { String memberId1 = "memberId1"; String memberId2 = "memberId2"; String topicName = "topicName"; + Uuid topicId = Uuid.randomUuid(); + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(topicId, topicName, 3) + .build(); MockPartitionAssignor assignor = new MockPartitionAssignor("range"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(metadataImage) .build(); ConsumerGroupMember.Builder memberBuilder1 = new ConsumerGroupMember.Builder(memberId1) @@ -9739,7 +9747,8 @@ public class GroupMetadataManagerTest { context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, epoch + 1)); Map<Uuid, Set<Integer>> assignmentMap = new HashMap<>(); - assignmentMap.put(Uuid.randomUuid(), Collections.emptySet()); + assignmentMap.put(topicId, Collections.emptySet()); + ConsumerGroupMember.Builder memberBuilder2 = new ConsumerGroupMember.Builder(memberId2); context.replay(RecordHelpers.newMemberSubscriptionRecord(consumerGroupId, memberBuilder2.build())); context.replay(RecordHelpers.newTargetAssignmentRecord(consumerGroupId, memberId2, assignmentMap)); @@ -9762,8 +9771,8 @@ public class GroupMetadataManagerTest { describedGroup = new ConsumerGroupDescribeResponseData.DescribedGroup() .setGroupId(consumerGroupId) .setMembers(Arrays.asList( - memberBuilder1.build().asConsumerGroupDescribeMember(new Assignment(Collections.emptyMap())), - memberBuilder2.build().asConsumerGroupDescribeMember(new Assignment(assignmentMap)) + memberBuilder1.build().asConsumerGroupDescribeMember(new Assignment(Collections.emptyMap()), metadataImage.topics()), + memberBuilder2.build().asConsumerGroupDescribeMember(new Assignment(assignmentMap), metadataImage.topics()) )) .setGroupState("assigning") .setAssignorName("range") diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMemberTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMemberTest.java index 05ce03eda81..2433c7ef3a9 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMemberTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMemberTest.java @@ -18,8 +18,10 @@ package org.apache.kafka.coordinator.group.consumer; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData; +import org.apache.kafka.coordinator.group.GroupMetadataManagerTest; import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.image.MetadataImage; import org.junit.jupiter.api.Test; import java.nio.ByteBuffer; @@ -38,7 +40,6 @@ import java.util.stream.Collectors; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; import static org.junit.jupiter.api.Assertions.assertEquals; - public class ConsumerGroupMemberTest { @Test @@ -318,6 +319,13 @@ public class ConsumerGroupMemberTest { Uuid topicId1 = Uuid.randomUuid(); Uuid topicId2 = Uuid.randomUuid(); Uuid topicId3 = Uuid.randomUuid(); + Uuid topicId4 = Uuid.randomUuid(); + MetadataImage metadataImage = new GroupMetadataManagerTest.MetadataImageBuilder() + .addTopic(topicId1, "topic1", 3) + .addTopic(topicId2, "topic2", 3) + .addTopic(topicId3, "topic3", 3) + .addTopic(topicId4, "topic4", 3) + .build(); List<Integer> assignedPartitions = Arrays.asList(0, 1, 2); int epoch = 10; ConsumerGroupCurrentMemberAssignmentValue record = new ConsumerGroupCurrentMemberAssignmentValue() @@ -341,7 +349,7 @@ public class ConsumerGroupMemberTest { List<String> subscribedTopicNames = Arrays.asList("topic1", "topic2"); String subscribedTopicRegex = "topic.*"; Map<Uuid, Set<Integer>> assignmentMap = new HashMap<>(); - assignmentMap.put(Uuid.randomUuid(), new HashSet<>()); + assignmentMap.put(topicId4, new HashSet<>(assignedPartitions)); Assignment targetAssignment = new Assignment(assignmentMap); ConsumerGroupMember member = new ConsumerGroupMember.Builder(memberId) .updateWith(record) @@ -353,7 +361,7 @@ public class ConsumerGroupMemberTest { .setSubscribedTopicRegex(subscribedTopicRegex) .build(); - ConsumerGroupDescribeResponseData.Member actual = member.asConsumerGroupDescribeMember(targetAssignment); + ConsumerGroupDescribeResponseData.Member actual = member.asConsumerGroupDescribeMember(targetAssignment, metadataImage.topics()); ConsumerGroupDescribeResponseData.Member expected = new ConsumerGroupDescribeResponseData.Member() .setMemberId(memberId) .setMemberEpoch(epoch) @@ -365,13 +373,18 @@ public class ConsumerGroupMemberTest { .setSubscribedTopicRegex(subscribedTopicRegex) .setAssignment( new ConsumerGroupDescribeResponseData.Assignment() - .setTopicPartitions(Collections.singletonList(new ConsumerGroupDescribeResponseData.TopicPartitions().setTopicId(topicId1).setPartitions(assignedPartitions))) + .setTopicPartitions(Collections.singletonList(new ConsumerGroupDescribeResponseData.TopicPartitions() + .setTopicId(topicId1) + .setTopicName("topic1") + .setPartitions(assignedPartitions) + )) ) .setTargetAssignment( new ConsumerGroupDescribeResponseData.Assignment() .setTopicPartitions(targetAssignment.partitions().entrySet().stream().map( item -> new ConsumerGroupDescribeResponseData.TopicPartitions() .setTopicId(item.getKey()) + .setTopicName("topic4") .setPartitions(new ArrayList<>(item.getValue())) ).collect(Collectors.toList())) ); @@ -384,8 +397,31 @@ public class ConsumerGroupMemberTest { ConsumerGroupMember member = new ConsumerGroupMember.Builder(Uuid.randomUuid().toString()) .build(); - ConsumerGroupDescribeResponseData.Member consumerGroupDescribeMember = member.asConsumerGroupDescribeMember(null); + ConsumerGroupDescribeResponseData.Member consumerGroupDescribeMember = member.asConsumerGroupDescribeMember( + null, new GroupMetadataManagerTest.MetadataImageBuilder().build().topics()); assertEquals(new ConsumerGroupDescribeResponseData.Assignment(), consumerGroupDescribeMember.targetAssignment()); } + + @Test + public void testAsConsumerGroupDescribeWithTopicNameNotFound() { + Uuid memberId = Uuid.randomUuid(); + ConsumerGroupCurrentMemberAssignmentValue record = new ConsumerGroupCurrentMemberAssignmentValue() + .setAssignedPartitions(Collections.singletonList(new ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions() + .setTopicId(Uuid.randomUuid()) + .setPartitions(Arrays.asList(0, 1, 2)))); + ConsumerGroupMember member = new ConsumerGroupMember.Builder(memberId.toString()) + .updateWith(record) + .build(); + + ConsumerGroupDescribeResponseData.Member expected = new ConsumerGroupDescribeResponseData.Member() + .setMemberId(memberId.toString()) + .setSubscribedTopicRegex(""); + ConsumerGroupDescribeResponseData.Member actual = member.asConsumerGroupDescribeMember(null, + new GroupMetadataManagerTest.MetadataImageBuilder() + .addTopic(Uuid.randomUuid(), "foo", 3) + .build().topics() + ); + assertEquals(expected, actual); + } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java index 7272e82f7f0..3f2b999608d 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java @@ -978,7 +978,8 @@ public class ConsumerGroupTest { new ConsumerGroupDescribeResponseData.Member().setMemberId("member2") .setSubscribedTopicRegex("") )); - ConsumerGroupDescribeResponseData.DescribedGroup actual = group.asDescribedGroup(1, ""); + ConsumerGroupDescribeResponseData.DescribedGroup actual = group.asDescribedGroup(1, "", + new GroupMetadataManagerTest.MetadataImageBuilder().build().topics()); assertEquals(expected, actual); }