Fixing stylecheck problems with storm-kafka-monitor
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/224633d3 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/224633d3 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/224633d3 Branch: refs/heads/master Commit: 224633d3ccbaa843c7f94c709d5cc573b1c59845 Parents: 5fc4e9f Author: Kishor Patil <[email protected]> Authored: Sun Apr 22 23:21:46 2018 -0400 Committer: Kishor Patil <[email protected]> Committed: Mon Apr 23 02:32:40 2018 -0400 ---------------------------------------------------------------------- external/storm-kafka-monitor/pom.xml | 2 +- .../kafka/monitor/KafkaOffsetLagResult.java | 14 +- .../storm/kafka/monitor/KafkaOffsetLagUtil.java | 203 +++++++++++-------- .../kafka/monitor/KafkaPartitionOffsetLag.java | 81 ++++---- .../kafka/monitor/NewKafkaSpoutOffsetQuery.java | 12 +- .../kafka/monitor/OldKafkaSpoutOffsetQuery.java | 25 ++- 6 files changed, 188 insertions(+), 149 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/224633d3/external/storm-kafka-monitor/pom.xml ---------------------------------------------------------------------- diff --git a/external/storm-kafka-monitor/pom.xml b/external/storm-kafka-monitor/pom.xml index 9e9b0a1..711dbea 100644 --- a/external/storm-kafka-monitor/pom.xml +++ b/external/storm-kafka-monitor/pom.xml @@ -147,7 +147,7 @@ <artifactId>maven-checkstyle-plugin</artifactId> <!--Note - the version would be inherited--> <configuration> - <maxAllowedViolations>178</maxAllowedViolations> + <maxAllowedViolations>87</maxAllowedViolations> </configuration> </plugin> </plugins> http://git-wip-us.apache.org/repos/asf/storm/blob/224633d3/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagResult.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagResult.java b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagResult.java index a6d898c..908141c 100644 --- a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagResult.java +++ b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagResult.java @@ -61,12 +61,12 @@ public class KafkaOffsetLagResult implements JSONAware { @Override public String toString() { return "KafkaOffsetLagResult{" + - "topic='" + topic + '\'' + - ", partition=" + partition + - ", consumerCommittedOffset=" + consumerCommittedOffset + - ", logHeadOffset=" + logHeadOffset + - ", lag=" + lag + - '}'; + "topic='" + topic + '\'' + + ", partition=" + partition + + ", consumerCommittedOffset=" + consumerCommittedOffset + + ", logHeadOffset=" + logHeadOffset + + ", lag=" + lag + + '}'; } @Override @@ -97,6 +97,6 @@ public class KafkaOffsetLagResult implements JSONAware { @Override public String toJSONString() { return "{\"topic\":\"" + topic + "\",\"partition\":" + partition + ",\"consumerCommittedOffset\":" + consumerCommittedOffset + "," + - "\"logHeadOffset\":" + logHeadOffset + ",\"lag\":" + lag + "}"; + "\"logHeadOffset\":" + logHeadOffset + ",\"lag\":" + lag + "}"; } } http://git-wip-us.apache.org/repos/asf/storm/blob/224633d3/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java index aa1a728..ef65bcb 100644 --- a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java +++ b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java @@ -18,6 +18,17 @@ package org.apache.storm.kafka.monitor; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import kafka.api.OffsetRequest; +import kafka.api.PartitionOffsetRequestInfo; +import kafka.common.TopicAndPartition; +import kafka.javaapi.OffsetResponse; +import kafka.javaapi.consumer.SimpleConsumer; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.DefaultParser; @@ -32,19 +43,6 @@ import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.json.simple.JSONValue; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -import kafka.api.OffsetRequest; -import kafka.api.PartitionOffsetRequestInfo; -import kafka.common.TopicAndPartition; -import kafka.javaapi.OffsetResponse; -import kafka.javaapi.consumer.SimpleConsumer; - /** * Utility class for querying offset lag for kafka spout */ @@ -72,7 +70,7 @@ public class KafkaOffsetLagUtil { private static final String OPTION_SECURITY_PROTOCOL_SHORT = "s"; private static final String OPTION_SECURITY_PROTOCOL_LONG = "security-protocol"; - public static void main (String args[]) { + public static void main(String args[]) { try { List<KafkaOffsetLagResult> results; Options options = buildOptions(); @@ -83,25 +81,29 @@ public class KafkaOffsetLagUtil { } if (commandLine.hasOption(OPTION_OLD_CONSUMER_LONG)) { OldKafkaSpoutOffsetQuery oldKafkaSpoutOffsetQuery; - if (commandLine.hasOption(OPTION_GROUP_ID_LONG) || commandLine.hasOption(OPTION_BOOTSTRAP_BROKERS_LONG) || commandLine.hasOption(OPTION_SECURITY_PROTOCOL_LONG)) { - printUsageAndExit(options, OPTION_GROUP_ID_LONG + " or " + OPTION_BOOTSTRAP_BROKERS_LONG + " or " + OPTION_SECURITY_PROTOCOL_LONG + " is " + - "not accepted with option " + OPTION_OLD_CONSUMER_LONG); + if (commandLine.hasOption(OPTION_GROUP_ID_LONG) || commandLine.hasOption(OPTION_BOOTSTRAP_BROKERS_LONG) || + commandLine.hasOption(OPTION_SECURITY_PROTOCOL_LONG)) { + printUsageAndExit(options, OPTION_GROUP_ID_LONG + " or " + OPTION_BOOTSTRAP_BROKERS_LONG + " or " + + OPTION_SECURITY_PROTOCOL_LONG + " is " + + "not accepted with option " + OPTION_OLD_CONSUMER_LONG); } if (!commandLine.hasOption(OPTION_ZK_SERVERS_LONG) || !commandLine.hasOption(OPTION_ZK_COMMITTED_NODE_LONG)) { printUsageAndExit(options, OPTION_ZK_SERVERS_LONG + " and " + OPTION_ZK_COMMITTED_NODE_LONG + " are required with " + - OPTION_OLD_CONSUMER_LONG); + OPTION_OLD_CONSUMER_LONG); } String[] topics = commandLine.getOptionValue(OPTION_TOPIC_LONG).split(","); if (topics != null && topics.length > 1) { - printUsageAndExit(options, "Multiple topics not supported with option " + OPTION_OLD_CONSUMER_LONG + ". Either a single topic or a " + - "wildcard string for matching topics is supported"); + printUsageAndExit(options, "Multiple topics not supported with option " + OPTION_OLD_CONSUMER_LONG + + ". Either a single topic or a " + + "wildcard string for matching topics is supported"); } if (commandLine.hasOption(OPTION_ZK_BROKERS_ROOT_LONG)) { if (commandLine.hasOption(OPTION_PARTITIONS_LONG) || commandLine.hasOption(OPTION_LEADERS_LONG)) { printUsageAndExit(options, OPTION_PARTITIONS_LONG + " or " + OPTION_LEADERS_LONG + " is not accepted with " + - OPTION_ZK_BROKERS_ROOT_LONG); + OPTION_ZK_BROKERS_ROOT_LONG); } - oldKafkaSpoutOffsetQuery = new OldKafkaSpoutOffsetQuery(commandLine.getOptionValue(OPTION_TOPIC_LONG), commandLine.getOptionValue + oldKafkaSpoutOffsetQuery = + new OldKafkaSpoutOffsetQuery(commandLine.getOptionValue(OPTION_TOPIC_LONG), commandLine.getOptionValue (OPTION_ZK_SERVERS_LONG), commandLine.getOptionValue(OPTION_ZK_COMMITTED_NODE_LONG), commandLine.hasOption (OPTION_TOPIC_WILDCARD_LONG), commandLine.getOptionValue(OPTION_ZK_BROKERS_ROOT_LONG)); } else { @@ -109,34 +111,41 @@ public class KafkaOffsetLagUtil { printUsageAndExit(options, OPTION_TOPIC_WILDCARD_LONG + " is not supported without " + OPTION_ZK_BROKERS_ROOT_LONG); } if (!commandLine.hasOption(OPTION_PARTITIONS_LONG) || !commandLine.hasOption(OPTION_LEADERS_LONG)) { - printUsageAndExit(options, OPTION_PARTITIONS_LONG + " and " + OPTION_LEADERS_LONG + " are required if " + OPTION_ZK_BROKERS_ROOT_LONG + - " is not provided"); + printUsageAndExit(options, OPTION_PARTITIONS_LONG + " and " + OPTION_LEADERS_LONG + " are required if " + + OPTION_ZK_BROKERS_ROOT_LONG + + " is not provided"); } String[] partitions = commandLine.getOptionValue(OPTION_PARTITIONS_LONG).split(","); String[] leaders = commandLine.getOptionValue(OPTION_LEADERS_LONG).split(","); if (partitions.length != leaders.length) { printUsageAndExit(options, OPTION_PARTITIONS_LONG + " and " + OPTION_LEADERS_LONG + " need to be of same size"); } - oldKafkaSpoutOffsetQuery = new OldKafkaSpoutOffsetQuery(commandLine.getOptionValue(OPTION_TOPIC_LONG), commandLine.getOptionValue + oldKafkaSpoutOffsetQuery = + new OldKafkaSpoutOffsetQuery(commandLine.getOptionValue(OPTION_TOPIC_LONG), commandLine.getOptionValue (OPTION_ZK_SERVERS_LONG), commandLine.getOptionValue(OPTION_ZK_COMMITTED_NODE_LONG), commandLine.getOptionValue (OPTION_PARTITIONS_LONG), commandLine.getOptionValue(OPTION_LEADERS_LONG)); } results = getOffsetLags(oldKafkaSpoutOffsetQuery); } else { String securityProtocol = commandLine.getOptionValue(OPTION_SECURITY_PROTOCOL_LONG); - String[] oldSpoutOptions = {OPTION_TOPIC_WILDCARD_LONG, OPTION_PARTITIONS_LONG, OPTION_LEADERS_LONG, OPTION_ZK_SERVERS_LONG, - OPTION_ZK_COMMITTED_NODE_LONG, OPTION_ZK_BROKERS_ROOT_LONG}; - for (String oldOption: oldSpoutOptions) { + String[] oldSpoutOptions = { + OPTION_TOPIC_WILDCARD_LONG, OPTION_PARTITIONS_LONG, OPTION_LEADERS_LONG, OPTION_ZK_SERVERS_LONG, + OPTION_ZK_COMMITTED_NODE_LONG, OPTION_ZK_BROKERS_ROOT_LONG + }; + for (String oldOption : oldSpoutOptions) { if (commandLine.hasOption(oldOption)) { printUsageAndExit(options, oldOption + " is not accepted without " + OPTION_OLD_CONSUMER_LONG); } } if (!commandLine.hasOption(OPTION_GROUP_ID_LONG) || !commandLine.hasOption(OPTION_BOOTSTRAP_BROKERS_LONG)) { - printUsageAndExit(options, OPTION_GROUP_ID_LONG + " and " + OPTION_BOOTSTRAP_BROKERS_LONG + " are required if " + OPTION_OLD_CONSUMER_LONG + - " is not specified"); + printUsageAndExit(options, OPTION_GROUP_ID_LONG + " and " + OPTION_BOOTSTRAP_BROKERS_LONG + " are required if " + + OPTION_OLD_CONSUMER_LONG + + " is not specified"); } - NewKafkaSpoutOffsetQuery newKafkaSpoutOffsetQuery = new NewKafkaSpoutOffsetQuery(commandLine.getOptionValue(OPTION_TOPIC_LONG), - commandLine.getOptionValue(OPTION_BOOTSTRAP_BROKERS_LONG), commandLine.getOptionValue(OPTION_GROUP_ID_LONG), securityProtocol); + NewKafkaSpoutOffsetQuery newKafkaSpoutOffsetQuery = + new NewKafkaSpoutOffsetQuery(commandLine.getOptionValue(OPTION_TOPIC_LONG), + commandLine.getOptionValue(OPTION_BOOTSTRAP_BROKERS_LONG), + commandLine.getOptionValue(OPTION_GROUP_ID_LONG), securityProtocol); results = getOffsetLags(newKafkaSpoutOffsetQuery); } @@ -161,49 +170,56 @@ public class KafkaOffsetLagUtil { } topicResultKeyedByPartition.put(result.getPartition(), - new KafkaPartitionOffsetLag(result.getConsumerCommittedOffset(), result.getLogHeadOffset())); + new KafkaPartitionOffsetLag(result.getConsumerCommittedOffset(), result.getLogHeadOffset())); } return resultKeyedByTopic; } - private static void printUsageAndExit (Options options, String message) { + private static void printUsageAndExit(Options options, String message) { System.out.println(message); HelpFormatter formatter = new HelpFormatter(); formatter.printHelp("storm-kafka-monitor ", options); System.exit(1); } - private static Options buildOptions () { + private static Options buildOptions() { Options options = new Options(); - options.addOption(OPTION_TOPIC_SHORT, OPTION_TOPIC_LONG, true, "REQUIRED Topics (comma separated list) for fetching log head and spout committed " + - "offset"); + options.addOption(OPTION_TOPIC_SHORT, OPTION_TOPIC_LONG, true, + "REQUIRED Topics (comma separated list) for fetching log head and spout committed " + + "offset"); options.addOption(OPTION_OLD_CONSUMER_SHORT, OPTION_OLD_CONSUMER_LONG, false, "Whether request is for old spout"); - options.addOption(OPTION_BOOTSTRAP_BROKERS_SHORT, OPTION_BOOTSTRAP_BROKERS_LONG, true, "Comma separated list of bootstrap broker hosts for new " + - "consumer/spout e.g. hostname1:9092,hostname2:9092"); + options.addOption(OPTION_BOOTSTRAP_BROKERS_SHORT, OPTION_BOOTSTRAP_BROKERS_LONG, true, + "Comma separated list of bootstrap broker hosts for new " + + "consumer/spout e.g. hostname1:9092,hostname2:9092"); options.addOption(OPTION_GROUP_ID_SHORT, OPTION_GROUP_ID_LONG, true, "Group id of consumer (applicable only for new kafka spout) "); - options.addOption(OPTION_TOPIC_WILDCARD_SHORT, OPTION_TOPIC_WILDCARD_LONG, false, "Whether topic provided is a wildcard as supported by ZkHosts in " + - "old spout"); + options.addOption(OPTION_TOPIC_WILDCARD_SHORT, OPTION_TOPIC_WILDCARD_LONG, false, + "Whether topic provided is a wildcard as supported by ZkHosts in " + + "old spout"); options.addOption(OPTION_PARTITIONS_SHORT, OPTION_PARTITIONS_LONG, true, "Comma separated list of partitions corresponding to " + - OPTION_LEADERS_LONG + " for old spout with StaticHosts"); + OPTION_LEADERS_LONG + " for old spout with StaticHosts"); options.addOption(OPTION_LEADERS_SHORT, OPTION_LEADERS_LONG, true, "Comma separated list of broker leaders corresponding to " + - OPTION_PARTITIONS_LONG + " for old spout with StaticHosts e.g. hostname1:9092,hostname2:9092"); - options.addOption(OPTION_ZK_SERVERS_SHORT, OPTION_ZK_SERVERS_LONG, true, "Comma separated list of zk servers for fetching spout committed offsets " + - "and/or topic metadata for ZkHosts e.g hostname1:2181,hostname2:2181"); - options.addOption(OPTION_ZK_COMMITTED_NODE_SHORT, OPTION_ZK_COMMITTED_NODE_LONG, true, "Zk node prefix where old kafka spout stores the committed" + - " offsets without the topic and partition nodes"); - options.addOption(OPTION_ZK_BROKERS_ROOT_SHORT, OPTION_ZK_BROKERS_ROOT_LONG, true, "Zk node prefix where kafka stores broker information e.g. " + - "/brokers (applicable only for old kafka spout) "); + OPTION_PARTITIONS_LONG + + " for old spout with StaticHosts e.g. hostname1:9092," + + "hostname2:9092"); + options.addOption(OPTION_ZK_SERVERS_SHORT, OPTION_ZK_SERVERS_LONG, true, + "Comma separated list of zk servers for fetching spout committed offsets " + + "and/or topic metadata for ZkHosts e.g hostname1:2181,hostname2:2181"); + options.addOption(OPTION_ZK_COMMITTED_NODE_SHORT, OPTION_ZK_COMMITTED_NODE_LONG, true, + "Zk node prefix where old kafka spout stores the committed" + + " offsets without the topic and partition nodes"); + options.addOption(OPTION_ZK_BROKERS_ROOT_SHORT, OPTION_ZK_BROKERS_ROOT_LONG, true, + "Zk node prefix where kafka stores broker information e.g. " + + "/brokers (applicable only for old kafka spout) "); options.addOption(OPTION_SECURITY_PROTOCOL_SHORT, OPTION_SECURITY_PROTOCOL_LONG, true, "Security protocol to connect to kafka"); return options; } /** - * * @param newKafkaSpoutOffsetQuery represents the information needed to query kafka for log head and spout offsets * @return log head offset, spout offset and lag for each partition */ - public static List<KafkaOffsetLagResult> getOffsetLags (NewKafkaSpoutOffsetQuery newKafkaSpoutOffsetQuery) { + public static List<KafkaOffsetLagResult> getOffsetLags(NewKafkaSpoutOffsetQuery newKafkaSpoutOffsetQuery) { KafkaConsumer<String, String> consumer = null; List<KafkaOffsetLagResult> result = new ArrayList<>(); try { @@ -219,7 +235,7 @@ public class KafkaOffsetLagUtil { } List<TopicPartition> topicPartitionList = new ArrayList<>(); consumer = new KafkaConsumer<>(props); - for (String topic: newKafkaSpoutOffsetQuery.getTopics().split(",")) { + for (String topic : newKafkaSpoutOffsetQuery.getTopics().split(",")) { List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topic); if (partitionInfoList != null) { for (PartitionInfo partitionInfo : partitionInfoList) { @@ -232,7 +248,8 @@ public class KafkaOffsetLagUtil { OffsetAndMetadata offsetAndMetadata = consumer.committed(topicPartition); long committedOffset = offsetAndMetadata != null ? offsetAndMetadata.offset() : -1; consumer.seekToEnd(toArrayList(topicPartition)); - result.add(new KafkaOffsetLagResult(topicPartition.topic(), topicPartition.partition(), committedOffset, consumer.position(topicPartition))); + result.add(new KafkaOffsetLagResult(topicPartition.topic(), topicPartition.partition(), committedOffset, + consumer.position(topicPartition))); } } finally { if (consumer != null) { @@ -243,22 +260,23 @@ public class KafkaOffsetLagUtil { } private static Collection<TopicPartition> toArrayList(final TopicPartition tp) { - return new ArrayList<TopicPartition>(1){{add(tp);}}; + return new ArrayList<TopicPartition>(1) {{ + add(tp); + }}; } /** - * * @param oldKafkaSpoutOffsetQuery represents the information needed to query kafka for log head and spout offsets * @return log head offset, spout offset and lag for each partition */ - public static List<KafkaOffsetLagResult> getOffsetLags (OldKafkaSpoutOffsetQuery oldKafkaSpoutOffsetQuery) throws Exception { + public static List<KafkaOffsetLagResult> getOffsetLags(OldKafkaSpoutOffsetQuery oldKafkaSpoutOffsetQuery) throws Exception { List<KafkaOffsetLagResult> result = new ArrayList<>(); Map<String, List<TopicPartition>> leaders = getLeadersAndTopicPartitions(oldKafkaSpoutOffsetQuery); if (leaders != null) { Map<String, Map<Integer, Long>> logHeadOffsets = getLogHeadOffsets(leaders); Map<String, List<Integer>> topicPartitions = new HashMap<>(); - for (Map.Entry<String, List<TopicPartition>> entry: leaders.entrySet()) { - for (TopicPartition topicPartition: entry.getValue()) { + for (Map.Entry<String, List<TopicPartition>> entry : leaders.entrySet()) { + for (TopicPartition topicPartition : entry.getValue()) { if (!topicPartitions.containsKey(topicPartition.topic())) { topicPartitions.put(topicPartition.topic(), new ArrayList<Integer>()); } @@ -266,13 +284,15 @@ public class KafkaOffsetLagUtil { } } Map<String, Map<Integer, Long>> oldConsumerOffsets = getOldConsumerOffsetsFromZk(topicPartitions, oldKafkaSpoutOffsetQuery); - for (Map.Entry<String, Map<Integer, Long>> topicOffsets: logHeadOffsets.entrySet()) { - for (Map.Entry<Integer, Long> partitionOffsets: topicOffsets.getValue().entrySet()) { - Long consumerCommittedOffset = oldConsumerOffsets.get(topicOffsets.getKey()) != null ? (Long) oldConsumerOffsets.get(topicOffsets.getKey()).get - (partitionOffsets.getKey()) : -1; + for (Map.Entry<String, Map<Integer, Long>> topicOffsets : logHeadOffsets.entrySet()) { + for (Map.Entry<Integer, Long> partitionOffsets : topicOffsets.getValue().entrySet()) { + Long consumerCommittedOffset = + oldConsumerOffsets.get(topicOffsets.getKey()) != null ? (Long) oldConsumerOffsets.get(topicOffsets.getKey()).get + (partitionOffsets.getKey()) : -1; consumerCommittedOffset = (consumerCommittedOffset == null ? -1 : consumerCommittedOffset); KafkaOffsetLagResult kafkaOffsetLagResult = new KafkaOffsetLagResult(topicOffsets.getKey(), partitionOffsets.getKey(), - consumerCommittedOffset, partitionOffsets.getValue()); + consumerCommittedOffset, + partitionOffsets.getValue()); result.add(kafkaOffsetLagResult); } } @@ -280,7 +300,8 @@ public class KafkaOffsetLagUtil { return result; } - private static Map<String, List<TopicPartition>> getLeadersAndTopicPartitions (OldKafkaSpoutOffsetQuery oldKafkaSpoutOffsetQuery) throws Exception { + private static Map<String, List<TopicPartition>> getLeadersAndTopicPartitions(OldKafkaSpoutOffsetQuery oldKafkaSpoutOffsetQuery) throws + Exception { Map<String, List<TopicPartition>> result = new HashMap<>(); // this means that kafka spout was configured with StaticHosts hosts (leader for partition) if (oldKafkaSpoutOffsetQuery.getPartitions() != null) { @@ -301,7 +322,8 @@ public class KafkaOffsetLagUtil { brokersZkNode += "/"; } String topicsZkPath = brokersZkNode + "topics"; - curatorFramework = CuratorFrameworkFactory.newClient(oldKafkaSpoutOffsetQuery.getZkServers(), 20000, 15000, new RetryOneTime(1000)); + curatorFramework = + CuratorFrameworkFactory.newClient(oldKafkaSpoutOffsetQuery.getZkServers(), 20000, 15000, new RetryOneTime(1000)); curatorFramework.start(); List<String> topics = new ArrayList<>(); if (oldKafkaSpoutOffsetQuery.isWildCardTopic()) { @@ -314,7 +336,7 @@ public class KafkaOffsetLagUtil { } else { topics.add(oldKafkaSpoutOffsetQuery.getTopic()); } - for (String topic: topics) { + for (String topic : topics) { String partitionsPath = topicsZkPath + "/" + topic + "/partitions"; List<String> children = curatorFramework.getChildren().forPath(partitionsPath); for (int i = 0; i < children.size(); ++i) { @@ -341,27 +363,33 @@ public class KafkaOffsetLagUtil { return result; } - private static Map<String, Map<Integer, Long>> getLogHeadOffsets (Map<String, List<TopicPartition>> leadersAndTopicPartitions) { + private static Map<String, Map<Integer, Long>> getLogHeadOffsets(Map<String, List<TopicPartition>> leadersAndTopicPartitions) { Map<String, Map<Integer, Long>> result = new HashMap<>(); if (leadersAndTopicPartitions != null) { PartitionOffsetRequestInfo partitionOffsetRequestInfo = new PartitionOffsetRequestInfo(OffsetRequest.LatestTime(), 1); - SimpleConsumer simpleConsumer = null; - for (Map.Entry<String, List<TopicPartition>> leader: leadersAndTopicPartitions.entrySet()) { + SimpleConsumer simpleConsumer = null; + for (Map.Entry<String, List<TopicPartition>> leader : leadersAndTopicPartitions.entrySet()) { try { - simpleConsumer = new SimpleConsumer(leader.getKey().split(":")[0], Integer.parseInt(leader.getKey().split(":")[1]), 10000, 64 * - 1024, "LogHeadOffsetRequest"); - Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>(); + simpleConsumer = + new SimpleConsumer(leader.getKey().split(":")[0], Integer.parseInt(leader.getKey().split(":")[1]), 10000, 64 * + 1024, + "LogHeadOffsetRequest"); + Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = + new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>(); for (TopicPartition topicPartition : leader.getValue()) { - requestInfo.put(new TopicAndPartition(topicPartition.topic(), topicPartition.partition()), partitionOffsetRequestInfo); + requestInfo + .put(new TopicAndPartition(topicPartition.topic(), topicPartition.partition()), partitionOffsetRequestInfo); if (!result.containsKey(topicPartition.topic())) { result.put(topicPartition.topic(), new HashMap<Integer, Long>()); } } - kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), - "LogHeadOffsetRequest"); + kafka.javaapi.OffsetRequest request = + new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), + "LogHeadOffsetRequest"); OffsetResponse response = simpleConsumer.getOffsetsBefore(request); for (TopicPartition topicPartition : leader.getValue()) { - result.get(topicPartition.topic()).put(topicPartition.partition(), response.offsets(topicPartition.topic(), topicPartition.partition())[0]); + result.get(topicPartition.topic()) + .put(topicPartition.partition(), response.offsets(topicPartition.topic(), topicPartition.partition())[0]); } } finally { if (simpleConsumer != null) { @@ -373,29 +401,34 @@ public class KafkaOffsetLagUtil { return result; } - private static Map<String, Map<Integer, Long>> getOldConsumerOffsetsFromZk (Map<String, List<Integer>> topicPartitions, OldKafkaSpoutOffsetQuery - oldKafkaSpoutOffsetQuery) throws Exception { + private static Map<String, Map<Integer, Long>> getOldConsumerOffsetsFromZk(Map<String, List<Integer>> topicPartitions, + OldKafkaSpoutOffsetQuery + oldKafkaSpoutOffsetQuery) throws Exception { Map<String, Map<Integer, Long>> result = new HashMap<>(); - CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(oldKafkaSpoutOffsetQuery.getZkServers(), 20000, 15000, new RetryOneTime(1000)); + CuratorFramework curatorFramework = + CuratorFrameworkFactory.newClient(oldKafkaSpoutOffsetQuery.getZkServers(), 20000, 15000, new RetryOneTime(1000)); curatorFramework.start(); String partitionPrefix = "partition_"; String zkPath = oldKafkaSpoutOffsetQuery.getZkPath(); if (zkPath.endsWith("/")) { - zkPath = zkPath.substring(0, zkPath.length()-1); + zkPath = zkPath.substring(0, zkPath.length() - 1); } if (curatorFramework.checkExists().forPath(zkPath) == null) { - throw new IllegalArgumentException(OPTION_ZK_COMMITTED_NODE_LONG+" '"+zkPath+"' dose not exists."); + throw new IllegalArgumentException(OPTION_ZK_COMMITTED_NODE_LONG + " '" + zkPath + "' dose not exists."); } byte[] zkData; try { if (topicPartitions != null) { - for (Map.Entry<String, List<Integer>> topicEntry: topicPartitions.entrySet()) { + for (Map.Entry<String, List<Integer>> topicEntry : topicPartitions.entrySet()) { Map<Integer, Long> partitionOffsets = new HashMap<>(); - for (Integer partition: topicEntry.getValue()) { - String path = zkPath + "/" + (oldKafkaSpoutOffsetQuery.isWildCardTopic() ? topicEntry.getKey() + "/" : "") + partitionPrefix + partition; + for (Integer partition : topicEntry.getValue()) { + String path = + zkPath + "/" + (oldKafkaSpoutOffsetQuery.isWildCardTopic() ? topicEntry.getKey() + "/" : "") + partitionPrefix + + partition; if (curatorFramework.checkExists().forPath(path) != null) { zkData = curatorFramework.getData().forPath(path); - Map<Object, Object> offsetData = (Map<Object, Object>) JSONValue.parseWithException(new String(zkData, "UTF-8")); + Map<Object, Object> offsetData = + (Map<Object, Object>) JSONValue.parseWithException(new String(zkData, "UTF-8")); partitionOffsets.put(partition, (Long) offsetData.get("offset")); } } http://git-wip-us.apache.org/repos/asf/storm/blob/224633d3/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaPartitionOffsetLag.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaPartitionOffsetLag.java b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaPartitionOffsetLag.java index 6faf266..a47cef2 100644 --- a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaPartitionOffsetLag.java +++ b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaPartitionOffsetLag.java @@ -15,57 +15,58 @@ * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ + package org.apache.storm.kafka.monitor; public class KafkaPartitionOffsetLag { - private long consumerCommittedOffset; - private long logHeadOffset; - private long lag; + private long consumerCommittedOffset; + private long logHeadOffset; + private long lag; - public KafkaPartitionOffsetLag(long consumerCommittedOffset, long logHeadOffset) { - this.consumerCommittedOffset = consumerCommittedOffset; - this.logHeadOffset = logHeadOffset; - this.lag = logHeadOffset - consumerCommittedOffset; - } + public KafkaPartitionOffsetLag(long consumerCommittedOffset, long logHeadOffset) { + this.consumerCommittedOffset = consumerCommittedOffset; + this.logHeadOffset = logHeadOffset; + this.lag = logHeadOffset - consumerCommittedOffset; + } - public long getConsumerCommittedOffset() { - return consumerCommittedOffset; - } + public long getConsumerCommittedOffset() { + return consumerCommittedOffset; + } - public long getLogHeadOffset() { - return logHeadOffset; - } + public long getLogHeadOffset() { + return logHeadOffset; + } - public long getLag() { - return lag; - } + public long getLag() { + return lag; + } - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (!(o instanceof KafkaPartitionOffsetLag)) return false; + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof KafkaPartitionOffsetLag)) return false; - KafkaPartitionOffsetLag that = (KafkaPartitionOffsetLag) o; + KafkaPartitionOffsetLag that = (KafkaPartitionOffsetLag) o; - if (getConsumerCommittedOffset() != that.getConsumerCommittedOffset()) return false; - if (getLogHeadOffset() != that.getLogHeadOffset()) return false; - return getLag() == that.getLag(); + if (getConsumerCommittedOffset() != that.getConsumerCommittedOffset()) return false; + if (getLogHeadOffset() != that.getLogHeadOffset()) return false; + return getLag() == that.getLag(); - } + } - @Override - public int hashCode() { - int result = (int) (getConsumerCommittedOffset() ^ (getConsumerCommittedOffset() >>> 32)); - result = 31 * result + (int) (getLogHeadOffset() ^ (getLogHeadOffset() >>> 32)); - result = 31 * result + (int) (getLag() ^ (getLag() >>> 32)); - return result; - } + @Override + public int hashCode() { + int result = (int) (getConsumerCommittedOffset() ^ (getConsumerCommittedOffset() >>> 32)); + result = 31 * result + (int) (getLogHeadOffset() ^ (getLogHeadOffset() >>> 32)); + result = 31 * result + (int) (getLag() ^ (getLag() >>> 32)); + return result; + } - @Override - public String toString() { - // JSONAware not working for nested element on Map so write JSON format from here - return "{\"consumerCommittedOffset\": " + consumerCommittedOffset + ", " + - "\"logHeadOffset\": " + logHeadOffset + ", " + - "\"lag\": " + lag + "}"; - } + @Override + public String toString() { + // JSONAware not working for nested element on Map so write JSON format from here + return "{\"consumerCommittedOffset\": " + consumerCommittedOffset + ", " + + "\"logHeadOffset\": " + logHeadOffset + ", " + + "\"lag\": " + lag + "}"; + } } http://git-wip-us.apache.org/repos/asf/storm/blob/224633d3/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java index 18bc0df..e6c4524 100644 --- a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java +++ b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java @@ -15,10 +15,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.kafka.monitor; /** - * Class representing information for querying kafka for log head offsets, consumer offsets and the difference for new kafka spout using new consumer api + * Class representing information for querying kafka for log head offsets, consumer offsets and the difference for new kafka spout using new + * consumer api */ public class NewKafkaSpoutOffsetQuery { private final String topics; // comma separated list of topics @@ -52,10 +54,10 @@ public class NewKafkaSpoutOffsetQuery { @Override public String toString() { return "NewKafkaSpoutOffsetQuery{" + - "topics='" + topics + '\'' + - ", consumerGroupId='" + consumerGroupId + '\'' + - ", bootStrapBrokers='" + bootStrapBrokers + '\'' + - '}'; + "topics='" + topics + '\'' + + ", consumerGroupId='" + consumerGroupId + '\'' + + ", bootStrapBrokers='" + bootStrapBrokers + '\'' + + '}'; } @Override http://git-wip-us.apache.org/repos/asf/storm/blob/224633d3/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/OldKafkaSpoutOffsetQuery.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/OldKafkaSpoutOffsetQuery.java b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/OldKafkaSpoutOffsetQuery.java index cdeed32..ea80b64 100644 --- a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/OldKafkaSpoutOffsetQuery.java +++ b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/OldKafkaSpoutOffsetQuery.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.kafka.monitor; /** @@ -27,7 +28,8 @@ public class OldKafkaSpoutOffsetQuery { private final boolean isWildCardTopic; //if the topic is a wildcard private final String brokersZkPath; //zk node prefix where kafka stores all broker information private final String partitions; //comma separated list of partitions corresponding to leaders below (for StaticHosts) - private final String leaders; //comma separated list of leader brokers and port corresponding to the partitions above (for StaticHosts) e.g. + private final String leaders; + //comma separated list of leader brokers and port corresponding to the partitions above (for StaticHosts) e.g. // hostname1:9092,hostname2:9092 public OldKafkaSpoutOffsetQuery(String topic, String zkServers, String zkPath, boolean isWildCardTopic, String brokersZkPath) { @@ -39,8 +41,9 @@ public class OldKafkaSpoutOffsetQuery { } - private OldKafkaSpoutOffsetQuery(String topic, String zkServers, String zkPath, boolean isWildCardTopic, String brokersZkPath, String partitions, String - leaders) { + private OldKafkaSpoutOffsetQuery(String topic, String zkServers, String zkPath, boolean isWildCardTopic, String brokersZkPath, + String partitions, String + leaders) { this.topic = topic; this.zkServers = zkServers; this.zkPath = zkPath; @@ -53,14 +56,14 @@ public class OldKafkaSpoutOffsetQuery { @Override public String toString() { return "OldKafkaSpoutOffsetQuery{" + - "topic='" + topic + '\'' + - ", zkServers='" + zkServers + '\'' + - ", zkPath='" + zkPath + '\'' + - ", isWildCardTopic=" + isWildCardTopic + - ", brokersZkPath='" + brokersZkPath + '\'' + - ", partitions='" + partitions + '\'' + - ", leaders='" + leaders + '\'' + - '}'; + "topic='" + topic + '\'' + + ", zkServers='" + zkServers + '\'' + + ", zkPath='" + zkPath + '\'' + + ", isWildCardTopic=" + isWildCardTopic + + ", brokersZkPath='" + brokersZkPath + '\'' + + ", partitions='" + partitions + '\'' + + ", leaders='" + leaders + '\'' + + '}'; } @Override
