This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e3f60c2 KAFKA-12479: Batch partition offset requests in
ConsumerGroupCommand (#10371)
e3f60c2 is described below
commit e3f60c254c66d7021d3e0b61968a59a70e00cb39
Author: Rajini Sivaram <[email protected]>
AuthorDate: Tue Mar 23 09:56:56 2021 +0000
KAFKA-12479: Batch partition offset requests in ConsumerGroupCommand
(#10371)
Reviewers: David Jacot <[email protected]>, Chia-Ping Tsai
<[email protected]>, Ismael Juma <[email protected]>
---
.../kafka/clients/admin/AdminClientTestUtils.java | 13 ++
.../scala/kafka/admin/ConsumerGroupCommand.scala | 65 ++++++----
.../kafka/admin/ConsumerGroupServiceTest.scala | 133 +++++++++++++++++++++
3 files changed, 186 insertions(+), 25 deletions(-)
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
index b0f0055..a64dab6 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
@@ -18,7 +18,11 @@ package org.apache.kafka.clients.admin;
import java.util.Collections;
import java.util.Map;
+import java.util.stream.Collectors;
+
import
org.apache.kafka.clients.admin.CreateTopicsResult.TopicMetadataAndConfig;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.internals.KafkaFutureImpl;
@@ -89,4 +93,13 @@ public class AdminClientTestUtils {
future.complete(description);
return new DescribeTopicsResult(Collections.singletonMap(topic,
future));
}
+
+ public static DescribeTopicsResult describeTopicsResult(Map<String,
TopicDescription> topicDescriptions) {
+ return new DescribeTopicsResult(topicDescriptions.entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, e ->
KafkaFuture.completedFuture(e.getValue()))));
+ }
+
+ public static ListConsumerGroupOffsetsResult
listConsumerGroupOffsetsResult(Map<TopicPartition, OffsetAndMetadata> offsets) {
+ return new
ListConsumerGroupOffsetsResult(KafkaFuture.completedFuture(offsets));
+ }
}
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index e948f6f..2c0dc8c 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -459,13 +459,7 @@ object ConsumerGroupCommand extends Logging {
val partitionLevelResult = mutable.Map[TopicPartition, Throwable]()
val (topicWithPartitions, topicWithoutPartitions) =
topics.partition(_.contains(":"))
-
- val knownPartitions = topicWithPartitions.flatMap { topicArg =>
- val split = topicArg.split(":")
- split(1).split(",").map { partition =>
- new TopicPartition(split(0), partition.toInt)
- }
- }
+ val knownPartitions =
topicWithPartitions.flatMap(parseTopicsWithPartitions)
// Get the partitions of topics that the user did not explicitly specify
the partitions
val describeTopicsResult = adminClient.describeTopics(
@@ -580,18 +574,20 @@ object ConsumerGroupCommand extends Logging {
partitionOffsets, Some(s"${consumerSummary.consumerId}"),
Some(s"${consumerSummary.host}"),
Some(s"${consumerSummary.clientId}"))
}
- val rowsWithoutConsumer = committedOffsets.filter { case (tp, _) =>
- !assignedTopicPartitions.contains(tp)
- }.flatMap { case (topicPartition, offset) =>
+
+ val unassignedPartitions = committedOffsets.filterNot { case (tp, _)
=> assignedTopicPartitions.contains(tp) }
+ val rowsWithoutConsumer = if (unassignedPartitions.nonEmpty) {
collectConsumerAssignment(
groupId,
Option(consumerGroup.coordinator),
- Seq(topicPartition),
- Map(topicPartition -> Some(offset.offset)),
+ unassignedPartitions.keySet.toSeq,
+ unassignedPartitions.map { case (tp, offset) => tp ->
Some(offset.offset) },
Some(MISSING_COLUMN_VALUE),
Some(MISSING_COLUMN_VALUE),
Some(MISSING_COLUMN_VALUE)).toSeq
- }
+ } else
+ Seq.empty
+
groupId -> (Some(state.toString), Some(rowsWithConsumer ++
rowsWithoutConsumer))
}).toMap
@@ -696,7 +692,8 @@ object ConsumerGroupCommand extends Logging {
adminClient.close()
}
- private def createAdminClient(configOverrides: Map[String, String]): Admin
= {
+ // Visibility for testing
+ protected def createAdminClient(configOverrides: Map[String, String]):
Admin = {
val props = if (opts.options.has(opts.commandConfigOpt))
Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)) else new
Properties()
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
opts.options.valueOf(opts.bootstrapServerOpt))
configOverrides.forKeyValue { (k, v) => props.put(k, v)}
@@ -708,22 +705,40 @@ object ConsumerGroupCommand extends Logging {
options.timeoutMs(t)
}
- private def parseTopicPartitionsToReset(groupId: String, topicArgs:
Seq[String]): Seq[TopicPartition] = topicArgs.flatMap {
- case topicArg if topicArg.contains(":") =>
- val topicPartitions = topicArg.split(":")
- val topic = topicPartitions(0)
- topicPartitions(1).split(",").map(partition => new
TopicPartition(topic, partition.toInt))
- case topic =>
+ private def parseTopicsWithPartitions(topicArg: String):
Seq[TopicPartition] = {
+ def partitionNum(partition: String): Int = {
+ try {
+ partition.toInt
+ } catch {
+ case _: NumberFormatException =>
+ throw new IllegalArgumentException(s"Invalid partition
'$partition' specified in topic arg '$topicArg''")
+ }
+ }
+ topicArg.split(":") match {
+ case Array(topic, partitions) =>
+ partitions.split(",").map(partition => new TopicPartition(topic,
partitionNum(partition)))
+ case _ =>
+ throw new IllegalArgumentException(s"Invalid topic arg '$topicArg',
expected topic name and partitions")
+ }
+ }
+
+ private def parseTopicPartitionsToReset(topicArgs: Seq[String]):
Seq[TopicPartition] = {
+ val (topicsWithPartitions, topics) = topicArgs.partition(_.contains(":"))
+ val specifiedPartitions =
topicsWithPartitions.flatMap(parseTopicsWithPartitions)
+
+ val unspecifiedPartitions = if (topics.nonEmpty) {
val descriptionMap = adminClient.describeTopics(
- Seq(topic).asJava,
+ topics.asJava,
withTimeoutMs(new DescribeTopicsOptions)
).all().get.asScala
- val r = descriptionMap.flatMap{ case(topic, description) =>
- description.partitions().asScala.map{ tpInfo =>
+ descriptionMap.flatMap { case (topic, description) =>
+ description.partitions().asScala.map { tpInfo =>
new TopicPartition(topic, tpInfo.partition)
}
}
- r
+ } else
+ Seq.empty
+ specifiedPartitions ++ unspecifiedPartitions
}
private def getPartitionsToReset(groupId: String): Seq[TopicPartition] = {
@@ -731,7 +746,7 @@ object ConsumerGroupCommand extends Logging {
getCommittedOffsets(groupId).keys.toSeq
} else if (opts.options.has(opts.topicOpt)) {
val topics = opts.options.valuesOf(opts.topicOpt).asScala
- parseTopicPartitionsToReset(groupId, topics)
+ parseTopicPartitionsToReset(topics)
} else {
if (opts.options.has(opts.resetFromFileOpt))
Nil
diff --git
a/core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
b/core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
new file mode 100644
index 0000000..86bf674
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.admin
+
+import java.util
+import java.util.{Collections, Optional}
+
+import kafka.admin.ConsumerGroupCommand.{ConsumerGroupCommandOptions,
ConsumerGroupService}
+import org.apache.kafka.clients.admin._
+import org.apache.kafka.clients.consumer.{OffsetAndMetadata, RangeAssignor}
+import org.apache.kafka.common.{ConsumerGroupState, KafkaFuture, Node,
TopicPartition, TopicPartitionInfo}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.api.Test
+import org.mockito.ArgumentMatchers
+import org.mockito.ArgumentMatchers._
+import org.mockito.Mockito._
+
+import scala.jdk.CollectionConverters._
+
+class ConsumerGroupServiceTest {
+
+ private val group = "testGroup"
+ private val topics = (0 until 5).map(i => s"testTopic$i")
+ private val numPartitions = 10
+ private val topicPartitions = topics.flatMap(topic => (0 until
numPartitions).map(i => new TopicPartition(topic, i)))
+ private val admin = mock(classOf[Admin])
+
+ @Test
+ def testAdminRequestsForDescribeOffsets(): Unit = {
+ val args = Array("--bootstrap-server", "localhost:9092", "--group", group,
"--describe", "--offsets")
+ val groupService = consumerGroupService(args)
+
+
when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)),
any()))
+ .thenReturn(describeGroupsResult(ConsumerGroupState.STABLE))
+ when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
+ .thenReturn(listGroupOffsetsResult)
+ when(admin.listOffsets(offsetsArgMatcher, any()))
+ .thenReturn(listOffsetsResult)
+
+ val (state, assignments) = groupService.collectGroupOffsets(group)
+ assertEquals(Some("Stable"), state)
+ assertTrue(assignments.nonEmpty)
+ assertEquals(topicPartitions.size, assignments.get.size)
+
+ verify(admin,
times(1)).describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)),
any())
+ verify(admin,
times(1)).listConsumerGroupOffsets(ArgumentMatchers.eq(group), any())
+ verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
+ }
+
+ @Test
+ def testAdminRequestsForResetOffsets(): Unit = {
+ val args = Seq("--bootstrap-server", "localhost:9092", "--group", group,
"--reset-offsets", "--to-latest")
+ val topicsWithoutPartitionsSpecified = topics.tail
+ val topicArgs = Seq("--topic", s"${topics.head}:${(0 until
numPartitions).mkString(",")}") ++
+ topicsWithoutPartitionsSpecified.flatMap(topic => Seq("--topic", topic))
+ val groupService = consumerGroupService((args ++ topicArgs).toArray)
+
+
when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)),
any()))
+ .thenReturn(describeGroupsResult(ConsumerGroupState.DEAD))
+
when(admin.describeTopics(ArgumentMatchers.eq(topicsWithoutPartitionsSpecified.asJava),
any()))
+ .thenReturn(describeTopicsResult(topicsWithoutPartitionsSpecified))
+ when(admin.listOffsets(offsetsArgMatcher, any()))
+ .thenReturn(listOffsetsResult)
+
+ val resetResult = groupService.resetOffsets()
+ assertEquals(Set(group), resetResult.keySet)
+ assertEquals(topicPartitions.toSet, resetResult(group).keySet)
+
+ verify(admin,
times(1)).describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)),
any())
+ verify(admin,
times(1)).describeTopics(ArgumentMatchers.eq(topicsWithoutPartitionsSpecified.asJava),
any())
+ verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
+ }
+
+ private def consumerGroupService(args: Array[String]): ConsumerGroupService
= {
+ new ConsumerGroupService(new ConsumerGroupCommandOptions(args)) {
+ override protected def createAdminClient(configOverrides:
collection.Map[String, String]): Admin = {
+ admin
+ }
+ }
+ }
+
+ private def describeGroupsResult(groupState: ConsumerGroupState):
DescribeConsumerGroupsResult = {
+ val member1 = new MemberDescription("member1", Optional.of("instance1"),
"client1", "host1", null)
+ val description = new ConsumerGroupDescription(group,
+ true,
+ Collections.singleton(member1),
+ classOf[RangeAssignor].getName,
+ groupState,
+ new Node(1, "localhost", 9092))
+ new DescribeConsumerGroupsResult(Collections.singletonMap(group,
KafkaFuture.completedFuture(description)))
+ }
+
+ private def listGroupOffsetsResult: ListConsumerGroupOffsetsResult = {
+ val offsets = topicPartitions.map(_ -> new
OffsetAndMetadata(100)).toMap.asJava
+ AdminClientTestUtils.listConsumerGroupOffsetsResult(offsets)
+ }
+
+ private def offsetsArgMatcher: util.Map[TopicPartition, OffsetSpec] = {
+ val expectedOffsets = topicPartitions.map(tp => tp ->
OffsetSpec.latest).toMap
+ ArgumentMatchers.argThat[util.Map[TopicPartition, OffsetSpec]] { map =>
+ map.keySet.asScala == expectedOffsets.keySet &&
map.values.asScala.forall(_.isInstanceOf[OffsetSpec.LatestSpec])
+ }
+ }
+
+ private def listOffsetsResult: ListOffsetsResult = {
+ val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100,
System.currentTimeMillis, Optional.of(1))
+ val futures = topicPartitions.map(_ ->
KafkaFuture.completedFuture(resultInfo)).toMap
+ new ListOffsetsResult(futures.asJava)
+ }
+
+ private def describeTopicsResult(topics: Seq[String]): DescribeTopicsResult
= {
+ val topicDescriptions = topics.map { topic =>
+ val partitions = (0 until numPartitions).map(i => new
TopicPartitionInfo(i, null, Collections.emptyList[Node],
Collections.emptyList[Node]))
+ topic -> new TopicDescription(topic, false, partitions.asJava)
+ }.toMap
+ AdminClientTestUtils.describeTopicsResult(topicDescriptions.asJava)
+ }
+}