dajac commented on code in PR #10964:
URL: https://github.com/apache/kafka/pull/10964#discussion_r919825138
##########
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java:
##########
@@ -915,23 +916,57 @@ default ListConsumerGroupsResult listConsumerGroups() {
/**
* List the consumer group offsets available in the cluster.
+ * <p>
+ * @deprecated Since 3.3.
+ * Use {@link #listConsumerGroupOffsets(Map,
ListConsumerGroupOffsetsOptions)}.
*
* @param options The options to use when listing the consumer group
offsets.
* @return The ListGroupOffsetsResult
*/
- ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId,
ListConsumerGroupOffsetsOptions options);
+ @Deprecated
+ default ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String
groupId, ListConsumerGroupOffsetsOptions options) {
+ ListConsumerGroupOffsetsOptions listOptions = new
ListConsumerGroupOffsetsOptions()
+ .requireStable(options.requireStable());
+ ListConsumerGroupOffsetsSpec groupSpec = new
ListConsumerGroupOffsetsSpec()
+ .topicPartitions(options.topicPartitions());
Review Comment:
nit: Indentation is not consistent.
##########
clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsResult.java:
##########
@@ -17,33 +17,73 @@
package org.apache.kafka.clients.admin;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.clients.admin.internals.CoordinatorKey;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
-import java.util.Map;
-
/**
- * The result of the {@link Admin#listConsumerGroupOffsets(String)} call.
+ * The result of the {@link Admin#listConsumerGroupOffsets(Map)} and
+ * {@link Admin#listConsumerGroupOffsets(String)} call.
* <p>
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class ListConsumerGroupOffsetsResult {
- final KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> future;
+ final Map<String, KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>>
futures;
- ListConsumerGroupOffsetsResult(KafkaFuture<Map<TopicPartition,
OffsetAndMetadata>> future) {
- this.future = future;
+ ListConsumerGroupOffsetsResult(final Map<CoordinatorKey,
KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> futures) {
+ this.futures = futures.entrySet().stream()
+ .collect(Collectors.toMap(e -> e.getKey().idValue,
Entry::getValue));
}
/**
* Return a future which yields a map of topic partitions to
OffsetAndMetadata objects.
* If the group does not have a committed offset for this partition, the
corresponding value in the returned map will be null.
*/
public KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>
partitionsToOffsetAndMetadata() {
- return future;
+ if (futures.size() != 1) {
+ throw new IllegalStateException("Offsets from multiple consumer
groups were requested. " +
+ "Use partitionsToOffsetAndMetadata(groupId) instead to get
future for a specific group.");
+ }
+ return futures.values().iterator().next();
}
+ /**
+ * Return a future which yields a map of topic partitions to
OffsetAndMetadata objects for
+ * the specified group. If the group doesn't have a committed offset for a
specific
+ * partition, the corresponding value in the returned map will be null.
+ */
+ public KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>
partitionsToOffsetAndMetadata(String groupId) {
+ return futures.get(groupId);
Review Comment:
I wonder if we should check if the `groupId` was actually requested. We
could thrown an `IllegalArgumentException` instead of returning null.
##########
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##########
@@ -3263,6 +3282,95 @@ public void testListConsumerGroupOffsets() throws
Exception {
}
}
+ @Test
+ public void testBatchedListConsumerGroupOffsets() throws Exception {
+ Cluster cluster = mockCluster(1, 0);
+ Time time = new MockTime();
+ Map<String, ListConsumerGroupOffsetsSpec> groupSpecs =
batchedListConsumerGroupOffsetsSpec();
+
+ try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time,
cluster, AdminClientConfig.RETRIES_CONFIG, "0")) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
env.kafkaClient().prepareResponse(prepareBatchedFindCoordinatorResponse(Errors.NONE,
env.cluster().controller(), groupSpecs.keySet()));
+
+ ListConsumerGroupOffsetsResult result =
env.adminClient().listConsumerGroupOffsets(groupSpecs, new
ListConsumerGroupOffsetsOptions());
+ sendOffsetFetchResponse(env.kafkaClient(), groupSpecs, true);
+
+ verifyListOffsetsForMultipleGroups(groupSpecs, result);
+ }
+ }
+
+ @Test
+ public void testBatchedListConsumerGroupOffsetsWithNoBatchingSupport()
throws Exception {
+ Cluster cluster = mockCluster(1, 0);
+ Time time = new MockTime();
+ Map<String, ListConsumerGroupOffsetsSpec> groupSpecs =
batchedListConsumerGroupOffsetsSpec();
+
+ ApiVersion findCoordinatorV3 = new ApiVersion()
Review Comment:
Should we also add a test where the FindCoordinator supports v4 and
OffsetFetch does not support v8? I think that this could happen in a partially
upgraded cluster.
##########
clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java:
##########
@@ -24,52 +24,116 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
+import
org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestGroup;
+import
org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestTopics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.OffsetFetchRequest;
import org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData;
import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
import org.junit.jupiter.api.Test;
public class ListConsumerGroupOffsetsHandlerTest {
private final LogContext logContext = new LogContext();
- private final String groupId = "group-id";
+ private final int throttleMs = 10;
+ private final String groupZero = "group0";
+ private final String groupOne = "group1";
+ private final String groupTwo = "group2";
+ private final List<String> groups = Arrays.asList(groupZero, groupOne,
groupTwo);
private final TopicPartition t0p0 = new TopicPartition("t0", 0);
private final TopicPartition t0p1 = new TopicPartition("t0", 1);
private final TopicPartition t1p0 = new TopicPartition("t1", 0);
private final TopicPartition t1p1 = new TopicPartition("t1", 1);
- private final List<TopicPartition> tps = Arrays.asList(t0p0, t0p1, t1p0,
t1p1);
+ private final TopicPartition t2p0 = new TopicPartition("t2", 0);
+ private final TopicPartition t2p1 = new TopicPartition("t2", 1);
+ private final TopicPartition t2p2 = new TopicPartition("t2", 2);
+ private final Map<String, ListConsumerGroupOffsetsSpec> singleRequestMap =
Collections.singletonMap(groupZero,
+ new
ListConsumerGroupOffsetsSpec().topicPartitions(Arrays.asList(t0p0, t0p1, t1p0,
t1p1)));
+ private final Map<String, ListConsumerGroupOffsetsSpec> batchedRequestMap =
+ new HashMap<String, ListConsumerGroupOffsetsSpec>() {{
+ put(groupZero, new
ListConsumerGroupOffsetsSpec().topicPartitions(singletonList(t0p0)));
+ put(groupOne, new
ListConsumerGroupOffsetsSpec().topicPartitions(Arrays.asList(t0p0, t1p0,
t1p1)));
+ put(groupTwo, new
ListConsumerGroupOffsetsSpec().topicPartitions(Arrays.asList(t0p0, t1p0, t1p1,
t2p0, t2p1, t2p2)));
+ }};
@Test
public void testBuildRequest() {
- ListConsumerGroupOffsetsHandler handler = new
ListConsumerGroupOffsetsHandler(groupId, tps, logContext);
- OffsetFetchRequest request = handler.buildBatchedRequest(1,
singleton(CoordinatorKey.byGroupId(groupId))).build();
- assertEquals(groupId, request.data().groups().get(0).groupId());
+ ListConsumerGroupOffsetsHandler handler =
+ new ListConsumerGroupOffsetsHandler(singleRequestMap, false,
logContext);
+ OffsetFetchRequest request = handler.buildBatchedRequest(1,
coordinatorKeys(groupZero)).build();
+ assertEquals(groupZero, request.data().groups().get(0).groupId());
assertEquals(2, request.data().groups().get(0).topics().size());
assertEquals(2,
request.data().groups().get(0).topics().get(0).partitionIndexes().size());
assertEquals(2,
request.data().groups().get(0).topics().get(1).partitionIndexes().size());
}
+ @Test
+ public void testBuildRequestWithMultipleGroups() {
+ Map<String, ListConsumerGroupOffsetsSpec> requestMap = new
HashMap<>(this.batchedRequestMap);
+ String groupThree = "group3";
+ requestMap.put(groupThree, new ListConsumerGroupOffsetsSpec()
+ .topicPartitions(Arrays.asList(new TopicPartition("t3", 0),
new TopicPartition("t3", 1))));
+
+ ListConsumerGroupOffsetsHandler handler = new
ListConsumerGroupOffsetsHandler(requestMap, false, logContext);
+ OffsetFetchRequest request1 = handler.buildBatchedRequest(1,
coordinatorKeys(groupZero, groupOne, groupTwo)).build();
+ assertEquals(Utils.mkSet(groupZero, groupOne, groupTwo),
requestGroups(request1));
+
+ OffsetFetchRequest request2 = handler.buildBatchedRequest(2,
coordinatorKeys(groupThree)).build();
+ assertEquals(Utils.mkSet(groupThree), requestGroups(request2));
+
+ Map<String, ListConsumerGroupOffsetsSpec> builtRequests = new
HashMap<>();
+ request1.groupIdsToPartitions().forEach((group, partitions) ->
+ builtRequests.put(group, new
ListConsumerGroupOffsetsSpec().topicPartitions(partitions)));
+ request2.groupIdsToPartitions().forEach((group, partitions) ->
+ builtRequests.put(group, new
ListConsumerGroupOffsetsSpec().topicPartitions(partitions)));
+
+ assertEquals(requestMap, builtRequests);
+ Map<String, List<OffsetFetchRequestTopics>> groupIdsToTopics =
request1.groupIdsToTopics();
+
+ assertEquals(1, groupIdsToTopics.get(groupZero).size());
+ assertEquals(2, groupIdsToTopics.get(groupOne).size());
+ assertEquals(3, groupIdsToTopics.get(groupTwo).size());
+
+ assertEquals(1,
groupIdsToTopics.get(groupZero).get(0).partitionIndexes().size());
+ assertEquals(1,
groupIdsToTopics.get(groupOne).get(0).partitionIndexes().size());
+ assertEquals(2,
groupIdsToTopics.get(groupOne).get(1).partitionIndexes().size());
+ assertEquals(1,
groupIdsToTopics.get(groupTwo).get(0).partitionIndexes().size());
+ assertEquals(2,
groupIdsToTopics.get(groupTwo).get(1).partitionIndexes().size());
+ assertEquals(3,
groupIdsToTopics.get(groupTwo).get(2).partitionIndexes().size());
+
+ groupIdsToTopics = request2.groupIdsToTopics();
+ assertEquals(1, groupIdsToTopics.get(groupThree).size());
Review Comment:
ditto.
##########
core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala:
##########
@@ -753,9 +753,10 @@ object ConsumerGroupCommand extends Logging {
private def getCommittedOffsets(groupId: String): Map[TopicPartition,
OffsetAndMetadata] = {
adminClient.listConsumerGroupOffsets(
- groupId,
- withTimeoutMs(new ListConsumerGroupOffsetsOptions)
- ).partitionsToOffsetAndMetadata.get.asScala
+ Collections.singletonMap(groupId, new ListConsumerGroupOffsetsSpec),
+ withTimeoutMs(new ListConsumerGroupOffsetsOptions()))
+ .partitionsToOffsetAndMetadata(groupId)
+ .get().asScala
Review Comment:
nit: I had a hard time reading this block. I somehow prefer the previous
format, I guess. I leave this up to you.
##########
clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java:
##########
@@ -150,21 +347,62 @@ private void assertCompleted(
AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition,
OffsetAndMetadata>> result,
Map<TopicPartition, OffsetAndMetadata> expected
) {
- CoordinatorKey key = CoordinatorKey.byGroupId(groupId);
+ CoordinatorKey key = CoordinatorKey.byGroupId(groupZero);
assertEquals(emptySet(), result.failedKeys.keySet());
assertEquals(emptyList(), result.unmappedKeys);
assertEquals(singleton(key), result.completedKeys.keySet());
- assertEquals(expected,
result.completedKeys.get(CoordinatorKey.byGroupId(groupId)));
+ assertEquals(expected, result.completedKeys.get(key));
+ }
+
+ private void assertCompletedForMultipleGroups(
+ AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition,
OffsetAndMetadata>> result,
+ Map<String, Map<TopicPartition, OffsetAndMetadata>> expected) {
Review Comment:
nit: Could we format new methods like the other ones in this file in order
to stay consistent. See `assertCompleted` for instance.
##########
clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java:
##########
@@ -24,52 +24,116 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
+import
org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestGroup;
+import
org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestTopics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.OffsetFetchRequest;
import org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData;
import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
import org.junit.jupiter.api.Test;
public class ListConsumerGroupOffsetsHandlerTest {
private final LogContext logContext = new LogContext();
- private final String groupId = "group-id";
+ private final int throttleMs = 10;
+ private final String groupZero = "group0";
+ private final String groupOne = "group1";
+ private final String groupTwo = "group2";
+ private final List<String> groups = Arrays.asList(groupZero, groupOne,
groupTwo);
private final TopicPartition t0p0 = new TopicPartition("t0", 0);
private final TopicPartition t0p1 = new TopicPartition("t0", 1);
private final TopicPartition t1p0 = new TopicPartition("t1", 0);
private final TopicPartition t1p1 = new TopicPartition("t1", 1);
- private final List<TopicPartition> tps = Arrays.asList(t0p0, t0p1, t1p0,
t1p1);
+ private final TopicPartition t2p0 = new TopicPartition("t2", 0);
+ private final TopicPartition t2p1 = new TopicPartition("t2", 1);
+ private final TopicPartition t2p2 = new TopicPartition("t2", 2);
+ private final Map<String, ListConsumerGroupOffsetsSpec> singleRequestMap =
Collections.singletonMap(groupZero,
+ new
ListConsumerGroupOffsetsSpec().topicPartitions(Arrays.asList(t0p0, t0p1, t1p0,
t1p1)));
+ private final Map<String, ListConsumerGroupOffsetsSpec> batchedRequestMap =
+ new HashMap<String, ListConsumerGroupOffsetsSpec>() {{
+ put(groupZero, new
ListConsumerGroupOffsetsSpec().topicPartitions(singletonList(t0p0)));
+ put(groupOne, new
ListConsumerGroupOffsetsSpec().topicPartitions(Arrays.asList(t0p0, t1p0,
t1p1)));
+ put(groupTwo, new
ListConsumerGroupOffsetsSpec().topicPartitions(Arrays.asList(t0p0, t1p0, t1p1,
t2p0, t2p1, t2p2)));
+ }};
@Test
public void testBuildRequest() {
- ListConsumerGroupOffsetsHandler handler = new
ListConsumerGroupOffsetsHandler(groupId, tps, logContext);
- OffsetFetchRequest request = handler.buildBatchedRequest(1,
singleton(CoordinatorKey.byGroupId(groupId))).build();
- assertEquals(groupId, request.data().groups().get(0).groupId());
+ ListConsumerGroupOffsetsHandler handler =
+ new ListConsumerGroupOffsetsHandler(singleRequestMap, false,
logContext);
+ OffsetFetchRequest request = handler.buildBatchedRequest(1,
coordinatorKeys(groupZero)).build();
+ assertEquals(groupZero, request.data().groups().get(0).groupId());
assertEquals(2, request.data().groups().get(0).topics().size());
assertEquals(2,
request.data().groups().get(0).topics().get(0).partitionIndexes().size());
assertEquals(2,
request.data().groups().get(0).topics().get(1).partitionIndexes().size());
}
+ @Test
+ public void testBuildRequestWithMultipleGroups() {
+ Map<String, ListConsumerGroupOffsetsSpec> requestMap = new
HashMap<>(this.batchedRequestMap);
+ String groupThree = "group3";
+ requestMap.put(groupThree, new ListConsumerGroupOffsetsSpec()
+ .topicPartitions(Arrays.asList(new TopicPartition("t3", 0),
new TopicPartition("t3", 1))));
+
+ ListConsumerGroupOffsetsHandler handler = new
ListConsumerGroupOffsetsHandler(requestMap, false, logContext);
+ OffsetFetchRequest request1 = handler.buildBatchedRequest(1,
coordinatorKeys(groupZero, groupOne, groupTwo)).build();
+ assertEquals(Utils.mkSet(groupZero, groupOne, groupTwo),
requestGroups(request1));
+
+ OffsetFetchRequest request2 = handler.buildBatchedRequest(2,
coordinatorKeys(groupThree)).build();
+ assertEquals(Utils.mkSet(groupThree), requestGroups(request2));
+
+ Map<String, ListConsumerGroupOffsetsSpec> builtRequests = new
HashMap<>();
+ request1.groupIdsToPartitions().forEach((group, partitions) ->
+ builtRequests.put(group, new
ListConsumerGroupOffsetsSpec().topicPartitions(partitions)));
+ request2.groupIdsToPartitions().forEach((group, partitions) ->
+ builtRequests.put(group, new
ListConsumerGroupOffsetsSpec().topicPartitions(partitions)));
+
+ assertEquals(requestMap, builtRequests);
+ Map<String, List<OffsetFetchRequestTopics>> groupIdsToTopics =
request1.groupIdsToTopics();
+
+ assertEquals(1, groupIdsToTopics.get(groupZero).size());
Review Comment:
nit: Should we assert `groupIdsToTopics.size()`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]