[
https://issues.apache.org/jira/browse/STORM-1136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15312701#comment-15312701
]
ASF GitHub Bot commented on STORM-1136:
---------------------------------------
Github user priyank5485 commented on a diff in the pull request:
https://github.com/apache/storm/pull/1451#discussion_r65582081
--- Diff:
external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
---
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.monitor;
+
+import kafka.api.OffsetRequest;
+import kafka.api.PartitionOffsetRequestInfo;
+import kafka.common.TopicAndPartition;
+import kafka.javaapi.OffsetResponse;
+import kafka.javaapi.consumer.SimpleConsumer;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.json.simple.JSONValue;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Utility class for querying offset lag for kafka spout
+ */
+public class KafkaOffsetLagUtil {
+ private static final String OPTION_TOPIC_SHORT = "t";
+ private static final String OPTION_TOPIC_LONG = "topics";
+ private static final String OPTION_OLD_CONSUMER_SHORT = "o";
+ private static final String OPTION_OLD_CONSUMER_LONG = "old-spout";
+ private static final String OPTION_BOOTSTRAP_BROKERS_SHORT = "b";
+ private static final String OPTION_BOOTSTRAP_BROKERS_LONG =
"bootstrap-brokers";
+ private static final String OPTION_GROUP_ID_SHORT = "g";
+ private static final String OPTION_GROUP_ID_LONG = "groupid";
+ private static final String OPTION_TOPIC_WILDCARD_SHORT = "w";
+ private static final String OPTION_TOPIC_WILDCARD_LONG =
"wildcard-topic";
+ private static final String OPTION_PARTITIONS_SHORT = "p";
+ private static final String OPTION_PARTITIONS_LONG = "partitions";
+ private static final String OPTION_LEADERS_SHORT = "l";
+ private static final String OPTION_LEADERS_LONG = "leaders";
+ private static final String OPTION_ZK_SERVERS_SHORT = "z";
+ private static final String OPTION_ZK_SERVERS_LONG = "zk-servers";
+ private static final String OPTION_ZK_COMMITTED_NODE_SHORT = "n";
+ private static final String OPTION_ZK_COMMITTED_NODE_LONG = "zk-node";
+ private static final String OPTION_ZK_BROKERS_ROOT_SHORT = "r";
+ private static final String OPTION_ZK_BROKERS_ROOT_LONG =
"zk-brokers-root-node";
+
+ public static void main (String args[]) {
+ try {
+ List<KafkaOffsetLagResult> results;
+ Options options = buildOptions();
+ CommandLineParser parser = new DefaultParser();
+ CommandLine commandLine = parser.parse(options, args);
+ if (!commandLine.hasOption(OPTION_TOPIC_LONG)) {
+ printUsageAndExit(options, OPTION_TOPIC_LONG + " is
required");
+ }
+ if (commandLine.hasOption(OPTION_OLD_CONSUMER_LONG)) {
+ OldKafkaSpoutOffsetQuery oldKafkaSpoutOffsetQuery;
+ if (commandLine.hasOption(OPTION_GROUP_ID_LONG) ||
commandLine.hasOption(OPTION_BOOTSTRAP_BROKERS_LONG)) {
+ printUsageAndExit(options, OPTION_GROUP_ID_LONG + " or
" + OPTION_BOOTSTRAP_BROKERS_LONG + " is not accepted with option " +
+ OPTION_OLD_CONSUMER_LONG);
+ }
+ if (!commandLine.hasOption(OPTION_ZK_SERVERS_LONG) ||
!commandLine.hasOption(OPTION_ZK_COMMITTED_NODE_LONG)) {
+ printUsageAndExit(options, OPTION_ZK_SERVERS_LONG + "
and " + OPTION_ZK_COMMITTED_NODE_LONG + " are required with " +
+ OPTION_OLD_CONSUMER_LONG);
+ }
+ String[] topics =
commandLine.getOptionValue(OPTION_TOPIC_LONG).split(",");
+ if (topics != null && topics.length > 1) {
+ printUsageAndExit(options, "Multiple topics not
supported with option " + OPTION_OLD_CONSUMER_LONG + ". Either a single topic
or a " +
+ "wildcard string for matching topics is
supported");
+ }
+ if (commandLine.hasOption(OPTION_ZK_BROKERS_ROOT_LONG)) {
+ if (commandLine.hasOption(OPTION_PARTITIONS_LONG) ||
commandLine.hasOption(OPTION_LEADERS_LONG)) {
+ printUsageAndExit(options, OPTION_PARTITIONS_LONG
+ " or " + OPTION_LEADERS_LONG + " is not accepted with " +
+ OPTION_ZK_BROKERS_ROOT_LONG);
+ }
+ oldKafkaSpoutOffsetQuery = new
OldKafkaSpoutOffsetQuery(commandLine.getOptionValue(OPTION_TOPIC_LONG),
commandLine.getOptionValue
+ (OPTION_ZK_SERVERS_LONG),
commandLine.getOptionValue(OPTION_ZK_COMMITTED_NODE_LONG), commandLine.hasOption
+ (OPTION_TOPIC_WILDCARD_LONG),
commandLine.getOptionValue(OPTION_ZK_BROKERS_ROOT_LONG));
+ } else {
+ if (commandLine.hasOption(OPTION_TOPIC_WILDCARD_LONG))
{
+ printUsageAndExit(options,
OPTION_TOPIC_WILDCARD_LONG + " is not supported without " +
OPTION_ZK_BROKERS_ROOT_LONG);
+ }
+ if (!commandLine.hasOption(OPTION_PARTITIONS_LONG) ||
!commandLine.hasOption(OPTION_LEADERS_LONG)) {
+ printUsageAndExit(options, OPTION_PARTITIONS_LONG
+ " and " + OPTION_LEADERS_LONG + " are required if " +
OPTION_ZK_BROKERS_ROOT_LONG +
+ " is not provided");
+ }
+ String[] partitions =
commandLine.getOptionValue(OPTION_PARTITIONS_LONG).split(",");
+ String[] leaders =
commandLine.getOptionValue(OPTION_LEADERS_LONG).split(",");
+ if (partitions.length != leaders.length) {
+ printUsageAndExit(options, OPTION_PARTITIONS_LONG
+ " and " + OPTION_LEADERS_LONG + " need to be of same size");
+ }
+ oldKafkaSpoutOffsetQuery = new
OldKafkaSpoutOffsetQuery(commandLine.getOptionValue(OPTION_TOPIC_LONG),
commandLine.getOptionValue
+ (OPTION_ZK_SERVERS_LONG),
commandLine.getOptionValue(OPTION_ZK_COMMITTED_NODE_LONG),
commandLine.getOptionValue
+ (OPTION_PARTITIONS_LONG),
commandLine.getOptionValue(OPTION_LEADERS_LONG));
+ }
+ results = getOffsetLags(oldKafkaSpoutOffsetQuery);
+ } else {
+ String[] oldSpoutOptions = {OPTION_TOPIC_WILDCARD_LONG,
OPTION_PARTITIONS_LONG, OPTION_LEADERS_LONG, OPTION_ZK_SERVERS_LONG,
+ OPTION_ZK_COMMITTED_NODE_LONG,
OPTION_ZK_BROKERS_ROOT_LONG};
+ for (String oldOption: oldSpoutOptions) {
+ if (commandLine.hasOption(oldOption)) {
+ printUsageAndExit(options, oldOption + " is not
accepted without " + OPTION_OLD_CONSUMER_LONG);
+ }
+ }
+ if (!commandLine.hasOption(OPTION_GROUP_ID_LONG) ||
!commandLine.hasOption(OPTION_BOOTSTRAP_BROKERS_LONG)) {
+ printUsageAndExit(options, OPTION_GROUP_ID_LONG + "
and " + OPTION_BOOTSTRAP_BROKERS_LONG + " are required if " +
OPTION_OLD_CONSUMER_LONG +
+ " is not specified");
+ }
+ NewKafkaSpoutOffsetQuery newKafkaSpoutOffsetQuery = new
NewKafkaSpoutOffsetQuery(commandLine.getOptionValue(OPTION_TOPIC_LONG),
+
commandLine.getOptionValue(OPTION_BOOTSTRAP_BROKERS_LONG),
commandLine.getOptionValue(OPTION_GROUP_ID_LONG));
+ results = getOffsetLags(newKafkaSpoutOffsetQuery);
+ }
+ System.out.print(JSONValue.toJSONString(results));
+ } catch (Exception ex) {
+ System.out.print("Unable to get offset lags for kafka. Reason:
");
+ ex.printStackTrace(System.out);
+ }
+ }
+
+ private static void printUsageAndExit (Options options, String
message) {
+ System.out.println(message);
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("storm-kafka-monitor ", options);
+ System.exit(1);
+ }
+
+ private static Options buildOptions () {
+ Options options = new Options();
+ options.addOption(OPTION_TOPIC_SHORT, OPTION_TOPIC_LONG, true,
"REQUIRED Topics (comma separated list) for fetching log head and spout
committed " +
+ "offset");
+ options.addOption(OPTION_OLD_CONSUMER_SHORT,
OPTION_OLD_CONSUMER_LONG, false, "Whether request is for old spout");
+ options.addOption(OPTION_BOOTSTRAP_BROKERS_SHORT,
OPTION_BOOTSTRAP_BROKERS_LONG, true, "Comma separated list of bootstrap broker
hosts for new " +
+ "consumer/spout e.g. hostname1:9092,hostname2:9092");
+ options.addOption(OPTION_GROUP_ID_SHORT, OPTION_GROUP_ID_LONG,
true, "Group id of consumer (applicable only for new kafka spout) ");
+ options.addOption(OPTION_TOPIC_WILDCARD_SHORT,
OPTION_TOPIC_WILDCARD_LONG, false, "Whether topic provided is a wildcard as
supported by ZkHosts in " +
+ "old spout");
+ options.addOption(OPTION_PARTITIONS_SHORT, OPTION_PARTITIONS_LONG,
true, "Comma separated list of partitions corresponding to " +
+ OPTION_LEADERS_LONG + " for old spout with StaticHosts");
+ options.addOption(OPTION_LEADERS_SHORT, OPTION_LEADERS_LONG, true,
"Comma separated list of broker leaders corresponding to " +
+ OPTION_PARTITIONS_LONG + " for old spout with StaticHosts
e.g. hostname1:9092,hostname2:9092");
+ options.addOption(OPTION_ZK_SERVERS_SHORT, OPTION_ZK_SERVERS_LONG,
true, "Comma separated list of zk servers for fetching spout committed offsets
" +
+ "and/or topic metadata for ZkHosts e.g
hostname1:2181,hostname2:2181");
+ options.addOption(OPTION_ZK_COMMITTED_NODE_SHORT,
OPTION_ZK_COMMITTED_NODE_LONG, true, "Zk node prefix where old kafka spout
stores the committed" +
+ " offsets without the topic and partition nodes");
+ options.addOption(OPTION_ZK_BROKERS_ROOT_SHORT,
OPTION_ZK_BROKERS_ROOT_LONG, true, "Zk node prefix where kafka stores broker
information e.g. " +
+ "/brokers (applicable only for old kafka spout) ");
+ return options;
+ }
+
+ /**
+ *
+ * @param newKafkaSpoutOffsetQuery represents the information needed
to query kafka for log head and spout offsets
+ * @return log head offset, spout offset and lag for each partition
+ */
+ public static List<KafkaOffsetLagResult> getOffsetLags
(NewKafkaSpoutOffsetQuery newKafkaSpoutOffsetQuery) {
+ KafkaConsumer<String, String> consumer = null;
+ List<KafkaOffsetLagResult> result = new ArrayList<>();
+ try {
+ Properties props = new Properties();
+ props.put("bootstrap.servers",
newKafkaSpoutOffsetQuery.getBootStrapBrokers());
+ props.put("group.id",
newKafkaSpoutOffsetQuery.getConsumerGroupId());
+ props.put("enable.auto.commit", "false");
+ props.put("session.timeout.ms", "30000");
+ props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
+ props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
+ List<TopicPartition> topicPartitionList = new ArrayList<>();
+ consumer = new KafkaConsumer<>(props);
+ for (String topic:
newKafkaSpoutOffsetQuery.getTopics().split(",")) {
+ List<PartitionInfo> partitionInfoList =
consumer.partitionsFor(topic);
+ if (partitionInfoList != null) {
+ for (PartitionInfo partitionInfo : partitionInfoList) {
+ topicPartitionList.add(new
TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
+ }
+ }
+ }
+ consumer.assign(topicPartitionList);
+ for (TopicPartition topicPartition : topicPartitionList) {
+ OffsetAndMetadata offsetAndMetadata =
consumer.committed(topicPartition);
+ long committedOffset = offsetAndMetadata != null ?
offsetAndMetadata.offset() : -1;
+ consumer.seekToEnd(topicPartition);
+ result.add(new
KafkaOffsetLagResult(topicPartition.topic(), topicPartition.partition(),
committedOffset, consumer.position(topicPartition)));
+ }
+ } finally {
+ if (consumer != null) {
+ consumer.close();
+ }
+ }
+ return result;
+ }
+
+ /**
+ *
+ * @param oldKafkaSpoutOffsetQuery represents the information needed
to query kafka for log head and spout offsets
+ * @return log head offset, spout offset and lag for each partition
+ */
+ public static List<KafkaOffsetLagResult> getOffsetLags
(OldKafkaSpoutOffsetQuery oldKafkaSpoutOffsetQuery) throws Exception {
+ List<KafkaOffsetLagResult> result = new ArrayList<>();
+ Map<String, List<TopicPartition>> leaders =
getLeadersAndTopicPartitions(oldKafkaSpoutOffsetQuery);
+ if (leaders != null) {
+ Map<String, Map<Integer, Long>> logHeadOffsets =
getLogHeadOffsets(leaders);
+ Map<String, List<Integer>> topicPartitions = new HashMap<>();
+ for (Map.Entry<String, List<TopicPartition>> entry:
leaders.entrySet()) {
+ for (TopicPartition topicPartition: entry.getValue()) {
+ if
(!topicPartitions.containsKey(topicPartition.topic())) {
+ topicPartitions.put(topicPartition.topic(), new
ArrayList<Integer>());
+ }
+
topicPartitions.get(topicPartition.topic()).add(topicPartition.partition());
+ }
+ }
+ Map<String, Map<Integer, Long>> oldConsumerOffsets =
getOldConsumerOffsetsFromZk(topicPartitions, oldKafkaSpoutOffsetQuery);
+ for (Map.Entry<String, Map<Integer, Long>> topicOffsets:
logHeadOffsets.entrySet()) {
+ for (Map.Entry<Integer, Long> partitionOffsets:
topicOffsets.getValue().entrySet()) {
+ Long consumerCommittedOffset =
oldConsumerOffsets.get(topicOffsets.getKey()) != null ?
oldConsumerOffsets.get(topicOffsets.getKey()).get
+ (partitionOffsets.getKey()) : -1;
+ consumerCommittedOffset = (consumerCommittedOffset ==
null ? -1 : consumerCommittedOffset);
+ KafkaOffsetLagResult kafkaOffsetLagResult = new
KafkaOffsetLagResult(topicOffsets.getKey(), partitionOffsets.getKey(),
+ consumerCommittedOffset,
partitionOffsets.getValue());
+ result.add(kafkaOffsetLagResult);
+ }
+ }
+ }
+ return result;
+ }
+
+ private static Map<String, List<TopicPartition>>
getLeadersAndTopicPartitions (OldKafkaSpoutOffsetQuery
oldKafkaSpoutOffsetQuery) throws Exception {
+ Map<String, List<TopicPartition>> result = new HashMap<>();
+ if (oldKafkaSpoutOffsetQuery.getPartitions() != null) {
+ String[] partitions =
oldKafkaSpoutOffsetQuery.getPartitions().split(",");
+ String[] leaders =
oldKafkaSpoutOffsetQuery.getLeaders().split(",");
+ for (int i = 0; i < leaders.length; ++i) {
+ if (!result.containsKey(leaders[i])) {
+ result.put(leaders[i], new
ArrayList<TopicPartition>());
+ }
+ result.get(leaders[i]).add(new
TopicPartition(oldKafkaSpoutOffsetQuery.getTopic(),
Integer.parseInt(partitions[i])));
+ }
+ } else {
--- End diff --
Old kafka spout has two ways for its configuration.
1. StaticHosts
2. ZkHosts
StaticHosts is when user adds the leader for each partition statically at
the time of building the topology. In that case we need to use that information
to get latest offsets. For ZkHosts the KafkaSpout finds leaders in the prepare
method using zk nodes and consumes data. Thats what the if else represents. I
will add some documentation.
> Provide a bin script to check consumer lag from KafkaSpout to Kafka topic
> offsets
> ---------------------------------------------------------------------------------
>
> Key: STORM-1136
> URL: https://issues.apache.org/jira/browse/STORM-1136
> Project: Apache Storm
> Issue Type: Improvement
> Components: storm-kafka
> Reporter: Sriharsha Chintalapani
> Assignee: Priyank Shah
>
> We store kafkaspout offsets in zkroot + id path in zookeeper. Kafka provides
> a utility and a protocol request to fetch latest offsets into topic
> {code}
> example:
> bin/kafka-run-classh.sh kafka.tools.GetOffsetTool
> {code}
> we should provide a way for the user to check how far the kafka spout read
> into topic and whats the lag. If we can expose this via UI even better.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)