http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/test/org/apache/storm/kafka/TridentKafkaTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/TridentKafkaTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/TridentKafkaTest.java index 864eaa9..926b5fe 100644 --- a/external/storm-kafka/src/test/org/apache/storm/kafka/TridentKafkaTest.java +++ b/external/storm-kafka/src/test/org/apache/storm/kafka/TridentKafkaTest.java @@ -1,27 +1,20 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.kafka; -import org.apache.storm.tuple.Fields; +import java.util.ArrayList; +import java.util.List; import kafka.javaapi.consumer.SimpleConsumer; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; import org.apache.storm.kafka.trident.TridentKafkaState; import org.apache.storm.kafka.trident.mapper.FieldNameBasedTupleToKafkaMapper; import org.apache.storm.kafka.trident.mapper.TridentTupleToKafkaMapper; @@ -29,9 +22,10 @@ import org.apache.storm.kafka.trident.selector.DefaultTopicSelector; import org.apache.storm.kafka.trident.selector.KafkaTopicSelector; import org.apache.storm.trident.tuple.TridentTuple; import org.apache.storm.trident.tuple.TridentTupleView; - -import java.util.ArrayList; -import java.util.List; +import org.apache.storm.tuple.Fields; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; public class TridentKafkaTest { private KafkaTestBroker broker; @@ -45,8 +39,8 @@ public class TridentKafkaTest { TridentTupleToKafkaMapper<Object, Object> mapper = new FieldNameBasedTupleToKafkaMapper<Object, Object>("key", "message"); KafkaTopicSelector topicSelector = new DefaultTopicSelector(TestUtils.TOPIC); state = new TridentKafkaState() - .withKafkaTopicSelector(topicSelector) - .withTridentTupleToKafkaMapper(mapper); + .withKafkaTopicSelector(topicSelector) + .withTridentTupleToKafkaMapper(mapper); state.prepare(TestUtils.getProducerProperties(broker.getBrokerConnectionString())); } @@ -60,14 +54,14 @@ public class TridentKafkaTest { state.updateState(tridentTuples, null); - for(int i = 0 ; i < batchSize ; i++) { + for (int i = 0; i < batchSize; i++) { TestUtils.verifyMessage(keyString, valString, broker, simpleConsumer); } } private List<TridentTuple> generateTupleBatch(String key, String message, int batchsize) { List<TridentTuple> batch = new ArrayList<>(); - for(int i =0 ; i < batchsize; i++) { + for (int i = 0; i < batchsize; i++) { batch.add(TridentTupleView.createFreshTuple(new Fields("key", "message"), key, message)); } return batch;
http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java index 31dfffe..93709cf 100644 --- a/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java +++ b/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java @@ -1,34 +1,38 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.kafka; -import org.apache.storm.Config; -import org.apache.curator.test.TestingServer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; import kafka.javaapi.consumer.SimpleConsumer; +import org.apache.curator.test.TestingServer; +import org.apache.storm.Config; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; -import java.util.*; - -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; import static org.mockito.Mockito.when; @@ -60,7 +64,7 @@ public class ZkCoordinatorTest { Map<String, Object> conf = buildZookeeperConfig(server); state = new ZkState(conf); simpleConsumer = new SimpleConsumer("localhost", broker.getPort(), 60000, 1024, "testClient"); - when(dynamicPartitionConnections.register(any(Broker.class), any(String.class) ,anyInt())).thenReturn(simpleConsumer); + when(dynamicPartitionConnections.register(any(Broker.class), any(String.class), anyInt())).thenReturn(simpleConsumer); } private Map<String, Object> buildZookeeperConfig(TestingServer server) { @@ -128,7 +132,8 @@ public class ZkCoordinatorTest { HashMap<Integer, PartitionManager> managersAfterRefresh = new HashMap<Integer, PartitionManager>(); for (List<PartitionManager> partitionManagersAfter : partitionManagersAfterRefresh) { for (PartitionManager manager : partitionManagersAfter) { - assertFalse("Multiple PartitionManagers for same partition", managersAfterRefresh.containsKey(manager.getPartition().partition)); + assertFalse("Multiple PartitionManagers for same partition", + managersAfterRefresh.containsKey(manager.getPartition().partition)); managersAfterRefresh.put(manager.getPartition().partition, manager); } } @@ -150,7 +155,8 @@ public class ZkCoordinatorTest { assertSame(managerBefore._committedTo, managerAfter._committedTo); } - private void assertPartitionsAreDifferent(List<PartitionManager> partitionManagersBefore, List<PartitionManager> partitionManagersAfter, int partitionsPerTask) { + private void assertPartitionsAreDifferent(List<PartitionManager> partitionManagersBefore, List<PartitionManager> partitionManagersAfter, + int partitionsPerTask) { assertEquals(partitionsPerTask, partitionManagersBefore.size()); assertEquals(partitionManagersBefore.size(), partitionManagersAfter.size()); for (int i = 0; i < partitionsPerTask; i++) { @@ -174,7 +180,8 @@ public class ZkCoordinatorTest { private List<ZkCoordinator> buildCoordinators(int totalTasks) { List<ZkCoordinator> coordinatorList = new ArrayList<ZkCoordinator>(); for (int i = 0; i < totalTasks; i++) { - ZkCoordinator coordinator = new ZkCoordinator(dynamicPartitionConnections, topoConf, spoutConfig, state, i, totalTasks, i, "test-id", reader); + ZkCoordinator coordinator = + new ZkCoordinator(dynamicPartitionConnections, topoConf, spoutConfig, state, i, totalTasks, i, "test-id", reader); coordinatorList.add(coordinator); } return coordinatorList; http://git-wip-us.apache.org/repos/asf/storm/blob/4fe4f04b/external/storm-kafka/src/test/org/apache/storm/kafka/bolt/KafkaBoltTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/bolt/KafkaBoltTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/bolt/KafkaBoltTest.java index ed26157..5c3053c 100644 --- a/external/storm-kafka/src/test/org/apache/storm/kafka/bolt/KafkaBoltTest.java +++ b/external/storm-kafka/src/test/org/apache/storm/kafka/bolt/KafkaBoltTest.java @@ -1,37 +1,26 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.kafka.bolt; -import org.apache.storm.Config; -import org.apache.storm.Constants; -import org.apache.storm.task.GeneralTopologyContext; -import org.apache.storm.task.IOutputCollector; -import org.apache.storm.task.OutputCollector; -import org.apache.storm.topology.TopologyBuilder; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.Tuple; -import org.apache.storm.tuple.TupleImpl; -import org.apache.storm.tuple.Values; -import org.apache.storm.utils.Utils; -import org.apache.storm.utils.TupleUtils; import com.google.common.collect.ImmutableList; -import kafka.api.OffsetRequest; +import java.lang.reflect.Field; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.Future; import kafka.api.FetchRequest; +import kafka.api.OffsetRequest; import kafka.javaapi.FetchResponse; import kafka.javaapi.OffsetResponse; import kafka.javaapi.consumer.SimpleConsumer; @@ -41,25 +30,44 @@ import kafka.message.MessageAndOffset; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; -import org.junit.*; +import org.apache.storm.Config; +import org.apache.storm.Constants; +import org.apache.storm.kafka.Broker; +import org.apache.storm.kafka.BrokerHosts; +import org.apache.storm.kafka.KafkaConfig; +import org.apache.storm.kafka.KafkaTestBroker; +import org.apache.storm.kafka.KafkaUtils; +import org.apache.storm.kafka.Partition; +import org.apache.storm.kafka.StaticHosts; +import org.apache.storm.kafka.trident.GlobalPartitionInformation; +import org.apache.storm.task.GeneralTopologyContext; +import org.apache.storm.task.IOutputCollector; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.TupleImpl; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.TupleUtils; +import org.apache.storm.utils.Utils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import org.apache.storm.kafka.*; -import org.apache.storm.kafka.trident.GlobalPartitionInformation; - -import java.nio.ByteBuffer; -import java.util.List; -import java.util.HashMap; -import java.util.Properties; -import java.util.concurrent.Future; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.*; - -import java.lang.reflect.Field; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyInt; +import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class KafkaBoltTest { @@ -73,6 +81,29 @@ public class KafkaBoltTest { @Mock private IOutputCollector collector; + private static ByteBufferMessageSet mockSingleMessage(byte[] key, byte[] message) { + ByteBufferMessageSet sets = mock(ByteBufferMessageSet.class); + MessageAndOffset msg = mock(MessageAndOffset.class); + final List<MessageAndOffset> msgs = ImmutableList.of(msg); + doReturn(msgs.iterator()).when(sets).iterator(); + Message kafkaMessage = mock(Message.class); + doReturn(ByteBuffer.wrap(key)).when(kafkaMessage).key(); + doReturn(ByteBuffer.wrap(message)).when(kafkaMessage).payload(); + doReturn(kafkaMessage).when(msg).message(); + return sets; + } + + private static SimpleConsumer mockSimpleConsumer(ByteBufferMessageSet mockMsg) { + SimpleConsumer simpleConsumer = mock(SimpleConsumer.class); + FetchResponse resp = mock(FetchResponse.class); + doReturn(resp).when(simpleConsumer).fetch(any(FetchRequest.class)); + OffsetResponse mockOffsetResponse = mock(OffsetResponse.class); + doReturn(new long[]{}).when(mockOffsetResponse).offsets(anyString(), anyInt()); + doReturn(mockOffsetResponse).when(simpleConsumer).getOffsetsBefore(any(kafka.javaapi.OffsetRequest.class)); + doReturn(mockMsg).when(resp).messageSet(anyString(), anyInt()); + return simpleConsumer; + } + @Before public void initMocks() { MockitoAnnotations.initMocks(this); @@ -261,7 +292,6 @@ public class KafkaBoltTest { verifyMessage(null, message); } - @Test public void executeWithBrokerDown() throws Exception { broker.shutdown(); @@ -274,7 +304,9 @@ public class KafkaBoltTest { private boolean verifyMessage(String key, String message) { long lastMessageOffset = KafkaUtils.getOffset(simpleConsumer, kafkaConfig.topic, 0, OffsetRequest.LatestTime()) - 1; ByteBufferMessageSet messageAndOffsets = KafkaUtils.fetchMessages(kafkaConfig, simpleConsumer, - new Partition(Broker.fromString(broker.getBrokerConnectionString()),kafkaConfig.topic, 0), lastMessageOffset); + new Partition( + Broker.fromString(broker.getBrokerConnectionString()), + kafkaConfig.topic, 0), lastMessageOffset); MessageAndOffset messageAndOffset = messageAndOffsets.iterator().next(); Message kafkaMessage = messageAndOffset.message(); ByteBuffer messageKeyBuffer = kafkaMessage.key(); @@ -290,23 +322,25 @@ public class KafkaBoltTest { private Tuple generateTestTuple(Object key, Object message) { TopologyBuilder builder = new TopologyBuilder(); - GeneralTopologyContext topologyContext = new GeneralTopologyContext(builder.createTopology(), new Config(), new HashMap<>(), new HashMap<>(), new HashMap<>(), "") { - @Override - public Fields getComponentOutputFields(String componentId, String streamId) { - return new Fields("key", "message"); - } - }; + GeneralTopologyContext topologyContext = + new GeneralTopologyContext(builder.createTopology(), new Config(), new HashMap<>(), new HashMap<>(), new HashMap<>(), "") { + @Override + public Fields getComponentOutputFields(String componentId, String streamId) { + return new Fields("key", "message"); + } + }; return new TupleImpl(topologyContext, new Values(key, message), topologyContext.getComponentId(1), 1, ""); } private Tuple generateTestTuple(Object message) { TopologyBuilder builder = new TopologyBuilder(); - GeneralTopologyContext topologyContext = new GeneralTopologyContext(builder.createTopology(), new Config(), new HashMap<>(), new HashMap<>(), new HashMap<>(), "") { - @Override - public Fields getComponentOutputFields(String componentId, String streamId) { - return new Fields("message"); - } - }; + GeneralTopologyContext topologyContext = + new GeneralTopologyContext(builder.createTopology(), new Config(), new HashMap<>(), new HashMap<>(), new HashMap<>(), "") { + @Override + public Fields getComponentOutputFields(String componentId, String streamId) { + return new Fields("message"); + } + }; return new TupleImpl(topologyContext, new Values(message), topologyContext.getComponentId(1), 1, ""); } @@ -318,27 +352,4 @@ public class KafkaBoltTest { assertTrue(TupleUtils.isTick(tuple)); return tuple; } - - private static ByteBufferMessageSet mockSingleMessage(byte[] key, byte[] message) { - ByteBufferMessageSet sets = mock(ByteBufferMessageSet.class); - MessageAndOffset msg = mock(MessageAndOffset.class); - final List<MessageAndOffset> msgs = ImmutableList.of(msg); - doReturn(msgs.iterator()).when(sets).iterator(); - Message kafkaMessage = mock(Message.class); - doReturn(ByteBuffer.wrap(key)).when(kafkaMessage).key(); - doReturn(ByteBuffer.wrap(message)).when(kafkaMessage).payload(); - doReturn(kafkaMessage).when(msg).message(); - return sets; - } - - private static SimpleConsumer mockSimpleConsumer(ByteBufferMessageSet mockMsg) { - SimpleConsumer simpleConsumer = mock(SimpleConsumer.class); - FetchResponse resp = mock(FetchResponse.class); - doReturn(resp).when(simpleConsumer).fetch(any(FetchRequest.class)); - OffsetResponse mockOffsetResponse = mock(OffsetResponse.class); - doReturn(new long[] {}).when(mockOffsetResponse).offsets(anyString(), anyInt()); - doReturn(mockOffsetResponse).when(simpleConsumer).getOffsetsBefore(any(kafka.javaapi.OffsetRequest.class)); - doReturn(mockMsg).when(resp).messageSet(anyString(), anyInt()); - return simpleConsumer; - } }
