[
https://issues.apache.org/jira/browse/STORM-1136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15312644#comment-15312644
]
ASF GitHub Bot commented on STORM-1136:
---------------------------------------
Github user hmcl commented on a diff in the pull request:
https://github.com/apache/storm/pull/1451#discussion_r65576345
--- Diff:
external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
---
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.monitor;
+
+import kafka.api.OffsetRequest;
+import kafka.api.PartitionOffsetRequestInfo;
+import kafka.common.TopicAndPartition;
+import kafka.javaapi.OffsetResponse;
+import kafka.javaapi.consumer.SimpleConsumer;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.json.simple.JSONValue;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Utility class for querying offset lag for kafka spout
+ */
+public class KafkaOffsetLagUtil {
+ private static final String OPTION_TOPIC_SHORT = "t";
+ private static final String OPTION_TOPIC_LONG = "topics";
+ private static final String OPTION_OLD_CONSUMER_SHORT = "o";
+ private static final String OPTION_OLD_CONSUMER_LONG = "old-spout";
+ private static final String OPTION_BOOTSTRAP_BROKERS_SHORT = "b";
+ private static final String OPTION_BOOTSTRAP_BROKERS_LONG =
"bootstrap-brokers";
+ private static final String OPTION_GROUP_ID_SHORT = "g";
+ private static final String OPTION_GROUP_ID_LONG = "groupid";
+ private static final String OPTION_TOPIC_WILDCARD_SHORT = "w";
+ private static final String OPTION_TOPIC_WILDCARD_LONG =
"wildcard-topic";
+ private static final String OPTION_PARTITIONS_SHORT = "p";
+ private static final String OPTION_PARTITIONS_LONG = "partitions";
+ private static final String OPTION_LEADERS_SHORT = "l";
+ private static final String OPTION_LEADERS_LONG = "leaders";
+ private static final String OPTION_ZK_SERVERS_SHORT = "z";
+ private static final String OPTION_ZK_SERVERS_LONG = "zk-servers";
+ private static final String OPTION_ZK_COMMITTED_NODE_SHORT = "n";
+ private static final String OPTION_ZK_COMMITTED_NODE_LONG = "zk-node";
+ private static final String OPTION_ZK_BROKERS_ROOT_SHORT = "r";
+ private static final String OPTION_ZK_BROKERS_ROOT_LONG =
"zk-brokers-root-node";
+
+ public static void main (String args[]) {
+ try {
+ List<KafkaOffsetLagResult> results;
+ Options options = buildOptions();
+ CommandLineParser parser = new DefaultParser();
+ CommandLine commandLine = parser.parse(options, args);
+ if (!commandLine.hasOption(OPTION_TOPIC_LONG)) {
+ printUsageAndExit(options, OPTION_TOPIC_LONG + " is
required");
+ }
+ if (commandLine.hasOption(OPTION_OLD_CONSUMER_LONG)) {
+ OldKafkaSpoutOffsetQuery oldKafkaSpoutOffsetQuery;
+ if (commandLine.hasOption(OPTION_GROUP_ID_LONG) ||
commandLine.hasOption(OPTION_BOOTSTRAP_BROKERS_LONG)) {
+ printUsageAndExit(options, OPTION_GROUP_ID_LONG + " or
" + OPTION_BOOTSTRAP_BROKERS_LONG + " is not accepted with option " +
+ OPTION_OLD_CONSUMER_LONG);
+ }
+ if (!commandLine.hasOption(OPTION_ZK_SERVERS_LONG) ||
!commandLine.hasOption(OPTION_ZK_COMMITTED_NODE_LONG)) {
+ printUsageAndExit(options, OPTION_ZK_SERVERS_LONG + "
and " + OPTION_ZK_COMMITTED_NODE_LONG + " are required with " +
+ OPTION_OLD_CONSUMER_LONG);
+ }
+ String[] topics =
commandLine.getOptionValue(OPTION_TOPIC_LONG).split(",");
+ if (topics != null && topics.length > 1) {
+ printUsageAndExit(options, "Multiple topics not
supported with option " + OPTION_OLD_CONSUMER_LONG + ". Either a single topic
or a " +
+ "wildcard string for matching topics is
supported");
+ }
+ if (commandLine.hasOption(OPTION_ZK_BROKERS_ROOT_LONG)) {
+ if (commandLine.hasOption(OPTION_PARTITIONS_LONG) ||
commandLine.hasOption(OPTION_LEADERS_LONG)) {
+ printUsageAndExit(options, OPTION_PARTITIONS_LONG
+ " or " + OPTION_LEADERS_LONG + " is not accepted with " +
+ OPTION_ZK_BROKERS_ROOT_LONG);
+ }
+ oldKafkaSpoutOffsetQuery = new
OldKafkaSpoutOffsetQuery(commandLine.getOptionValue(OPTION_TOPIC_LONG),
commandLine.getOptionValue
+ (OPTION_ZK_SERVERS_LONG),
commandLine.getOptionValue(OPTION_ZK_COMMITTED_NODE_LONG), commandLine.hasOption
+ (OPTION_TOPIC_WILDCARD_LONG),
commandLine.getOptionValue(OPTION_ZK_BROKERS_ROOT_LONG));
+ } else {
+ if (commandLine.hasOption(OPTION_TOPIC_WILDCARD_LONG))
{
+ printUsageAndExit(options,
OPTION_TOPIC_WILDCARD_LONG + " is not supported without " +
OPTION_ZK_BROKERS_ROOT_LONG);
+ }
+ if (!commandLine.hasOption(OPTION_PARTITIONS_LONG) ||
!commandLine.hasOption(OPTION_LEADERS_LONG)) {
+ printUsageAndExit(options, OPTION_PARTITIONS_LONG
+ " and " + OPTION_LEADERS_LONG + " are required if " +
OPTION_ZK_BROKERS_ROOT_LONG +
+ " is not provided");
+ }
+ String[] partitions =
commandLine.getOptionValue(OPTION_PARTITIONS_LONG).split(",");
+ String[] leaders =
commandLine.getOptionValue(OPTION_LEADERS_LONG).split(",");
+ if (partitions.length != leaders.length) {
+ printUsageAndExit(options, OPTION_PARTITIONS_LONG
+ " and " + OPTION_LEADERS_LONG + " need to be of same size");
+ }
+ oldKafkaSpoutOffsetQuery = new
OldKafkaSpoutOffsetQuery(commandLine.getOptionValue(OPTION_TOPIC_LONG),
commandLine.getOptionValue
+ (OPTION_ZK_SERVERS_LONG),
commandLine.getOptionValue(OPTION_ZK_COMMITTED_NODE_LONG),
commandLine.getOptionValue
+ (OPTION_PARTITIONS_LONG),
commandLine.getOptionValue(OPTION_LEADERS_LONG));
+ }
+ results = getOffsetLags(oldKafkaSpoutOffsetQuery);
+ } else {
+ String[] oldSpoutOptions = {OPTION_TOPIC_WILDCARD_LONG,
OPTION_PARTITIONS_LONG, OPTION_LEADERS_LONG, OPTION_ZK_SERVERS_LONG,
+ OPTION_ZK_COMMITTED_NODE_LONG,
OPTION_ZK_BROKERS_ROOT_LONG};
+ for (String oldOption: oldSpoutOptions) {
+ if (commandLine.hasOption(oldOption)) {
+ printUsageAndExit(options, oldOption + " is not
accepted without " + OPTION_OLD_CONSUMER_LONG);
+ }
+ }
+ if (!commandLine.hasOption(OPTION_GROUP_ID_LONG) ||
!commandLine.hasOption(OPTION_BOOTSTRAP_BROKERS_LONG)) {
+ printUsageAndExit(options, OPTION_GROUP_ID_LONG + "
and " + OPTION_BOOTSTRAP_BROKERS_LONG + " are required if " +
OPTION_OLD_CONSUMER_LONG +
+ " is not specified");
+ }
+ NewKafkaSpoutOffsetQuery newKafkaSpoutOffsetQuery = new
NewKafkaSpoutOffsetQuery(commandLine.getOptionValue(OPTION_TOPIC_LONG),
+
commandLine.getOptionValue(OPTION_BOOTSTRAP_BROKERS_LONG),
commandLine.getOptionValue(OPTION_GROUP_ID_LONG));
+ results = getOffsetLags(newKafkaSpoutOffsetQuery);
+ }
+ System.out.print(JSONValue.toJSONString(results));
+ } catch (Exception ex) {
+ System.out.print("Unable to get offset lags for kafka. Reason:
");
+ ex.printStackTrace(System.out);
+ }
+ }
+
+ private static void printUsageAndExit (Options options, String
message) {
+ System.out.println(message);
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("storm-kafka-monitor ", options);
+ System.exit(1);
+ }
+
+ private static Options buildOptions () {
+ Options options = new Options();
+ options.addOption(OPTION_TOPIC_SHORT, OPTION_TOPIC_LONG, true,
"REQUIRED Topics (comma separated list) for fetching log head and spout
committed " +
+ "offset");
+ options.addOption(OPTION_OLD_CONSUMER_SHORT,
OPTION_OLD_CONSUMER_LONG, false, "Whether request is for old spout");
+ options.addOption(OPTION_BOOTSTRAP_BROKERS_SHORT,
OPTION_BOOTSTRAP_BROKERS_LONG, true, "Comma separated list of bootstrap broker
hosts for new " +
+ "consumer/spout e.g. hostname1:9092,hostname2:9092");
+ options.addOption(OPTION_GROUP_ID_SHORT, OPTION_GROUP_ID_LONG,
true, "Group id of consumer (applicable only for new kafka spout) ");
+ options.addOption(OPTION_TOPIC_WILDCARD_SHORT,
OPTION_TOPIC_WILDCARD_LONG, false, "Whether topic provided is a wildcard as
supported by ZkHosts in " +
+ "old spout");
+ options.addOption(OPTION_PARTITIONS_SHORT, OPTION_PARTITIONS_LONG,
true, "Comma separated list of partitions corresponding to " +
+ OPTION_LEADERS_LONG + " for old spout with StaticHosts");
+ options.addOption(OPTION_LEADERS_SHORT, OPTION_LEADERS_LONG, true,
"Comma separated list of broker leaders corresponding to " +
+ OPTION_PARTITIONS_LONG + " for old spout with StaticHosts
e.g. hostname1:9092,hostname2:9092");
+ options.addOption(OPTION_ZK_SERVERS_SHORT, OPTION_ZK_SERVERS_LONG,
true, "Comma separated list of zk servers for fetching spout committed offsets
" +
+ "and/or topic metadata for ZkHosts e.g
hostname1:2181,hostname2:2181");
+ options.addOption(OPTION_ZK_COMMITTED_NODE_SHORT,
OPTION_ZK_COMMITTED_NODE_LONG, true, "Zk node prefix where old kafka spout
stores the committed" +
+ " offsets without the topic and partition nodes");
+ options.addOption(OPTION_ZK_BROKERS_ROOT_SHORT,
OPTION_ZK_BROKERS_ROOT_LONG, true, "Zk node prefix where kafka stores broker
information e.g. " +
+ "/brokers (applicable only for old kafka spout) ");
+ return options;
+ }
+
+ /**
+ *
+ * @param newKafkaSpoutOffsetQuery represents the information needed
to query kafka for log head and spout offsets
+ * @return log head offset, spout offset and lag for each partition
+ */
+ public static List<KafkaOffsetLagResult> getOffsetLags
(NewKafkaSpoutOffsetQuery newKafkaSpoutOffsetQuery) {
+ KafkaConsumer<String, String> consumer = null;
+ List<KafkaOffsetLagResult> result = new ArrayList<>();
+ try {
+ Properties props = new Properties();
+ props.put("bootstrap.servers",
newKafkaSpoutOffsetQuery.getBootStrapBrokers());
+ props.put("group.id",
newKafkaSpoutOffsetQuery.getConsumerGroupId());
+ props.put("enable.auto.commit", "false");
+ props.put("session.timeout.ms", "30000");
+ props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
+ props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
+ List<TopicPartition> topicPartitionList = new ArrayList<>();
+ consumer = new KafkaConsumer<>(props);
+ for (String topic:
newKafkaSpoutOffsetQuery.getTopics().split(",")) {
+ List<PartitionInfo> partitionInfoList =
consumer.partitionsFor(topic);
+ if (partitionInfoList != null) {
+ for (PartitionInfo partitionInfo : partitionInfoList) {
+ topicPartitionList.add(new
TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
+ }
+ }
+ }
+ consumer.assign(topicPartitionList);
+ for (TopicPartition topicPartition : topicPartitionList) {
+ OffsetAndMetadata offsetAndMetadata =
consumer.committed(topicPartition);
+ long committedOffset = offsetAndMetadata != null ?
offsetAndMetadata.offset() : -1;
+ consumer.seekToEnd(topicPartition);
+ result.add(new
KafkaOffsetLagResult(topicPartition.topic(), topicPartition.partition(),
committedOffset, consumer.position(topicPartition)));
+ }
+ } finally {
+ if (consumer != null) {
+ consumer.close();
+ }
+ }
+ return result;
+ }
+
+ /**
+ *
+ * @param oldKafkaSpoutOffsetQuery represents the information needed
to query kafka for log head and spout offsets
+ * @return log head offset, spout offset and lag for each partition
+ */
+ public static List<KafkaOffsetLagResult> getOffsetLags
(OldKafkaSpoutOffsetQuery oldKafkaSpoutOffsetQuery) throws Exception {
+ List<KafkaOffsetLagResult> result = new ArrayList<>();
+ Map<String, List<TopicPartition>> leaders =
getLeadersAndTopicPartitions(oldKafkaSpoutOffsetQuery);
+ if (leaders != null) {
+ Map<String, Map<Integer, Long>> logHeadOffsets =
getLogHeadOffsets(leaders);
+ Map<String, List<Integer>> topicPartitions = new HashMap<>();
+ for (Map.Entry<String, List<TopicPartition>> entry:
leaders.entrySet()) {
+ for (TopicPartition topicPartition: entry.getValue()) {
+ if
(!topicPartitions.containsKey(topicPartition.topic())) {
+ topicPartitions.put(topicPartition.topic(), new
ArrayList<Integer>());
+ }
+
topicPartitions.get(topicPartition.topic()).add(topicPartition.partition());
+ }
+ }
+ Map<String, Map<Integer, Long>> oldConsumerOffsets =
getOldConsumerOffsetsFromZk(topicPartitions, oldKafkaSpoutOffsetQuery);
+ for (Map.Entry<String, Map<Integer, Long>> topicOffsets:
logHeadOffsets.entrySet()) {
+ for (Map.Entry<Integer, Long> partitionOffsets:
topicOffsets.getValue().entrySet()) {
+ Long consumerCommittedOffset =
oldConsumerOffsets.get(topicOffsets.getKey()) != null ?
oldConsumerOffsets.get(topicOffsets.getKey()).get
+ (partitionOffsets.getKey()) : -1;
+ consumerCommittedOffset = (consumerCommittedOffset ==
null ? -1 : consumerCommittedOffset);
+ KafkaOffsetLagResult kafkaOffsetLagResult = new
KafkaOffsetLagResult(topicOffsets.getKey(), partitionOffsets.getKey(),
+ consumerCommittedOffset,
partitionOffsets.getValue());
+ result.add(kafkaOffsetLagResult);
+ }
+ }
+ }
+ return result;
+ }
+
+ private static Map<String, List<TopicPartition>>
getLeadersAndTopicPartitions (OldKafkaSpoutOffsetQuery
oldKafkaSpoutOffsetQuery) throws Exception {
+ Map<String, List<TopicPartition>> result = new HashMap<>();
+ if (oldKafkaSpoutOffsetQuery.getPartitions() != null) {
+ String[] partitions =
oldKafkaSpoutOffsetQuery.getPartitions().split(",");
+ String[] leaders =
oldKafkaSpoutOffsetQuery.getLeaders().split(",");
+ for (int i = 0; i < leaders.length; ++i) {
+ if (!result.containsKey(leaders[i])) {
+ result.put(leaders[i], new
ArrayList<TopicPartition>());
+ }
+ result.get(leaders[i]).add(new
TopicPartition(oldKafkaSpoutOffsetQuery.getTopic(),
Integer.parseInt(partitions[i])));
+ }
+ } else {
--- End diff --
when can this scenario occur? There is a lot of code in some of these
methods. It would be ideal to put some comments hinting on what is going on,
and/or split into smaller methods.
> Provide a bin script to check consumer lag from KafkaSpout to Kafka topic
> offsets
> ---------------------------------------------------------------------------------
>
> Key: STORM-1136
> URL: https://issues.apache.org/jira/browse/STORM-1136
> Project: Apache Storm
> Issue Type: Improvement
> Components: storm-kafka
> Reporter: Sriharsha Chintalapani
> Assignee: Priyank Shah
>
> We store kafkaspout offsets in zkroot + id path in zookeeper. Kafka provides
> a utility and a protocol request to fetch latest offsets into topic
> {code}
> example:
> bin/kafka-run-classh.sh kafka.tools.GetOffsetTool
> {code}
> we should provide a way for the user to check how far the kafka spout read
> into topic and whats the lag. If we can expose this via UI even better.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)