dongnuo123 commented on code in PR #14462:
URL: https://github.com/apache/kafka/pull/14462#discussion_r1346196163
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -521,6 +575,31 @@ GenericGroup getOrMaybeCreateGenericGroup(
}
}
+ /**
+ * Gets a generic group by committed offset.
+ *
+ * @param groupId The group id.
+ * @param committedOffset A specified committed offset corresponding to
this shard.
+ *
+ * @return A GenericGroup.
+ * @throws GroupIdNotFoundException if the group does not exist or is not
a generic group.
+ */
+ public GenericGroup getGenericGroupByCommittedOffset(
Review Comment:
done
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -751,6 +752,111 @@ public void testListGroupsFailedImmediately()
assertEquals(Collections.emptyList(), listGroupsResponseData.groups());
}
+ @Test
+ public void testDescribeGroups() throws Exception {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+ int partitionCount = 2;
+ service.startup(() -> partitionCount);
+
+ DescribeGroupsResponseData.DescribedGroup describedGroup1 = new
DescribeGroupsResponseData.DescribedGroup()
+ .setGroupId("group-id-1");
+ DescribeGroupsResponseData.DescribedGroup describedGroup2 = new
DescribeGroupsResponseData.DescribedGroup()
+ .setGroupId("group-id-2");
+ List<DescribeGroupsResponseData.DescribedGroup>
expectedDescribedGroups = Arrays.asList(
+ describedGroup1,
+ describedGroup2
+ );
+
+ when(runtime.scheduleReadOperation(
+ ArgumentMatchers.eq("describe-groups"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ ArgumentMatchers.any()
+
)).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(describedGroup1)));
+
+ CompletableFuture<Object> describedGroupFuture = new
CompletableFuture<>();
+ when(runtime.scheduleReadOperation(
+ ArgumentMatchers.eq("describe-groups"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 1)),
+ ArgumentMatchers.any()
+ )).thenReturn(describedGroupFuture);
+
+ CompletableFuture<List<DescribeGroupsResponseData.DescribedGroup>>
future =
+ service.describeGroups(requestContext(ApiKeys.DESCRIBE_GROUPS),
Arrays.asList("group-id-1", "group-id-2"));
+
+ assertFalse(future.isDone());
+
describedGroupFuture.complete(Collections.singletonList(describedGroup2));
+
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]