dajac commented on code in PR #14271:
URL: https://github.com/apache/kafka/pull/14271#discussion_r1319725841
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##########
@@ -48,6 +49,16 @@ public String toString() {
*/
String stateAsString();
+ /**
+ * @return The {{@link GroupType}}'s String representation with
committedOffset.
Review Comment:
nit: `based on the committed offset.`.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -428,9 +431,32 @@ public CompletableFuture<ListGroupsResponseData>
listGroups(
return
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
}
- return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
- "This API is not implemented yet."
- ));
+ CompletableFuture<ListGroupsResponseData> future = new
CompletableFuture<>();
+ List<ListGroupsResponseData.ListedGroup> results = new ArrayList<>();
Review Comment:
nit: Those two could be final as well.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -428,9 +431,32 @@ public CompletableFuture<ListGroupsResponseData>
listGroups(
return
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
}
- return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
- "This API is not implemented yet."
- ));
+ CompletableFuture<ListGroupsResponseData> future = new
CompletableFuture<>();
+ List<ListGroupsResponseData.ListedGroup> results = new ArrayList<>();
+ final AtomicInteger cnt = new
AtomicInteger(runtime.partitions().size());
+
+ for (TopicPartition tp : runtime.partitions()) {
Review Comment:
It seems to me that calling `partitions()` twice is not safe here because
the number of partitions could change in between the two calls. I think that we
should store it in order to avoid this race condition.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -306,6 +308,19 @@ public CoordinatorResult<OffsetCommitResponseData, Record>
commitOffset(
return offsetMetadataManager.commitOffset(context, request);
}
+ /**
+ * Handles a ListGroups request.
+ *
+ * @param statesFilter The states of the groups we want to list. If empty
all groups are returned with their state.
Review Comment:
nit: Add committedOffset as well.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -179,6 +181,23 @@ public String stateAsString() {
return state.get().toString();
}
+ /**
+ * @return The current state as a String with given committedOffset.
+ */
+ public String stateAsString(long committedOffset) {
+ return state.get(committedOffset).toString();
+ }
+
+ /**
+ * @return the group formatted as a list group response.
Review Comment:
nit: ditto.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -599,6 +606,85 @@ public void testHeartbeatCoordinatorException() throws
Exception {
);
}
+ @Test
+ public void testListGroups() throws ExecutionException,
InterruptedException, TimeoutException {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+ service.startup(() -> 1);
+
+ ListGroupsRequestData request = new ListGroupsRequestData();
+
+ List<ListGroupsResponseData.ListedGroup> expectedResults =
Arrays.asList(
+ new ListGroupsResponseData.ListedGroup()
+ .setGroupId("group1")
+ .setGroupState("Stable")
+ .setProtocolType("protocol1"),
+ new ListGroupsResponseData.ListedGroup()
+ .setGroupId("group2")
+ .setGroupState("Empty")
+ .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+
+ );
+ when(runtime.partitions()).thenReturn(Sets.newSet(new
TopicPartition("__consumer_offsets", 0)));
Review Comment:
Could we add more partitions to ensure that the logic to handle them work as
expected?
##########
server-common/src/main/java/org/apache/kafka/server/util/FutureUtils.java:
##########
@@ -103,4 +104,12 @@ public static <T> CompletableFuture<T>
failedFuture(Throwable ex) {
future.completeExceptionally(ex);
return future;
}
+
+ public static <T> void drainFutures(
Review Comment:
I suppose that we could remove this now, isn't it?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -420,6 +422,17 @@ public Group group(String groupId, long committedOffset)
throws GroupIdNotFoundE
return group;
}
+ /**
+ * @return The GenericGroup List filtered by statesFilter and
committedOffset.
+ */
Review Comment:
nit: Could we update the javadoc to full format? It would be great to
document the arguments, etc. You have an example right below
(`getOrMaybeCreateConsumerGroup`).
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -179,6 +181,23 @@ public String stateAsString() {
return state.get().toString();
}
+ /**
+ * @return The current state as a String with given committedOffset.
Review Comment:
nit: `based on the committed offset.` here as well.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##########
@@ -48,6 +49,16 @@ public String toString() {
*/
String stateAsString();
+ /**
+ * @return The {{@link GroupType}}'s String representation with
committedOffset.
+ */
+ String stateAsString(long committedOffset);
+
+ /**
+ * @return the group formatted as a list group response.
Review Comment:
nit: Should we also add `based on the committed offset.` here?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -599,6 +606,85 @@ public void testHeartbeatCoordinatorException() throws
Exception {
);
}
+ @Test
+ public void testListGroups() throws ExecutionException,
InterruptedException, TimeoutException {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+ service.startup(() -> 1);
+
+ ListGroupsRequestData request = new ListGroupsRequestData();
+
+ List<ListGroupsResponseData.ListedGroup> expectedResults =
Arrays.asList(
+ new ListGroupsResponseData.ListedGroup()
+ .setGroupId("group1")
+ .setGroupState("Stable")
+ .setProtocolType("protocol1"),
+ new ListGroupsResponseData.ListedGroup()
+ .setGroupId("group2")
+ .setGroupState("Empty")
+ .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+
+ );
+ when(runtime.partitions()).thenReturn(Sets.newSet(new
TopicPartition("__consumer_offsets", 0)));
+ when(runtime.scheduleReadOperation(
+ ArgumentMatchers.eq("list-groups"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(expectedResults));
+
+ CompletableFuture<ListGroupsResponseData> responseFuture =
service.listGroups(
+ requestContext(ApiKeys.LIST_GROUPS),
+ request
+ );
+
+ assertEquals(new ListGroupsResponseData().setGroups(expectedResults),
responseFuture.get(5, TimeUnit.SECONDS));
+ }
+
+ private void testListGroupsFailedWithException(Throwable t,
ListGroupsResponseData expectResponseData)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+ service.startup(() -> 1);
+
+ ListGroupsRequestData request = new ListGroupsRequestData();
+ when(runtime.partitions()).thenReturn(Sets.newSet(new
TopicPartition("__consumer_offsets", 0)));
+ when(runtime.scheduleReadOperation(
+ ArgumentMatchers.eq("list-groups"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ ArgumentMatchers.any()
+ )).thenReturn(FutureUtils.failedFuture(t));
+
+ CompletableFuture<ListGroupsResponseData> responseFuture =
service.listGroups(
+ requestContext(ApiKeys.LIST_GROUPS),
+ request
+ );
+ assertEquals(expectResponseData, responseFuture.get(5,
TimeUnit.SECONDS));
+
Review Comment:
nit: Remove empty line.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##########
@@ -1169,7 +1179,7 @@ public List<JoinGroupResponseMember>
currentGenericGroupMembers() {
/**
* @return the group formatted as a list group response.
Review Comment:
nit: ditto.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -599,6 +606,85 @@ public void testHeartbeatCoordinatorException() throws
Exception {
);
}
+ @Test
+ public void testListGroups() throws ExecutionException,
InterruptedException, TimeoutException {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+ service.startup(() -> 1);
+
+ ListGroupsRequestData request = new ListGroupsRequestData();
+
+ List<ListGroupsResponseData.ListedGroup> expectedResults =
Arrays.asList(
+ new ListGroupsResponseData.ListedGroup()
+ .setGroupId("group1")
+ .setGroupState("Stable")
+ .setProtocolType("protocol1"),
+ new ListGroupsResponseData.ListedGroup()
+ .setGroupId("group2")
+ .setGroupState("Empty")
+ .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+
Review Comment:
nit: We can remove this empty line.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -8999,4 +9052,4 @@ public SyncResult(
this.appendFuture = coordinatorResult.appendFuture();
}
}
-}
+}
Review Comment:
nit: Could we add an empty line at the end?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##########
@@ -242,6 +242,16 @@ public String stateAsString() {
return this.state.toString();
}
+ /**
+ * The state of this group with committedOffset.
Review Comment:
nit: `based on the committed offset.`.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -599,6 +606,85 @@ public void testHeartbeatCoordinatorException() throws
Exception {
);
}
+ @Test
+ public void testListGroups() throws ExecutionException,
InterruptedException, TimeoutException {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+ service.startup(() -> 1);
+
+ ListGroupsRequestData request = new ListGroupsRequestData();
+
+ List<ListGroupsResponseData.ListedGroup> expectedResults =
Arrays.asList(
+ new ListGroupsResponseData.ListedGroup()
+ .setGroupId("group1")
+ .setGroupState("Stable")
+ .setProtocolType("protocol1"),
+ new ListGroupsResponseData.ListedGroup()
+ .setGroupId("group2")
+ .setGroupState("Empty")
+ .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+
+ );
+ when(runtime.partitions()).thenReturn(Sets.newSet(new
TopicPartition("__consumer_offsets", 0)));
+ when(runtime.scheduleReadOperation(
+ ArgumentMatchers.eq("list-groups"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(expectedResults));
+
+ CompletableFuture<ListGroupsResponseData> responseFuture =
service.listGroups(
+ requestContext(ApiKeys.LIST_GROUPS),
+ request
+ );
+
+ assertEquals(new ListGroupsResponseData().setGroups(expectedResults),
responseFuture.get(5, TimeUnit.SECONDS));
+ }
+
+ private void testListGroupsFailedWithException(Throwable t,
ListGroupsResponseData expectResponseData)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+ service.startup(() -> 1);
+
+ ListGroupsRequestData request = new ListGroupsRequestData();
+ when(runtime.partitions()).thenReturn(Sets.newSet(new
TopicPartition("__consumer_offsets", 0)));
+ when(runtime.scheduleReadOperation(
+ ArgumentMatchers.eq("list-groups"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ ArgumentMatchers.any()
+ )).thenReturn(FutureUtils.failedFuture(t));
+
+ CompletableFuture<ListGroupsResponseData> responseFuture =
service.listGroups(
+ requestContext(ApiKeys.LIST_GROUPS),
+ request
+ );
+ assertEquals(expectResponseData, responseFuture.get(5,
TimeUnit.SECONDS));
+
+ }
+
+ @Test
+ public void testListGroupsFutureFailed() throws InterruptedException,
ExecutionException, TimeoutException {
+ for (Errors errors : Errors.values()) {
Review Comment:
Testing all errors does not seem necessary to me. I think that we should
test the following cases:
1. NOT_COORDINATOR is handled.
2. Other errors fail the future immediately even if not all the future are
resolved. It would be great if you can have add a unresolved future in this
case.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -8574,6 +8597,36 @@ public void
testHeartbeatDuringRebalanceCausesRebalanceInProgress() throws Excep
assertEquals(Errors.REBALANCE_IN_PROGRESS.code(),
heartbeatResponse.errorCode());
}
+ @Test
+ public void testListGroups() {
+ String genericGroupId = "generic-group-id";
+ String consumerGroupId = "consumer-group-id";
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder().build();
+ context.updateLastWrittenOffset(context.lastWrittenOffset);
+ GenericGroup genericGroup = context.createGenericGroup(genericGroupId);
+ ConsumerGroup consumerGroup =
context.createConsumerGroup(consumerGroupId);
+ context.updateLastWrittenOffset(context.lastWrittenOffset + 2);
Review Comment:
In other tests, we use `replay()` to build the state. For instance, see
`testConsumerGroupStates`. I wonder if we could reuse the same pattern here in
order to keep the tests homogenous. What do you think?
Should we also test the state filtering somehow? One way would be to add a
new member to the consumer group at some point.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]