Fixing stylecheck problems with storm-kafka-monitor

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/224633d3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/224633d3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/224633d3

Branch: refs/heads/master
Commit: 224633d3ccbaa843c7f94c709d5cc573b1c59845
Parents: 5fc4e9f
Author: Kishor Patil <[email protected]>
Authored: Sun Apr 22 23:21:46 2018 -0400
Committer: Kishor Patil <[email protected]>
Committed: Mon Apr 23 02:32:40 2018 -0400

----------------------------------------------------------------------
 external/storm-kafka-monitor/pom.xml            |   2 +-
 .../kafka/monitor/KafkaOffsetLagResult.java     |  14 +-
 .../storm/kafka/monitor/KafkaOffsetLagUtil.java | 203 +++++++++++--------
 .../kafka/monitor/KafkaPartitionOffsetLag.java  |  81 ++++----
 .../kafka/monitor/NewKafkaSpoutOffsetQuery.java |  12 +-
 .../kafka/monitor/OldKafkaSpoutOffsetQuery.java |  25 ++-
 6 files changed, 188 insertions(+), 149 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/224633d3/external/storm-kafka-monitor/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-kafka-monitor/pom.xml 
b/external/storm-kafka-monitor/pom.xml
index 9e9b0a1..711dbea 100644
--- a/external/storm-kafka-monitor/pom.xml
+++ b/external/storm-kafka-monitor/pom.xml
@@ -147,7 +147,7 @@
                 <artifactId>maven-checkstyle-plugin</artifactId>
                 <!--Note - the version would be inherited-->
                 <configuration>
-                    <maxAllowedViolations>178</maxAllowedViolations>
+                    <maxAllowedViolations>87</maxAllowedViolations>
                 </configuration>
             </plugin>
         </plugins>

http://git-wip-us.apache.org/repos/asf/storm/blob/224633d3/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagResult.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagResult.java
 
b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagResult.java
index a6d898c..908141c 100644
--- 
a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagResult.java
+++ 
b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagResult.java
@@ -61,12 +61,12 @@ public class KafkaOffsetLagResult implements JSONAware {
     @Override
     public String toString() {
         return "KafkaOffsetLagResult{" +
-                "topic='" + topic + '\'' +
-                ", partition=" + partition +
-                ", consumerCommittedOffset=" + consumerCommittedOffset +
-                ", logHeadOffset=" + logHeadOffset +
-                ", lag=" + lag +
-                '}';
+               "topic='" + topic + '\'' +
+               ", partition=" + partition +
+               ", consumerCommittedOffset=" + consumerCommittedOffset +
+               ", logHeadOffset=" + logHeadOffset +
+               ", lag=" + lag +
+               '}';
     }
 
     @Override
@@ -97,6 +97,6 @@ public class KafkaOffsetLagResult implements JSONAware {
     @Override
     public String toJSONString() {
         return "{\"topic\":\"" + topic + "\",\"partition\":" + partition + 
",\"consumerCommittedOffset\":" + consumerCommittedOffset + "," +
-                "\"logHeadOffset\":" + logHeadOffset + ",\"lag\":" + lag + "}";
+               "\"logHeadOffset\":" + logHeadOffset + ",\"lag\":" + lag + "}";
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/224633d3/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
 
b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
index aa1a728..ef65bcb 100644
--- 
a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
+++ 
b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
@@ -18,6 +18,17 @@
 
 package org.apache.storm.kafka.monitor;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import kafka.api.OffsetRequest;
+import kafka.api.PartitionOffsetRequestInfo;
+import kafka.common.TopicAndPartition;
+import kafka.javaapi.OffsetResponse;
+import kafka.javaapi.consumer.SimpleConsumer;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.DefaultParser;
@@ -32,19 +43,6 @@ import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.json.simple.JSONValue;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import kafka.api.OffsetRequest;
-import kafka.api.PartitionOffsetRequestInfo;
-import kafka.common.TopicAndPartition;
-import kafka.javaapi.OffsetResponse;
-import kafka.javaapi.consumer.SimpleConsumer;
-
 /**
  * Utility class for querying offset lag for kafka spout
  */
@@ -72,7 +70,7 @@ public class KafkaOffsetLagUtil {
     private static final String OPTION_SECURITY_PROTOCOL_SHORT = "s";
     private static final String OPTION_SECURITY_PROTOCOL_LONG = 
"security-protocol";
 
-    public static void main (String args[]) {
+    public static void main(String args[]) {
         try {
             List<KafkaOffsetLagResult> results;
             Options options = buildOptions();
@@ -83,25 +81,29 @@ public class KafkaOffsetLagUtil {
             }
             if (commandLine.hasOption(OPTION_OLD_CONSUMER_LONG)) {
                 OldKafkaSpoutOffsetQuery oldKafkaSpoutOffsetQuery;
-                if (commandLine.hasOption(OPTION_GROUP_ID_LONG) || 
commandLine.hasOption(OPTION_BOOTSTRAP_BROKERS_LONG) || 
commandLine.hasOption(OPTION_SECURITY_PROTOCOL_LONG)) {
-                    printUsageAndExit(options, OPTION_GROUP_ID_LONG + " or " + 
OPTION_BOOTSTRAP_BROKERS_LONG + " or " + OPTION_SECURITY_PROTOCOL_LONG + " is " 
+
-                            "not accepted with option " + 
OPTION_OLD_CONSUMER_LONG);
+                if (commandLine.hasOption(OPTION_GROUP_ID_LONG) || 
commandLine.hasOption(OPTION_BOOTSTRAP_BROKERS_LONG) ||
+                    commandLine.hasOption(OPTION_SECURITY_PROTOCOL_LONG)) {
+                    printUsageAndExit(options, OPTION_GROUP_ID_LONG + " or " + 
OPTION_BOOTSTRAP_BROKERS_LONG + " or " +
+                                               OPTION_SECURITY_PROTOCOL_LONG + 
" is " +
+                                               "not accepted with option " + 
OPTION_OLD_CONSUMER_LONG);
                 }
                 if (!commandLine.hasOption(OPTION_ZK_SERVERS_LONG) || 
!commandLine.hasOption(OPTION_ZK_COMMITTED_NODE_LONG)) {
                     printUsageAndExit(options, OPTION_ZK_SERVERS_LONG + " and 
" + OPTION_ZK_COMMITTED_NODE_LONG + " are required  with " +
-                            OPTION_OLD_CONSUMER_LONG);
+                                               OPTION_OLD_CONSUMER_LONG);
                 }
                 String[] topics = 
commandLine.getOptionValue(OPTION_TOPIC_LONG).split(",");
                 if (topics != null && topics.length > 1) {
-                    printUsageAndExit(options, "Multiple topics not supported 
with option " + OPTION_OLD_CONSUMER_LONG + ". Either a single topic or a " +
-                            "wildcard string for matching topics is 
supported");
+                    printUsageAndExit(options, "Multiple topics not supported 
with option " + OPTION_OLD_CONSUMER_LONG +
+                                               ". Either a single topic or a " 
+
+                                               "wildcard string for matching 
topics is supported");
                 }
                 if (commandLine.hasOption(OPTION_ZK_BROKERS_ROOT_LONG)) {
                     if (commandLine.hasOption(OPTION_PARTITIONS_LONG) || 
commandLine.hasOption(OPTION_LEADERS_LONG)) {
                         printUsageAndExit(options, OPTION_PARTITIONS_LONG + " 
or " + OPTION_LEADERS_LONG + " is not accepted with " +
-                                OPTION_ZK_BROKERS_ROOT_LONG);
+                                                   
OPTION_ZK_BROKERS_ROOT_LONG);
                     }
-                    oldKafkaSpoutOffsetQuery = new 
OldKafkaSpoutOffsetQuery(commandLine.getOptionValue(OPTION_TOPIC_LONG), 
commandLine.getOptionValue
+                    oldKafkaSpoutOffsetQuery =
+                        new 
OldKafkaSpoutOffsetQuery(commandLine.getOptionValue(OPTION_TOPIC_LONG), 
commandLine.getOptionValue
                             (OPTION_ZK_SERVERS_LONG), 
commandLine.getOptionValue(OPTION_ZK_COMMITTED_NODE_LONG), commandLine.hasOption
                             (OPTION_TOPIC_WILDCARD_LONG), 
commandLine.getOptionValue(OPTION_ZK_BROKERS_ROOT_LONG));
                 } else {
@@ -109,34 +111,41 @@ public class KafkaOffsetLagUtil {
                         printUsageAndExit(options, OPTION_TOPIC_WILDCARD_LONG 
+ " is not supported without " + OPTION_ZK_BROKERS_ROOT_LONG);
                     }
                     if (!commandLine.hasOption(OPTION_PARTITIONS_LONG) || 
!commandLine.hasOption(OPTION_LEADERS_LONG)) {
-                        printUsageAndExit(options, OPTION_PARTITIONS_LONG + " 
and " + OPTION_LEADERS_LONG + " are required if " + OPTION_ZK_BROKERS_ROOT_LONG 
+
-                                " is not provided");
+                        printUsageAndExit(options, OPTION_PARTITIONS_LONG + " 
and " + OPTION_LEADERS_LONG + " are required if " +
+                                                   OPTION_ZK_BROKERS_ROOT_LONG 
+
+                                                   " is not provided");
                     }
                     String[] partitions = 
commandLine.getOptionValue(OPTION_PARTITIONS_LONG).split(",");
                     String[] leaders = 
commandLine.getOptionValue(OPTION_LEADERS_LONG).split(",");
                     if (partitions.length != leaders.length) {
                         printUsageAndExit(options, OPTION_PARTITIONS_LONG + " 
and " + OPTION_LEADERS_LONG + " need to be of same size");
                     }
-                    oldKafkaSpoutOffsetQuery = new 
OldKafkaSpoutOffsetQuery(commandLine.getOptionValue(OPTION_TOPIC_LONG), 
commandLine.getOptionValue
+                    oldKafkaSpoutOffsetQuery =
+                        new 
OldKafkaSpoutOffsetQuery(commandLine.getOptionValue(OPTION_TOPIC_LONG), 
commandLine.getOptionValue
                             (OPTION_ZK_SERVERS_LONG), 
commandLine.getOptionValue(OPTION_ZK_COMMITTED_NODE_LONG), 
commandLine.getOptionValue
                             (OPTION_PARTITIONS_LONG), 
commandLine.getOptionValue(OPTION_LEADERS_LONG));
                 }
                 results = getOffsetLags(oldKafkaSpoutOffsetQuery);
             } else {
                 String securityProtocol = 
commandLine.getOptionValue(OPTION_SECURITY_PROTOCOL_LONG);
-                String[] oldSpoutOptions = {OPTION_TOPIC_WILDCARD_LONG, 
OPTION_PARTITIONS_LONG, OPTION_LEADERS_LONG, OPTION_ZK_SERVERS_LONG,
-                        OPTION_ZK_COMMITTED_NODE_LONG, 
OPTION_ZK_BROKERS_ROOT_LONG};
-                for (String oldOption: oldSpoutOptions) {
+                String[] oldSpoutOptions = {
+                    OPTION_TOPIC_WILDCARD_LONG, OPTION_PARTITIONS_LONG, 
OPTION_LEADERS_LONG, OPTION_ZK_SERVERS_LONG,
+                    OPTION_ZK_COMMITTED_NODE_LONG, OPTION_ZK_BROKERS_ROOT_LONG
+                };
+                for (String oldOption : oldSpoutOptions) {
                     if (commandLine.hasOption(oldOption)) {
                         printUsageAndExit(options, oldOption + " is not 
accepted without " + OPTION_OLD_CONSUMER_LONG);
                     }
                 }
                 if (!commandLine.hasOption(OPTION_GROUP_ID_LONG) || 
!commandLine.hasOption(OPTION_BOOTSTRAP_BROKERS_LONG)) {
-                    printUsageAndExit(options, OPTION_GROUP_ID_LONG + " and " 
+ OPTION_BOOTSTRAP_BROKERS_LONG + " are required if " + 
OPTION_OLD_CONSUMER_LONG +
-                            " is not specified");
+                    printUsageAndExit(options, OPTION_GROUP_ID_LONG + " and " 
+ OPTION_BOOTSTRAP_BROKERS_LONG + " are required if " +
+                                               OPTION_OLD_CONSUMER_LONG +
+                                               " is not specified");
                 }
-                NewKafkaSpoutOffsetQuery newKafkaSpoutOffsetQuery = new 
NewKafkaSpoutOffsetQuery(commandLine.getOptionValue(OPTION_TOPIC_LONG),
-                        
commandLine.getOptionValue(OPTION_BOOTSTRAP_BROKERS_LONG), 
commandLine.getOptionValue(OPTION_GROUP_ID_LONG), securityProtocol);
+                NewKafkaSpoutOffsetQuery newKafkaSpoutOffsetQuery =
+                    new 
NewKafkaSpoutOffsetQuery(commandLine.getOptionValue(OPTION_TOPIC_LONG),
+                                                 
commandLine.getOptionValue(OPTION_BOOTSTRAP_BROKERS_LONG),
+                                                 
commandLine.getOptionValue(OPTION_GROUP_ID_LONG), securityProtocol);
                 results = getOffsetLags(newKafkaSpoutOffsetQuery);
             }
 
@@ -161,49 +170,56 @@ public class KafkaOffsetLagUtil {
             }
 
             topicResultKeyedByPartition.put(result.getPartition(),
-                new 
KafkaPartitionOffsetLag(result.getConsumerCommittedOffset(), 
result.getLogHeadOffset()));
+                                            new 
KafkaPartitionOffsetLag(result.getConsumerCommittedOffset(), 
result.getLogHeadOffset()));
         }
 
         return resultKeyedByTopic;
     }
 
-    private static void printUsageAndExit (Options options, String message) {
+    private static void printUsageAndExit(Options options, String message) {
         System.out.println(message);
         HelpFormatter formatter = new HelpFormatter();
         formatter.printHelp("storm-kafka-monitor ", options);
         System.exit(1);
     }
 
-    private static Options buildOptions () {
+    private static Options buildOptions() {
         Options options = new Options();
-        options.addOption(OPTION_TOPIC_SHORT, OPTION_TOPIC_LONG, true, 
"REQUIRED Topics (comma separated list) for fetching log head and spout 
committed " +
-                "offset");
+        options.addOption(OPTION_TOPIC_SHORT, OPTION_TOPIC_LONG, true,
+                          "REQUIRED Topics (comma separated list) for fetching 
log head and spout committed " +
+                          "offset");
         options.addOption(OPTION_OLD_CONSUMER_SHORT, OPTION_OLD_CONSUMER_LONG, 
false, "Whether request is for old spout");
-        options.addOption(OPTION_BOOTSTRAP_BROKERS_SHORT, 
OPTION_BOOTSTRAP_BROKERS_LONG, true, "Comma separated list of bootstrap broker 
hosts for new " +
-                "consumer/spout e.g. hostname1:9092,hostname2:9092");
+        options.addOption(OPTION_BOOTSTRAP_BROKERS_SHORT, 
OPTION_BOOTSTRAP_BROKERS_LONG, true,
+                          "Comma separated list of bootstrap broker hosts for 
new " +
+                          "consumer/spout e.g. hostname1:9092,hostname2:9092");
         options.addOption(OPTION_GROUP_ID_SHORT, OPTION_GROUP_ID_LONG, true, 
"Group id of consumer (applicable only for new kafka spout) ");
-        options.addOption(OPTION_TOPIC_WILDCARD_SHORT, 
OPTION_TOPIC_WILDCARD_LONG, false, "Whether topic provided is a wildcard as 
supported by ZkHosts in " +
-                "old spout");
+        options.addOption(OPTION_TOPIC_WILDCARD_SHORT, 
OPTION_TOPIC_WILDCARD_LONG, false,
+                          "Whether topic provided is a wildcard as supported 
by ZkHosts in " +
+                          "old spout");
         options.addOption(OPTION_PARTITIONS_SHORT, OPTION_PARTITIONS_LONG, 
true, "Comma separated list of partitions corresponding to " +
-                OPTION_LEADERS_LONG + " for old spout with StaticHosts");
+                                                                               
  OPTION_LEADERS_LONG + " for old spout with StaticHosts");
         options.addOption(OPTION_LEADERS_SHORT, OPTION_LEADERS_LONG, true, 
"Comma separated list of broker leaders corresponding to " +
-                OPTION_PARTITIONS_LONG + " for old spout with StaticHosts e.g. 
hostname1:9092,hostname2:9092");
-        options.addOption(OPTION_ZK_SERVERS_SHORT, OPTION_ZK_SERVERS_LONG, 
true, "Comma separated list of zk servers for fetching spout committed offsets  
" +
-                "and/or topic metadata for ZkHosts e.g 
hostname1:2181,hostname2:2181");
-        options.addOption(OPTION_ZK_COMMITTED_NODE_SHORT, 
OPTION_ZK_COMMITTED_NODE_LONG, true, "Zk node prefix where old kafka spout 
stores the committed" +
-                " offsets without the topic and partition nodes");
-        options.addOption(OPTION_ZK_BROKERS_ROOT_SHORT, 
OPTION_ZK_BROKERS_ROOT_LONG, true, "Zk node prefix where kafka stores broker 
information e.g. " +
-                "/brokers (applicable only for old kafka spout) ");
+                                                                           
OPTION_PARTITIONS_LONG +
+                                                                           " 
for old spout with StaticHosts e.g. hostname1:9092," +
+                                                                           
"hostname2:9092");
+        options.addOption(OPTION_ZK_SERVERS_SHORT, OPTION_ZK_SERVERS_LONG, 
true,
+                          "Comma separated list of zk servers for fetching 
spout committed offsets  " +
+                          "and/or topic metadata for ZkHosts e.g 
hostname1:2181,hostname2:2181");
+        options.addOption(OPTION_ZK_COMMITTED_NODE_SHORT, 
OPTION_ZK_COMMITTED_NODE_LONG, true,
+                          "Zk node prefix where old kafka spout stores the 
committed" +
+                          " offsets without the topic and partition nodes");
+        options.addOption(OPTION_ZK_BROKERS_ROOT_SHORT, 
OPTION_ZK_BROKERS_ROOT_LONG, true,
+                          "Zk node prefix where kafka stores broker 
information e.g. " +
+                          "/brokers (applicable only for old kafka spout) ");
         options.addOption(OPTION_SECURITY_PROTOCOL_SHORT, 
OPTION_SECURITY_PROTOCOL_LONG, true, "Security protocol to connect to kafka");
         return options;
     }
 
     /**
-     *
      * @param newKafkaSpoutOffsetQuery represents the information needed to 
query kafka for log head and spout offsets
      * @return log head offset, spout offset and lag for each partition
      */
-    public static List<KafkaOffsetLagResult> getOffsetLags 
(NewKafkaSpoutOffsetQuery newKafkaSpoutOffsetQuery) {
+    public static List<KafkaOffsetLagResult> 
getOffsetLags(NewKafkaSpoutOffsetQuery newKafkaSpoutOffsetQuery) {
         KafkaConsumer<String, String> consumer = null;
         List<KafkaOffsetLagResult> result = new ArrayList<>();
         try {
@@ -219,7 +235,7 @@ public class KafkaOffsetLagUtil {
             }
             List<TopicPartition> topicPartitionList = new ArrayList<>();
             consumer = new KafkaConsumer<>(props);
-            for (String topic: 
newKafkaSpoutOffsetQuery.getTopics().split(",")) {
+            for (String topic : 
newKafkaSpoutOffsetQuery.getTopics().split(",")) {
                 List<PartitionInfo> partitionInfoList = 
consumer.partitionsFor(topic);
                 if (partitionInfoList != null) {
                     for (PartitionInfo partitionInfo : partitionInfoList) {
@@ -232,7 +248,8 @@ public class KafkaOffsetLagUtil {
                 OffsetAndMetadata offsetAndMetadata = 
consumer.committed(topicPartition);
                 long committedOffset = offsetAndMetadata != null ? 
offsetAndMetadata.offset() : -1;
                 consumer.seekToEnd(toArrayList(topicPartition));
-                result.add(new KafkaOffsetLagResult(topicPartition.topic(), 
topicPartition.partition(), committedOffset, 
consumer.position(topicPartition)));
+                result.add(new KafkaOffsetLagResult(topicPartition.topic(), 
topicPartition.partition(), committedOffset,
+                                                    
consumer.position(topicPartition)));
             }
         } finally {
             if (consumer != null) {
@@ -243,22 +260,23 @@ public class KafkaOffsetLagUtil {
     }
 
     private static Collection<TopicPartition> toArrayList(final TopicPartition 
tp) {
-        return new ArrayList<TopicPartition>(1){{add(tp);}};
+        return new ArrayList<TopicPartition>(1) {{
+            add(tp);
+        }};
     }
 
     /**
-     *
      * @param oldKafkaSpoutOffsetQuery represents the information needed to 
query kafka for log head and spout offsets
      * @return log head offset, spout offset and lag for each partition
      */
-    public static List<KafkaOffsetLagResult> getOffsetLags 
(OldKafkaSpoutOffsetQuery oldKafkaSpoutOffsetQuery) throws Exception {
+    public static List<KafkaOffsetLagResult> 
getOffsetLags(OldKafkaSpoutOffsetQuery oldKafkaSpoutOffsetQuery) throws 
Exception {
         List<KafkaOffsetLagResult> result = new ArrayList<>();
         Map<String, List<TopicPartition>> leaders = 
getLeadersAndTopicPartitions(oldKafkaSpoutOffsetQuery);
         if (leaders != null) {
             Map<String, Map<Integer, Long>> logHeadOffsets = 
getLogHeadOffsets(leaders);
             Map<String, List<Integer>> topicPartitions = new HashMap<>();
-            for (Map.Entry<String, List<TopicPartition>> entry: 
leaders.entrySet()) {
-                for (TopicPartition topicPartition: entry.getValue()) {
+            for (Map.Entry<String, List<TopicPartition>> entry : 
leaders.entrySet()) {
+                for (TopicPartition topicPartition : entry.getValue()) {
                     if (!topicPartitions.containsKey(topicPartition.topic())) {
                         topicPartitions.put(topicPartition.topic(), new 
ArrayList<Integer>());
                     }
@@ -266,13 +284,15 @@ public class KafkaOffsetLagUtil {
                 }
             }
             Map<String, Map<Integer, Long>> oldConsumerOffsets = 
getOldConsumerOffsetsFromZk(topicPartitions, oldKafkaSpoutOffsetQuery);
-            for (Map.Entry<String, Map<Integer, Long>> topicOffsets: 
logHeadOffsets.entrySet()) {
-                for (Map.Entry<Integer, Long> partitionOffsets: 
topicOffsets.getValue().entrySet()) {
-                    Long consumerCommittedOffset = 
oldConsumerOffsets.get(topicOffsets.getKey()) != null ? (Long) 
oldConsumerOffsets.get(topicOffsets.getKey()).get
-                        (partitionOffsets.getKey()) : -1;
+            for (Map.Entry<String, Map<Integer, Long>> topicOffsets : 
logHeadOffsets.entrySet()) {
+                for (Map.Entry<Integer, Long> partitionOffsets : 
topicOffsets.getValue().entrySet()) {
+                    Long consumerCommittedOffset =
+                        oldConsumerOffsets.get(topicOffsets.getKey()) != null 
? (Long) oldConsumerOffsets.get(topicOffsets.getKey()).get
+                            (partitionOffsets.getKey()) : -1;
                     consumerCommittedOffset = (consumerCommittedOffset == null 
? -1 : consumerCommittedOffset);
                     KafkaOffsetLagResult kafkaOffsetLagResult = new 
KafkaOffsetLagResult(topicOffsets.getKey(), partitionOffsets.getKey(),
-                            consumerCommittedOffset, 
partitionOffsets.getValue());
+                                                                               
          consumerCommittedOffset,
+                                                                               
          partitionOffsets.getValue());
                     result.add(kafkaOffsetLagResult);
                 }
             }
@@ -280,7 +300,8 @@ public class KafkaOffsetLagUtil {
         return result;
     }
 
-    private static Map<String, List<TopicPartition>> 
getLeadersAndTopicPartitions (OldKafkaSpoutOffsetQuery 
oldKafkaSpoutOffsetQuery) throws Exception {
+    private static Map<String, List<TopicPartition>> 
getLeadersAndTopicPartitions(OldKafkaSpoutOffsetQuery oldKafkaSpoutOffsetQuery) 
throws
+        Exception {
         Map<String, List<TopicPartition>> result = new HashMap<>();
         // this means that kafka spout was configured with StaticHosts hosts 
(leader for partition)
         if (oldKafkaSpoutOffsetQuery.getPartitions() != null) {
@@ -301,7 +322,8 @@ public class KafkaOffsetLagUtil {
                     brokersZkNode += "/";
                 }
                 String topicsZkPath = brokersZkNode + "topics";
-                curatorFramework = 
CuratorFrameworkFactory.newClient(oldKafkaSpoutOffsetQuery.getZkServers(), 
20000, 15000, new RetryOneTime(1000));
+                curatorFramework =
+                    
CuratorFrameworkFactory.newClient(oldKafkaSpoutOffsetQuery.getZkServers(), 
20000, 15000, new RetryOneTime(1000));
                 curatorFramework.start();
                 List<String> topics = new ArrayList<>();
                 if (oldKafkaSpoutOffsetQuery.isWildCardTopic()) {
@@ -314,7 +336,7 @@ public class KafkaOffsetLagUtil {
                 } else {
                     topics.add(oldKafkaSpoutOffsetQuery.getTopic());
                 }
-                for (String topic: topics) {
+                for (String topic : topics) {
                     String partitionsPath = topicsZkPath + "/" + topic + 
"/partitions";
                     List<String> children = 
curatorFramework.getChildren().forPath(partitionsPath);
                     for (int i = 0; i < children.size(); ++i) {
@@ -341,27 +363,33 @@ public class KafkaOffsetLagUtil {
         return result;
     }
 
-    private static Map<String, Map<Integer, Long>> getLogHeadOffsets 
(Map<String, List<TopicPartition>> leadersAndTopicPartitions) {
+    private static Map<String, Map<Integer, Long>> 
getLogHeadOffsets(Map<String, List<TopicPartition>> leadersAndTopicPartitions) {
         Map<String, Map<Integer, Long>> result = new HashMap<>();
         if (leadersAndTopicPartitions != null) {
             PartitionOffsetRequestInfo partitionOffsetRequestInfo = new 
PartitionOffsetRequestInfo(OffsetRequest.LatestTime(), 1);
-            SimpleConsumer simpleConsumer  = null;
-            for (Map.Entry<String, List<TopicPartition>> leader: 
leadersAndTopicPartitions.entrySet()) {
+            SimpleConsumer simpleConsumer = null;
+            for (Map.Entry<String, List<TopicPartition>> leader : 
leadersAndTopicPartitions.entrySet()) {
                 try {
-                    simpleConsumer = new 
SimpleConsumer(leader.getKey().split(":")[0], 
Integer.parseInt(leader.getKey().split(":")[1]), 10000, 64 *
-                            1024, "LogHeadOffsetRequest");
-                    Map<TopicAndPartition, PartitionOffsetRequestInfo> 
requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
+                    simpleConsumer =
+                        new SimpleConsumer(leader.getKey().split(":")[0], 
Integer.parseInt(leader.getKey().split(":")[1]), 10000, 64 *
+                                                                               
                                                   1024,
+                                           "LogHeadOffsetRequest");
+                    Map<TopicAndPartition, PartitionOffsetRequestInfo> 
requestInfo =
+                        new HashMap<TopicAndPartition, 
PartitionOffsetRequestInfo>();
                     for (TopicPartition topicPartition : leader.getValue()) {
-                        requestInfo.put(new 
TopicAndPartition(topicPartition.topic(), topicPartition.partition()), 
partitionOffsetRequestInfo);
+                        requestInfo
+                            .put(new TopicAndPartition(topicPartition.topic(), 
topicPartition.partition()), partitionOffsetRequestInfo);
                         if (!result.containsKey(topicPartition.topic())) {
                             result.put(topicPartition.topic(), new 
HashMap<Integer, Long>());
                         }
                     }
-                    kafka.javaapi.OffsetRequest request = new 
kafka.javaapi.OffsetRequest(requestInfo, 
kafka.api.OffsetRequest.CurrentVersion(),
-                            "LogHeadOffsetRequest");
+                    kafka.javaapi.OffsetRequest request =
+                        new kafka.javaapi.OffsetRequest(requestInfo, 
kafka.api.OffsetRequest.CurrentVersion(),
+                                                        
"LogHeadOffsetRequest");
                     OffsetResponse response = 
simpleConsumer.getOffsetsBefore(request);
                     for (TopicPartition topicPartition : leader.getValue()) {
-                        
result.get(topicPartition.topic()).put(topicPartition.partition(), 
response.offsets(topicPartition.topic(), topicPartition.partition())[0]);
+                        result.get(topicPartition.topic())
+                              .put(topicPartition.partition(), 
response.offsets(topicPartition.topic(), topicPartition.partition())[0]);
                     }
                 } finally {
                     if (simpleConsumer != null) {
@@ -373,29 +401,34 @@ public class KafkaOffsetLagUtil {
         return result;
     }
 
-    private static Map<String, Map<Integer, Long>> getOldConsumerOffsetsFromZk 
(Map<String, List<Integer>> topicPartitions, OldKafkaSpoutOffsetQuery
-            oldKafkaSpoutOffsetQuery) throws Exception {
+    private static Map<String, Map<Integer, Long>> 
getOldConsumerOffsetsFromZk(Map<String, List<Integer>> topicPartitions,
+                                                                               
OldKafkaSpoutOffsetQuery
+                                                                               
    oldKafkaSpoutOffsetQuery) throws Exception {
         Map<String, Map<Integer, Long>> result = new HashMap<>();
-        CuratorFramework curatorFramework = 
CuratorFrameworkFactory.newClient(oldKafkaSpoutOffsetQuery.getZkServers(), 
20000, 15000, new RetryOneTime(1000));
+        CuratorFramework curatorFramework =
+            
CuratorFrameworkFactory.newClient(oldKafkaSpoutOffsetQuery.getZkServers(), 
20000, 15000, new RetryOneTime(1000));
         curatorFramework.start();
         String partitionPrefix = "partition_";
         String zkPath = oldKafkaSpoutOffsetQuery.getZkPath();
         if (zkPath.endsWith("/")) {
-            zkPath = zkPath.substring(0, zkPath.length()-1);
+            zkPath = zkPath.substring(0, zkPath.length() - 1);
         }
         if (curatorFramework.checkExists().forPath(zkPath) == null) {
-            throw new IllegalArgumentException(OPTION_ZK_COMMITTED_NODE_LONG+" 
'"+zkPath+"' dose not exists.");
+            throw new IllegalArgumentException(OPTION_ZK_COMMITTED_NODE_LONG + 
" '" + zkPath + "' dose not exists.");
         }
         byte[] zkData;
         try {
             if (topicPartitions != null) {
-                for (Map.Entry<String, List<Integer>> topicEntry: 
topicPartitions.entrySet()) {
+                for (Map.Entry<String, List<Integer>> topicEntry : 
topicPartitions.entrySet()) {
                     Map<Integer, Long> partitionOffsets = new HashMap<>();
-                    for (Integer partition: topicEntry.getValue()) {
-                        String path = zkPath + "/" + 
(oldKafkaSpoutOffsetQuery.isWildCardTopic() ? topicEntry.getKey() + "/" : "") + 
partitionPrefix + partition;
+                    for (Integer partition : topicEntry.getValue()) {
+                        String path =
+                            zkPath + "/" + 
(oldKafkaSpoutOffsetQuery.isWildCardTopic() ? topicEntry.getKey() + "/" : "") + 
partitionPrefix +
+                            partition;
                         if (curatorFramework.checkExists().forPath(path) != 
null) {
                             zkData = curatorFramework.getData().forPath(path);
-                            Map<Object, Object> offsetData = (Map<Object, 
Object>) JSONValue.parseWithException(new String(zkData, "UTF-8"));
+                            Map<Object, Object> offsetData =
+                                (Map<Object, Object>) 
JSONValue.parseWithException(new String(zkData, "UTF-8"));
                             partitionOffsets.put(partition, (Long) 
offsetData.get("offset"));
                         }
                     }

http://git-wip-us.apache.org/repos/asf/storm/blob/224633d3/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaPartitionOffsetLag.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaPartitionOffsetLag.java
 
b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaPartitionOffsetLag.java
index 6faf266..a47cef2 100644
--- 
a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaPartitionOffsetLag.java
+++ 
b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaPartitionOffsetLag.java
@@ -15,57 +15,58 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  
*******************************************************************************/
+
 package org.apache.storm.kafka.monitor;
 
 public class KafkaPartitionOffsetLag {
-  private long consumerCommittedOffset;
-  private long logHeadOffset;
-  private long lag;
+    private long consumerCommittedOffset;
+    private long logHeadOffset;
+    private long lag;
 
-  public KafkaPartitionOffsetLag(long consumerCommittedOffset, long 
logHeadOffset) {
-    this.consumerCommittedOffset = consumerCommittedOffset;
-    this.logHeadOffset = logHeadOffset;
-    this.lag = logHeadOffset - consumerCommittedOffset;
-  }
+    public KafkaPartitionOffsetLag(long consumerCommittedOffset, long 
logHeadOffset) {
+        this.consumerCommittedOffset = consumerCommittedOffset;
+        this.logHeadOffset = logHeadOffset;
+        this.lag = logHeadOffset - consumerCommittedOffset;
+    }
 
-  public long getConsumerCommittedOffset() {
-    return consumerCommittedOffset;
-  }
+    public long getConsumerCommittedOffset() {
+        return consumerCommittedOffset;
+    }
 
-  public long getLogHeadOffset() {
-    return logHeadOffset;
-  }
+    public long getLogHeadOffset() {
+        return logHeadOffset;
+    }
 
-  public long getLag() {
-    return lag;
-  }
+    public long getLag() {
+        return lag;
+    }
 
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) return true;
-    if (!(o instanceof KafkaPartitionOffsetLag)) return false;
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (!(o instanceof KafkaPartitionOffsetLag)) return false;
 
-    KafkaPartitionOffsetLag that = (KafkaPartitionOffsetLag) o;
+        KafkaPartitionOffsetLag that = (KafkaPartitionOffsetLag) o;
 
-    if (getConsumerCommittedOffset() != that.getConsumerCommittedOffset()) 
return false;
-    if (getLogHeadOffset() != that.getLogHeadOffset()) return false;
-    return getLag() == that.getLag();
+        if (getConsumerCommittedOffset() != that.getConsumerCommittedOffset()) 
return false;
+        if (getLogHeadOffset() != that.getLogHeadOffset()) return false;
+        return getLag() == that.getLag();
 
-  }
+    }
 
-  @Override
-  public int hashCode() {
-    int result = (int) (getConsumerCommittedOffset() ^ 
(getConsumerCommittedOffset() >>> 32));
-    result = 31 * result + (int) (getLogHeadOffset() ^ (getLogHeadOffset() >>> 
32));
-    result = 31 * result + (int) (getLag() ^ (getLag() >>> 32));
-    return result;
-  }
+    @Override
+    public int hashCode() {
+        int result = (int) (getConsumerCommittedOffset() ^ 
(getConsumerCommittedOffset() >>> 32));
+        result = 31 * result + (int) (getLogHeadOffset() ^ (getLogHeadOffset() 
>>> 32));
+        result = 31 * result + (int) (getLag() ^ (getLag() >>> 32));
+        return result;
+    }
 
-  @Override
-  public String toString() {
-    // JSONAware not working for nested element on Map so write JSON format 
from here
-    return "{\"consumerCommittedOffset\": " + consumerCommittedOffset + ", " +
-        "\"logHeadOffset\": " + logHeadOffset + ", " +
-        "\"lag\": " + lag + "}";
-  }
+    @Override
+    public String toString() {
+        // JSONAware not working for nested element on Map so write JSON 
format from here
+        return "{\"consumerCommittedOffset\": " + consumerCommittedOffset + ", 
" +
+               "\"logHeadOffset\": " + logHeadOffset + ", " +
+               "\"lag\": " + lag + "}";
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/224633d3/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java
 
b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java
index 18bc0df..e6c4524 100644
--- 
a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java
+++ 
b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java
@@ -15,10 +15,12 @@
  *   See the License for the specific language governing permissions and
  *   limitations under the License.
  */
+
 package org.apache.storm.kafka.monitor;
 
 /**
- * Class representing information for querying kafka for log head offsets, 
consumer offsets and the difference for new kafka spout using new consumer api
+ * Class representing information for querying kafka for log head offsets, 
consumer offsets and the difference for new kafka spout using new
+ * consumer api
  */
 public class NewKafkaSpoutOffsetQuery {
     private final String topics; // comma separated list of topics
@@ -52,10 +54,10 @@ public class NewKafkaSpoutOffsetQuery {
     @Override
     public String toString() {
         return "NewKafkaSpoutOffsetQuery{" +
-                "topics='" + topics + '\'' +
-                ", consumerGroupId='" + consumerGroupId + '\'' +
-                ", bootStrapBrokers='" + bootStrapBrokers + '\'' +
-                '}';
+               "topics='" + topics + '\'' +
+               ", consumerGroupId='" + consumerGroupId + '\'' +
+               ", bootStrapBrokers='" + bootStrapBrokers + '\'' +
+               '}';
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/224633d3/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/OldKafkaSpoutOffsetQuery.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/OldKafkaSpoutOffsetQuery.java
 
b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/OldKafkaSpoutOffsetQuery.java
index cdeed32..ea80b64 100644
--- 
a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/OldKafkaSpoutOffsetQuery.java
+++ 
b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/OldKafkaSpoutOffsetQuery.java
@@ -15,6 +15,7 @@
  *   See the License for the specific language governing permissions and
  *   limitations under the License.
  */
+
 package org.apache.storm.kafka.monitor;
 
 /**
@@ -27,7 +28,8 @@ public class OldKafkaSpoutOffsetQuery {
     private final boolean isWildCardTopic; //if the topic is a wildcard
     private final String brokersZkPath; //zk node prefix where kafka stores 
all broker information
     private final String partitions; //comma separated list of partitions 
corresponding to leaders below (for StaticHosts)
-    private final String leaders; //comma separated list of leader brokers and 
port corresponding to the partitions above (for StaticHosts) e.g.
+    private final String leaders;
+        //comma separated list of leader brokers and port corresponding to the 
partitions above (for StaticHosts) e.g.
     // hostname1:9092,hostname2:9092
 
     public OldKafkaSpoutOffsetQuery(String topic, String zkServers, String 
zkPath, boolean isWildCardTopic, String brokersZkPath) {
@@ -39,8 +41,9 @@ public class OldKafkaSpoutOffsetQuery {
 
     }
 
-    private OldKafkaSpoutOffsetQuery(String topic, String zkServers, String 
zkPath, boolean isWildCardTopic, String brokersZkPath, String partitions, String
-            leaders) {
+    private OldKafkaSpoutOffsetQuery(String topic, String zkServers, String 
zkPath, boolean isWildCardTopic, String brokersZkPath,
+                                     String partitions, String
+                                         leaders) {
         this.topic = topic;
         this.zkServers = zkServers;
         this.zkPath = zkPath;
@@ -53,14 +56,14 @@ public class OldKafkaSpoutOffsetQuery {
     @Override
     public String toString() {
         return "OldKafkaSpoutOffsetQuery{" +
-                "topic='" + topic + '\'' +
-                ", zkServers='" + zkServers + '\'' +
-                ", zkPath='" + zkPath + '\'' +
-                ", isWildCardTopic=" + isWildCardTopic +
-                ", brokersZkPath='" + brokersZkPath + '\'' +
-                ", partitions='" + partitions + '\'' +
-                ", leaders='" + leaders + '\'' +
-                '}';
+               "topic='" + topic + '\'' +
+               ", zkServers='" + zkServers + '\'' +
+               ", zkPath='" + zkPath + '\'' +
+               ", isWildCardTopic=" + isWildCardTopic +
+               ", brokersZkPath='" + brokersZkPath + '\'' +
+               ", partitions='" + partitions + '\'' +
+               ", leaders='" + leaders + '\'' +
+               '}';
     }
 
     @Override

Reply via email to