http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/test/org/apache/storm/kafka/TridentKafkaTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/test/org/apache/storm/kafka/TridentKafkaTest.java 
b/external/storm-kafka/src/test/org/apache/storm/kafka/TridentKafkaTest.java
index 864eaa9..926b5fe 100644
--- a/external/storm-kafka/src/test/org/apache/storm/kafka/TridentKafkaTest.java
+++ b/external/storm-kafka/src/test/org/apache/storm/kafka/TridentKafkaTest.java
@@ -1,27 +1,20 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.kafka;
 
-import org.apache.storm.tuple.Fields;
+import java.util.ArrayList;
+import java.util.List;
 import kafka.javaapi.consumer.SimpleConsumer;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
 import org.apache.storm.kafka.trident.TridentKafkaState;
 import org.apache.storm.kafka.trident.mapper.FieldNameBasedTupleToKafkaMapper;
 import org.apache.storm.kafka.trident.mapper.TridentTupleToKafkaMapper;
@@ -29,9 +22,10 @@ import 
org.apache.storm.kafka.trident.selector.DefaultTopicSelector;
 import org.apache.storm.kafka.trident.selector.KafkaTopicSelector;
 import org.apache.storm.trident.tuple.TridentTuple;
 import org.apache.storm.trident.tuple.TridentTupleView;
-
-import java.util.ArrayList;
-import java.util.List;
+import org.apache.storm.tuple.Fields;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
 
 public class TridentKafkaTest {
     private KafkaTestBroker broker;
@@ -45,8 +39,8 @@ public class TridentKafkaTest {
         TridentTupleToKafkaMapper<Object, Object> mapper = new 
FieldNameBasedTupleToKafkaMapper<Object, Object>("key", "message");
         KafkaTopicSelector topicSelector = new 
DefaultTopicSelector(TestUtils.TOPIC);
         state = new TridentKafkaState()
-                .withKafkaTopicSelector(topicSelector)
-                .withTridentTupleToKafkaMapper(mapper);
+            .withKafkaTopicSelector(topicSelector)
+            .withTridentTupleToKafkaMapper(mapper);
         
state.prepare(TestUtils.getProducerProperties(broker.getBrokerConnectionString()));
     }
 
@@ -60,14 +54,14 @@ public class TridentKafkaTest {
 
         state.updateState(tridentTuples, null);
 
-        for(int i = 0 ; i < batchSize ; i++) {
+        for (int i = 0; i < batchSize; i++) {
             TestUtils.verifyMessage(keyString, valString, broker, 
simpleConsumer);
         }
     }
 
     private List<TridentTuple> generateTupleBatch(String key, String message, 
int batchsize) {
         List<TridentTuple> batch = new ArrayList<>();
-        for(int i =0 ; i < batchsize; i++) {
+        for (int i = 0; i < batchsize; i++) {
             batch.add(TridentTupleView.createFreshTuple(new Fields("key", 
"message"), key, message));
         }
         return batch;

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java 
b/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java
index 31dfffe..93709cf 100644
--- 
a/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java
+++ 
b/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java
@@ -1,34 +1,38 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.kafka;
 
-import org.apache.storm.Config;
-import org.apache.curator.test.TestingServer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 import kafka.javaapi.consumer.SimpleConsumer;
+import org.apache.curator.test.TestingServer;
+import org.apache.storm.Config;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 
-import java.util.*;
-
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Mockito.when;
@@ -60,7 +64,7 @@ public class ZkCoordinatorTest {
         Map<String, Object> conf = buildZookeeperConfig(server);
         state = new ZkState(conf);
         simpleConsumer = new SimpleConsumer("localhost", broker.getPort(), 
60000, 1024, "testClient");
-        when(dynamicPartitionConnections.register(any(Broker.class), 
any(String.class) ,anyInt())).thenReturn(simpleConsumer);
+        when(dynamicPartitionConnections.register(any(Broker.class), 
any(String.class), anyInt())).thenReturn(simpleConsumer);
     }
 
     private Map<String, Object> buildZookeeperConfig(TestingServer server) {
@@ -128,7 +132,8 @@ public class ZkCoordinatorTest {
         HashMap<Integer, PartitionManager> managersAfterRefresh = new 
HashMap<Integer, PartitionManager>();
         for (List<PartitionManager> partitionManagersAfter : 
partitionManagersAfterRefresh) {
             for (PartitionManager manager : partitionManagersAfter) {
-                assertFalse("Multiple PartitionManagers for same partition", 
managersAfterRefresh.containsKey(manager.getPartition().partition));
+                assertFalse("Multiple PartitionManagers for same partition",
+                            
managersAfterRefresh.containsKey(manager.getPartition().partition));
                 managersAfterRefresh.put(manager.getPartition().partition, 
manager);
             }
         }
@@ -150,7 +155,8 @@ public class ZkCoordinatorTest {
         assertSame(managerBefore._committedTo, managerAfter._committedTo);
     }
 
-    private void assertPartitionsAreDifferent(List<PartitionManager> 
partitionManagersBefore, List<PartitionManager> partitionManagersAfter, int 
partitionsPerTask) {
+    private void assertPartitionsAreDifferent(List<PartitionManager> 
partitionManagersBefore, List<PartitionManager> partitionManagersAfter,
+                                              int partitionsPerTask) {
         assertEquals(partitionsPerTask, partitionManagersBefore.size());
         assertEquals(partitionManagersBefore.size(), 
partitionManagersAfter.size());
         for (int i = 0; i < partitionsPerTask; i++) {
@@ -174,7 +180,8 @@ public class ZkCoordinatorTest {
     private List<ZkCoordinator> buildCoordinators(int totalTasks) {
         List<ZkCoordinator> coordinatorList = new ArrayList<ZkCoordinator>();
         for (int i = 0; i < totalTasks; i++) {
-            ZkCoordinator coordinator = new 
ZkCoordinator(dynamicPartitionConnections, topoConf, spoutConfig, state, i, 
totalTasks, i, "test-id", reader);
+            ZkCoordinator coordinator =
+                new ZkCoordinator(dynamicPartitionConnections, topoConf, 
spoutConfig, state, i, totalTasks, i, "test-id", reader);
             coordinatorList.add(coordinator);
         }
         return coordinatorList;

http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/test/org/apache/storm/kafka/bolt/KafkaBoltTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/test/org/apache/storm/kafka/bolt/KafkaBoltTest.java 
b/external/storm-kafka/src/test/org/apache/storm/kafka/bolt/KafkaBoltTest.java
index ed26157..5c3053c 100644
--- 
a/external/storm-kafka/src/test/org/apache/storm/kafka/bolt/KafkaBoltTest.java
+++ 
b/external/storm-kafka/src/test/org/apache/storm/kafka/bolt/KafkaBoltTest.java
@@ -1,37 +1,26 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.kafka.bolt;
 
-import org.apache.storm.Config;
-import org.apache.storm.Constants;
-import org.apache.storm.task.GeneralTopologyContext;
-import org.apache.storm.task.IOutputCollector;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.TupleImpl;
-import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
-import org.apache.storm.utils.TupleUtils;
 import com.google.common.collect.ImmutableList;
-import kafka.api.OffsetRequest;
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.Future;
 import kafka.api.FetchRequest;
+import kafka.api.OffsetRequest;
 import kafka.javaapi.FetchResponse;
 import kafka.javaapi.OffsetResponse;
 import kafka.javaapi.consumer.SimpleConsumer;
@@ -41,25 +30,44 @@ import kafka.message.MessageAndOffset;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.junit.*;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+import org.apache.storm.kafka.Broker;
+import org.apache.storm.kafka.BrokerHosts;
+import org.apache.storm.kafka.KafkaConfig;
+import org.apache.storm.kafka.KafkaTestBroker;
+import org.apache.storm.kafka.KafkaUtils;
+import org.apache.storm.kafka.Partition;
+import org.apache.storm.kafka.StaticHosts;
+import org.apache.storm.kafka.trident.GlobalPartitionInformation;
+import org.apache.storm.task.GeneralTopologyContext;
+import org.apache.storm.task.IOutputCollector;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.TupleUtils;
+import org.apache.storm.utils.Utils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
-import org.apache.storm.kafka.*;
-import org.apache.storm.kafka.trident.GlobalPartitionInformation;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.HashMap;
-import java.util.Properties;
-import java.util.concurrent.Future;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.*;
-
-import java.lang.reflect.Field;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class KafkaBoltTest {
 
@@ -73,6 +81,29 @@ public class KafkaBoltTest {
     @Mock
     private IOutputCollector collector;
 
+    private static ByteBufferMessageSet mockSingleMessage(byte[] key, byte[] 
message) {
+        ByteBufferMessageSet sets = mock(ByteBufferMessageSet.class);
+        MessageAndOffset msg = mock(MessageAndOffset.class);
+        final List<MessageAndOffset> msgs = ImmutableList.of(msg);
+        doReturn(msgs.iterator()).when(sets).iterator();
+        Message kafkaMessage = mock(Message.class);
+        doReturn(ByteBuffer.wrap(key)).when(kafkaMessage).key();
+        doReturn(ByteBuffer.wrap(message)).when(kafkaMessage).payload();
+        doReturn(kafkaMessage).when(msg).message();
+        return sets;
+    }
+
+    private static SimpleConsumer mockSimpleConsumer(ByteBufferMessageSet 
mockMsg) {
+        SimpleConsumer simpleConsumer = mock(SimpleConsumer.class);
+        FetchResponse resp = mock(FetchResponse.class);
+        doReturn(resp).when(simpleConsumer).fetch(any(FetchRequest.class));
+        OffsetResponse mockOffsetResponse = mock(OffsetResponse.class);
+        doReturn(new long[]{}).when(mockOffsetResponse).offsets(anyString(), 
anyInt());
+        
doReturn(mockOffsetResponse).when(simpleConsumer).getOffsetsBefore(any(kafka.javaapi.OffsetRequest.class));
+        doReturn(mockMsg).when(resp).messageSet(anyString(), anyInt());
+        return simpleConsumer;
+    }
+
     @Before
     public void initMocks() {
         MockitoAnnotations.initMocks(this);
@@ -261,7 +292,6 @@ public class KafkaBoltTest {
         verifyMessage(null, message);
     }
 
-
     @Test
     public void executeWithBrokerDown() throws Exception {
         broker.shutdown();
@@ -274,7 +304,9 @@ public class KafkaBoltTest {
     private boolean verifyMessage(String key, String message) {
         long lastMessageOffset = KafkaUtils.getOffset(simpleConsumer, 
kafkaConfig.topic, 0, OffsetRequest.LatestTime()) - 1;
         ByteBufferMessageSet messageAndOffsets = 
KafkaUtils.fetchMessages(kafkaConfig, simpleConsumer,
-                new 
Partition(Broker.fromString(broker.getBrokerConnectionString()),kafkaConfig.topic,
 0), lastMessageOffset);
+                                                                          new 
Partition(
+                                                                              
Broker.fromString(broker.getBrokerConnectionString()),
+                                                                              
kafkaConfig.topic, 0), lastMessageOffset);
         MessageAndOffset messageAndOffset = 
messageAndOffsets.iterator().next();
         Message kafkaMessage = messageAndOffset.message();
         ByteBuffer messageKeyBuffer = kafkaMessage.key();
@@ -290,23 +322,25 @@ public class KafkaBoltTest {
 
     private Tuple generateTestTuple(Object key, Object message) {
         TopologyBuilder builder = new TopologyBuilder();
-        GeneralTopologyContext topologyContext = new 
GeneralTopologyContext(builder.createTopology(), new Config(), new HashMap<>(), 
new HashMap<>(), new HashMap<>(), "") {
-            @Override
-            public Fields getComponentOutputFields(String componentId, String 
streamId) {
-                return new Fields("key", "message");
-            }
-        };
+        GeneralTopologyContext topologyContext =
+            new GeneralTopologyContext(builder.createTopology(), new Config(), 
new HashMap<>(), new HashMap<>(), new HashMap<>(), "") {
+                @Override
+                public Fields getComponentOutputFields(String componentId, 
String streamId) {
+                    return new Fields("key", "message");
+                }
+            };
         return new TupleImpl(topologyContext, new Values(key, message), 
topologyContext.getComponentId(1), 1, "");
     }
 
     private Tuple generateTestTuple(Object message) {
         TopologyBuilder builder = new TopologyBuilder();
-        GeneralTopologyContext topologyContext = new 
GeneralTopologyContext(builder.createTopology(), new Config(), new HashMap<>(), 
new HashMap<>(), new HashMap<>(), "") {
-            @Override
-            public Fields getComponentOutputFields(String componentId, String 
streamId) {
-                return new Fields("message");
-            }
-        };
+        GeneralTopologyContext topologyContext =
+            new GeneralTopologyContext(builder.createTopology(), new Config(), 
new HashMap<>(), new HashMap<>(), new HashMap<>(), "") {
+                @Override
+                public Fields getComponentOutputFields(String componentId, 
String streamId) {
+                    return new Fields("message");
+                }
+            };
         return new TupleImpl(topologyContext, new Values(message), 
topologyContext.getComponentId(1), 1, "");
     }
 
@@ -318,27 +352,4 @@ public class KafkaBoltTest {
         assertTrue(TupleUtils.isTick(tuple));
         return tuple;
     }
-
-    private static ByteBufferMessageSet mockSingleMessage(byte[] key, byte[] 
message) {
-        ByteBufferMessageSet sets = mock(ByteBufferMessageSet.class);
-        MessageAndOffset msg = mock(MessageAndOffset.class);
-        final List<MessageAndOffset> msgs = ImmutableList.of(msg);
-        doReturn(msgs.iterator()).when(sets).iterator();
-        Message kafkaMessage = mock(Message.class);
-        doReturn(ByteBuffer.wrap(key)).when(kafkaMessage).key();
-        doReturn(ByteBuffer.wrap(message)).when(kafkaMessage).payload();
-        doReturn(kafkaMessage).when(msg).message();
-        return sets;
-    }
-
-    private static SimpleConsumer mockSimpleConsumer(ByteBufferMessageSet 
mockMsg) {
-        SimpleConsumer simpleConsumer = mock(SimpleConsumer.class);
-        FetchResponse resp = mock(FetchResponse.class);
-        doReturn(resp).when(simpleConsumer).fetch(any(FetchRequest.class));
-        OffsetResponse mockOffsetResponse = mock(OffsetResponse.class);
-        doReturn(new long[] {}).when(mockOffsetResponse).offsets(anyString(), 
anyInt());
-        
doReturn(mockOffsetResponse).when(simpleConsumer).getOffsetsBefore(any(kafka.javaapi.OffsetRequest.class));
-        doReturn(mockMsg).when(resp).messageSet(anyString(), anyInt());
-        return simpleConsumer;
-    }
 }

Reply via email to