Fixing stylecheck problems with storm-kafka

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4fe4f04b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4fe4f04b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4fe4f04b

Branch: refs/heads/master
Commit: 4fe4f04bb9750301b96f5c20142acb9a9a6a6000
Parents: 1a2d131
Author: Kishor Patil <[email protected]>
Authored: Sun Apr 22 22:59:46 2018 -0400
Committer: Kishor Patil <[email protected]>
Committed: Mon Apr 23 00:22:37 2018 -0400

----------------------------------------------------------------------
 external/storm-kafka/pom.xml                    |   2 +-
 .../src/jvm/org/apache/storm/kafka/Broker.java  |  51 ++--
 .../jvm/org/apache/storm/kafka/BrokerHosts.java |  19 +-
 .../storm/kafka/ByteBufferSerializer.java       |  26 +-
 .../storm/kafka/DynamicBrokersReader.java       | 112 ++++----
 .../kafka/DynamicPartitionConnections.java      |  55 ++--
 .../ExponentialBackoffMsgRetryManager.java      |  75 +++---
 .../storm/kafka/FailedFetchException.java       |  19 +-
 .../storm/kafka/FailedMsgRetryManager.java      |  19 +-
 .../org/apache/storm/kafka/IntSerializer.java   |  24 +-
 .../jvm/org/apache/storm/kafka/KafkaConfig.java |  25 +-
 .../jvm/org/apache/storm/kafka/KafkaError.java  |  19 +-
 .../jvm/org/apache/storm/kafka/KafkaSpout.java  |  58 ++--
 .../jvm/org/apache/storm/kafka/KafkaUtils.java  | 269 +++++++++----------
 .../org/apache/storm/kafka/KeyValueScheme.java  |  22 +-
 .../kafka/KeyValueSchemeAsMultiScheme.java      |  31 +--
 .../storm/kafka/MessageMetadataScheme.java      |  22 +-
 .../MessageMetadataSchemeAsMultiScheme.java     |  20 +-
 .../jvm/org/apache/storm/kafka/Partition.java   |  39 ++-
 .../storm/kafka/PartitionCoordinator.java       |  19 +-
 .../apache/storm/kafka/PartitionManager.java    | 121 +++++----
 .../jvm/org/apache/storm/kafka/SpoutConfig.java |  19 +-
 .../apache/storm/kafka/StaticCoordinator.java   |  27 +-
 .../jvm/org/apache/storm/kafka/StaticHosts.java |  19 +-
 .../storm/kafka/StaticPartitionConnections.java |  22 +-
 .../storm/kafka/StringKeyValueScheme.java       |  24 +-
 .../kafka/StringMessageAndMetadataScheme.java   |  27 +-
 .../storm/kafka/StringMultiSchemeWithTopic.java |  28 +-
 .../org/apache/storm/kafka/StringScheme.java    |  38 ++-
 .../kafka/TopicOffsetOutOfRangeException.java   |   1 +
 .../org/apache/storm/kafka/ZkCoordinator.java   |  50 ++--
 .../src/jvm/org/apache/storm/kafka/ZkHosts.java |  19 +-
 .../src/jvm/org/apache/storm/kafka/ZkState.java |  65 +++--
 .../org/apache/storm/kafka/bolt/KafkaBolt.java  |  69 +++--
 .../FieldNameBasedTupleToKafkaMapper.java       |  21 +-
 .../kafka/bolt/mapper/TupleToKafkaMapper.java   |  25 +-
 .../bolt/selector/DefaultTopicSelector.java     |  19 +-
 .../bolt/selector/FieldIndexTopicSelector.java  |  19 +-
 .../bolt/selector/FieldNameTopicSelector.java   |  19 +-
 .../kafka/bolt/selector/KafkaTopicSelector.java |  22 +-
 .../apache/storm/kafka/trident/Coordinator.java |  27 +-
 .../storm/kafka/trident/DefaultCoordinator.java |  19 +-
 .../trident/GlobalPartitionInformation.java     |  41 ++-
 .../storm/kafka/trident/IBatchCoordinator.java  |  19 +-
 .../storm/kafka/trident/IBrokerReader.java      |  20 +-
 .../apache/storm/kafka/trident/MaxMetric.java   |  19 +-
 .../kafka/trident/OpaqueTridentKafkaSpout.java  |  44 ++-
 .../storm/kafka/trident/StaticBrokerReader.java |  23 +-
 .../trident/TransactionalTridentKafkaSpout.java |  22 +-
 .../storm/kafka/trident/TridentKafkaConfig.java |  19 +-
 .../kafka/trident/TridentKafkaEmitter.java      |  73 +++--
 .../storm/kafka/trident/TridentKafkaState.java  |  49 ++--
 .../kafka/trident/TridentKafkaStateFactory.java |  34 +--
 .../kafka/trident/TridentKafkaUpdater.java      |  22 +-
 .../storm/kafka/trident/ZkBrokerReader.java     | 113 ++++----
 .../FieldNameBasedTupleToKafkaMapper.java       |  19 +-
 .../mapper/TridentTupleToKafkaMapper.java       |  26 +-
 .../trident/selector/DefaultTopicSelector.java  |  19 +-
 .../trident/selector/KafkaTopicSelector.java    |  22 +-
 .../storm/kafka/DynamicBrokersReaderTest.java   |  39 ++-
 .../ExponentialBackoffMsgRetryManagerTest.java  |  49 ++--
 .../org/apache/storm/kafka/KafkaErrorTest.java  |  19 +-
 .../org/apache/storm/kafka/KafkaTestBroker.java |  29 +-
 .../org/apache/storm/kafka/KafkaUtilsTest.java  |  73 +++--
 .../storm/kafka/PartitionManagerTest.java       |  34 +--
 .../storm/kafka/StringKeyValueSchemeTest.java   |  36 ++-
 .../apache/storm/kafka/TestStringScheme.java    |  28 +-
 .../test/org/apache/storm/kafka/TestUtils.java  |  32 ++-
 .../apache/storm/kafka/TridentKafkaTest.java    |  40 ++-
 .../apache/storm/kafka/ZkCoordinatorTest.java   |  49 ++--
 .../apache/storm/kafka/bolt/KafkaBoltTest.java  | 159 ++++++-----
 71 files changed, 1246 insertions(+), 1551 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-kafka/pom.xml b/external/storm-kafka/pom.xml
index d3b38db..2e0f35b 100644
--- a/external/storm-kafka/pom.xml
+++ b/external/storm-kafka/pom.xml
@@ -57,7 +57,7 @@
                 <artifactId>maven-checkstyle-plugin</artifactId>
                 <!--Note - the version would be inherited-->
                 <configuration>
-                    <maxAllowedViolations>557</maxAllowedViolations>
+                    <maxAllowedViolations>180</maxAllowedViolations>
                 </configuration>
             </plugin>
         </plugins>

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/Broker.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/Broker.java 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/Broker.java
index 0d95e8d..b33af99 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/Broker.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/Broker.java
@@ -1,24 +1,18 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.kafka;
 
 import com.google.common.base.Objects;
-
 import java.io.Serializable;
 
 public class Broker implements Serializable, Comparable<Broker> {
@@ -27,9 +21,9 @@ public class Broker implements Serializable, 
Comparable<Broker> {
 
     // for kryo compatibility
     private Broker() {
-       
+
     }
-    
+
     public Broker(String host, int port) {
         this.host = host;
         this.port = port;
@@ -39,6 +33,19 @@ public class Broker implements Serializable, 
Comparable<Broker> {
         this(host, 9092);
     }
 
+    public static Broker fromString(String host) {
+        Broker hp;
+        String[] spec = host.split(":");
+        if (spec.length == 1) {
+            hp = new Broker(spec[0]);
+        } else if (spec.length == 2) {
+            hp = new Broker(spec[0], Integer.parseInt(spec[1]));
+        } else {
+            throw new IllegalArgumentException("Invalid host specification: " 
+ host);
+        }
+        return hp;
+    }
+
     @Override
     public int hashCode() {
         return Objects.hashCode(host, port);
@@ -61,20 +68,6 @@ public class Broker implements Serializable, 
Comparable<Broker> {
         return host + ":" + port;
     }
 
-    public static Broker fromString(String host) {
-        Broker hp;
-        String[] spec = host.split(":");
-        if (spec.length == 1) {
-            hp = new Broker(spec[0]);
-        } else if (spec.length == 2) {
-            hp = new Broker(spec[0], Integer.parseInt(spec[1]));
-        } else {
-            throw new IllegalArgumentException("Invalid host specification: " 
+ host);
-        }
-        return hp;
-    }
-
-
     @Override
     public int compareTo(Broker o) {
         if (this.host.equals(o.host)) {

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/BrokerHosts.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/BrokerHosts.java 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/BrokerHosts.java
index 13ba0a1..dbd6a10 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/BrokerHosts.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/BrokerHosts.java
@@ -1,20 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.kafka;
 
 import java.io.Serializable;

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ByteBufferSerializer.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ByteBufferSerializer.java 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ByteBufferSerializer.java
index 2a18a7f..37986a5 100644
--- 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ByteBufferSerializer.java
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ByteBufferSerializer.java
@@ -15,27 +15,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.storm.kafka;
 
-import org.apache.storm.utils.Utils;
-import org.apache.kafka.common.serialization.Serializer;
+package org.apache.storm.kafka;
 
 import java.nio.ByteBuffer;
 import java.util.Map;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.storm.utils.Utils;
 
 public class ByteBufferSerializer implements Serializer<ByteBuffer> {
-  @Override
-  public void configure(Map<String, ?> map, boolean b) {
+    @Override
+    public void configure(Map<String, ?> map, boolean b) {
 
-  }
+    }
 
-  @Override
-  public void close() {
+    @Override
+    public void close() {
 
-  }
+    }
 
-  @Override
-  public byte[] serialize(String s, ByteBuffer b) {
-    return Utils.toByteArray(b);
-  }
+    @Override
+    public byte[] serialize(String s, ByteBuffer b) {
+        return Utils.toByteArray(b);
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicBrokersReader.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicBrokersReader.java 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicBrokersReader.java
index c203359..49ad530 100644
--- 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicBrokersReader.java
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicBrokersReader.java
@@ -1,37 +1,31 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.kafka;
 
-import org.apache.storm.Config;
-import org.apache.storm.utils.ObjectReader;
 import com.google.common.base.Preconditions;
+import java.net.SocketTimeoutException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.RetryNTimes;
+import org.apache.storm.Config;
+import org.apache.storm.kafka.trident.GlobalPartitionInformation;
+import org.apache.storm.utils.ObjectReader;
 import org.json.simple.JSONValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.storm.kafka.trident.GlobalPartitionInformation;
-
-import java.net.SocketTimeoutException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
 
 
 public class DynamicBrokersReader {
@@ -49,7 +43,7 @@ public class DynamicBrokersReader {
 
         validateConfig(conf);
 
-        Preconditions.checkNotNull(zkStr,"zkString cannot be null");
+        Preconditions.checkNotNull(zkStr, "zkString cannot be null");
         Preconditions.checkNotNull(zkPath, "zkPath cannot be null");
         Preconditions.checkNotNull(topic, "topic cannot be null");
 
@@ -58,11 +52,11 @@ public class DynamicBrokersReader {
         _isWildcardTopic = 
ObjectReader.getBoolean(conf.get("kafka.topic.wildcard.match"), false);
         try {
             _curator = CuratorFrameworkFactory.newClient(
-                    zkStr,
-                    
ObjectReader.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)),
-                    
ObjectReader.getInt(conf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT)),
-                    new 
RetryNTimes(ObjectReader.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)),
-                            
ObjectReader.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL))));
+                zkStr,
+                
ObjectReader.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)),
+                
ObjectReader.getInt(conf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT)),
+                new 
RetryNTimes(ObjectReader.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)),
+                                
ObjectReader.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL))));
             _curator.start();
         } catch (Exception ex) {
             LOG.error("Couldn't connect to zookeeper", ex);
@@ -74,33 +68,33 @@ public class DynamicBrokersReader {
      * Get all partitions with their current leaders
      */
     public List<GlobalPartitionInformation> getBrokerInfo() throws 
SocketTimeoutException {
-      List<String> topics =  getTopics();
-      List<GlobalPartitionInformation> partitions =  new 
ArrayList<GlobalPartitionInformation>();
-
-      for (String topic : topics) {
-          GlobalPartitionInformation globalPartitionInformation = new 
GlobalPartitionInformation(topic, this._isWildcardTopic);
-          try {
-              int numPartitionsForTopic = getNumPartitions(topic);
-              String brokerInfoPath = brokerPath();
-              for (int partition = 0; partition < numPartitionsForTopic; 
partition++) {
-                  int leader = getLeaderFor(topic,partition);
-                  String path = brokerInfoPath + "/" + leader;
-                  try {
-                      byte[] brokerData = _curator.getData().forPath(path);
-                      Broker hp = getBrokerHost(brokerData);
-                      globalPartitionInformation.addPartition(partition, hp);
-                  } catch 
(org.apache.zookeeper.KeeperException.NoNodeException e) {
-                      LOG.error("Node {} does not exist ", path);
-                  }
-              }
-          } catch (SocketTimeoutException e) {
-              throw e;
-          } catch (Exception e) {
-              throw new RuntimeException(e);
-          }
-          LOG.info("Read partition info from zookeeper: " + 
globalPartitionInformation);
-          partitions.add(globalPartitionInformation);
-      }
+        List<String> topics = getTopics();
+        List<GlobalPartitionInformation> partitions = new 
ArrayList<GlobalPartitionInformation>();
+
+        for (String topic : topics) {
+            GlobalPartitionInformation globalPartitionInformation = new 
GlobalPartitionInformation(topic, this._isWildcardTopic);
+            try {
+                int numPartitionsForTopic = getNumPartitions(topic);
+                String brokerInfoPath = brokerPath();
+                for (int partition = 0; partition < numPartitionsForTopic; 
partition++) {
+                    int leader = getLeaderFor(topic, partition);
+                    String path = brokerInfoPath + "/" + leader;
+                    try {
+                        byte[] brokerData = _curator.getData().forPath(path);
+                        Broker hp = getBrokerHost(brokerData);
+                        globalPartitionInformation.addPartition(partition, hp);
+                    } catch 
(org.apache.zookeeper.KeeperException.NoNodeException e) {
+                        LOG.error("Node {} does not exist ", path);
+                    }
+                }
+            } catch (SocketTimeoutException e) {
+                throw e;
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+            LOG.info("Read partition info from zookeeper: " + 
globalPartitionInformation);
+            partitions.add(globalPartitionInformation);
+        }
         return partitions;
     }
 
@@ -135,9 +129,10 @@ public class DynamicBrokersReader {
         }
     }
 
-    public String topicsPath () {
+    public String topicsPath() {
         return _zkPath + "/topics";
     }
+
     public String partitionPath(String topic) {
         return topicsPath() + "/" + topic + "/partitions";
     }
@@ -147,7 +142,6 @@ public class DynamicBrokersReader {
     }
 
 
-
     /**
      * get /brokers/topics/distributedTopic/partitions/1/state
      * { "controller_epoch":4, "isr":[ 1, 0 ], "leader":1, "leader_epoch":1, 
"version":1 }
@@ -202,13 +196,13 @@ public class DynamicBrokersReader {
      */
     private void validateConfig(final Map<String, Object> conf) {
         
Preconditions.checkNotNull(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT),
-                "%s cannot be null", Config.STORM_ZOOKEEPER_SESSION_TIMEOUT);
+                                   "%s cannot be null", 
Config.STORM_ZOOKEEPER_SESSION_TIMEOUT);
         
Preconditions.checkNotNull(conf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT),
-                "%s cannot be null", 
Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT);
+                                   "%s cannot be null", 
Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT);
         
Preconditions.checkNotNull(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES),
-                "%s cannot be null", Config.STORM_ZOOKEEPER_RETRY_TIMES);
+                                   "%s cannot be null", 
Config.STORM_ZOOKEEPER_RETRY_TIMES);
         
Preconditions.checkNotNull(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL),
-                "%s cannot be null", Config.STORM_ZOOKEEPER_RETRY_INTERVAL);
+                                   "%s cannot be null", 
Config.STORM_ZOOKEEPER_RETRY_INTERVAL);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicPartitionConnections.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicPartitionConnections.java
 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicPartitionConnections.java
index 4c5dba5..1ca7144 100644
--- 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicPartitionConnections.java
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicPartitionConnections.java
@@ -1,50 +1,33 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
-package org.apache.storm.kafka;
 
-import kafka.javaapi.consumer.SimpleConsumer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.storm.kafka.trident.IBrokerReader;
+package org.apache.storm.kafka;
 
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import kafka.javaapi.consumer.SimpleConsumer;
+import org.apache.storm.kafka.trident.IBrokerReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 public class DynamicPartitionConnections {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(DynamicPartitionConnections.class);
-
-    static class ConnectionInfo {
-        SimpleConsumer consumer;
-        Set<String> partitions = new HashSet<String>();
-
-        public ConnectionInfo(SimpleConsumer consumer) {
-            this.consumer = consumer;
-        }
-    }
-
     Map<Broker, ConnectionInfo> _connections = new HashMap<>();
     KafkaConfig _config;
     IBrokerReader _reader;
-
     public DynamicPartitionConnections(KafkaConfig config, IBrokerReader 
brokerReader) {
         _config = config;
         _reader = brokerReader;
@@ -57,10 +40,11 @@ public class DynamicPartitionConnections {
 
     public SimpleConsumer register(Broker host, String topic, int partition) {
         if (!_connections.containsKey(host)) {
-            _connections.put(host, new ConnectionInfo(new 
SimpleConsumer(host.host, host.port, _config.socketTimeoutMs, 
_config.bufferSizeBytes, _config.clientId)));
+            _connections.put(host, new ConnectionInfo(
+                new SimpleConsumer(host.host, host.port, 
_config.socketTimeoutMs, _config.bufferSizeBytes, _config.clientId)));
         }
         ConnectionInfo info = _connections.get(host);
-        info.partitions.add(getHashKey(topic,partition));
+        info.partitions.add(getHashKey(topic, partition));
         return info.consumer;
     }
 
@@ -74,7 +58,7 @@ public class DynamicPartitionConnections {
 
     public void unregister(Broker port, String topic, int partition) {
         ConnectionInfo info = _connections.get(port);
-        info.partitions.remove(getHashKey(topic,partition));
+        info.partitions.remove(getHashKey(topic, partition));
         if (info.partitions.isEmpty()) {
             info.consumer.close();
             _connections.remove(port);
@@ -95,4 +79,13 @@ public class DynamicPartitionConnections {
     private String getHashKey(String topic, int partition) {
         return topic + "_" + partition;
     }
+
+    static class ConnectionInfo {
+        SimpleConsumer consumer;
+        Set<String> partitions = new HashSet<String>();
+
+        public ConnectionInfo(SimpleConsumer consumer) {
+            this.consumer = consumer;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
index 60654a5..2651c30 100644
--- 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
@@ -1,23 +1,17 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.kafka;
 
-import org.apache.storm.utils.Time;
 import java.util.Comparator;
 import java.util.HashSet;
 import java.util.Map;
@@ -25,6 +19,7 @@ import java.util.PriorityQueue;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import org.apache.storm.utils.Time;
 
 public class ExponentialBackoffMsgRetryManager implements 
FailedMsgRetryManager {
 
@@ -34,7 +29,7 @@ public class ExponentialBackoffMsgRetryManager implements 
FailedMsgRetryManager
     private int retryLimit;
 
     private Queue<MessageRetryRecord> waiting;
-    private Map<Long,MessageRetryRecord> records;
+    private Map<Long, MessageRetryRecord> records;
 
     public ExponentialBackoffMsgRetryManager() {
 
@@ -46,15 +41,15 @@ public class ExponentialBackoffMsgRetryManager implements 
FailedMsgRetryManager
         this.retryDelayMaxMs = spoutConfig.retryDelayMaxMs;
         this.retryLimit = spoutConfig.retryLimit;
         this.waiting = new PriorityQueue<MessageRetryRecord>(11, new 
RetryTimeComparator());
-        this.records = new ConcurrentHashMap<Long,MessageRetryRecord>();
+        this.records = new ConcurrentHashMap<Long, MessageRetryRecord>();
     }
 
     @Override
     public void failed(Long offset) {
         MessageRetryRecord oldRecord = this.records.get(offset);
         MessageRetryRecord newRecord = oldRecord == null ?
-                                       new MessageRetryRecord(offset) :
-                                       oldRecord.createNextRetryRecord();
+            new MessageRetryRecord(offset) :
+            oldRecord.createNextRetryRecord();
         this.records.put(offset, newRecord);
         this.waiting.add(newRecord);
     }
@@ -98,16 +93,16 @@ public class ExponentialBackoffMsgRetryManager implements 
FailedMsgRetryManager
     public boolean shouldReEmitMsg(Long offset) {
         MessageRetryRecord record = this.records.get(offset);
         return record != null &&
-                this.waiting.contains(record) &&
-                Time.currentTimeMillis() >= record.retryTimeUTC;
+               this.waiting.contains(record) &&
+               Time.currentTimeMillis() >= record.retryTimeUTC;
     }
 
     @Override
     public boolean retryFurther(Long offset) {
         MessageRetryRecord record = this.records.get(offset);
-        return ! (record != null &&
-               this.retryLimit > 0 &&
-               this.retryLimit <= record.retryNum);
+        return !(record != null &&
+                 this.retryLimit > 0 &&
+                 this.retryLimit <= record.retryNum);
     }
 
     @Override
@@ -117,9 +112,9 @@ public class ExponentialBackoffMsgRetryManager implements 
FailedMsgRetryManager
 
     @Override
     public Set<Long> clearOffsetsBefore(Long kafkaOffset) {
-        Set<Long> invalidOffsets = new HashSet<Long>(); 
-        for(Long offset : records.keySet()){
-            if(offset < kafkaOffset){
+        Set<Long> invalidOffsets = new HashSet<Long>();
+        for (Long offset : records.keySet()) {
+            if (offset < kafkaOffset) {
                 MessageRetryRecord record = this.records.remove(offset);
                 if (record != null) {
                     this.waiting.remove(record);
@@ -130,6 +125,19 @@ public class ExponentialBackoffMsgRetryManager implements 
FailedMsgRetryManager
         return invalidOffsets;
     }
 
+    private static class RetryTimeComparator implements 
Comparator<MessageRetryRecord> {
+
+        @Override
+        public int compare(MessageRetryRecord record1, MessageRetryRecord 
record2) {
+            return 
Long.valueOf(record1.retryTimeUTC).compareTo(Long.valueOf(record2.retryTimeUTC));
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            return false;
+        }
+    }
+
     /**
      * A MessageRetryRecord holds the data of how many times a message has
      * failed and been retried, and when the last failure occurred.  It can
@@ -174,8 +182,8 @@ public class ExponentialBackoffMsgRetryManager implements 
FailedMsgRetryManager
             double delay = retryInitialDelayMs * delayMultiplier;
             Long maxLong = Long.MAX_VALUE;
             long delayThisRetryMs = delay >= maxLong.doubleValue()
-                                    ?  maxLong
-                                    : (long) delay;
+                ? maxLong
+                : (long) delay;
             return Math.min(delayThisRetryMs, retryDelayMaxMs);
         }
 
@@ -190,17 +198,4 @@ public class ExponentialBackoffMsgRetryManager implements 
FailedMsgRetryManager
             return Long.valueOf(this.offset).hashCode();
         }
     }
-
-    private static class RetryTimeComparator implements 
Comparator<MessageRetryRecord> {
-
-        @Override
-        public int compare(MessageRetryRecord record1, MessageRetryRecord 
record2) {
-            return 
Long.valueOf(record1.retryTimeUTC).compareTo(Long.valueOf(record2.retryTimeUTC));
-        }
-
-        @Override
-        public boolean equals(Object obj) {
-            return false;
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedFetchException.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedFetchException.java 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedFetchException.java
index 448d0c3..a1f3fe5 100644
--- 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedFetchException.java
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedFetchException.java
@@ -1,20 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.kafka;
 
 public class FailedFetchException extends RuntimeException {

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java
 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java
index c7a7a04..b7fafdc 100644
--- 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java
@@ -1,20 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.kafka;
 
 import java.io.Serializable;

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/IntSerializer.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/IntSerializer.java 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/IntSerializer.java
index 7cdfc87..fef8625 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/IntSerializer.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/IntSerializer.java
@@ -15,28 +15,28 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.storm.kafka;
 
-import org.apache.kafka.common.serialization.Serializer;
+package org.apache.storm.kafka;
 
 import java.util.Map;
+import org.apache.kafka.common.serialization.Serializer;
 
 public class IntSerializer implements Serializer<Integer> {
-  @Override
-  public void configure(Map<String, ?> map, boolean b) {
-  }
+    @Override
+    public void configure(Map<String, ?> map, boolean b) {
+    }
 
-  @Override
-  public byte[] serialize(String topic, Integer val) {
-    return new byte[] {
+    @Override
+    public byte[] serialize(String topic, Integer val) {
+        return new byte[]{
             (byte) (val >>> 24),
             (byte) (val >>> 16),
             (byte) (val >>> 8),
             val.byteValue()
         };
-  }
+    }
 
-  @Override
-  public void close() {
-  }
+    @Override
+    public void close() {
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java
index 1c9ada8..a93f426 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java
@@ -1,32 +1,25 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.kafka;
 
 import java.io.Serializable;
-
+import kafka.api.FetchRequest;
 import org.apache.storm.spout.MultiScheme;
 import org.apache.storm.spout.RawMultiScheme;
 
-import kafka.api.FetchRequest;
-
 public class KafkaConfig implements Serializable {
     private static final long serialVersionUID = 5276718734571623855L;
-    
+
     public final BrokerHosts hosts;
     public final String topic;
     public final String clientId;

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaError.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaError.java 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaError.java
index 1d866e7..4bf2ed2 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaError.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaError.java
@@ -1,20 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.kafka;
 
 public enum KafkaError {

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
index 7b1243c..aed5986 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
@@ -1,24 +1,23 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  * <p/>
  * http://www.apache.org/licenses/LICENSE-2.0
  * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.kafka;
 
 import com.google.common.base.Strings;
-
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import org.apache.storm.Config;
 import org.apache.storm.kafka.PartitionManager.KafkaMessageId;
 import org.apache.storm.kafka.trident.GlobalPartitionInformation;
@@ -30,27 +29,16 @@ import org.apache.storm.topology.base.BaseRichSpout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
-
 // TODO: need to add blacklisting
 // TODO: need to make a best effort to not re-emit messages if don't have to
 public class KafkaSpout extends BaseRichSpout {
-    static enum EmitState {
-        EMITTED_MORE_LEFT,
-        EMITTED_END,
-        NO_EMITTED
-    }
-
     private static final Logger LOG = 
LoggerFactory.getLogger(KafkaSpout.class);
-
     SpoutConfig _spoutConfig;
     SpoutOutputCollector _collector;
     PartitionCoordinator _coordinator;
     DynamicPartitionConnections _connections;
     ZkState _state;
-
     long _lastUpdateMs = 0;
-
     int _currPartitionIndex = 0;
 
     public KafkaSpout(SpoutConfig spoutConf) {
@@ -81,12 +69,12 @@ public class KafkaSpout extends BaseRichSpout {
         int totalTasks = 
context.getComponentTasks(context.getThisComponentId()).size();
         if (_spoutConfig.hosts instanceof StaticHosts) {
             _coordinator = new StaticCoordinator(_connections, conf,
-                    _spoutConfig, _state, context.getThisTaskIndex(),
-                    totalTasks, context.getThisTaskId(), topologyInstanceId);
+                                                 _spoutConfig, _state, 
context.getThisTaskIndex(),
+                                                 totalTasks, 
context.getThisTaskId(), topologyInstanceId);
         } else {
             _coordinator = new ZkCoordinator(_connections, conf,
-                    _spoutConfig, _state, context.getThisTaskIndex(),
-                    totalTasks, context.getThisTaskId(), topologyInstanceId);
+                                             _spoutConfig, _state, 
context.getThisTaskIndex(),
+                                             totalTasks, 
context.getThisTaskId(), topologyInstanceId);
         }
 
         context.registerMetric("kafkaOffset", new IMetric() {
@@ -158,7 +146,7 @@ public class KafkaSpout extends BaseRichSpout {
     }
 
     private PartitionManager getManagerForPartition(int partition) {
-        for (PartitionManager partitionManager: 
_coordinator.getMyManagedPartitions()) {
+        for (PartitionManager partitionManager : 
_coordinator.getMyManagedPartitions()) {
             if (partitionManager.getPartition().partition == partition) {
                 return partitionManager;
             }
@@ -203,7 +191,7 @@ public class KafkaSpout extends BaseRichSpout {
 
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
-       if (!Strings.isNullOrEmpty(_spoutConfig.outputStreamId)) {
+        if (!Strings.isNullOrEmpty(_spoutConfig.outputStreamId)) {
             declarer.declareStream(_spoutConfig.outputStreamId, 
_spoutConfig.scheme.getOutputFields());
         } else {
             declarer.declare(_spoutConfig.scheme.getOutputFields());
@@ -211,7 +199,7 @@ public class KafkaSpout extends BaseRichSpout {
     }
 
     @Override
-    public Map<String, Object> getComponentConfiguration () {
+    public Map<String, Object> getComponentConfiguration() {
         Map<String, Object> configuration = super.getComponentConfiguration();
         if (configuration == null) {
             configuration = new HashMap<>();
@@ -240,7 +228,7 @@ public class KafkaSpout extends BaseRichSpout {
             List<Partition> partitions = 
globalPartitionInformation.getOrderedPartitions();
             StringBuilder staticPartitions = new StringBuilder();
             StringBuilder leaderHosts = new StringBuilder();
-            for (Partition partition: partitions) {
+            for (Partition partition : partitions) {
                 staticPartitions.append(partition.partition + ",");
                 leaderHosts.append(partition.host.host + ":" + 
partition.host.port).append(",");
             }
@@ -258,4 +246,10 @@ public class KafkaSpout extends BaseRichSpout {
         }
     }
 
+    static enum EmitState {
+        EMITTED_MORE_LEFT,
+        EMITTED_END,
+        NO_EMITTED
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java
index 76bb896..38958b2 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java
@@ -1,32 +1,18 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.kafka;
 
 import com.google.common.base.Preconditions;
-
-import org.apache.storm.kafka.trident.GlobalPartitionInformation;
-import org.apache.storm.kafka.trident.IBrokerReader;
-import org.apache.storm.kafka.trident.StaticBrokerReader;
-import org.apache.storm.kafka.trident.ZkBrokerReader;
-import org.apache.storm.metric.api.IMetric;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.IOException;
 import java.net.ConnectException;
 import java.net.SocketTimeoutException;
@@ -39,7 +25,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
-
 import kafka.api.FetchRequest;
 import kafka.api.FetchRequestBuilder;
 import kafka.api.PartitionOffsetRequestInfo;
@@ -49,6 +34,13 @@ import kafka.javaapi.OffsetRequest;
 import kafka.javaapi.consumer.SimpleConsumer;
 import kafka.javaapi.message.ByteBufferMessageSet;
 import kafka.message.Message;
+import org.apache.storm.kafka.trident.GlobalPartitionInformation;
+import org.apache.storm.kafka.trident.IBrokerReader;
+import org.apache.storm.kafka.trident.StaticBrokerReader;
+import org.apache.storm.kafka.trident.ZkBrokerReader;
+import org.apache.storm.metric.api.IMetric;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 public class KafkaUtils {
@@ -57,7 +49,7 @@ public class KafkaUtils {
     private static final int NO_OFFSET = -5;
 
     //suppress default constructor for noninstantiablility
-    private KafkaUtils(){
+    private KafkaUtils() {
         throw new AssertionError();
     }
 
@@ -80,7 +72,7 @@ public class KafkaUtils {
         Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new 
HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
         requestInfo.put(topicAndPartition, new 
PartitionOffsetRequestInfo(startOffsetTime, 1));
         OffsetRequest request = new OffsetRequest(
-                requestInfo, kafka.api.OffsetRequest.CurrentVersion(), 
consumer.clientId());
+            requestInfo, kafka.api.OffsetRequest.CurrentVersion(), 
consumer.clientId());
 
         long[] offsets = consumer.getOffsetsBefore(request).offsets(topic, 
partition);
         if (offsets.length > 0) {
@@ -90,121 +82,23 @@ public class KafkaUtils {
         }
     }
 
-    public static class KafkaOffsetMetric implements IMetric {
-        Map<Partition, PartitionManager.OffsetData> _partitionToOffset = new 
HashMap<Partition, PartitionManager.OffsetData>();
-        Set<Partition> _partitions;
-        DynamicPartitionConnections _connections;
-
-        public KafkaOffsetMetric(DynamicPartitionConnections connections) {
-            _connections = connections;
-        }
-
-        public void setOffsetData(Partition partition, 
PartitionManager.OffsetData offsetData) {
-            _partitionToOffset.put(partition, offsetData);
-        }
-
-        private class TopicMetrics {
-            long totalSpoutLag = 0;
-            long totalEarliestTimeOffset = 0;
-            long totalLatestTimeOffset = 0;
-            long totalLatestEmittedOffset = 0;
-            long totalLatestCompletedOffset = 0;
-        }
-
-        @Override
-        public Object getValueAndReset() {
-            try {
-                HashMap<String, Long> ret = new HashMap<>();
-                if (_partitions != null && _partitions.size() == 
_partitionToOffset.size()) {
-                    Map<String,TopicMetrics> topicMetricsMap = new 
TreeMap<String, TopicMetrics>();
-                    for (Map.Entry<Partition, PartitionManager.OffsetData> e : 
_partitionToOffset.entrySet()) {
-                        Partition partition = e.getKey();
-                        SimpleConsumer consumer = 
_connections.getConnection(partition);
-                        if (consumer == null) {
-                            LOG.warn("partitionToOffset contains partition not 
found in _connections. Stale partition data?");
-                            return null;
-                        }
-                        long latestTimeOffset = getOffset(consumer, 
partition.topic, partition.partition, kafka.api.OffsetRequest.LatestTime());
-                        long earliestTimeOffset = getOffset(consumer, 
partition.topic, partition.partition, kafka.api.OffsetRequest.EarliestTime());
-                        if (latestTimeOffset == KafkaUtils.NO_OFFSET) {
-                            LOG.warn("No data found in Kafka Partition " + 
partition.getId());
-                            return null;
-                        }
-                        long latestEmittedOffset = 
e.getValue().latestEmittedOffset;
-                        long latestCompletedOffset = 
e.getValue().latestCompletedOffset;
-                        long spoutLag = latestTimeOffset - 
latestCompletedOffset;
-                        String topic = partition.topic;
-                        String metricPath = partition.getId();
-                        //Handle the case where Partition Path Id does not 
contain topic name Partition.getId() == "partition_" + partition
-                        if (!metricPath.startsWith(topic + "/")) {
-                            metricPath = topic + "/" + metricPath;
-                        }
-                        ret.put(metricPath + "/" + "spoutLag", spoutLag);
-                        ret.put(metricPath + "/" + "earliestTimeOffset", 
earliestTimeOffset);
-                        ret.put(metricPath + "/" + "latestTimeOffset", 
latestTimeOffset);
-                        ret.put(metricPath + "/" + "latestEmittedOffset", 
latestEmittedOffset);
-                        ret.put(metricPath + "/" + "latestCompletedOffset", 
latestCompletedOffset);
-
-                        if (!topicMetricsMap.containsKey(partition.topic)) {
-                            topicMetricsMap.put(partition.topic,new 
TopicMetrics());
-                        }
-
-                        TopicMetrics topicMetrics = 
topicMetricsMap.get(partition.topic);
-                        topicMetrics.totalSpoutLag += spoutLag;
-                        topicMetrics.totalEarliestTimeOffset += 
earliestTimeOffset;
-                        topicMetrics.totalLatestTimeOffset += latestTimeOffset;
-                        topicMetrics.totalLatestEmittedOffset += 
latestEmittedOffset;
-                        topicMetrics.totalLatestCompletedOffset += 
latestCompletedOffset;
-                    }
-
-                    for(Map.Entry<String, TopicMetrics> e : 
topicMetricsMap.entrySet()) {
-                        String topic = e.getKey();
-                        TopicMetrics topicMetrics = e.getValue();
-                        ret.put(topic + "/" + "totalSpoutLag", 
topicMetrics.totalSpoutLag);
-                        ret.put(topic + "/" + "totalEarliestTimeOffset", 
topicMetrics.totalEarliestTimeOffset);
-                        ret.put(topic + "/" + "totalLatestTimeOffset", 
topicMetrics.totalLatestTimeOffset);
-                        ret.put(topic + "/" + "totalLatestEmittedOffset", 
topicMetrics.totalLatestEmittedOffset);
-                        ret.put(topic + "/" + "totalLatestCompletedOffset", 
topicMetrics.totalLatestCompletedOffset);
-                    }
-
-                    return ret;
-                } else {
-                    LOG.info("Metrics Tick: Not enough data to calculate spout 
lag.");
-                }
-            } catch (Throwable t) {
-                LOG.warn("Metrics Tick: Exception when computing kafkaOffset 
metric.", t);
-            }
-            return null;
-        }
-
-        public void refreshPartitions(Set<Partition> partitions) {
-            _partitions = partitions;
-            Iterator<Partition> it = _partitionToOffset.keySet().iterator();
-            while (it.hasNext()) {
-                if (!partitions.contains(it.next())) {
-                    it.remove();
-                }
-            }
-        }
-    }
-
     public static ByteBufferMessageSet fetchMessages(KafkaConfig config, 
SimpleConsumer consumer, Partition partition, long offset)
-            throws TopicOffsetOutOfRangeException, 
FailedFetchException,RuntimeException {
+        throws TopicOffsetOutOfRangeException, FailedFetchException, 
RuntimeException {
         ByteBufferMessageSet msgs = null;
         String topic = partition.topic;
         int partitionId = partition.partition;
         FetchRequestBuilder builder = new FetchRequestBuilder();
         FetchRequest fetchRequest = builder.addFetch(topic, partitionId, 
offset, config.fetchSizeBytes).
-                
clientId(config.clientId).maxWait(config.fetchMaxWait).minBytes(config.minFetchByte).build();
+            
clientId(config.clientId).maxWait(config.fetchMaxWait).minBytes(config.minFetchByte).build();
         FetchResponse fetchResponse;
         try {
             fetchResponse = consumer.fetch(fetchRequest);
         } catch (Exception e) {
             if (e instanceof ConnectException ||
-                    e instanceof SocketTimeoutException ||
-                    e instanceof IOException ||
-                    e instanceof UnresolvedAddressException
-                    ) {
+                e instanceof SocketTimeoutException ||
+                e instanceof IOException ||
+                e instanceof UnresolvedAddressException
+                ) {
                 LOG.warn("Network error when fetching messages:", e);
                 throw new FailedFetchException(e);
             } else {
@@ -225,11 +119,11 @@ public class KafkaUtils {
         } else {
             msgs = fetchResponse.messageSet(topic, partitionId);
         }
-        LOG.debug("Messages fetched. [config = {}], [consumer = {}], 
[partition = {}], [offset = {}], [msgs = {}]", config, consumer, partition, 
offset, msgs);
+        LOG.debug("Messages fetched. [config = {}], [consumer = {}], 
[partition = {}], [offset = {}], [msgs = {}]", config, consumer,
+                  partition, offset, msgs);
         return msgs;
     }
 
-
     public static Iterable<List<Object>> generateTuples(KafkaConfig 
kafkaConfig, Message msg, String topic) {
         Iterable<List<Object>> tups;
         ByteBuffer payload = msg.payload();
@@ -241,15 +135,16 @@ public class KafkaUtils {
             tups = ((KeyValueSchemeAsMultiScheme) 
kafkaConfig.scheme).deserializeKeyAndValue(key, payload);
         } else {
             if (kafkaConfig.scheme instanceof StringMultiSchemeWithTopic) {
-                tups = 
((StringMultiSchemeWithTopic)kafkaConfig.scheme).deserializeWithTopic(topic, 
payload);
+                tups = ((StringMultiSchemeWithTopic) 
kafkaConfig.scheme).deserializeWithTopic(topic, payload);
             } else {
                 tups = kafkaConfig.scheme.deserialize(payload);
             }
         }
         return tups;
     }
-    
-    public static Iterable<List<Object>> 
generateTuples(MessageMetadataSchemeAsMultiScheme scheme, Message msg, 
Partition partition, long offset) {
+
+    public static Iterable<List<Object>> 
generateTuples(MessageMetadataSchemeAsMultiScheme scheme, Message msg, 
Partition partition,
+                                                        long offset) {
         ByteBuffer payload = msg.payload();
         if (payload == null) {
             return null;
@@ -257,18 +152,18 @@ public class KafkaUtils {
         return scheme.deserializeMessageWithMetadata(payload, partition, 
offset);
     }
 
-
     public static List<Partition> 
calculatePartitionsForTask(List<GlobalPartitionInformation> partitons,
-            int totalTasks, int taskIndex, int taskId) {
+                                                             int totalTasks, 
int taskIndex, int taskId) {
         Preconditions.checkArgument(taskIndex < totalTasks, "task index must 
be less that total tasks");
         List<Partition> taskPartitions = new ArrayList<Partition>();
         List<Partition> partitions = new ArrayList<Partition>();
-        for(GlobalPartitionInformation partitionInformation : partitons) {
+        for (GlobalPartitionInformation partitionInformation : partitons) {
             partitions.addAll(partitionInformation.getOrderedPartitions());
         }
         int numPartitions = partitions.size();
         if (numPartitions < totalTasks) {
-            LOG.warn("there are more tasks than partitions (tasks: " + 
totalTasks + "; partitions: " + numPartitions + "), some tasks will be idle");
+            LOG.warn("there are more tasks than partitions (tasks: " + 
totalTasks + "; partitions: " + numPartitions +
+                     "), some tasks will be idle");
         }
         for (int i = taskIndex; i < numPartitions; i += totalTasks) {
             Partition taskPartition = partitions.get(i);
@@ -290,4 +185,104 @@ public class KafkaUtils {
     public static String taskPrefix(int taskIndex, int totalTasks, int taskId) 
{
         return "Task [" + (taskIndex + 1) + "/" + totalTasks + "], Task-ID: " 
+ taskId;
     }
+
+    public static class KafkaOffsetMetric implements IMetric {
+        Map<Partition, PartitionManager.OffsetData> _partitionToOffset = new 
HashMap<Partition, PartitionManager.OffsetData>();
+        Set<Partition> _partitions;
+        DynamicPartitionConnections _connections;
+
+        public KafkaOffsetMetric(DynamicPartitionConnections connections) {
+            _connections = connections;
+        }
+
+        public void setOffsetData(Partition partition, 
PartitionManager.OffsetData offsetData) {
+            _partitionToOffset.put(partition, offsetData);
+        }
+
+        @Override
+        public Object getValueAndReset() {
+            try {
+                HashMap<String, Long> ret = new HashMap<>();
+                if (_partitions != null && _partitions.size() == 
_partitionToOffset.size()) {
+                    Map<String, TopicMetrics> topicMetricsMap = new 
TreeMap<String, TopicMetrics>();
+                    for (Map.Entry<Partition, PartitionManager.OffsetData> e : 
_partitionToOffset.entrySet()) {
+                        Partition partition = e.getKey();
+                        SimpleConsumer consumer = 
_connections.getConnection(partition);
+                        if (consumer == null) {
+                            LOG.warn("partitionToOffset contains partition not 
found in _connections. Stale partition data?");
+                            return null;
+                        }
+                        long latestTimeOffset =
+                            getOffset(consumer, partition.topic, 
partition.partition, kafka.api.OffsetRequest.LatestTime());
+                        long earliestTimeOffset =
+                            getOffset(consumer, partition.topic, 
partition.partition, kafka.api.OffsetRequest.EarliestTime());
+                        if (latestTimeOffset == KafkaUtils.NO_OFFSET) {
+                            LOG.warn("No data found in Kafka Partition " + 
partition.getId());
+                            return null;
+                        }
+                        long latestEmittedOffset = 
e.getValue().latestEmittedOffset;
+                        long latestCompletedOffset = 
e.getValue().latestCompletedOffset;
+                        long spoutLag = latestTimeOffset - 
latestCompletedOffset;
+                        String topic = partition.topic;
+                        String metricPath = partition.getId();
+                        //Handle the case where Partition Path Id does not 
contain topic name Partition.getId() == "partition_" + partition
+                        if (!metricPath.startsWith(topic + "/")) {
+                            metricPath = topic + "/" + metricPath;
+                        }
+                        ret.put(metricPath + "/" + "spoutLag", spoutLag);
+                        ret.put(metricPath + "/" + "earliestTimeOffset", 
earliestTimeOffset);
+                        ret.put(metricPath + "/" + "latestTimeOffset", 
latestTimeOffset);
+                        ret.put(metricPath + "/" + "latestEmittedOffset", 
latestEmittedOffset);
+                        ret.put(metricPath + "/" + "latestCompletedOffset", 
latestCompletedOffset);
+
+                        if (!topicMetricsMap.containsKey(partition.topic)) {
+                            topicMetricsMap.put(partition.topic, new 
TopicMetrics());
+                        }
+
+                        TopicMetrics topicMetrics = 
topicMetricsMap.get(partition.topic);
+                        topicMetrics.totalSpoutLag += spoutLag;
+                        topicMetrics.totalEarliestTimeOffset += 
earliestTimeOffset;
+                        topicMetrics.totalLatestTimeOffset += latestTimeOffset;
+                        topicMetrics.totalLatestEmittedOffset += 
latestEmittedOffset;
+                        topicMetrics.totalLatestCompletedOffset += 
latestCompletedOffset;
+                    }
+
+                    for (Map.Entry<String, TopicMetrics> e : 
topicMetricsMap.entrySet()) {
+                        String topic = e.getKey();
+                        TopicMetrics topicMetrics = e.getValue();
+                        ret.put(topic + "/" + "totalSpoutLag", 
topicMetrics.totalSpoutLag);
+                        ret.put(topic + "/" + "totalEarliestTimeOffset", 
topicMetrics.totalEarliestTimeOffset);
+                        ret.put(topic + "/" + "totalLatestTimeOffset", 
topicMetrics.totalLatestTimeOffset);
+                        ret.put(topic + "/" + "totalLatestEmittedOffset", 
topicMetrics.totalLatestEmittedOffset);
+                        ret.put(topic + "/" + "totalLatestCompletedOffset", 
topicMetrics.totalLatestCompletedOffset);
+                    }
+
+                    return ret;
+                } else {
+                    LOG.info("Metrics Tick: Not enough data to calculate spout 
lag.");
+                }
+            } catch (Throwable t) {
+                LOG.warn("Metrics Tick: Exception when computing kafkaOffset 
metric.", t);
+            }
+            return null;
+        }
+
+        public void refreshPartitions(Set<Partition> partitions) {
+            _partitions = partitions;
+            Iterator<Partition> it = _partitionToOffset.keySet().iterator();
+            while (it.hasNext()) {
+                if (!partitions.contains(it.next())) {
+                    it.remove();
+                }
+            }
+        }
+
+        private class TopicMetrics {
+            long totalSpoutLag = 0;
+            long totalEarliestTimeOffset = 0;
+            long totalLatestTimeOffset = 0;
+            long totalLatestEmittedOffset = 0;
+            long totalLatestCompletedOffset = 0;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KeyValueScheme.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KeyValueScheme.java 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KeyValueScheme.java
index 3f9acc2..6bb1dc5 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KeyValueScheme.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KeyValueScheme.java
@@ -1,26 +1,20 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
-package org.apache.storm.kafka;
 
-import org.apache.storm.spout.Scheme;
+package org.apache.storm.kafka;
 
 import java.nio.ByteBuffer;
 import java.util.List;
+import org.apache.storm.spout.Scheme;
 
 public interface KeyValueScheme extends Scheme {
     List<Object> deserializeKeyAndValue(ByteBuffer key, ByteBuffer value);

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KeyValueSchemeAsMultiScheme.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KeyValueSchemeAsMultiScheme.java
 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KeyValueSchemeAsMultiScheme.java
index 25053dd..00983cc 100644
--- 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KeyValueSchemeAsMultiScheme.java
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KeyValueSchemeAsMultiScheme.java
@@ -1,27 +1,21 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
-package org.apache.storm.kafka;
 
-import org.apache.storm.spout.SchemeAsMultiScheme;
+package org.apache.storm.kafka;
 
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.List;
+import org.apache.storm.spout.SchemeAsMultiScheme;
 
 public class KeyValueSchemeAsMultiScheme extends SchemeAsMultiScheme {
 
@@ -30,9 +24,12 @@ public class KeyValueSchemeAsMultiScheme extends 
SchemeAsMultiScheme {
     }
 
     public Iterable<List<Object>> deserializeKeyAndValue(final ByteBuffer key, 
final ByteBuffer value) {
-        List<Object> o = ((KeyValueScheme)scheme).deserializeKeyAndValue(key, 
value);
-        if(o == null) return null;
-        else return Arrays.asList(o);
+        List<Object> o = ((KeyValueScheme) scheme).deserializeKeyAndValue(key, 
value);
+        if (o == null) {
+            return null;
+        } else {
+            return Arrays.asList(o);
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/MessageMetadataScheme.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/MessageMetadataScheme.java
 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/MessageMetadataScheme.java
index d0fc08e..f77f419 100644
--- 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/MessageMetadataScheme.java
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/MessageMetadataScheme.java
@@ -1,26 +1,20 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
-package org.apache.storm.kafka;
 
-import org.apache.storm.spout.Scheme;
+package org.apache.storm.kafka;
 
 import java.nio.ByteBuffer;
 import java.util.List;
+import org.apache.storm.spout.Scheme;
 
 public interface MessageMetadataScheme extends Scheme {
     List<Object> deserializeMessageWithMetadata(ByteBuffer message, Partition 
partition, long offset);

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/MessageMetadataSchemeAsMultiScheme.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/MessageMetadataSchemeAsMultiScheme.java
 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/MessageMetadataSchemeAsMultiScheme.java
index a53fa88..f52a772 100644
--- 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/MessageMetadataSchemeAsMultiScheme.java
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/MessageMetadataSchemeAsMultiScheme.java
@@ -1,26 +1,20 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.kafka;
 
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.List;
-
 import org.apache.storm.spout.SchemeAsMultiScheme;
 
 public class MessageMetadataSchemeAsMultiScheme extends SchemeAsMultiScheme {

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/Partition.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/Partition.java 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/Partition.java
index 99bb9d3..9edf28b 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/Partition.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/Partition.java
@@ -1,20 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.kafka;
 
 import com.google.common.base.Objects;
@@ -33,16 +28,17 @@ public class Partition implements ISpoutPartition, 
Serializable {
 
     // for kryo compatibility
     private Partition() {
-       
+
     }
+
     public Partition(Broker host, String topic, int partition) {
         this.topic = topic;
         this.host = host;
         this.partition = partition;
         this.bUseTopicNameForPartitionPathId = false;
     }
-    
-    public Partition(Broker host, String topic, int partition,Boolean 
bUseTopicNameForPartitionPathId) {
+
+    public Partition(Broker host, String topic, int partition, Boolean 
bUseTopicNameForPartitionPathId) {
         this.topic = topic;
         this.host = host;
         this.partition = partition;
@@ -63,22 +59,23 @@ public class Partition implements ISpoutPartition, 
Serializable {
             return false;
         }
         final Partition other = (Partition) obj;
-        return Objects.equal(this.host, other.host) && 
Objects.equal(this.topic, other.topic) && Objects.equal(this.partition, 
other.partition);
+        return Objects.equal(this.host, other.host) && 
Objects.equal(this.topic, other.topic) &&
+               Objects.equal(this.partition, other.partition);
     }
 
     @Override
     public String toString() {
         return "Partition{" +
-                "host=" + host +
-                ", topic=" + topic +
-                ", partition=" + partition +
-                '}';
+               "host=" + host +
+               ", topic=" + topic +
+               ", partition=" + partition +
+               '}';
     }
 
     @Override
     public String getId() {
         if (bUseTopicNameForPartitionPathId) {
-            return  topic  + "/partition_" + partition;
+            return topic + "/partition_" + partition;
         } else {
             //Keep the Partition Id backward compatible with Old 
implementation of Partition.getId() == "partition_" + partition
             return "partition_" + partition;

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionCoordinator.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionCoordinator.java 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionCoordinator.java
index c9004fa..4dba709 100644
--- 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionCoordinator.java
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionCoordinator.java
@@ -1,20 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.kafka;
 
 import java.util.List;

Reply via email to