http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
index a9a18fd..5805c21 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
@@ -1,26 +1,30 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.kafka;
 
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableMap;
 import java.io.Serializable;
-
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.javaapi.message.ByteBufferMessageSet;
+import kafka.message.MessageAndOffset;
 import org.apache.storm.Config;
 import org.apache.storm.kafka.KafkaSpout.EmitState;
 import org.apache.storm.kafka.trident.MaxMetric;
@@ -32,12 +36,6 @@ import org.apache.storm.spout.SpoutOutputCollector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
-
-import kafka.javaapi.consumer.SimpleConsumer;
-import kafka.javaapi.message.ByteBufferMessageSet;
-import kafka.message.MessageAndOffset;
-
 public class PartitionManager {
     private static final Logger LOG = 
LoggerFactory.getLogger(PartitionManager.class);
 
@@ -50,11 +48,8 @@ public class PartitionManager {
     // Count of messages which were not retried because failedMsgRetryManager 
didn't consider offset eligible for
     // retry
     private final CountMetric _messageIneligibleForRetryCount;
-    Long _emittedToOffset;
-    // _pending key = Kafka offset, value = time at which the message was 
first submitted to the topology
-    private SortedMap<Long,Long> _pending = new TreeMap<Long,Long>();
     private final FailedMsgRetryManager _failedMsgRetryManager;
-
+    Long _emittedToOffset;
     // retryRecords key = Kafka offset, value = retry info for the given 
message
     Long _committedTo;
     LinkedList<MessageAndOffset> _waitingToEmit = new 
LinkedList<MessageAndOffset>();
@@ -66,15 +61,16 @@ public class PartitionManager {
     ZkState _state;
     Map _topoConf;
     long numberFailed, numberAcked;
+    // _pending key = Kafka offset, value = time at which the message was 
first submitted to the topology
+    private SortedMap<Long, Long> _pending = new TreeMap<Long, Long>();
 
     public PartitionManager(
-            DynamicPartitionConnections connections,
-            String topologyInstanceId,
-            ZkState state,
-            Map<String, Object> topoConf,
-            SpoutConfig spoutConfig,
-            Partition id)
-    {
+        DynamicPartitionConnections connections,
+        String topologyInstanceId,
+        ZkState state,
+        Map<String, Object> topoConf,
+        SpoutConfig spoutConfig,
+        Partition id) {
         this(connections, topologyInstanceId, state, topoConf, spoutConfig, 
id, null);
     }
 
@@ -82,13 +78,13 @@ public class PartitionManager {
      * @param previousManager previous partition manager if manager for 
partition is being recreated
      */
     public PartitionManager(
-            DynamicPartitionConnections connections,
-            String topologyInstanceId,
-            ZkState state,
-            Map<String, Object> topoConf,
-            SpoutConfig spoutConfig,
-            Partition id,
-            PartitionManager previousManager) {
+        DynamicPartitionConnections connections,
+        String topologyInstanceId,
+        ZkState state,
+        Map<String, Object> topoConf,
+        SpoutConfig spoutConfig,
+        Partition id,
+        PartitionManager previousManager) {
         _partition = id;
         _connections = connections;
         _spoutConfig = spoutConfig;
@@ -105,16 +101,16 @@ public class PartitionManager {
             _waitingToEmit = previousManager._waitingToEmit;
             _pending = previousManager._pending;
             LOG.info("Recreating PartitionManager based on previous manager, 
_waitingToEmit size: {}, _pending size: {}",
-                    _waitingToEmit.size(),
-                    _pending.size());
+                     _waitingToEmit.size(),
+                     _pending.size());
         } else {
             try {
                 _failedMsgRetryManager = (FailedMsgRetryManager) 
Class.forName(spoutConfig.failedMsgRetryManagerClass).newInstance();
                 _failedMsgRetryManager.prepare(spoutConfig, _topoConf);
             } catch (ClassNotFoundException | InstantiationException | 
IllegalAccessException e) {
                 throw new IllegalArgumentException(String.format("Failed to 
create an instance of <%s> from: <%s>",
-                        FailedMsgRetryManager.class,
-                        spoutConfig.failedMsgRetryManagerClass), e);
+                                                                 
FailedMsgRetryManager.class,
+                                                                 
spoutConfig.failedMsgRetryManagerClass), e);
             }
 
             String jsonTopologyId = null;
@@ -142,7 +138,8 @@ public class PartitionManager {
                 LOG.info("Topology change detected and ignore zookeeper 
offsets set to true, using configuration to determine offset");
             } else {
                 _committedTo = jsonOffset;
-                LOG.info("Read last commit offset from zookeeper: " + 
_committedTo + "; old topology_id: " + jsonTopologyId + " - new topology_id: " 
+ topologyInstanceId);
+                LOG.info("Read last commit offset from zookeeper: " + 
_committedTo + "; old topology_id: " + jsonTopologyId +
+                         " - new topology_id: " + topologyInstanceId);
             }
 
             if (currentOffset - _committedTo > spoutConfig.maxOffsetBehind || 
_committedTo <= 0) {
@@ -150,7 +147,8 @@ public class PartitionManager {
                 Long lastCommittedOffset = _committedTo;
                 _committedTo = currentOffset;
                 LOG.info("Commit offset " + lastCommittedOffset + " is more 
than " +
-                        spoutConfig.maxOffsetBehind + " behind latest offset " 
+ currentOffset + ", resetting to startOffsetTime=" + 
spoutConfig.startOffsetTime);
+                         spoutConfig.maxOffsetBehind + " behind latest offset 
" + currentOffset + ", resetting to startOffsetTime=" +
+                         spoutConfig.startOffsetTime);
             }
 
             LOG.info("Starting Kafka " + _consumer.host() + " " + id + " from 
offset " + _committedTo);
@@ -191,13 +189,14 @@ public class PartitionManager {
 
             Iterable<List<Object>> tups;
             if (_spoutConfig.scheme instanceof 
MessageMetadataSchemeAsMultiScheme) {
-                tups = 
KafkaUtils.generateTuples((MessageMetadataSchemeAsMultiScheme) 
_spoutConfig.scheme, toEmit.message(), _partition, toEmit.offset());
+                tups = 
KafkaUtils.generateTuples((MessageMetadataSchemeAsMultiScheme) 
_spoutConfig.scheme, toEmit.message(), _partition,
+                                                 toEmit.offset());
             } else {
                 tups = KafkaUtils.generateTuples(_spoutConfig, 
toEmit.message(), _partition.topic);
             }
 
             if ((tups != null) && tups.iterator().hasNext()) {
-               if (!Strings.isNullOrEmpty(_spoutConfig.outputStreamId)) {
+                if (!Strings.isNullOrEmpty(_spoutConfig.outputStreamId)) {
                     for (List<Object> tup : tups) {
                         collector.emit(_spoutConfig.outputStreamId, tup, new 
KafkaMessageId(_partition, toEmit.offset()));
                     }
@@ -305,16 +304,17 @@ public class PartitionManager {
     public void fail(Long offset) {
         if (offset < _emittedToOffset - _spoutConfig.maxOffsetBehind) {
             LOG.info(
-                    "Skipping failed tuple at offset={}" +
-                        " because it's more than maxOffsetBehind={}" +
-                        " behind _emittedToOffset={} for {}",
+                "Skipping failed tuple at offset={}" +
+                " because it's more than maxOffsetBehind={}" +
+                " behind _emittedToOffset={} for {}",
                 offset,
                 _spoutConfig.maxOffsetBehind,
                 _emittedToOffset,
                 _partition
             );
         } else {
-            LOG.debug("Failing at offset={} with _pending.size()={} pending 
and _emittedToOffset={} for {}", offset, _pending.size(), _emittedToOffset, 
_partition);
+            LOG.debug("Failing at offset={} with _pending.size()={} pending 
and _emittedToOffset={} for {}", offset, _pending.size(),
+                      _emittedToOffset, _partition);
             numberFailed++;
             if (numberAcked == 0 && numberFailed > 
_spoutConfig.maxOffsetBehind) {
                 throw new RuntimeException("Too many tuple failures");
@@ -337,19 +337,22 @@ public class PartitionManager {
     public void commit() {
         long lastCompletedOffset = lastCompletedOffset();
         if (_committedTo != lastCompletedOffset) {
-            LOG.debug("Writing last completed offset ({}) to ZK for {} for 
topology: {}", lastCompletedOffset, _partition, _topologyInstanceId);
+            LOG.debug("Writing last completed offset ({}) to ZK for {} for 
topology: {}", lastCompletedOffset, _partition,
+                      _topologyInstanceId);
             Map<Object, Object> data = (Map<Object, Object>) 
ImmutableMap.builder()
-                    .put("topology", ImmutableMap.of("id", _topologyInstanceId,
-                            "name", _topoConf.get(Config.TOPOLOGY_NAME)))
-                    .put("offset", lastCompletedOffset)
-                    .put("partition", _partition.partition)
-                    .put("broker", ImmutableMap.of("host", 
_partition.host.host,
-                            "port", _partition.host.port))
-                    .put("topic", _partition.topic).build();
+                                                                         
.put("topology", ImmutableMap.of("id", _topologyInstanceId,
+                                                                               
                           "name", _topoConf
+                                                                               
                               .get(Config.TOPOLOGY_NAME)))
+                                                                         
.put("offset", lastCompletedOffset)
+                                                                         
.put("partition", _partition.partition)
+                                                                         
.put("broker", ImmutableMap.of("host", _partition.host.host,
+                                                                               
                         "port", _partition.host.port))
+                                                                         
.put("topic", _partition.topic).build();
             _state.writeJSON(committedPath(), data);
 
             _committedTo = lastCompletedOffset;
-            LOG.debug("Wrote last completed offset ({}) to ZK for {} for 
topology: {}", lastCompletedOffset, _partition, _topologyInstanceId);
+            LOG.debug("Wrote last completed offset ({}) to ZK for {} for 
topology: {}", lastCompletedOffset, _partition,
+                      _topologyInstanceId);
         } else {
             LOG.debug("No new offset for {} for topology: {}", _partition, 
_topologyInstanceId);
         }
@@ -377,7 +380,7 @@ public class PartitionManager {
 
     public void close() {
         commit();
-        _connections.unregister(_partition.host, _partition.topic , 
_partition.partition);
+        _connections.unregister(_partition.host, _partition.topic, 
_partition.partition);
     }
 
     static class KafkaMessageId implements Serializable {

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java
index 0ad19d7..74a4a3b 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java
@@ -1,20 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.kafka;
 
 import java.io.Serializable;

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java
index cd23ca6..8d12ee1 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java
@@ -1,33 +1,30 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.kafka;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import org.apache.storm.kafka.trident.GlobalPartitionInformation;
 
-import java.util.*;
-
 
 public class StaticCoordinator implements PartitionCoordinator {
     Map<Partition, PartitionManager> _managers = new HashMap<Partition, 
PartitionManager>();
     List<PartitionManager> _allManagers = new ArrayList<>();
 
     public StaticCoordinator(DynamicPartitionConnections connections, 
Map<String, Object> topoConf, SpoutConfig config, ZkState state,
-            int taskIndex, int totalTasks, int taskId, String 
topologyInstanceId) {
+                             int taskIndex, int totalTasks, int taskId, String 
topologyInstanceId) {
         StaticHosts hosts = (StaticHosts) config.hosts;
         List<GlobalPartitionInformation> partitions = new 
ArrayList<GlobalPartitionInformation>();
         partitions.add(hosts.getPartitionInformation());

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticHosts.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticHosts.java 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticHosts.java
index 33d5c16..1f8f903 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticHosts.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticHosts.java
@@ -1,20 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.kafka;
 
 import org.apache.storm.kafka.trident.GlobalPartitionInformation;

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticPartitionConnections.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticPartitionConnections.java
 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticPartitionConnections.java
index 77a7211..2c9d4f2 100644
--- 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticPartitionConnections.java
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticPartitionConnections.java
@@ -1,26 +1,20 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
-package org.apache.storm.kafka;
 
-import kafka.javaapi.consumer.SimpleConsumer;
+package org.apache.storm.kafka;
 
 import java.util.HashMap;
 import java.util.Map;
+import kafka.javaapi.consumer.SimpleConsumer;
 
 public class StaticPartitionConnections {
     Map<Integer, SimpleConsumer> _kafka = new HashMap<Integer, 
SimpleConsumer>();

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringKeyValueScheme.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringKeyValueScheme.java 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringKeyValueScheme.java
index 9ef7f74..3d62961 100644
--- 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringKeyValueScheme.java
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringKeyValueScheme.java
@@ -1,33 +1,27 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.kafka;
 
-import org.apache.storm.tuple.Values;
 import com.google.common.collect.ImmutableMap;
-
 import java.nio.ByteBuffer;
 import java.util.List;
+import org.apache.storm.tuple.Values;
 
 public class StringKeyValueScheme extends StringScheme implements 
KeyValueScheme {
 
     @Override
     public List<Object> deserializeKeyAndValue(ByteBuffer key, ByteBuffer 
value) {
-        if ( key == null ) {
+        if (key == null) {
             return deserialize(value);
         }
         String keyString = StringScheme.deserializeString(key);

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringMessageAndMetadataScheme.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringMessageAndMetadataScheme.java
 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringMessageAndMetadataScheme.java
index e57738d..ab6e500 100644
--- 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringMessageAndMetadataScheme.java
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringMessageAndMetadataScheme.java
@@ -1,33 +1,26 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
-package org.apache.storm.kafka;
 
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
+package org.apache.storm.kafka;
 
 import java.nio.ByteBuffer;
 import java.util.List;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
 
 public class StringMessageAndMetadataScheme extends StringScheme implements 
MessageMetadataScheme {
-    private static final long serialVersionUID = -5441841920447947374L;
-
     public static final String STRING_SCHEME_PARTITION_KEY = "partition";
     public static final String STRING_SCHEME_OFFSET = "offset";
+    private static final long serialVersionUID = -5441841920447947374L;
 
     @Override
     public List<Object> deserializeMessageWithMetadata(ByteBuffer message, 
Partition partition, long offset) {

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringMultiSchemeWithTopic.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringMultiSchemeWithTopic.java
 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringMultiSchemeWithTopic.java
index e197318..061b30a 100644
--- 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringMultiSchemeWithTopic.java
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringMultiSchemeWithTopic.java
@@ -1,32 +1,26 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
-package org.apache.storm.kafka;
 
-import org.apache.storm.spout.MultiScheme;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
+package org.apache.storm.kafka;
 
 import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
+import org.apache.storm.spout.MultiScheme;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
 
 public class StringMultiSchemeWithTopic
-        implements MultiScheme {
+    implements MultiScheme {
     public static final String STRING_SCHEME_KEY = "str";
 
     public static final String TOPIC_KEY = "topic";

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringScheme.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringScheme.java 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringScheme.java
index b0b8d27..bcbc058 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringScheme.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringScheme.java
@@ -1,39 +1,29 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
-package org.apache.storm.kafka;
 
-import org.apache.storm.spout.Scheme;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
+package org.apache.storm.kafka;
 
 import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
+import org.apache.storm.spout.Scheme;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
 
 public class StringScheme implements Scheme {
-    private static final Charset UTF8_CHARSET = StandardCharsets.UTF_8;
     public static final String STRING_SCHEME_KEY = "str";
-
-    public List<Object> deserialize(ByteBuffer bytes) {
-        return new Values(deserializeString(bytes));
-    }
+    private static final Charset UTF8_CHARSET = StandardCharsets.UTF_8;
 
     public static String deserializeString(ByteBuffer string) {
         if (string.hasArray()) {
@@ -44,6 +34,10 @@ public class StringScheme implements Scheme {
         }
     }
 
+    public List<Object> deserialize(ByteBuffer bytes) {
+        return new Values(deserializeString(bytes));
+    }
+
     public Fields getOutputFields() {
         return new Fields(STRING_SCHEME_KEY);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/TopicOffsetOutOfRangeException.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/TopicOffsetOutOfRangeException.java
 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/TopicOffsetOutOfRangeException.java
index 8e1c98f..613a62e 100644
--- 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/TopicOffsetOutOfRangeException.java
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/TopicOffsetOutOfRangeException.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.kafka;
 
 public class TopicOffsetOutOfRangeException extends RuntimeException {

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java
index 136dc51..bc9ebd5 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java
@@ -1,27 +1,26 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.kafka;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.storm.kafka.trident.GlobalPartitionInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.storm.kafka.trident.GlobalPartitionInformation;
-
-import java.util.*;
 
 import static org.apache.storm.kafka.KafkaUtils.taskPrefix;
 
@@ -43,12 +42,13 @@ public class ZkCoordinator implements PartitionCoordinator {
     Map _topoConf;
 
     public ZkCoordinator(DynamicPartitionConnections connections, Map<String, 
Object> topoConf, SpoutConfig spoutConfig, ZkState state,
-            int taskIndex, int totalTasks, int taskId, String 
topologyInstanceId) {
-        this(connections, topoConf, spoutConfig, state, taskIndex, totalTasks, 
taskId, topologyInstanceId, buildReader(topoConf, spoutConfig));
+                         int taskIndex, int totalTasks, int taskId, String 
topologyInstanceId) {
+        this(connections, topoConf, spoutConfig, state, taskIndex, totalTasks, 
taskId, topologyInstanceId,
+             buildReader(topoConf, spoutConfig));
     }
 
     public ZkCoordinator(DynamicPartitionConnections connections, Map<String, 
Object> topoConf, SpoutConfig spoutConfig, ZkState state,
-            int taskIndex, int totalTasks, int taskId, String 
topologyInstanceId, DynamicBrokersReader reader) {
+                         int taskIndex, int totalTasks, int taskId, String 
topologyInstanceId, DynamicBrokersReader reader) {
         _spoutConfig = spoutConfig;
         _connections = connections;
         _taskIndex = taskIndex;
@@ -103,13 +103,13 @@ public class ZkCoordinator implements 
PartitionCoordinator {
 
             for (Partition id : newPartitions) {
                 PartitionManager man = new PartitionManager(
-                        _connections,
-                        _topologyInstanceId,
-                        _state,
-                        _topoConf,
-                        _spoutConfig,
-                        id,
-                        deletedManagers.get(id.partition));
+                    _connections,
+                    _topologyInstanceId,
+                    _state,
+                    _topoConf,
+                    _spoutConfig,
+                    id,
+                    deletedManagers.get(id.partition));
                 _managers.put(id, man);
             }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkHosts.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkHosts.java 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkHosts.java
index 2c2a26f..9c6b29d 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkHosts.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkHosts.java
@@ -1,20 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.kafka;
 
 

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkState.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkState.java 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkState.java
index 7241c60..3d27173 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkState.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkState.java
@@ -1,53 +1,59 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.kafka;
 
-import org.apache.storm.Config;
-import org.apache.storm.utils.ObjectReader;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.RetryNTimes;
+import org.apache.storm.Config;
+import org.apache.storm.utils.ObjectReader;
 import org.apache.zookeeper.CreateMode;
 import org.json.simple.JSONValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.nio.charset.Charset;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 public class ZkState {
     private static final Logger LOG = LoggerFactory.getLogger(ZkState.class);
     CuratorFramework _curator;
 
+    public ZkState(Map<String, Object> stateConf) {
+        stateConf = new HashMap<>(stateConf);
+
+        try {
+            _curator = newCurator(stateConf);
+            _curator.start();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     private CuratorFramework newCurator(final Map<String, Object> stateConf)
-            throws Exception {
+        throws Exception {
         Integer port = (Integer) 
stateConf.get(Config.TRANSACTIONAL_ZOOKEEPER_PORT);
         String serverPorts = "";
         for (String server : (List<String>) 
stateConf.get(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS)) {
             serverPorts = serverPorts + server + ":" + port + ",";
         }
         return CuratorFrameworkFactory.newClient(serverPorts,
-                
ObjectReader.getInt(stateConf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)),
-                
ObjectReader.getInt(stateConf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT)),
-                new 
RetryNTimes(ObjectReader.getInt(stateConf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)),
-                        
ObjectReader.getInt(stateConf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL))));
+                                                 
ObjectReader.getInt(stateConf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)),
+                                                 
ObjectReader.getInt(stateConf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT)),
+                                                 new 
RetryNTimes(ObjectReader.getInt(stateConf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)),
+                                                                 ObjectReader
+                                                                     
.getInt(stateConf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL))));
     }
 
     public CuratorFramework getCurator() {
@@ -55,17 +61,6 @@ public class ZkState {
         return _curator;
     }
 
-    public ZkState(Map<String, Object> stateConf) {
-        stateConf = new HashMap<>(stateConf);
-
-        try {
-            _curator = newCurator(stateConf);
-            _curator.start();
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
     public void writeJSON(String path, Map<Object, Object> data) {
         LOG.debug("Writing {} the data {}", path, data.toString());
         writeBytes(path, 
JSONValue.toJSONString(data).getBytes(Charset.forName("UTF-8")));

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java
index a4c255a..e0b94f3 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java
@@ -1,43 +1,36 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.kafka.bolt;
 
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseRichBolt;
-import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.utils.TupleUtils;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.clients.producer.Callback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
 import org.apache.storm.kafka.bolt.mapper.TupleToKafkaMapper;
 import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
 import org.apache.storm.kafka.bolt.selector.KafkaTopicSelector;
-import java.util.concurrent.Future;
-import java.util.concurrent.ExecutionException;
-import java.util.Map;
-import java.util.Properties;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -57,13 +50,11 @@ import java.util.Properties;
 @Deprecated
 public class KafkaBolt<K, V> extends BaseTickTupleAwareRichBolt {
 
-    private static final Logger LOG = LoggerFactory.getLogger(KafkaBolt.class);
-
     public static final String TOPIC = "topic";
-
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaBolt.class);
     private KafkaProducer<K, V> producer;
     private OutputCollector collector;
-    private TupleToKafkaMapper<K,V> mapper;
+    private TupleToKafkaMapper<K, V> mapper;
     private KafkaTopicSelector topicSelector;
     private Properties boltSpecfiedProperties = new Properties();
     /**
@@ -76,18 +67,18 @@ public class KafkaBolt<K, V> extends 
BaseTickTupleAwareRichBolt {
     private boolean async = true;
 
     public KafkaBolt() {}
-    
-    public KafkaBolt<K,V> withTupleToKafkaMapper(TupleToKafkaMapper<K,V> 
mapper) {
+
+    public KafkaBolt<K, V> withTupleToKafkaMapper(TupleToKafkaMapper<K, V> 
mapper) {
         this.mapper = mapper;
         return this;
     }
 
-    public KafkaBolt<K,V> withTopicSelector(KafkaTopicSelector selector) {
+    public KafkaBolt<K, V> withTopicSelector(KafkaTopicSelector selector) {
         this.topicSelector = selector;
         return this;
     }
 
-    public KafkaBolt<K,V> withProducerProperties(Properties 
producerProperties) {
+    public KafkaBolt<K, V> withProducerProperties(Properties 
producerProperties) {
         this.boltSpecfiedProperties = producerProperties;
         return this;
     }
@@ -95,13 +86,13 @@ public class KafkaBolt<K, V> extends 
BaseTickTupleAwareRichBolt {
     @Override
     public void prepare(Map<String, Object> topoConf, TopologyContext context, 
OutputCollector collector) {
         //for backward compatibility.
-        if(mapper == null) {
-            this.mapper = new FieldNameBasedTupleToKafkaMapper<K,V>();
+        if (mapper == null) {
+            this.mapper = new FieldNameBasedTupleToKafkaMapper<K, V>();
         }
 
         //for backward compatibility.
-        if(topicSelector == null) {
-            if(topoConf.containsKey(TOPIC)) {
+        if (topicSelector == null) {
+            if (topoConf.containsKey(TOPIC)) {
                 this.topicSelector = new DefaultTopicSelector((String) 
topoConf.get(TOPIC));
             } else {
                 throw new IllegalArgumentException("topic should be specified 
in bolt's configuration");
@@ -121,7 +112,7 @@ public class KafkaBolt<K, V> extends 
BaseTickTupleAwareRichBolt {
             key = mapper.getKeyFromTuple(input);
             message = mapper.getMessageFromTuple(input);
             topic = topicSelector.getTopic(input);
-            if (topic != null ) {
+            if (topic != null) {
                 Callback callback = null;
 
                 if (!fireAndForget && async) {

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/FieldNameBasedTupleToKafkaMapper.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/FieldNameBasedTupleToKafkaMapper.java
 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/FieldNameBasedTupleToKafkaMapper.java
index 672da8e..7276ef6 100644
--- 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/FieldNameBasedTupleToKafkaMapper.java
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/FieldNameBasedTupleToKafkaMapper.java
@@ -1,25 +1,20 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.kafka.bolt.mapper;
 
 import org.apache.storm.tuple.Tuple;
 
-public class FieldNameBasedTupleToKafkaMapper<K,V> implements 
TupleToKafkaMapper<K, V> {
+public class FieldNameBasedTupleToKafkaMapper<K, V> implements 
TupleToKafkaMapper<K, V> {
 
     public static final String BOLT_KEY = "key";
     public static final String BOLT_MESSAGE = "message";

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/TupleToKafkaMapper.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/TupleToKafkaMapper.java
 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/TupleToKafkaMapper.java
index 3890413..7012e6b 100644
--- 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/TupleToKafkaMapper.java
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/TupleToKafkaMapper.java
@@ -1,32 +1,27 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
-package org.apache.storm.kafka.bolt.mapper;
 
-import org.apache.storm.tuple.Tuple;
+package org.apache.storm.kafka.bolt.mapper;
 
 import java.io.Serializable;
+import org.apache.storm.tuple.Tuple;
 
 /**
  * as the really verbose name suggests this interface mapps a storm tuple to 
kafka key and message.
  * @param <K> type of key.
  * @param <V> type of value.
  */
-public interface TupleToKafkaMapper<K,V> extends Serializable {
+public interface TupleToKafkaMapper<K, V> extends Serializable {
     K getKeyFromTuple(Tuple tuple);
+
     V getMessageFromTuple(Tuple tuple);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/DefaultTopicSelector.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/DefaultTopicSelector.java
 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/DefaultTopicSelector.java
index 2aafc78..d1784b0 100644
--- 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/DefaultTopicSelector.java
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/DefaultTopicSelector.java
@@ -1,20 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.kafka.bolt.selector;
 
 import org.apache.storm.tuple.Tuple;

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/FieldIndexTopicSelector.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/FieldIndexTopicSelector.java
 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/FieldIndexTopicSelector.java
index 7b52403..50c5c1f 100644
--- 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/FieldIndexTopicSelector.java
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/FieldIndexTopicSelector.java
@@ -1,20 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.kafka.bolt.selector;
 
 import org.apache.storm.tuple.Tuple;

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/FieldNameTopicSelector.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/FieldNameTopicSelector.java
 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/FieldNameTopicSelector.java
index a622e8f..d3c304a 100644
--- 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/FieldNameTopicSelector.java
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/FieldNameTopicSelector.java
@@ -1,20 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.kafka.bolt.selector;
 
 import org.apache.storm.tuple.Tuple;

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/KafkaTopicSelector.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/KafkaTopicSelector.java
 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/KafkaTopicSelector.java
index cb7fb44..4045df7 100644
--- 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/KafkaTopicSelector.java
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/KafkaTopicSelector.java
@@ -1,25 +1,19 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
-package org.apache.storm.kafka.bolt.selector;
 
-import org.apache.storm.tuple.Tuple;
+package org.apache.storm.kafka.bolt.selector;
 
 import java.io.Serializable;
+import org.apache.storm.tuple.Tuple;
 
 public interface KafkaTopicSelector extends Serializable {
     String getTopic(Tuple tuple);

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/Coordinator.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/Coordinator.java 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/Coordinator.java
index 76baf62..0e8dba1 100644
--- 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/Coordinator.java
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/Coordinator.java
@@ -1,30 +1,25 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.kafka.trident;
 
+import java.util.List;
+import java.util.Map;
 import org.apache.storm.kafka.KafkaUtils;
 import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
 import org.apache.storm.trident.spout.IPartitionedTridentSpout;
 
-import java.util.List;
-import java.util.Map;
-
-class Coordinator implements 
IPartitionedTridentSpout.Coordinator<List<GlobalPartitionInformation>>, 
IOpaquePartitionedTridentSpout.Coordinator<List<GlobalPartitionInformation>> {
+class Coordinator implements 
IPartitionedTridentSpout.Coordinator<List<GlobalPartitionInformation>>,
+                             
IOpaquePartitionedTridentSpout.Coordinator<List<GlobalPartitionInformation>> {
 
     private IBrokerReader reader;
     private TridentKafkaConfig config;

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/DefaultCoordinator.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/DefaultCoordinator.java
 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/DefaultCoordinator.java
index 7a7e32c..575e235 100644
--- 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/DefaultCoordinator.java
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/DefaultCoordinator.java
@@ -1,20 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.kafka.trident;
 
 public class DefaultCoordinator implements IBatchCoordinator {

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/GlobalPartitionInformation.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/GlobalPartitionInformation.java
 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/GlobalPartitionInformation.java
index e420cb3..b26dc7f 100644
--- 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/GlobalPartitionInformation.java
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/GlobalPartitionInformation.java
@@ -1,35 +1,32 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.kafka.trident;
 
 import com.google.common.base.Objects;
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
 import org.apache.storm.kafka.Broker;
 import org.apache.storm.kafka.Partition;
 
-import java.io.Serializable;
-import java.util.*;
-
 
 public class GlobalPartitionInformation implements Iterable<Partition>, 
Serializable {
 
-    private Map<Integer, Broker> partitionMap;
     public String topic;
-
+    private Map<Integer, Broker> partitionMap;
     //Flag to keep the Partition Path Id backward compatible with Old 
implementation of Partition.getId() == "partition_" + partition
     private Boolean bUseTopicNameForPartitionPathId;
 
@@ -49,16 +46,16 @@ public class GlobalPartitionInformation implements 
Iterable<Partition>, Serializ
         partitionMap.put(partitionId, broker);
     }
 
-    public Boolean getbUseTopicNameForPartitionPathId () {
+    public Boolean getbUseTopicNameForPartitionPathId() {
         return bUseTopicNameForPartitionPathId;
     }
 
     @Override
     public String toString() {
         return "GlobalPartitionInformation{" +
-                "topic=" + topic +
-                ", partitionMap=" + partitionMap +
-                '}';
+               "topic=" + topic +
+               ", partitionMap=" + partitionMap +
+               '}';
     }
 
     public Broker getBrokerFor(Integer partitionId) {
@@ -87,7 +84,7 @@ public class GlobalPartitionInformation implements 
Iterable<Partition>, Serializ
             @Override
             public Partition next() {
                 Map.Entry<Integer, Broker> next = iterator.next();
-                return new Partition(next.getValue(), topic , next.getKey(), 
bUseTopicNameForPartitionPathId);
+                return new Partition(next.getValue(), topic, next.getKey(), 
bUseTopicNameForPartitionPathId);
             }
 
             @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/IBatchCoordinator.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/IBatchCoordinator.java
 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/IBatchCoordinator.java
index 41369ba..4c6c404 100644
--- 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/IBatchCoordinator.java
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/IBatchCoordinator.java
@@ -1,20 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.kafka.trident;
 
 import java.io.Serializable;

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/IBrokerReader.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/IBrokerReader.java
 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/IBrokerReader.java
index 904d8c9..c5cf8b2 100644
--- 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/IBrokerReader.java
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/IBrokerReader.java
@@ -1,24 +1,18 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.kafka.trident;
 
 import java.util.List;
-import java.util.Map;
 
 public interface IBrokerReader {
 

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/MaxMetric.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/MaxMetric.java 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/MaxMetric.java
index 2332205..14324ed 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/MaxMetric.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/MaxMetric.java
@@ -1,20 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.kafka.trident;
 
 

Reply via email to