This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch 2.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push: new ace626c MINOR: Use KafkaConsumer in GetOffsetShell (#5220) ace626c is described below commit ace626c0b325093ea68decd6b0e7f3c0f5070cfd Author: Ismael Juma <ism...@juma.me.uk> AuthorDate: Thu Jun 14 10:37:35 2018 -0700 MINOR: Use KafkaConsumer in GetOffsetShell (#5220) This does the minimal amount of work so that the tool relies on public non-deprecated APIs (i.e. it no longer relies on Scala clients code). Additional improvements (not included here) have been proposed via KIP-308. There are a few other PRs that touch this class with overlapping goals: - https://github.com/apache/kafka/pull/2891 - https://github.com/apache/kafka/pull/3051 - https://github.com/apache/kafka/pull/3320 One of them remains relevant in the context of KIP-308, but the others have been superseded. I included the authors of the 3 PRs as co-authors. Reviewers: Rajini Sivaram <rajinisiva...@googlemail.com>, Vahid Hashemian <vahidhashem...@us.ibm.com>, Manikumar Reddy <manikumar.re...@gmail.com> Co-authored-by: Arseniy Tashoyan <tasho...@gmail.com> Co-authored-by: Vahid Hashemian <vahidhashem...@us.ibm.com> Co-authored-by: Mohammed Amine GARMES Co-authored-by: Ismael Juma <ism...@juma.me.uk> --- .../main/scala/kafka/tools/GetOffsetShell.scala | 127 ++++++++++++++------- 1 file changed, 85 insertions(+), 42 deletions(-) diff --git a/core/src/main/scala/kafka/tools/GetOffsetShell.scala b/core/src/main/scala/kafka/tools/GetOffsetShell.scala index 4104ded..eafddc6 100644 --- a/core/src/main/scala/kafka/tools/GetOffsetShell.scala +++ b/core/src/main/scala/kafka/tools/GetOffsetShell.scala @@ -18,13 +18,16 @@ */ package kafka.tools -import kafka.consumer._ +import java.util.Properties + import joptsimple._ -import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo} -import kafka.common.TopicAndPartition -import kafka.client.ClientUtils import kafka.utils.{CommandLineUtils, Exit, ToolsUtils} +import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} +import org.apache.kafka.common.{PartitionInfo, TopicPartition} +import org.apache.kafka.common.requests.ListOffsetRequest +import org.apache.kafka.common.serialization.ByteArrayDeserializer +import scala.collection.JavaConverters._ object GetOffsetShell { @@ -47,20 +50,20 @@ object GetOffsetShell { .withRequiredArg .describedAs("timestamp/-1(latest)/-2(earliest)") .ofType(classOf[java.lang.Long]) - .defaultsTo(-1) - val nOffsetsOpt = parser.accepts("offsets", "number of offsets returned") + .defaultsTo(-1L) + parser.accepts("offsets", "DEPRECATED AND IGNORED: number of offsets returned") .withRequiredArg .describedAs("count") .ofType(classOf[java.lang.Integer]) .defaultsTo(1) - val maxWaitMsOpt = parser.accepts("max-wait-ms", "The max amount of time each fetch request waits.") + parser.accepts("max-wait-ms", "DEPRECATED AND IGNORED: The max amount of time each fetch request waits.") .withRequiredArg .describedAs("ms") .ofType(classOf[java.lang.Integer]) .defaultsTo(1000) - - if(args.length == 0) - CommandLineUtils.printUsageAndDie(parser, "An interactive shell for getting consumer offsets.") + + if (args.length == 0) + CommandLineUtils.printUsageAndDie(parser, "An interactive shell for getting topic offsets.") val options = parser.parse(args : _*) @@ -69,41 +72,81 @@ object GetOffsetShell { val clientId = "GetOffsetShell" val brokerList = options.valueOf(brokerListOpt) ToolsUtils.validatePortOrDie(parser, brokerList) - val metadataTargetBrokers = ClientUtils.parseBrokerList(brokerList) val topic = options.valueOf(topicOpt) - val partitionList = options.valueOf(partitionOpt) - val time = options.valueOf(timeOpt).longValue - val nOffsets = options.valueOf(nOffsetsOpt).intValue - val maxWaitMs = options.valueOf(maxWaitMsOpt).intValue() - - val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers, clientId, maxWaitMs).topicsMetadata - if(topicsMetadata.size != 1 || !topicsMetadata.head.topic.equals(topic)) { - System.err.println(("Error: no valid topic metadata for topic: %s, " + " probably the topic does not exist, run ").format(topic) + - "kafka-list-topic.sh to verify") - Exit.exit(1) - } - val partitions = - if(partitionList == "") { - topicsMetadata.head.partitionsMetadata.map(_.partitionId) - } else { - partitionList.split(",").map(_.toInt).toSeq - } - partitions.foreach { partitionId => - val partitionMetadataOpt = topicsMetadata.head.partitionsMetadata.find(_.partitionId == partitionId) - partitionMetadataOpt match { - case Some(metadata) => - metadata.leader match { - case Some(leader) => - val consumer = new SimpleConsumer(leader.host, leader.port, 10000, 100000, clientId) - val topicAndPartition = TopicAndPartition(topic, partitionId) - val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(time, nOffsets))) - val offsets = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets - - println("%s:%d:%s".format(topic, partitionId, offsets.mkString(","))) - case None => System.err.println("Error: partition %d does not have a leader. Skip getting offsets".format(partitionId)) + val partitionIdsRequested: Set[Int] = { + val partitionsString = options.valueOf(partitionOpt) + if (partitionsString.isEmpty) + Set.empty + else + partitionsString.split(",").map { partitionString => + try partitionString.toInt + catch { + case _: NumberFormatException => + System.err.println(s"--partitions expects a comma separated list of numeric partition ids, but received: $partitionsString") + Exit.exit(1) } - case None => System.err.println("Error: partition %d does not exist".format(partitionId)) + }.toSet + } + val listOffsetsTimestamp = options.valueOf(timeOpt).longValue + + val config = new Properties + config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + config.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId) + val consumer = new KafkaConsumer(config, new ByteArrayDeserializer, new ByteArrayDeserializer) + + val partitionInfos = listPartitionInfos(consumer, topic, partitionIdsRequested) match { + case None => + System.err.println(s"Topic $topic does not exist") + Exit.exit(1) + case Some(p) if p.isEmpty => + if (partitionIdsRequested.isEmpty) + System.err.println(s"Topic $topic has 0 partitions") + else + System.err.println(s"Topic $topic does not have any of the requested partitions ${partitionIdsRequested.mkString(",")}") + Exit.exit(1) + case Some(p) => p + } + + if (partitionIdsRequested.nonEmpty) { + (partitionIdsRequested -- partitionInfos.map(_.partition)).foreach { partitionId => + System.err.println(s"Error: partition $partitionId does not exist") } } + + val topicPartitions = partitionInfos.sortBy(_.partition).flatMap { p => + if (p.leader == null) { + System.err.println(s"Error: partition ${p.partition} does not have a leader. Skip getting offsets") + None + } else + Some(new TopicPartition(p.topic, p.partition)) + } + + /* Note that the value of the map can be null */ + val partitionOffsets: collection.Map[TopicPartition, java.lang.Long] = listOffsetsTimestamp match { + case ListOffsetRequest.EARLIEST_TIMESTAMP => consumer.beginningOffsets(topicPartitions.asJava).asScala + case ListOffsetRequest.LATEST_TIMESTAMP => consumer.endOffsets(topicPartitions.asJava).asScala + case _ => + val timestampsToSearch = topicPartitions.map(tp => tp -> (listOffsetsTimestamp: java.lang.Long)).toMap.asJava + consumer.offsetsForTimes(timestampsToSearch).asScala.mapValues(x => if (x == null) null else x.offset) + } + + partitionOffsets.toSeq.sortBy { case (tp, _) => tp.partition }.foreach { case (tp, offset) => + println(s"$topic:${tp.partition}:${Option(offset).getOrElse("")}") + } + + } + + /** + * Return the partition infos for `topic`. If the topic does not exist, `None` is returned. + */ + private def listPartitionInfos(consumer: KafkaConsumer[_, _], topic: String, partitionIds: Set[Int]): Option[Seq[PartitionInfo]] = { + val partitionInfos = consumer.listTopics.asScala.filterKeys(_ == topic).values.flatMap(_.asScala).toBuffer + if (partitionInfos.isEmpty) + None + else if (partitionIds.isEmpty) + Some(partitionInfos) + else + Some(partitionInfos.filter(p => partitionIds.contains(p.partition))) } + } -- To stop receiving notification emails like this one, please contact ij...@apache.org.