http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/OpaqueTridentKafkaSpout.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/OpaqueTridentKafkaSpout.java
 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/OpaqueTridentKafkaSpout.java
index c17c912..c98be42 100644
--- 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/OpaqueTridentKafkaSpout.java
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/OpaqueTridentKafkaSpout.java
@@ -1,36 +1,30 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.kafka.trident;
 
+import java.util.List;
+import java.util.Map;
 import org.apache.storm.kafka.Partition;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
 import org.apache.storm.tuple.Fields;
 
-import java.util.List;
-import java.util.Map;
-
 
 public class OpaqueTridentKafkaSpout
-        implements IOpaquePartitionedTridentSpout<
-                List<GlobalPartitionInformation>,
-                Partition,
-                Map<String, Object>> {
+    implements IOpaquePartitionedTridentSpout<
+    List<GlobalPartitionInformation>,
+    Partition,
+    Map<String, Object>> {
 
 
     TridentKafkaConfig _config;
@@ -41,17 +35,17 @@ public class OpaqueTridentKafkaSpout
 
     @Override
     public Emitter<List<GlobalPartitionInformation>,
-            Partition,
-            Map<String, Object>> getEmitter(Map<String, Object> conf,
-                    TopologyContext context) {
+        Partition,
+        Map<String, Object>> getEmitter(Map<String, Object> conf,
+                                        TopologyContext context) {
         return new TridentKafkaEmitter(conf, context, _config, context
-                .getStormId()).asOpaqueEmitter();
+            .getStormId()).asOpaqueEmitter();
     }
 
     @Override
     public IOpaquePartitionedTridentSpout.Coordinator getCoordinator(
-            Map<String, Object> conf,
-            TopologyContext tc) {
+        Map<String, Object> conf,
+        TopologyContext tc) {
         return new org.apache.storm.kafka.trident.Coordinator(conf, _config);
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/StaticBrokerReader.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/StaticBrokerReader.java
 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/StaticBrokerReader.java
index ba27651..3c5cc09 100644
--- 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/StaticBrokerReader.java
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/StaticBrokerReader.java
@@ -1,20 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.kafka.trident;
 
 import java.util.ArrayList;
@@ -24,7 +19,7 @@ import java.util.TreeMap;
 
 public class StaticBrokerReader implements IBrokerReader {
 
-    private Map<String,GlobalPartitionInformation> brokers = new 
TreeMap<String,GlobalPartitionInformation>();
+    private Map<String, GlobalPartitionInformation> brokers = new 
TreeMap<String, GlobalPartitionInformation>();
 
     public StaticBrokerReader(String topic, GlobalPartitionInformation 
partitionInformation) {
         this.brokers.put(topic, partitionInformation);
@@ -37,7 +32,7 @@ public class StaticBrokerReader implements IBrokerReader {
     }
 
     @Override
-    public List<GlobalPartitionInformation> getAllBrokers () {
+    public List<GlobalPartitionInformation> getAllBrokers() {
         List<GlobalPartitionInformation> list = new 
ArrayList<GlobalPartitionInformation>();
         list.addAll(brokers.values());
         return list;

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TransactionalTridentKafkaSpout.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TransactionalTridentKafkaSpout.java
 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TransactionalTridentKafkaSpout.java
index 1042098..7b1d4dd 100644
--- 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TransactionalTridentKafkaSpout.java
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TransactionalTridentKafkaSpout.java
@@ -1,29 +1,23 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.kafka.trident;
 
+import java.util.Map;
 import org.apache.storm.kafka.Partition;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.trident.spout.IPartitionedTridentSpout;
 import org.apache.storm.tuple.Fields;
 
-import java.util.Map;
-
 
 public class TransactionalTridentKafkaSpout implements 
IPartitionedTridentSpout<GlobalPartitionInformation, Partition, Map> {
 

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaConfig.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaConfig.java
 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaConfig.java
index b225e9a..3dac221 100644
--- 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaConfig.java
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaConfig.java
@@ -1,20 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.kafka.trident;
 
 import org.apache.storm.kafka.BrokerHosts;

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
index 3333c2c..cb00579 100644
--- 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
@@ -1,24 +1,27 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.kafka.trident;
 
 import com.google.common.collect.ImmutableMap;
-
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.javaapi.message.ByteBufferMessageSet;
+import kafka.message.Message;
+import kafka.message.MessageAndOffset;
 import org.apache.storm.Config;
 import org.apache.storm.kafka.DynamicPartitionConnections;
 import org.apache.storm.kafka.FailedFetchException;
@@ -38,17 +41,6 @@ import org.apache.storm.trident.topology.TransactionAttempt;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-
-import kafka.javaapi.consumer.SimpleConsumer;
-import kafka.javaapi.message.ByteBufferMessageSet;
-import kafka.message.Message;
-import kafka.message.MessageAndOffset;
-
 public class TridentKafkaEmitter {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(TridentKafkaEmitter.class);
@@ -74,10 +66,10 @@ public class TridentKafkaEmitter {
 
 
     private Map<String, Object> failFastEmitNewPartitionBatch(
-            final TransactionAttempt attempt,
-            TridentCollector collector,
-            Partition partition,
-            Map<String, Object> lastMeta) {
+        final TransactionAttempt attempt,
+        TridentCollector collector,
+        Partition partition,
+        Map<String, Object> lastMeta) {
         SimpleConsumer consumer = _connections.register(partition);
         Map<String, Object> ret = doEmitNewPartitionBatch(consumer, partition, 
collector, lastMeta, attempt);
         Long offset = (Long) ret.get("offset");
@@ -86,7 +78,8 @@ public class TridentKafkaEmitter {
         return ret;
     }
 
-    private Map<String, Object> emitNewPartitionBatch(TransactionAttempt 
attempt, TridentCollector collector, Partition partition, Map<String, Object> 
lastMeta) {
+    private Map<String, Object> emitNewPartitionBatch(TransactionAttempt 
attempt, TridentCollector collector, Partition partition,
+                                                      Map<String, Object> 
lastMeta) {
         try {
             return failFastEmitNewPartitionBatch(attempt, collector, 
partition, lastMeta);
         } catch (FailedFetchException e) {
@@ -107,16 +100,16 @@ public class TridentKafkaEmitter {
     }
 
     private Map<String, Object> doEmitNewPartitionBatch(SimpleConsumer 
consumer,
-            Partition partition,
-            TridentCollector collector,
-            Map<String, Object> lastMeta,
-            TransactionAttempt attempt) {
+                                                        Partition partition,
+                                                        TridentCollector 
collector,
+                                                        Map<String, Object> 
lastMeta,
+                                                        TransactionAttempt 
attempt) {
         LOG.debug("Emitting new partition batch - [transaction = {}], 
[lastMeta = {}]", attempt, lastMeta);
         long offset;
         if (lastMeta != null) {
             String lastInstanceId = null;
             Map<String, Object> lastTopoMeta = (Map<String, Object>)
-                    lastMeta.get("topology");
+                lastMeta.get("topology");
             if (lastTopoMeta != null) {
                 lastInstanceId = (String) lastTopoMeta.get("id");
             }
@@ -170,7 +163,8 @@ public class TridentKafkaEmitter {
     /**
      * re-emit the batch described by the meta data provided
      */
-    private void reEmitPartitionBatch(TransactionAttempt attempt, 
TridentCollector collector, Partition partition, Map<String, Object> meta) {
+    private void reEmitPartitionBatch(TransactionAttempt attempt, 
TridentCollector collector, Partition partition,
+                                      Map<String, Object> meta) {
         LOG.info("re-emitting batch, attempt " + attempt);
         String instanceId = (String) meta.get("instanceId");
         if (!_config.ignoreZkOffsets || 
instanceId.equals(_topologyInstanceId)) {
@@ -241,7 +235,8 @@ public class TridentKafkaEmitter {
              * for defining the parameters of the next batch.
              */
             @Override
-            public Map<String, Object> emitPartitionBatch(TransactionAttempt 
transactionAttempt, TridentCollector tridentCollector, Partition partition, 
Map<String, Object> map) {
+            public Map<String, Object> emitPartitionBatch(TransactionAttempt 
transactionAttempt, TridentCollector tridentCollector,
+                                                          Partition partition, 
Map<String, Object> map) {
                 return emitNewPartitionBatch(transactionAttempt, 
tridentCollector, partition, map);
             }
 
@@ -270,7 +265,8 @@ public class TridentKafkaEmitter {
              * Return the metadata that can be used to reconstruct this 
partition/batch in the future.
              */
             @Override
-            public Map<String, Object> 
emitPartitionBatchNew(TransactionAttempt transactionAttempt, TridentCollector 
tridentCollector, Partition partition, Map<String, Object> map) {
+            public Map<String, Object> 
emitPartitionBatchNew(TransactionAttempt transactionAttempt, TridentCollector 
tridentCollector,
+                                                             Partition 
partition, Map<String, Object> map) {
                 return failFastEmitNewPartitionBatch(transactionAttempt, 
tridentCollector, partition, map);
             }
 
@@ -279,7 +275,8 @@ public class TridentKafkaEmitter {
              * the metadata created when it was first emitted.
              */
             @Override
-            public void emitPartitionBatch(TransactionAttempt 
transactionAttempt, TridentCollector tridentCollector, Partition partition, 
Map<String, Object> map) {
+            public void emitPartitionBatch(TransactionAttempt 
transactionAttempt, TridentCollector tridentCollector, Partition partition,
+                                           Map<String, Object> map) {
                 reEmitPartitionBatch(transactionAttempt, tridentCollector, 
partition, map);
             }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java
 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java
index eb6737a..71b2cb1 100644
--- 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java
@@ -1,41 +1,35 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.kafka.trident;
 
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.topology.FailedException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import org.apache.commons.lang.Validate;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.storm.kafka.trident.mapper.TridentTupleToKafkaMapper;
 import org.apache.storm.kafka.trident.selector.KafkaTopicSelector;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.topology.FailedException;
 import org.apache.storm.trident.operation.TridentCollector;
 import org.apache.storm.trident.state.State;
 import org.apache.storm.trident.tuple.TridentTuple;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class TridentKafkaState implements State {
     private static final Logger LOG = 
LoggerFactory.getLogger(TridentKafkaState.class);
@@ -79,9 +73,10 @@ public class TridentKafkaState implements State {
             for (TridentTuple tuple : tuples) {
                 topic = topicSelector.getTopic(tuple);
 
-                if(topic != null) {
+                if (topic != null) {
                     Future<RecordMetadata> result = producer.send(new 
ProducerRecord(topic,
-                            mapper.getKeyFromTuple(tuple), 
mapper.getMessageFromTuple(tuple)));
+                                                                               
      mapper.getKeyFromTuple(tuple),
+                                                                               
      mapper.getMessageFromTuple(tuple)));
                     futures.add(result);
                 } else {
                     LOG.warn("skipping key = " + mapper.getKeyFromTuple(tuple) 
+ ", topic selector returned null.");
@@ -97,9 +92,9 @@ public class TridentKafkaState implements State {
                 }
             }
 
-            if(exceptions.size() > 0){
-                String errorMsg = "Could not retrieve result for messages " + 
tuples + " from topic = " + topic 
-                        + " because of the following exceptions: \n";
+            if (exceptions.size() > 0) {
+                String errorMsg = "Could not retrieve result for messages " + 
tuples + " from topic = " + topic
+                                  + " because of the following exceptions: \n";
                 for (ExecutionException exception : exceptions) {
                     errorMsg = errorMsg + exception.getMessage() + "\n";
                 }

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java
 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java
index 0bf21ab..5b66fd8 100644
--- 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java
@@ -1,32 +1,26 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.kafka.trident;
 
-import org.apache.storm.task.IMetricsContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.Map;
+import java.util.Properties;
 import org.apache.storm.kafka.trident.mapper.TridentTupleToKafkaMapper;
 import org.apache.storm.kafka.trident.selector.KafkaTopicSelector;
+import org.apache.storm.task.IMetricsContext;
 import org.apache.storm.trident.state.State;
 import org.apache.storm.trident.state.StateFactory;
-
-import java.util.Map;
-import java.util.Properties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class TridentKafkaStateFactory implements StateFactory {
 
@@ -55,8 +49,8 @@ public class TridentKafkaStateFactory implements StateFactory 
{
     public State makeState(Map<String, Object> conf, IMetricsContext metrics, 
int partitionIndex, int numPartitions) {
         LOG.info("makeState(partitonIndex={}, numpartitions={}", 
partitionIndex, numPartitions);
         TridentKafkaState state = new TridentKafkaState()
-                .withKafkaTopicSelector(this.topicSelector)
-                .withTridentTupleToKafkaMapper(this.mapper);
+            .withKafkaTopicSelector(this.topicSelector)
+            .withTridentTupleToKafkaMapper(this.mapper);
         state.prepare(producerProperties);
         return state;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaUpdater.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaUpdater.java
 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaUpdater.java
index 7a905ab..1100b66 100644
--- 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaUpdater.java
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaUpdater.java
@@ -1,28 +1,22 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.kafka.trident;
 
+import java.util.List;
 import org.apache.storm.trident.operation.TridentCollector;
 import org.apache.storm.trident.state.BaseStateUpdater;
 import org.apache.storm.trident.tuple.TridentTuple;
 
-import java.util.List;
-
 public class TridentKafkaUpdater extends BaseStateUpdater<TridentKafkaState> {
     @Override
     public void updateState(TridentKafkaState state, List<TridentTuple> 
tuples, TridentCollector collector) {

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/ZkBrokerReader.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/ZkBrokerReader.java
 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/ZkBrokerReader.java
index 00758a6..d40256e 100644
--- 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/ZkBrokerReader.java
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/ZkBrokerReader.java
@@ -1,84 +1,79 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
-package org.apache.storm.kafka.trident;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.storm.kafka.DynamicBrokersReader;
-import org.apache.storm.kafka.ZkHosts;
+package org.apache.storm.kafka.trident;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import org.apache.storm.kafka.DynamicBrokersReader;
+import org.apache.storm.kafka.ZkHosts;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 public class ZkBrokerReader implements IBrokerReader {
 
-       private static final Logger LOG = 
LoggerFactory.getLogger(ZkBrokerReader.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(ZkBrokerReader.class);
 
-       List<GlobalPartitionInformation> cachedBrokers = new 
ArrayList<GlobalPartitionInformation>();
-       DynamicBrokersReader reader;
-       long lastRefreshTimeMs;
+    List<GlobalPartitionInformation> cachedBrokers = new 
ArrayList<GlobalPartitionInformation>();
+    DynamicBrokersReader reader;
+    long lastRefreshTimeMs;
 
 
-       long refreshMillis;
+    long refreshMillis;
 
-       public ZkBrokerReader(Map<String, Object> conf, String topic, ZkHosts 
hosts) {
-               try {
-                       reader = new DynamicBrokersReader(conf, 
hosts.brokerZkStr, hosts.brokerZkPath, topic);
-                       cachedBrokers = reader.getBrokerInfo();
-                       lastRefreshTimeMs = System.currentTimeMillis();
-                       refreshMillis = hosts.refreshFreqSecs * 1000L;
-               } catch (java.net.SocketTimeoutException e) {
-                       LOG.warn("Failed to update brokers", e);
-               }
+    public ZkBrokerReader(Map<String, Object> conf, String topic, ZkHosts 
hosts) {
+        try {
+            reader = new DynamicBrokersReader(conf, hosts.brokerZkStr, 
hosts.brokerZkPath, topic);
+            cachedBrokers = reader.getBrokerInfo();
+            lastRefreshTimeMs = System.currentTimeMillis();
+            refreshMillis = hosts.refreshFreqSecs * 1000L;
+        } catch (java.net.SocketTimeoutException e) {
+            LOG.warn("Failed to update brokers", e);
+        }
+
+    }
 
-       }
+    private void refresh() {
+        long currTime = System.currentTimeMillis();
+        if (currTime > lastRefreshTimeMs + refreshMillis) {
+            try {
+                LOG.info("brokers need refreshing because " + refreshMillis + 
"ms have expired");
+                cachedBrokers = reader.getBrokerInfo();
+                lastRefreshTimeMs = currTime;
+            } catch (java.net.SocketTimeoutException e) {
+                LOG.warn("Failed to update brokers", e);
+            }
+        }
+    }
 
-       private void refresh() {
-               long currTime = System.currentTimeMillis();
-               if (currTime > lastRefreshTimeMs + refreshMillis) {
-                       try {
-                               LOG.info("brokers need refreshing because " + 
refreshMillis + "ms have expired");
-                               cachedBrokers = reader.getBrokerInfo();
-                               lastRefreshTimeMs = currTime;
-                       } catch (java.net.SocketTimeoutException e) {
-                               LOG.warn("Failed to update brokers", e);
-                       }
-               }
-       }
-       @Override
-       public GlobalPartitionInformation getBrokerForTopic(String topic) {
-               refresh();
-        for(GlobalPartitionInformation partitionInformation : cachedBrokers) {
+    @Override
+    public GlobalPartitionInformation getBrokerForTopic(String topic) {
+        refresh();
+        for (GlobalPartitionInformation partitionInformation : cachedBrokers) {
             if (partitionInformation.topic.equals(topic)) return 
partitionInformation;
         }
-               return null;
-       }
+        return null;
+    }
 
-       @Override
-       public List<GlobalPartitionInformation> getAllBrokers() {
-               refresh();
-               return cachedBrokers;
-       }
+    @Override
+    public List<GlobalPartitionInformation> getAllBrokers() {
+        refresh();
+        return cachedBrokers;
+    }
 
-       @Override
-       public void close() {
-               reader.close();
-       }
+    @Override
+    public void close() {
+        reader.close();
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java
 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java
index 2d04971..01e3eca 100644
--- 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java
@@ -1,20 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.kafka.trident.mapper;
 
 import org.apache.storm.trident.tuple.TridentTuple;

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java
 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java
index 28c6c89..4a522d6 100644
--- 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java
@@ -1,28 +1,22 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
-package org.apache.storm.kafka.trident.mapper;
 
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.trident.tuple.TridentTuple;
+package org.apache.storm.kafka.trident.mapper;
 
 import java.io.Serializable;
+import org.apache.storm.trident.tuple.TridentTuple;
 
-public interface TridentTupleToKafkaMapper<K,V>  extends Serializable {
+public interface TridentTupleToKafkaMapper<K, V> extends Serializable {
     K getKeyFromTuple(TridentTuple tuple);
+
     V getMessageFromTuple(TridentTuple tuple);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/DefaultTopicSelector.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/DefaultTopicSelector.java
 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/DefaultTopicSelector.java
index 7ae49a3..93b5566 100644
--- 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/DefaultTopicSelector.java
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/DefaultTopicSelector.java
@@ -1,20 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.kafka.trident.selector;
 
 import org.apache.storm.trident.tuple.TridentTuple;

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/KafkaTopicSelector.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/KafkaTopicSelector.java
 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/KafkaTopicSelector.java
index 012a6c7..6de3921 100644
--- 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/KafkaTopicSelector.java
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/KafkaTopicSelector.java
@@ -1,25 +1,19 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
-package org.apache.storm.kafka.trident.selector;
 
-import org.apache.storm.trident.tuple.TridentTuple;
+package org.apache.storm.kafka.trident.selector;
 
 import java.io.Serializable;
+import org.apache.storm.trident.tuple.TridentTuple;
 
 public interface KafkaTopicSelector extends Serializable {
     String getTopic(TridentTuple tuple);

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/test/org/apache/storm/kafka/DynamicBrokersReaderTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/test/org/apache/storm/kafka/DynamicBrokersReaderTest.java
 
b/external/storm-kafka/src/test/org/apache/storm/kafka/DynamicBrokersReaderTest.java
index 46cc60d..a6bb61c 100644
--- 
a/external/storm-kafka/src/test/org/apache/storm/kafka/DynamicBrokersReaderTest.java
+++ 
b/external/storm-kafka/src/test/org/apache/storm/kafka/DynamicBrokersReaderTest.java
@@ -1,40 +1,33 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.kafka;
 
-import org.apache.storm.Config;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.curator.test.TestingServer;
 import org.apache.curator.utils.ZKPaths;
+import org.apache.storm.Config;
+import org.apache.storm.kafka.trident.GlobalPartitionInformation;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-import org.apache.storm.kafka.trident.GlobalPartitionInformation;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
 
 /**
  * Date: 16/05/2013
@@ -66,7 +59,7 @@ public class DynamicBrokersReaderTest {
 
         Map<String, Object> conf2 = new HashMap<>();
         conf2.putAll(conf);
-        conf2.put("kafka.topic.wildcard.match",true);
+        conf2.put("kafka.topic.wildcard.match", true);
 
         wildCardBrokerReader = new DynamicBrokersReader(conf2, 
connectionString, masterPath, "^test.*$");
         zookeeper.start();
@@ -114,8 +107,8 @@ public class DynamicBrokersReaderTest {
     }
 
 
-    private GlobalPartitionInformation 
getByTopic(List<GlobalPartitionInformation> partitions, String topic){
-        for(GlobalPartitionInformation partitionInformation : partitions) {
+    private GlobalPartitionInformation 
getByTopic(List<GlobalPartitionInformation> partitions, String topic) {
+        for (GlobalPartitionInformation partitionInformation : partitions) {
             if (partitionInformation.topic.equals(topic)) return 
partitionInformation;
         }
         return null;
@@ -242,7 +235,7 @@ public class DynamicBrokersReaderTest {
         String connectionString = server.getConnectString();
         Map<String, Object> conf = new HashMap<>();
         conf.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 1000);
-//        conf.put(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT, 1000);
+        //        conf.put(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT, 1000);
         conf.put(Config.STORM_ZOOKEEPER_RETRY_TIMES, 4);
         conf.put(Config.STORM_ZOOKEEPER_RETRY_INTERVAL, 5);
 

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
 
b/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
index 049fce7..5fcee28 100644
--- 
a/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
+++ 
b/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
@@ -1,28 +1,23 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
-package org.apache.storm.kafka;
 
-import org.junit.Test;
+package org.apache.storm.kafka;
 
 import org.apache.storm.utils.Time;
 import org.apache.storm.utils.Time.SimulatedTime;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Test;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
@@ -52,8 +47,8 @@ public class ExponentialBackoffMsgRetryManagerTest {
 
     @Test
     public void testImmediateRetry() throws Exception {
-        
-        
+
+
         ExponentialBackoffMsgRetryManager manager = 
buildExponentialBackoffMsgRetryManager(0, 0d, 0, Integer.MAX_VALUE);
         manager.failed(TEST_OFFSET);
         Long next = manager.nextFailedMessageToRetry();
@@ -124,22 +119,22 @@ public class ExponentialBackoffMsgRetryManagerTest {
         // so TEST_OFFSET2 should come first
 
         Time.advanceTime(initial * 2);
-        assertTrue("message "+TEST_OFFSET+"should be ready for retry", 
manager.shouldReEmitMsg(TEST_OFFSET));
-        assertTrue("message "+TEST_OFFSET2+"should be ready for retry", 
manager.shouldReEmitMsg(TEST_OFFSET2));
+        assertTrue("message " + TEST_OFFSET + "should be ready for retry", 
manager.shouldReEmitMsg(TEST_OFFSET));
+        assertTrue("message " + TEST_OFFSET2 + "should be ready for retry", 
manager.shouldReEmitMsg(TEST_OFFSET2));
 
         Long next = manager.nextFailedMessageToRetry();
-        assertEquals("expect first message to retry is "+TEST_OFFSET2, 
TEST_OFFSET2, next);
+        assertEquals("expect first message to retry is " + TEST_OFFSET2, 
TEST_OFFSET2, next);
 
         Time.advanceTime(initial);
 
         // haven't retried yet, so first should still be TEST_OFFSET2
         next = manager.nextFailedMessageToRetry();
-        assertEquals("expect first message to retry is "+TEST_OFFSET2, 
TEST_OFFSET2, next);
+        assertEquals("expect first message to retry is " + TEST_OFFSET2, 
TEST_OFFSET2, next);
         manager.retryStarted(next);
 
         // now it should be TEST_OFFSET
         next = manager.nextFailedMessageToRetry();
-        assertEquals("expect message to retry is now "+TEST_OFFSET, 
TEST_OFFSET, next);
+        assertEquals("expect message to retry is now " + TEST_OFFSET, 
TEST_OFFSET, next);
         manager.retryStarted(next);
 
         // now none left
@@ -230,14 +225,14 @@ public class ExponentialBackoffMsgRetryManagerTest {
         assertEquals("expect test offset next available for retry", 
TEST_OFFSET, next);
         assertTrue("message should be ready for retry", 
manager.shouldReEmitMsg(TEST_OFFSET));
     }
-    
+
     @Test
     public void testClearInvalidMessages() throws Exception {
         ExponentialBackoffMsgRetryManager manager = 
buildExponentialBackoffMsgRetryManager(0, 0d, 0, Integer.MAX_VALUE);
         manager.failed(TEST_OFFSET);
         manager.failed(TEST_OFFSET2);
         manager.failed(TEST_OFFSET3);
-        
+
         assertTrue("message should be ready for retry", 
manager.shouldReEmitMsg(TEST_OFFSET));
         assertTrue("message should be ready for retry", 
manager.shouldReEmitMsg(TEST_OFFSET2));
         assertTrue("message should be ready for retry", 
manager.shouldReEmitMsg(TEST_OFFSET3));
@@ -246,7 +241,7 @@ public class ExponentialBackoffMsgRetryManagerTest {
 
         Long next = manager.nextFailedMessageToRetry();
         assertEquals("expect test offset next available for retry", 
TEST_OFFSET3, next);
-        
+
         manager.acked(TEST_OFFSET3);
         next = manager.nextFailedMessageToRetry();
         assertNull("expect no message ready after acked", next);
@@ -267,8 +262,8 @@ public class ExponentialBackoffMsgRetryManagerTest {
 
         assertFalse(manager.retryFurther(TEST_OFFSET));
     }
-    
-    private ExponentialBackoffMsgRetryManager 
buildExponentialBackoffMsgRetryManager(long retryInitialDelayMs, 
+
+    private ExponentialBackoffMsgRetryManager 
buildExponentialBackoffMsgRetryManager(long retryInitialDelayMs,
                                                                                
      double retryDelayMultiplier,
                                                                                
      long retryDelayMaxMs,
                                                                                
      int retryLimit) {
@@ -276,7 +271,7 @@ public class ExponentialBackoffMsgRetryManagerTest {
         spoutConfig.retryInitialDelayMs = retryInitialDelayMs;
         spoutConfig.retryDelayMultiplier = retryDelayMultiplier;
         spoutConfig.retryDelayMaxMs = retryDelayMaxMs;
-        spoutConfig.retryLimit = retryLimit; 
+        spoutConfig.retryLimit = retryLimit;
         ExponentialBackoffMsgRetryManager exponentialBackoffMsgRetryManager = 
new ExponentialBackoffMsgRetryManager();
         exponentialBackoffMsgRetryManager.prepare(spoutConfig, null);
         return exponentialBackoffMsgRetryManager;

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaErrorTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaErrorTest.java 
b/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaErrorTest.java
index e38bc1e..ad793ed 100644
--- a/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaErrorTest.java
+++ b/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaErrorTest.java
@@ -1,20 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.kafka;
 
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaTestBroker.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaTestBroker.java 
b/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaTestBroker.java
index 0952764..f31386a 100644
--- a/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaTestBroker.java
+++ b/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaTestBroker.java
@@ -1,22 +1,21 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.kafka;
 
+import java.io.File;
+import java.io.IOException;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
 import kafka.admin.AdminUtils;
 import kafka.api.PartitionMetadata;
 import kafka.api.TopicMetadata;
@@ -34,11 +33,6 @@ import org.apache.curator.test.InstanceSpec;
 import org.apache.curator.test.TestingServer;
 import scala.collection.JavaConversions;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-
 /**
  * Date: 11/01/2014
  * Time: 13:15
@@ -161,6 +155,7 @@ public class KafkaTestBroker {
     public int getPort() {
         return port;
     }
+
     public void shutdown() {
         if (kafka != null) {
             kafka.shutdown();

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaUtilsTest.java 
b/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaUtilsTest.java
index 1bd989f..3c2c0d9 100644
--- a/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaUtilsTest.java
+++ b/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaUtilsTest.java
@@ -1,38 +1,29 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
-package org.apache.storm.kafka;
 
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
+package org.apache.storm.kafka;
 
+import com.google.common.collect.ImmutableMap;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
-
 import kafka.api.OffsetRequest;
 import kafka.javaapi.consumer.SimpleConsumer;
 import kafka.javaapi.message.ByteBufferMessageSet;
 import kafka.message.MessageAndOffset;
-
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.storm.kafka.trident.GlobalPartitionInformation;
+import org.apache.storm.spout.SchemeAsMultiScheme;
 import org.apache.storm.utils.Utils;
 import org.junit.After;
 import org.junit.Assert;
@@ -42,13 +33,14 @@ import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.storm.kafka.trident.GlobalPartitionInformation;
-import org.apache.storm.spout.SchemeAsMultiScheme;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
 
-import com.google.common.collect.ImmutableMap;
 public class KafkaUtilsTest {
-    private String TEST_TOPIC = "testTopic";
     private static final Logger LOG = 
LoggerFactory.getLogger(KafkaUtilsTest.class);
+    private String TEST_TOPIC = "testTopic";
     private KafkaTestBroker broker;
     private SimpleConsumer simpleConsumer;
     private KafkaConfig config;
@@ -73,7 +65,8 @@ public class KafkaUtilsTest {
 
     @Test(expected = FailedFetchException.class)
     public void topicDoesNotExist() throws Exception {
-        KafkaUtils.fetchMessages(config, simpleConsumer, new 
Partition(Broker.fromString(broker.getBrokerConnectionString()), TEST_TOPIC, 
0), 0);
+        KafkaUtils
+            .fetchMessages(config, simpleConsumer, new 
Partition(Broker.fromString(broker.getBrokerConnectionString()), TEST_TOPIC, 
0), 0);
     }
 
     @Test(expected = FailedFetchException.class)
@@ -82,7 +75,9 @@ public class KafkaUtilsTest {
         broker.shutdown();
         SimpleConsumer simpleConsumer = new SimpleConsumer("localhost", port, 
100, 1024, "testClient");
         try {
-            KafkaUtils.fetchMessages(config, simpleConsumer, new 
Partition(Broker.fromString(broker.getBrokerConnectionString()), TEST_TOPIC, 
0), OffsetRequest.LatestTime());
+            KafkaUtils
+                .fetchMessages(config, simpleConsumer, new 
Partition(Broker.fromString(broker.getBrokerConnectionString()), TEST_TOPIC, 0),
+                               OffsetRequest.LatestTime());
         } finally {
             simpleConsumer.close();
         }
@@ -94,7 +89,9 @@ public class KafkaUtilsTest {
         createTopicAndSendMessage(value);
         long offset = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, 
OffsetRequest.LatestTime()) - 1;
         ByteBufferMessageSet messageAndOffsets = 
KafkaUtils.fetchMessages(config, simpleConsumer,
-                new 
Partition(Broker.fromString(broker.getBrokerConnectionString()), TEST_TOPIC, 
0), offset);
+                                                                          new 
Partition(
+                                                                              
Broker.fromString(broker.getBrokerConnectionString()),
+                                                                              
TEST_TOPIC, 0), offset);
         String message = new 
String(Utils.toByteArray(messageAndOffsets.iterator().next().message().payload()));
         assertThat(message, is(equalTo(value)));
     }
@@ -103,7 +100,7 @@ public class KafkaUtilsTest {
     public void fetchMessagesWithInvalidOffsetAndDefaultHandlingDisabled() 
throws Exception {
         config.useStartOffsetTimeIfOffsetOutOfRange = false;
         KafkaUtils.fetchMessages(config, simpleConsumer,
-                new 
Partition(Broker.fromString(broker.getBrokerConnectionString()), TEST_TOPIC, 
0), -99);
+                                 new 
Partition(Broker.fromString(broker.getBrokerConnectionString()), TEST_TOPIC, 
0), -99);
     }
 
     @Test(expected = TopicOffsetOutOfRangeException.class)
@@ -112,7 +109,7 @@ public class KafkaUtilsTest {
         String value = "test";
         createTopicAndSendMessage(value);
         KafkaUtils.fetchMessages(config, simpleConsumer,
-                new 
Partition(Broker.fromString(broker.getBrokerConnectionString()), "newTopic", 
0), -99);
+                                 new 
Partition(Broker.fromString(broker.getBrokerConnectionString()), "newTopic", 
0), -99);
     }
 
     @Test
@@ -187,21 +184,21 @@ public class KafkaUtilsTest {
             assertEquals(value, lists.iterator().next().get(0));
         }
     }
-    
+
     @Test
     public void generateTuplesWithMessageAndMetadataScheme() {
         String value = "value";
         Partition mockPartition = Mockito.mock(Partition.class);
         mockPartition.partition = 0;
         long offset = 0L;
-        
+
         MessageMetadataSchemeAsMultiScheme scheme = new 
MessageMetadataSchemeAsMultiScheme(new StringMessageAndMetadataScheme());
-        
+
         createTopicAndSendMessage(null, value);
         ByteBufferMessageSet messageAndOffsets = getLastMessage();
         for (MessageAndOffset msg : messageAndOffsets) {
             Iterable<List<Object>> lists = KafkaUtils.generateTuples(scheme, 
msg.message(), mockPartition, offset);
-            List<Object> values = lists.iterator().next(); 
+            List<Object> values = lists.iterator().next();
             assertEquals("Message is incorrect", value, values.get(0));
             assertEquals("Partition is incorrect", mockPartition.partition, 
values.get(1));
             assertEquals("Offset is incorrect", offset, values.get(2));
@@ -210,12 +207,14 @@ public class KafkaUtilsTest {
 
     private ByteBufferMessageSet getLastMessage() {
         long offsetOfLastMessage = KafkaUtils.getOffset(simpleConsumer, 
config.topic, 0, OffsetRequest.LatestTime()) - 1;
-        return KafkaUtils.fetchMessages(config, simpleConsumer, new 
Partition(Broker.fromString(broker.getBrokerConnectionString()), TEST_TOPIC, 
0), offsetOfLastMessage);
+        return KafkaUtils
+            .fetchMessages(config, simpleConsumer, new 
Partition(Broker.fromString(broker.getBrokerConnectionString()), TEST_TOPIC, 0),
+                           offsetOfLastMessage);
     }
 
     private void runGetValueOnlyTuplesTest() {
         String value = "value";
-        
+
         createTopicAndSendMessage(null, value);
         ByteBufferMessageSet messageAndOffsets = getLastMessage();
         for (MessageAndOffset msg : messageAndOffsets) {
@@ -264,13 +263,13 @@ public class KafkaUtilsTest {
     public void assignAllPartitionsToOneTask() {
         runPartitionToTaskMappingTest(32, 32);
     }
-    
+
     public void runPartitionToTaskMappingTest(int numPartitions, int 
partitionsPerTask) {
         GlobalPartitionInformation globalPartitionInformation = 
TestUtils.buildPartitionInfo(numPartitions);
         List<GlobalPartitionInformation> partitions = new 
ArrayList<GlobalPartitionInformation>();
         partitions.add(globalPartitionInformation);
         int numTasks = numPartitions / partitionsPerTask;
-        for (int i = 0 ; i < numTasks ; i++) {
+        for (int i = 0; i < numTasks; i++) {
             assertEquals(partitionsPerTask, 
KafkaUtils.calculatePartitionsForTask(partitions, numTasks, i, i).size());
         }
     }
@@ -285,7 +284,7 @@ public class KafkaUtilsTest {
         assertEquals(0, KafkaUtils.calculatePartitionsForTask(partitions, 
numTasks, 1, 1).size());
     }
 
-    @Test (expected = IllegalArgumentException.class )
+    @Test(expected = IllegalArgumentException.class)
     public void assignInvalidTask() {
         GlobalPartitionInformation globalPartitionInformation = new 
GlobalPartitionInformation(TEST_TOPIC);
         List<GlobalPartitionInformation> partitions = new 
ArrayList<GlobalPartitionInformation>();

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/test/org/apache/storm/kafka/PartitionManagerTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/test/org/apache/storm/kafka/PartitionManagerTest.java
 
b/external/storm-kafka/src/test/org/apache/storm/kafka/PartitionManagerTest.java
index 805913d..a2824fb 100644
--- 
a/external/storm-kafka/src/test/org/apache/storm/kafka/PartitionManagerTest.java
+++ 
b/external/storm-kafka/src/test/org/apache/storm/kafka/PartitionManagerTest.java
@@ -1,22 +1,23 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.kafka;
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.storm.Config;
@@ -30,13 +31,6 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-
 public class PartitionManagerTest {
 
     private static final String TOPIC_NAME = "testTopic";
@@ -128,7 +122,7 @@ public class PartitionManagerTest {
 
         PartitionManager partitionManager = partitionManagers.get(0);
 
-        for (int i=0; i < 5; i++) {
+        for (int i = 0; i < 5; i++) {
             sendMessage("message-" + i);
         }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/test/org/apache/storm/kafka/StringKeyValueSchemeTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/test/org/apache/storm/kafka/StringKeyValueSchemeTest.java
 
b/external/storm-kafka/src/test/org/apache/storm/kafka/StringKeyValueSchemeTest.java
index 7e5ff00..9c749fe 100644
--- 
a/external/storm-kafka/src/test/org/apache/storm/kafka/StringKeyValueSchemeTest.java
+++ 
b/external/storm-kafka/src/test/org/apache/storm/kafka/StringKeyValueSchemeTest.java
@@ -1,29 +1,23 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.kafka;
 
-import org.apache.storm.tuple.Fields;
 import com.google.common.collect.ImmutableMap;
-import org.junit.Test;
-
 import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
 import java.util.Collections;
+import org.apache.storm.tuple.Fields;
+import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -32,6 +26,10 @@ public class StringKeyValueSchemeTest {
 
     private StringKeyValueScheme scheme = new StringKeyValueScheme();
 
+    private static ByteBuffer wrapString(String s) {
+        return ByteBuffer.wrap(s.getBytes(Charset.defaultCharset()));
+    }
+
     @Test
     public void testDeserialize() throws Exception {
         assertEquals(Collections.singletonList("test"), 
scheme.deserialize(wrapString("test")));
@@ -47,16 +45,12 @@ public class StringKeyValueSchemeTest {
     @Test
     public void testDeserializeWithNullKeyAndValue() throws Exception {
         assertEquals(Collections.singletonList("test"),
-            scheme.deserializeKeyAndValue(null, wrapString("test")));
+                     scheme.deserializeKeyAndValue(null, wrapString("test")));
     }
 
     @Test
     public void testDeserializeWithKeyAndValue() throws Exception {
         assertEquals(Collections.singletonList(ImmutableMap.of("key", "test")),
-                scheme.deserializeKeyAndValue(wrapString("key"), 
wrapString("test")));
-    }
-
-    private static ByteBuffer wrapString(String s) {
-        return ByteBuffer.wrap(s.getBytes(Charset.defaultCharset()));
+                     scheme.deserializeKeyAndValue(wrapString("key"), 
wrapString("test")));
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/test/org/apache/storm/kafka/TestStringScheme.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/test/org/apache/storm/kafka/TestStringScheme.java 
b/external/storm-kafka/src/test/org/apache/storm/kafka/TestStringScheme.java
index 23944ab..ac94e9d 100644
--- a/external/storm-kafka/src/test/org/apache/storm/kafka/TestStringScheme.java
+++ b/external/storm-kafka/src/test/org/apache/storm/kafka/TestStringScheme.java
@@ -15,26 +15,26 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.storm.kafka;
 
-import org.junit.Test;
+package org.apache.storm.kafka;
 
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
+import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 
 public class TestStringScheme {
-  @Test
-  public void testDeserializeString() {
-    String s = "foo";
-    byte[] bytes = s.getBytes(StandardCharsets.UTF_8);
-    ByteBuffer direct = ByteBuffer.allocateDirect(bytes.length);
-    direct.put(bytes);
-    direct.flip();
-    String s1 = StringScheme.deserializeString(ByteBuffer.wrap(bytes));
-    String s2 = StringScheme.deserializeString(direct);
-    assertEquals(s, s1);
-    assertEquals(s, s2);
-  }
+    @Test
+    public void testDeserializeString() {
+        String s = "foo";
+        byte[] bytes = s.getBytes(StandardCharsets.UTF_8);
+        ByteBuffer direct = ByteBuffer.allocateDirect(bytes.length);
+        direct.put(bytes);
+        direct.flip();
+        String s1 = StringScheme.deserializeString(ByteBuffer.wrap(bytes));
+        String s2 = StringScheme.deserializeString(direct);
+        assertEquals(s, s1);
+        assertEquals(s, s2);
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/test/org/apache/storm/kafka/TestUtils.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/test/org/apache/storm/kafka/TestUtils.java 
b/external/storm-kafka/src/test/org/apache/storm/kafka/TestUtils.java
index c7ba674..921df3c 100644
--- a/external/storm-kafka/src/test/org/apache/storm/kafka/TestUtils.java
+++ b/external/storm-kafka/src/test/org/apache/storm/kafka/TestUtils.java
@@ -1,32 +1,28 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.kafka;
 
-import org.apache.storm.utils.Utils;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
 import kafka.api.OffsetRequest;
 import kafka.javaapi.consumer.SimpleConsumer;
 import kafka.javaapi.message.ByteBufferMessageSet;
 import kafka.message.Message;
 import kafka.message.MessageAndOffset;
 import org.apache.storm.kafka.trident.GlobalPartitionInformation;
-
-import java.nio.ByteBuffer;
-import java.util.*;
+import org.apache.storm.utils.Utils;
 
 import static org.junit.Assert.assertEquals;
 
@@ -83,7 +79,9 @@ public class TestUtils {
     public static boolean verifyMessage(String key, String message, 
KafkaTestBroker broker, SimpleConsumer simpleConsumer) {
         long lastMessageOffset = KafkaUtils.getOffset(simpleConsumer, 
TestUtils.TOPIC, 0, OffsetRequest.LatestTime()) - 1;
         ByteBufferMessageSet messageAndOffsets = 
KafkaUtils.fetchMessages(TestUtils.getKafkaConfig(broker), simpleConsumer,
-                new 
Partition(Broker.fromString(broker.getBrokerConnectionString()),TestUtils.TOPIC,
 0), lastMessageOffset);
+                                                                          new 
Partition(
+                                                                              
Broker.fromString(broker.getBrokerConnectionString()),
+                                                                              
TestUtils.TOPIC, 0), lastMessageOffset);
         MessageAndOffset messageAndOffset = 
messageAndOffsets.iterator().next();
         Message kafkaMessage = messageAndOffset.message();
         ByteBuffer messageKeyBuffer = kafkaMessage.key();

Reply via email to