[
https://issues.apache.org/jira/browse/KAFKA-4333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16302069#comment-16302069
]
ASF GitHub Bot commented on KAFKA-4333:
---------------------------------------
vahidhashemian closed pull request #2059: KAFKA-4333: Report coordinator id of
consumer groups with consumer group '--list' option
URL: https://github.com/apache/kafka/pull/2059
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala
b/core/src/main/scala/kafka/admin/AdminClient.scala
index 50198a763ed..029584bed79 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -173,12 +173,12 @@ class AdminClient(val time: Time,
}
}
- def listAllGroupsFlattened(): List[GroupOverview] = {
- listAllGroups.values.flatten.toList
+ def listAllGroupsFlattened(): List[(GroupOverview, Node)] = {
+ listAllGroups.flatMap(nodeGroups => nodeGroups._2.map(group => (group,
nodeGroups._1))).toList
}
- def listAllConsumerGroupsFlattened(): List[GroupOverview] = {
- listAllGroupsFlattened.filter(_.protocolType ==
ConsumerProtocol.PROTOCOL_TYPE)
+ def listAllConsumerGroupsFlattened(): List[(GroupOverview, Node)] = {
+ listAllGroupsFlattened.filter(_._1.protocolType ==
ConsumerProtocol.PROTOCOL_TYPE)
}
def listGroupOffsets(groupId: String): Map[TopicPartition, Long] = {
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 69f0d8a9624..5ba2f1e728d 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -69,7 +69,7 @@ object ConsumerGroupCommand extends Logging {
try {
if (opts.options.has(opts.listOpt))
- consumerGroupService.listGroups().foreach(println(_))
+ printGroups(consumerGroupService.listGroups, !opts.useOldConsumer)
else if (opts.options.has(opts.describeOpt)) {
val (state, assignments) = consumerGroupService.describeGroup()
val groupId = opts.options.valuesOf(opts.groupOpt).asScala.head
@@ -145,6 +145,24 @@ object ConsumerGroupCommand extends Logging {
}
}
+ def printGroups(groups: List[(String, Option[Int])], useNewConsumer:
Boolean): Unit = {
+ if (groups.isEmpty)
+ println("No consumer group was found.")
+ else {
+ print("\n%-30s".format("GROUP"))
+ if (useNewConsumer)
+ print("%s".format("COORDINATOR-ID"))
+ println()
+
+ groups.foreach { group =>
+ print("%-30s".format(group._1))
+ if (useNewConsumer)
+ print("%s".format(group._2.get))
+ println()
+ }
+ }
+ }
+
def printOffsetsToReset(groupAssignmentsToReset: Map[TopicPartition,
OffsetAndMetadata]): Unit = {
print("\n%-30s %-10s %-15s".format("TOPIC", "PARTITION", "NEW-OFFSET"))
println()
@@ -166,7 +184,7 @@ object ConsumerGroupCommand extends Logging {
sealed trait ConsumerGroupService {
- def listGroups(): List[String]
+ def listGroups(): List[(String, Option[Int])]
def describeGroup(): (Option[String],
Option[Seq[PartitionAssignmentState]]) = {
collectGroupAssignment(opts.options.valueOf(opts.groupOpt))
@@ -243,8 +261,9 @@ object ConsumerGroupCommand extends Logging {
zkUtils.close()
}
- def listGroups(): List[String] = {
- zkUtils.getConsumerGroups().toList
+ def listGroups(): List[(String, Option[Int])] = {
+ // there is no coordinator for zookeeper based groups, so return None
for coordinator id
+ zkUtils.getConsumerGroups().map((_, None)).toList
}
def deleteGroups() {
@@ -418,8 +437,9 @@ object ConsumerGroupCommand extends Logging {
// `consumer` is only needed for `describe`, so we instantiate it lazily
private var consumer: KafkaConsumer[String, String] = null
- def listGroups(): List[String] = {
- adminClient.listAllConsumerGroupsFlattened().map(_.groupId)
+ def listGroups(): List[(String, Option[Int])] = {
+ // return group id along with corresponding coordinator id
+ adminClient.listAllConsumerGroupsFlattened.map(x => (x._1.groupId,
Some(x._2.id)))
}
protected def collectGroupAssignment(group: String): (Option[String],
Option[Seq[PartitionAssignmentState]]) = {
diff --git
a/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
b/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
index ddac7644829..dab92d21cd8 100644
--- a/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
+++ b/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
@@ -195,8 +195,11 @@ class LegacyAdminClientTest extends IntegrationTestHarness
with Logging {
val groups = client.listAllGroupsFlattened
assertFalse(groups.isEmpty)
val group = groups.head
- assertEquals(groupId, group.groupId)
- assertEquals("consumer", group.protocolType)
+ assertEquals(groupId, group._1.groupId)
+ assertEquals("consumer", group._1.protocolType)
+ // make sure there is a valid group coordinator
+ assertTrue(group._2 != null)
+ assertTrue(servers.map(_.config.brokerId).toList.contains(group._2.id))
}
@Test
diff --git a/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala
b/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala
index c03be663433..95665930f0a 100644
--- a/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala
@@ -80,7 +80,7 @@ class ListConsumerGroupTest extends KafkaServerTestHarness {
// action/test
TestUtils.waitUntilTrue(() => {
val groups = consumerGroupCommand.listGroups()
- groups.size == 2 && groups.contains(group)
+ groups.size == 2 && groups.map(_._1).contains(group) &&
groups.map(_._2).toSet == Set(None)
}, "Expected a different list group results.")
// cleanup
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Report consumer group coordinator id when '--list' option is used
> -----------------------------------------------------------------
>
> Key: KAFKA-4333
> URL: https://issues.apache.org/jira/browse/KAFKA-4333
> Project: Kafka
> Issue Type: Improvement
> Components: consumer
> Reporter: Vahid Hashemian
> Assignee: Vahid Hashemian
> Priority: Minor
>
> One piece of information missing when extracting information about consumer
> groups (Java API based) is the coordinator id (broker id of the coordinator).
> It would be useful to enhance the {{--list}} option of the consumer group
> command to report the corresponding coordinator id of each consumer group.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)