http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java b/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java deleted file mode 100644 index eb694bb..0000000 --- a/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java +++ /dev/null @@ -1,295 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka; - -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; - -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; - -import kafka.api.OffsetRequest; -import kafka.javaapi.consumer.SimpleConsumer; -import kafka.javaapi.message.ByteBufferMessageSet; -import kafka.message.MessageAndOffset; - -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.mockito.Mockito; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import storm.kafka.trident.GlobalPartitionInformation; -import backtype.storm.spout.SchemeAsMultiScheme; -import backtype.storm.utils.Utils; - -import com.google.common.collect.ImmutableMap; -public class KafkaUtilsTest { - private String TEST_TOPIC = "testTopic"; - private static final Logger LOG = LoggerFactory.getLogger(KafkaUtilsTest.class); - private KafkaTestBroker broker; - private SimpleConsumer simpleConsumer; - private KafkaConfig config; - private BrokerHosts brokerHosts; - - @Before - public void setup() { - broker = new KafkaTestBroker(); - GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation(TEST_TOPIC); - globalPartitionInformation.addPartition(0, Broker.fromString(broker.getBrokerConnectionString())); - brokerHosts = new StaticHosts(globalPartitionInformation); - config = new KafkaConfig(brokerHosts, TEST_TOPIC); - simpleConsumer = new SimpleConsumer("localhost", broker.getPort(), 60000, 1024, "testClient"); - } - - @After - public void shutdown() { - simpleConsumer.close(); - broker.shutdown(); - } - - - @Test(expected = FailedFetchException.class) - public void topicDoesNotExist() throws Exception { - KafkaUtils.fetchMessages(config, simpleConsumer, new Partition(Broker.fromString(broker.getBrokerConnectionString()), TEST_TOPIC, 0), 0); - } - - @Test(expected = FailedFetchException.class) - public void brokerIsDown() throws Exception { - int port = broker.getPort(); - broker.shutdown(); - SimpleConsumer simpleConsumer = new SimpleConsumer("localhost", port, 100, 1024, "testClient"); - try { - KafkaUtils.fetchMessages(config, simpleConsumer, new Partition(Broker.fromString(broker.getBrokerConnectionString()), TEST_TOPIC, 0), OffsetRequest.LatestTime()); - } finally { - simpleConsumer.close(); - } - } - - @Test - public void fetchMessage() throws Exception { - String value = "test"; - createTopicAndSendMessage(value); - long offset = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, OffsetRequest.LatestTime()) - 1; - ByteBufferMessageSet messageAndOffsets = KafkaUtils.fetchMessages(config, simpleConsumer, - new Partition(Broker.fromString(broker.getBrokerConnectionString()), TEST_TOPIC, 0), offset); - String message = new String(Utils.toByteArray(messageAndOffsets.iterator().next().message().payload())); - assertThat(message, is(equalTo(value))); - } - - @Test(expected = FailedFetchException.class) - public void fetchMessagesWithInvalidOffsetAndDefaultHandlingDisabled() throws Exception { - config.useStartOffsetTimeIfOffsetOutOfRange = false; - KafkaUtils.fetchMessages(config, simpleConsumer, - new Partition(Broker.fromString(broker.getBrokerConnectionString()), TEST_TOPIC, 0), -99); - } - - @Test(expected = TopicOffsetOutOfRangeException.class) - public void fetchMessagesWithInvalidOffsetAndDefaultHandlingEnabled() throws Exception { - config = new KafkaConfig(brokerHosts, "newTopic"); - String value = "test"; - createTopicAndSendMessage(value); - KafkaUtils.fetchMessages(config, simpleConsumer, - new Partition(Broker.fromString(broker.getBrokerConnectionString()), "newTopic", 0), -99); - } - - @Test - public void getOffsetFromConfigAndDontForceFromStart() { - config.ignoreZkOffsets = false; - config.startOffsetTime = OffsetRequest.EarliestTime(); - createTopicAndSendMessage(); - long latestOffset = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, OffsetRequest.EarliestTime()); - long offsetFromConfig = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, config); - assertThat(latestOffset, is(equalTo(offsetFromConfig))); - } - - @Test - public void getOffsetFromConfigAndFroceFromStart() { - config.ignoreZkOffsets = true; - config.startOffsetTime = OffsetRequest.EarliestTime(); - createTopicAndSendMessage(); - long earliestOffset = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, OffsetRequest.EarliestTime()); - long offsetFromConfig = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, config); - assertThat(earliestOffset, is(equalTo(offsetFromConfig))); - } - - @Test - public void generateTuplesWithoutKeyAndKeyValueScheme() { - config.scheme = new KeyValueSchemeAsMultiScheme(new StringKeyValueScheme()); - runGetValueOnlyTuplesTest(); - } - - @Test - public void generateTuplesWithKeyAndKeyValueScheme() { - config.scheme = new KeyValueSchemeAsMultiScheme(new StringKeyValueScheme()); - config.useStartOffsetTimeIfOffsetOutOfRange = false; - String value = "value"; - String key = "key"; - createTopicAndSendMessage(key, value); - ByteBufferMessageSet messageAndOffsets = getLastMessage(); - for (MessageAndOffset msg : messageAndOffsets) { - Iterable<List<Object>> lists = KafkaUtils.generateTuples(config, msg.message(), config.topic); - assertEquals(ImmutableMap.of(key, value), lists.iterator().next().get(0)); - } - } - - @Test - public void generateTupelsWithValueScheme() { - config.scheme = new SchemeAsMultiScheme(new StringScheme()); - runGetValueOnlyTuplesTest(); - } - - @Test - public void generateTuplesWithValueAndStringMultiSchemeWithTopic() { - config.scheme = new StringMultiSchemeWithTopic(); - String value = "value"; - createTopicAndSendMessage(value); - ByteBufferMessageSet messageAndOffsets = getLastMessage(); - for (MessageAndOffset msg : messageAndOffsets) { - Iterable<List<Object>> lists = KafkaUtils.generateTuples(config, msg.message(), config.topic); - List<Object> list = lists.iterator().next(); - assertEquals(value, list.get(0)); - assertEquals(config.topic, list.get(1)); - } - } - - @Test - public void generateTuplesWithValueSchemeAndKeyValueMessage() { - config.scheme = new SchemeAsMultiScheme(new StringScheme()); - String value = "value"; - String key = "key"; - createTopicAndSendMessage(key, value); - ByteBufferMessageSet messageAndOffsets = getLastMessage(); - for (MessageAndOffset msg : messageAndOffsets) { - Iterable<List<Object>> lists = KafkaUtils.generateTuples(config, msg.message(), config.topic); - assertEquals(value, lists.iterator().next().get(0)); - } - } - - @Test - public void generateTuplesWithMessageAndMetadataScheme() { - String value = "value"; - Partition mockPartition = Mockito.mock(Partition.class); - mockPartition.partition = 0; - long offset = 0L; - - MessageMetadataSchemeAsMultiScheme scheme = new MessageMetadataSchemeAsMultiScheme(new StringMessageAndMetadataScheme()); - - createTopicAndSendMessage(null, value); - ByteBufferMessageSet messageAndOffsets = getLastMessage(); - for (MessageAndOffset msg : messageAndOffsets) { - Iterable<List<Object>> lists = KafkaUtils.generateTuples(scheme, msg.message(), mockPartition, offset); - List<Object> values = lists.iterator().next(); - assertEquals("Message is incorrect", value, values.get(0)); - assertEquals("Partition is incorrect", mockPartition.partition, values.get(1)); - assertEquals("Offset is incorrect", offset, values.get(2)); - } - } - - private ByteBufferMessageSet getLastMessage() { - long offsetOfLastMessage = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, OffsetRequest.LatestTime()) - 1; - return KafkaUtils.fetchMessages(config, simpleConsumer, new Partition(Broker.fromString(broker.getBrokerConnectionString()), TEST_TOPIC, 0), offsetOfLastMessage); - } - - private void runGetValueOnlyTuplesTest() { - String value = "value"; - - createTopicAndSendMessage(null, value); - ByteBufferMessageSet messageAndOffsets = getLastMessage(); - for (MessageAndOffset msg : messageAndOffsets) { - Iterable<List<Object>> lists = KafkaUtils.generateTuples(config, msg.message(), config.topic); - assertEquals(value, lists.iterator().next().get(0)); - } - } - - private void createTopicAndSendMessage() { - createTopicAndSendMessage(null, "someValue"); - } - - private void createTopicAndSendMessage(String value) { - createTopicAndSendMessage(null, value); - } - - private void createTopicAndSendMessage(String key, String value) { - Properties p = new Properties(); - p.put("acks", "1"); - p.put("bootstrap.servers", broker.getBrokerConnectionString()); - p.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - p.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - p.put("metadata.fetch.timeout.ms", 1000); - KafkaProducer<String, String> producer = new KafkaProducer<String, String>(p); - try { - producer.send(new ProducerRecord<String, String>(config.topic, key, value)).get(); - } catch (Exception e) { - Assert.fail(e.getMessage()); - LOG.error("Failed to do synchronous sending due to " + e, e); - } finally { - producer.close(); - } - } - - @Test - public void assignOnePartitionPerTask() { - runPartitionToTaskMappingTest(16, 1); - } - - @Test - public void assignTwoPartitionsPerTask() { - runPartitionToTaskMappingTest(16, 2); - } - - @Test - public void assignAllPartitionsToOneTask() { - runPartitionToTaskMappingTest(32, 32); - } - - public void runPartitionToTaskMappingTest(int numPartitions, int partitionsPerTask) { - GlobalPartitionInformation globalPartitionInformation = TestUtils.buildPartitionInfo(numPartitions); - List<GlobalPartitionInformation> partitions = new ArrayList<GlobalPartitionInformation>(); - partitions.add(globalPartitionInformation); - int numTasks = numPartitions / partitionsPerTask; - for (int i = 0 ; i < numTasks ; i++) { - assertEquals(partitionsPerTask, KafkaUtils.calculatePartitionsForTask(partitions, numTasks, i).size()); - } - } - - @Test - public void moreTasksThanPartitions() { - GlobalPartitionInformation globalPartitionInformation = TestUtils.buildPartitionInfo(1); - List<GlobalPartitionInformation> partitions = new ArrayList<GlobalPartitionInformation>(); - partitions.add(globalPartitionInformation); - int numTasks = 2; - assertEquals(1, KafkaUtils.calculatePartitionsForTask(partitions, numTasks, 0).size()); - assertEquals(0, KafkaUtils.calculatePartitionsForTask(partitions, numTasks, 1).size()); - } - - @Test (expected = IllegalArgumentException.class ) - public void assignInvalidTask() { - GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation(TEST_TOPIC); - List<GlobalPartitionInformation> partitions = new ArrayList<GlobalPartitionInformation>(); - partitions.add(globalPartitionInformation); - KafkaUtils.calculatePartitionsForTask(partitions, 1, 1); - } -}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/test/storm/kafka/StringKeyValueSchemeTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/test/storm/kafka/StringKeyValueSchemeTest.java b/external/storm-kafka/src/test/storm/kafka/StringKeyValueSchemeTest.java deleted file mode 100644 index eddb900..0000000 --- a/external/storm-kafka/src/test/storm/kafka/StringKeyValueSchemeTest.java +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka; - -import backtype.storm.tuple.Fields; -import com.google.common.collect.ImmutableMap; -import org.junit.Test; - -import java.nio.ByteBuffer; -import java.nio.charset.Charset; -import java.util.Collections; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -public class StringKeyValueSchemeTest { - - private StringKeyValueScheme scheme = new StringKeyValueScheme(); - - @Test - public void testDeserialize() throws Exception { - assertEquals(Collections.singletonList("test"), scheme.deserialize(wrapString("test"))); - } - - @Test - public void testGetOutputFields() throws Exception { - Fields outputFields = scheme.getOutputFields(); - assertTrue(outputFields.contains(StringScheme.STRING_SCHEME_KEY)); - assertEquals(1, outputFields.size()); - } - - @Test - public void testDeserializeWithNullKeyAndValue() throws Exception { - assertEquals(Collections.singletonList("test"), - scheme.deserializeKeyAndValue(null, wrapString("test"))); - } - - @Test - public void testDeserializeWithKeyAndValue() throws Exception { - assertEquals(Collections.singletonList(ImmutableMap.of("key", "test")), - scheme.deserializeKeyAndValue(wrapString("key"), wrapString("test"))); - } - - private static ByteBuffer wrapString(String s) { - return ByteBuffer.wrap(s.getBytes(Charset.defaultCharset())); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/test/storm/kafka/TestStringScheme.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/test/storm/kafka/TestStringScheme.java b/external/storm-kafka/src/test/storm/kafka/TestStringScheme.java deleted file mode 100644 index ae36409..0000000 --- a/external/storm-kafka/src/test/storm/kafka/TestStringScheme.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka; - -import org.junit.Test; - -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; - -import static org.junit.Assert.assertEquals; - -public class TestStringScheme { - @Test - public void testDeserializeString() { - String s = "foo"; - byte[] bytes = s.getBytes(StandardCharsets.UTF_8); - ByteBuffer direct = ByteBuffer.allocateDirect(bytes.length); - direct.put(bytes); - direct.flip(); - String s1 = StringScheme.deserializeString(ByteBuffer.wrap(bytes)); - String s2 = StringScheme.deserializeString(direct); - assertEquals(s, s1); - assertEquals(s, s2); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/test/storm/kafka/TestUtils.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/test/storm/kafka/TestUtils.java b/external/storm-kafka/src/test/storm/kafka/TestUtils.java deleted file mode 100644 index 3e69160..0000000 --- a/external/storm-kafka/src/test/storm/kafka/TestUtils.java +++ /dev/null @@ -1,101 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka; - -import backtype.storm.Config; -import backtype.storm.utils.Utils; -import kafka.api.OffsetRequest; -import kafka.javaapi.consumer.SimpleConsumer; -import kafka.javaapi.message.ByteBufferMessageSet; -import kafka.message.Message; -import kafka.message.MessageAndOffset; -import storm.kafka.bolt.KafkaBolt; -import storm.kafka.trident.GlobalPartitionInformation; - -import java.nio.ByteBuffer; -import java.util.*; - -import static org.junit.Assert.assertEquals; - -public class TestUtils { - - public static final String TOPIC = "test"; - - public static GlobalPartitionInformation buildPartitionInfo(int numPartitions) { - return buildPartitionInfo(numPartitions, 9092); - } - - public static List<GlobalPartitionInformation> buildPartitionInfoList(GlobalPartitionInformation partitionInformation) { - List<GlobalPartitionInformation> map = new ArrayList<GlobalPartitionInformation>(); - map.add(partitionInformation); - return map; - } - - public static GlobalPartitionInformation buildPartitionInfo(int numPartitions, int brokerPort) { - GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation(TOPIC); - for (int i = 0; i < numPartitions; i++) { - globalPartitionInformation.addPartition(i, Broker.fromString("broker-" + i + " :" + brokerPort)); - } - return globalPartitionInformation; - } - - public static SimpleConsumer getKafkaConsumer(KafkaTestBroker broker) { - BrokerHosts brokerHosts = getBrokerHosts(broker); - KafkaConfig kafkaConfig = new KafkaConfig(brokerHosts, TOPIC); - SimpleConsumer simpleConsumer = new SimpleConsumer("localhost", broker.getPort(), 60000, 1024, "testClient"); - return simpleConsumer; - } - - public static KafkaConfig getKafkaConfig(KafkaTestBroker broker) { - BrokerHosts brokerHosts = getBrokerHosts(broker); - KafkaConfig kafkaConfig = new KafkaConfig(brokerHosts, TOPIC); - return kafkaConfig; - } - - private static BrokerHosts getBrokerHosts(KafkaTestBroker broker) { - GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation(TOPIC); - globalPartitionInformation.addPartition(0, Broker.fromString(broker.getBrokerConnectionString())); - return new StaticHosts(globalPartitionInformation); - } - - public static Properties getProducerProperties(String brokerConnectionString) { - Properties props = new Properties(); - props.put("bootstrap.servers", brokerConnectionString); - props.put("acks", "1"); - props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - return props; - } - - public static boolean verifyMessage(String key, String message, KafkaTestBroker broker, SimpleConsumer simpleConsumer) { - long lastMessageOffset = KafkaUtils.getOffset(simpleConsumer, TestUtils.TOPIC, 0, OffsetRequest.LatestTime()) - 1; - ByteBufferMessageSet messageAndOffsets = KafkaUtils.fetchMessages(TestUtils.getKafkaConfig(broker), simpleConsumer, - new Partition(Broker.fromString(broker.getBrokerConnectionString()),TestUtils.TOPIC, 0), lastMessageOffset); - MessageAndOffset messageAndOffset = messageAndOffsets.iterator().next(); - Message kafkaMessage = messageAndOffset.message(); - ByteBuffer messageKeyBuffer = kafkaMessage.key(); - String keyString = null; - String messageString = new String(Utils.toByteArray(kafkaMessage.payload())); - if (messageKeyBuffer != null) { - keyString = new String(Utils.toByteArray(messageKeyBuffer)); - } - assertEquals(key, keyString); - assertEquals(message, messageString); - return true; - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/test/storm/kafka/TridentKafkaTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/test/storm/kafka/TridentKafkaTest.java b/external/storm-kafka/src/test/storm/kafka/TridentKafkaTest.java deleted file mode 100644 index 8213b07..0000000 --- a/external/storm-kafka/src/test/storm/kafka/TridentKafkaTest.java +++ /dev/null @@ -1,81 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka; - -import backtype.storm.tuple.Fields; -import kafka.javaapi.consumer.SimpleConsumer; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import storm.kafka.trident.TridentKafkaState; -import storm.kafka.trident.mapper.FieldNameBasedTupleToKafkaMapper; -import storm.kafka.trident.mapper.TridentTupleToKafkaMapper; -import storm.kafka.trident.selector.DefaultTopicSelector; -import storm.kafka.trident.selector.KafkaTopicSelector; -import storm.trident.tuple.TridentTuple; -import storm.trident.tuple.TridentTupleView; - -import java.util.ArrayList; -import java.util.List; - -public class TridentKafkaTest { - private KafkaTestBroker broker; - private TridentKafkaState state; - private SimpleConsumer simpleConsumer; - - @Before - public void setup() { - broker = new KafkaTestBroker(); - simpleConsumer = TestUtils.getKafkaConsumer(broker); - TridentTupleToKafkaMapper mapper = new FieldNameBasedTupleToKafkaMapper("key", "message"); - KafkaTopicSelector topicSelector = new DefaultTopicSelector(TestUtils.TOPIC); - state = new TridentKafkaState() - .withKafkaTopicSelector(topicSelector) - .withTridentTupleToKafkaMapper(mapper); - state.prepare(TestUtils.getProducerProperties(broker.getBrokerConnectionString())); - } - - @Test - public void testKeyValue() { - String keyString = "key-123"; - String valString = "message-123"; - int batchSize = 10; - - List<TridentTuple> tridentTuples = generateTupleBatch(keyString, valString, batchSize); - - state.updateState(tridentTuples, null); - - for(int i = 0 ; i < batchSize ; i++) { - TestUtils.verifyMessage(keyString, valString, broker, simpleConsumer); - } - } - - private List<TridentTuple> generateTupleBatch(String key, String message, int batchsize) { - List<TridentTuple> batch = new ArrayList<>(); - for(int i =0 ; i < batchsize; i++) { - batch.add(TridentTupleView.createFreshTuple(new Fields("key", "message"), key, message)); - } - return batch; - } - - @After - public void shutdown() { - simpleConsumer.close(); - broker.shutdown(); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/test/storm/kafka/TridentKafkaTopology.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/test/storm/kafka/TridentKafkaTopology.java b/external/storm-kafka/src/test/storm/kafka/TridentKafkaTopology.java deleted file mode 100644 index b9e25e4..0000000 --- a/external/storm-kafka/src/test/storm/kafka/TridentKafkaTopology.java +++ /dev/null @@ -1,91 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka; - -import backtype.storm.Config; -import backtype.storm.LocalCluster; -import backtype.storm.generated.StormTopology; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Values; -import com.google.common.collect.ImmutableMap; -import storm.kafka.trident.TridentKafkaStateFactory; -import storm.kafka.trident.TridentKafkaUpdater; -import storm.kafka.trident.mapper.FieldNameBasedTupleToKafkaMapper; -import storm.kafka.trident.selector.DefaultTopicSelector; -import storm.trident.Stream; -import storm.trident.TridentTopology; -import storm.trident.testing.FixedBatchSpout; - -import java.util.Properties; - -public class TridentKafkaTopology { - - private static StormTopology buildTopology(String brokerConnectionString) { - Fields fields = new Fields("word", "count"); - FixedBatchSpout spout = new FixedBatchSpout(fields, 4, - new Values("storm", "1"), - new Values("trident", "1"), - new Values("needs", "1"), - new Values("javadoc", "1") - ); - spout.setCycle(true); - - TridentTopology topology = new TridentTopology(); - Stream stream = topology.newStream("spout1", spout); - - Properties props = new Properties(); - props.put("bootstrap.servers", brokerConnectionString); - props.put("acks", "1"); - props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - - TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory() - .withProducerProperties(props) - .withKafkaTopicSelector(new DefaultTopicSelector("test")) - .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word", "count")); - stream.partitionPersist(stateFactory, fields, new TridentKafkaUpdater(), new Fields()); - - return topology.build(); - } - - /** - * To run this topology ensure you have a kafka broker running and provide connection string to broker as argument. - * Create a topic test with command line, - * kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partition 1 --topic test - * - * run this program and run the kafka consumer: - * kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning - * - * you should see the messages flowing through. - * - * @param args - * @throws Exception - */ - public static void main(String[] args) throws Exception { - if(args.length < 1) { - System.out.println("Please provide kafka broker url ,e.g. localhost:9092"); - } - - LocalCluster cluster = new LocalCluster(); - cluster.submitTopology("wordCounter", new Config(), buildTopology(args[0])); - Thread.sleep(60 * 1000); - cluster.killTopology("wordCounter"); - - cluster.shutdown(); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/test/storm/kafka/ZkCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/test/storm/kafka/ZkCoordinatorTest.java b/external/storm-kafka/src/test/storm/kafka/ZkCoordinatorTest.java deleted file mode 100644 index 48ca60f..0000000 --- a/external/storm-kafka/src/test/storm/kafka/ZkCoordinatorTest.java +++ /dev/null @@ -1,148 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka; - -import backtype.storm.Config; -import org.apache.curator.test.TestingServer; -import kafka.javaapi.consumer.SimpleConsumer; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; - -import java.util.*; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyInt; -import static org.mockito.Mockito.when; - -public class ZkCoordinatorTest { - - - @Mock - private DynamicBrokersReader reader; - - @Mock - private DynamicPartitionConnections dynamicPartitionConnections; - - private KafkaTestBroker broker = new KafkaTestBroker(); - private TestingServer server; - private Map stormConf = new HashMap(); - private SpoutConfig spoutConfig; - private ZkState state; - private SimpleConsumer simpleConsumer; - - @Before - public void setUp() throws Exception { - MockitoAnnotations.initMocks(this); - server = new TestingServer(); - String connectionString = server.getConnectString(); - ZkHosts hosts = new ZkHosts(connectionString); - hosts.refreshFreqSecs = 1; - spoutConfig = new SpoutConfig(hosts, "topic", "/test", "id"); - Map conf = buildZookeeperConfig(server); - state = new ZkState(conf); - simpleConsumer = new SimpleConsumer("localhost", broker.getPort(), 60000, 1024, "testClient"); - when(dynamicPartitionConnections.register(any(Broker.class), any(String.class) ,anyInt())).thenReturn(simpleConsumer); - } - - private Map buildZookeeperConfig(TestingServer server) { - Map conf = new HashMap(); - conf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, server.getPort()); - conf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Arrays.asList("localhost")); - conf.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 20000); - conf.put(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT, 20000); - conf.put(Config.STORM_ZOOKEEPER_RETRY_TIMES, 3); - conf.put(Config.STORM_ZOOKEEPER_RETRY_INTERVAL, 30); - return conf; - } - - @After - public void shutdown() throws Exception { - simpleConsumer.close(); - broker.shutdown(); - server.close(); - } - - @Test - public void testOnePartitionPerTask() throws Exception { - int totalTasks = 64; - int partitionsPerTask = 1; - List<ZkCoordinator> coordinatorList = buildCoordinators(totalTasks / partitionsPerTask); - when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(TestUtils.buildPartitionInfo(totalTasks))); - for (ZkCoordinator coordinator : coordinatorList) { - List<PartitionManager> myManagedPartitions = coordinator.getMyManagedPartitions(); - assertEquals(partitionsPerTask, myManagedPartitions.size()); - assertEquals(coordinator._taskIndex, myManagedPartitions.get(0).getPartition().partition); - } - } - - - @Test - public void testPartitionsChange() throws Exception { - final int totalTasks = 64; - int partitionsPerTask = 2; - List<ZkCoordinator> coordinatorList = buildCoordinators(totalTasks / partitionsPerTask); - when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(TestUtils.buildPartitionInfo(totalTasks, 9092))); - List<List<PartitionManager>> partitionManagersBeforeRefresh = getPartitionManagers(coordinatorList); - waitForRefresh(); - when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(TestUtils.buildPartitionInfo(totalTasks, 9093))); - List<List<PartitionManager>> partitionManagersAfterRefresh = getPartitionManagers(coordinatorList); - assertEquals(partitionManagersAfterRefresh.size(), partitionManagersAfterRefresh.size()); - Iterator<List<PartitionManager>> iterator = partitionManagersAfterRefresh.iterator(); - for (List<PartitionManager> partitionManagersBefore : partitionManagersBeforeRefresh) { - List<PartitionManager> partitionManagersAfter = iterator.next(); - assertPartitionsAreDifferent(partitionManagersBefore, partitionManagersAfter, partitionsPerTask); - } - } - - private void assertPartitionsAreDifferent(List<PartitionManager> partitionManagersBefore, List<PartitionManager> partitionManagersAfter, int partitionsPerTask) { - assertEquals(partitionsPerTask, partitionManagersBefore.size()); - assertEquals(partitionManagersBefore.size(), partitionManagersAfter.size()); - for (int i = 0; i < partitionsPerTask; i++) { - assertNotEquals(partitionManagersBefore.get(i).getPartition(), partitionManagersAfter.get(i).getPartition()); - } - - } - - private List<List<PartitionManager>> getPartitionManagers(List<ZkCoordinator> coordinatorList) { - List<List<PartitionManager>> partitions = new ArrayList(); - for (ZkCoordinator coordinator : coordinatorList) { - partitions.add(coordinator.getMyManagedPartitions()); - } - return partitions; - } - - private void waitForRefresh() throws InterruptedException { - Thread.sleep(((ZkHosts) spoutConfig.hosts).refreshFreqSecs * 1000 + 1); - } - - private List<ZkCoordinator> buildCoordinators(int totalTasks) { - List<ZkCoordinator> coordinatorList = new ArrayList<ZkCoordinator>(); - for (int i = 0; i < totalTasks; i++) { - ZkCoordinator coordinator = new ZkCoordinator(dynamicPartitionConnections, stormConf, spoutConfig, state, i, totalTasks, "test-id", reader); - coordinatorList.add(coordinator); - } - return coordinatorList; - } - - -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java b/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java deleted file mode 100644 index f3aee76..0000000 --- a/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java +++ /dev/null @@ -1,341 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka.bolt; - -import backtype.storm.Config; -import backtype.storm.Constants; -import backtype.storm.task.GeneralTopologyContext; -import backtype.storm.task.IOutputCollector; -import backtype.storm.task.OutputCollector; -import backtype.storm.topology.TopologyBuilder; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.TupleImpl; -import backtype.storm.tuple.Values; -import backtype.storm.utils.TupleUtils; -import backtype.storm.utils.Utils; -import com.google.common.collect.ImmutableList; -import kafka.api.OffsetRequest; -import kafka.api.FetchRequest; -import kafka.javaapi.FetchResponse; -import kafka.javaapi.OffsetResponse; -import kafka.javaapi.consumer.SimpleConsumer; -import kafka.javaapi.message.ByteBufferMessageSet; -import kafka.message.Message; -import kafka.message.MessageAndOffset; -import org.apache.kafka.clients.producer.Callback; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.junit.*; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; -import org.mockito.internal.util.reflection.Whitebox; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; -import storm.kafka.*; -import storm.kafka.trident.GlobalPartitionInformation; - -import java.nio.ByteBuffer; -import java.util.List; -import java.util.HashMap; -import java.util.Properties; -import java.util.concurrent.Future; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.*; - -public class KafkaBoltTest { - - private static final String TEST_TOPIC = "test-topic"; - private KafkaTestBroker broker; - private KafkaBolt bolt; - private Config config = new Config(); - private KafkaConfig kafkaConfig; - private SimpleConsumer simpleConsumer; - - @Mock - private IOutputCollector collector; - - @Before - public void initMocks() { - MockitoAnnotations.initMocks(this); - broker = new KafkaTestBroker(); - setupKafkaConsumer(); - config.put(KafkaBolt.TOPIC, TEST_TOPIC); - bolt = generateStringSerializerBolt(); - } - - @After - public void shutdown() { - simpleConsumer.close(); - broker.shutdown(); - bolt.cleanup(); - } - - private void setupKafkaConsumer() { - GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation(TEST_TOPIC); - globalPartitionInformation.addPartition(0, Broker.fromString(broker.getBrokerConnectionString())); - BrokerHosts brokerHosts = new StaticHosts(globalPartitionInformation); - kafkaConfig = new KafkaConfig(brokerHosts, TEST_TOPIC); - simpleConsumer = new SimpleConsumer("localhost", broker.getPort(), 60000, 1024, "testClient"); - } - - @Test - public void shouldAcknowledgeTickTuples() throws Exception { - // Given - Tuple tickTuple = mockTickTuple(); - - // When - bolt.execute(tickTuple); - - // Then - verify(collector).ack(tickTuple); - } - - @Test - public void executeWithKey() throws Exception { - String message = "value-123"; - String key = "key-123"; - Tuple tuple = generateTestTuple(key, message); - bolt.execute(tuple); - verify(collector).ack(tuple); - verifyMessage(key, message); - } - - /* test synchronous sending */ - @Test - public void executeWithByteArrayKeyAndMessageSync() { - boolean async = false; - boolean fireAndForget = false; - bolt = generateDefaultSerializerBolt(async, fireAndForget, null); - String keyString = "test-key"; - String messageString = "test-message"; - byte[] key = keyString.getBytes(); - byte[] message = messageString.getBytes(); - Tuple tuple = generateTestTuple(key, message); - bolt.execute(tuple); - verify(collector).ack(tuple); - verifyMessage(keyString, messageString); - } - - /* test asynchronous sending (default) */ - @Test - public void executeWithByteArrayKeyAndMessageAsync() { - boolean async = true; - boolean fireAndForget = false; - String keyString = "test-key"; - String messageString = "test-message"; - byte[] key = keyString.getBytes(); - byte[] message = messageString.getBytes(); - final Tuple tuple = generateTestTuple(key, message); - - final ByteBufferMessageSet mockMsg = mockSingleMessage(key, message); - simpleConsumer.close(); - simpleConsumer = mockSimpleConsumer(mockMsg); - KafkaProducer<?, ?> producer = mock(KafkaProducer.class); - when(producer.send(any(ProducerRecord.class), any(Callback.class))).thenAnswer(new Answer<Future>() { - @Override - public Future answer(InvocationOnMock invocationOnMock) throws Throwable { - Callback cb = (Callback) invocationOnMock.getArguments()[1]; - cb.onCompletion(null, null); - return mock(Future.class); - } - }); - bolt = generateDefaultSerializerBolt(async, fireAndForget, producer); - bolt.execute(tuple); - verify(collector).ack(tuple); - verifyMessage(keyString, messageString); - } - - /* test with fireAndForget option enabled */ - @Test - public void executeWithByteArrayKeyAndMessageFire() { - boolean async = true; - boolean fireAndForget = true; - bolt = generateDefaultSerializerBolt(async, fireAndForget, null); - String keyString = "test-key"; - String messageString = "test-message"; - byte[] key = keyString.getBytes(); - byte[] message = messageString.getBytes(); - Tuple tuple = generateTestTuple(key, message); - final ByteBufferMessageSet mockMsg = mockSingleMessage(key, message); - simpleConsumer.close(); - simpleConsumer = mockSimpleConsumer(mockMsg); - KafkaProducer<?, ?> producer = mock(KafkaProducer.class); - // do not invoke the callback of send() in order to test whether the bolt handle the fireAndForget option - // properly. - doReturn(mock(Future.class)).when(producer).send(any(ProducerRecord.class), any(Callback.class)); - bolt.execute(tuple); - verify(collector).ack(tuple); - verifyMessage(keyString, messageString); - } - - /* test bolt specified properties */ - @Test - public void executeWithBoltSpecifiedProperties() { - boolean async = false; - boolean fireAndForget = false; - bolt = defaultSerializerBoltWithSpecifiedProperties(async, fireAndForget); - String keyString = "test-key"; - String messageString = "test-message"; - byte[] key = keyString.getBytes(); - byte[] message = messageString.getBytes(); - Tuple tuple = generateTestTuple(key, message); - bolt.execute(tuple); - verify(collector).ack(tuple); - verifyMessage(keyString, messageString); - } - - private KafkaBolt generateStringSerializerBolt() { - Properties props = new Properties(); - props.put("acks", "1"); - props.put("bootstrap.servers", broker.getBrokerConnectionString()); - props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - props.put("metadata.fetch.timeout.ms", 1000); - KafkaBolt bolt = new KafkaBolt().withProducerProperties(props); - bolt.prepare(config, null, new OutputCollector(collector)); - bolt.setAsync(false); - return bolt; - } - - private KafkaBolt generateDefaultSerializerBolt(boolean async, boolean fireAndForget, - KafkaProducer<?, ?> mockProducer) { - Properties props = new Properties(); - props.put("acks", "1"); - props.put("bootstrap.servers", broker.getBrokerConnectionString()); - props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); - props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); - props.put("metadata.fetch.timeout.ms", 1000); - props.put("linger.ms", 0); - KafkaBolt bolt = new KafkaBolt().withProducerProperties(props); - bolt.prepare(config, null, new OutputCollector(collector)); - bolt.setAsync(async); - bolt.setFireAndForget(fireAndForget); - if (mockProducer != null) { - Whitebox.setInternalState(bolt, "producer", mockProducer); - } - return bolt; - } - - private KafkaBolt defaultSerializerBoltWithSpecifiedProperties(boolean async, boolean fireAndForget) { - Properties props = new Properties(); - props.put("acks", "1"); - props.put("bootstrap.servers", broker.getBrokerConnectionString()); - props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); - props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); - props.put("metadata.fetch.timeout.ms", 1000); - props.put("linger.ms", 0); - KafkaBolt bolt = new KafkaBolt().withProducerProperties(props); - bolt.prepare(config, null, new OutputCollector(collector)); - bolt.setAsync(async); - bolt.setFireAndForget(fireAndForget); - return bolt; - } - - @Test - public void executeWithoutKey() throws Exception { - String message = "value-234"; - Tuple tuple = generateTestTuple(message); - bolt.execute(tuple); - verify(collector).ack(tuple); - verifyMessage(null, message); - } - - - @Test - public void executeWithBrokerDown() throws Exception { - broker.shutdown(); - String message = "value-234"; - Tuple tuple = generateTestTuple(message); - bolt.execute(tuple); - verify(collector).fail(tuple); - } - - private boolean verifyMessage(String key, String message) { - long lastMessageOffset = KafkaUtils.getOffset(simpleConsumer, kafkaConfig.topic, 0, OffsetRequest.LatestTime()) - 1; - ByteBufferMessageSet messageAndOffsets = KafkaUtils.fetchMessages(kafkaConfig, simpleConsumer, - new Partition(Broker.fromString(broker.getBrokerConnectionString()),kafkaConfig.topic, 0), lastMessageOffset); - MessageAndOffset messageAndOffset = messageAndOffsets.iterator().next(); - Message kafkaMessage = messageAndOffset.message(); - ByteBuffer messageKeyBuffer = kafkaMessage.key(); - String keyString = null; - String messageString = new String(Utils.toByteArray(kafkaMessage.payload())); - if (messageKeyBuffer != null) { - keyString = new String(Utils.toByteArray(messageKeyBuffer)); - } - assertEquals(key, keyString); - assertEquals(message, messageString); - return true; - } - - private Tuple generateTestTuple(Object key, Object message) { - TopologyBuilder builder = new TopologyBuilder(); - GeneralTopologyContext topologyContext = new GeneralTopologyContext(builder.createTopology(), new Config(), new HashMap(), new HashMap(), new HashMap(), "") { - @Override - public Fields getComponentOutputFields(String componentId, String streamId) { - return new Fields("key", "message"); - } - }; - return new TupleImpl(topologyContext, new Values(key, message), 1, ""); - } - - private Tuple generateTestTuple(Object message) { - TopologyBuilder builder = new TopologyBuilder(); - GeneralTopologyContext topologyContext = new GeneralTopologyContext(builder.createTopology(), new Config(), new HashMap(), new HashMap(), new HashMap(), "") { - @Override - public Fields getComponentOutputFields(String componentId, String streamId) { - return new Fields("message"); - } - }; - return new TupleImpl(topologyContext, new Values(message), 1, ""); - } - - private Tuple mockTickTuple() { - Tuple tuple = mock(Tuple.class); - when(tuple.getSourceComponent()).thenReturn(Constants.SYSTEM_COMPONENT_ID); - when(tuple.getSourceStreamId()).thenReturn(Constants.SYSTEM_TICK_STREAM_ID); - // Sanity check - assertTrue(TupleUtils.isTick(tuple)); - return tuple; - } - - private static ByteBufferMessageSet mockSingleMessage(byte[] key, byte[] message) { - ByteBufferMessageSet sets = mock(ByteBufferMessageSet.class); - MessageAndOffset msg = mock(MessageAndOffset.class); - final List<MessageAndOffset> msgs = ImmutableList.of(msg); - doReturn(msgs.iterator()).when(sets).iterator(); - Message kafkaMessage = mock(Message.class); - doReturn(ByteBuffer.wrap(key)).when(kafkaMessage).key(); - doReturn(ByteBuffer.wrap(message)).when(kafkaMessage).payload(); - doReturn(kafkaMessage).when(msg).message(); - return sets; - } - - private static SimpleConsumer mockSimpleConsumer(ByteBufferMessageSet mockMsg) { - SimpleConsumer simpleConsumer = mock(SimpleConsumer.class); - FetchResponse resp = mock(FetchResponse.class); - doReturn(resp).when(simpleConsumer).fetch(any(FetchRequest.class)); - OffsetResponse mockOffsetResponse = mock(OffsetResponse.class); - doReturn(new long[] {}).when(mockOffsetResponse).offsets(anyString(), anyInt()); - doReturn(mockOffsetResponse).when(simpleConsumer).getOffsetsBefore(any(kafka.javaapi.OffsetRequest.class)); - doReturn(mockMsg).when(resp).messageSet(anyString(), anyInt()); - return simpleConsumer; - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-metrics/src/main/java/org/apache/storm/metrics/hdrhistogram/HistogramMetric.java ---------------------------------------------------------------------- diff --git a/external/storm-metrics/src/main/java/org/apache/storm/metrics/hdrhistogram/HistogramMetric.java b/external/storm-metrics/src/main/java/org/apache/storm/metrics/hdrhistogram/HistogramMetric.java index 4adc500..7f58a3d 100644 --- a/external/storm-metrics/src/main/java/org/apache/storm/metrics/hdrhistogram/HistogramMetric.java +++ b/external/storm-metrics/src/main/java/org/apache/storm/metrics/hdrhistogram/HistogramMetric.java @@ -17,7 +17,7 @@ */ package org.apache.storm.metrics.hdrhistogram; -import backtype.storm.metric.api.IMetric; +import org.apache.storm.metric.api.IMetric; import org.HdrHistogram.Histogram; /** http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-metrics/src/main/java/org/apache/storm/metrics/sigar/CPUMetric.java ---------------------------------------------------------------------- diff --git a/external/storm-metrics/src/main/java/org/apache/storm/metrics/sigar/CPUMetric.java b/external/storm-metrics/src/main/java/org/apache/storm/metrics/sigar/CPUMetric.java index a3addc9..7c6d75e 100644 --- a/external/storm-metrics/src/main/java/org/apache/storm/metrics/sigar/CPUMetric.java +++ b/external/storm-metrics/src/main/java/org/apache/storm/metrics/sigar/CPUMetric.java @@ -20,7 +20,7 @@ package org.apache.storm.metrics.sigar; import org.hyperic.sigar.Sigar; import org.hyperic.sigar.ProcCpu; -import backtype.storm.metric.api.IMetric; +import org.apache.storm.metric.api.IMetric; import java.util.HashMap; http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java index dc2a2d3..0c64f43 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java @@ -17,9 +17,9 @@ */ package org.apache.storm.redis.bolt; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.base.BaseRichBolt; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.redis.common.config.JedisClusterConfig; import org.apache.storm.redis.common.config.JedisPoolConfig; import org.apache.storm.redis.common.container.JedisCommandsContainerBuilder; http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java index 47c98cb..4d6dc4e 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java @@ -17,9 +17,9 @@ */ package org.apache.storm.redis.bolt; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; import org.apache.storm.redis.common.mapper.RedisDataTypeDescription; import org.apache.storm.redis.common.mapper.RedisLookupMapper; import org.apache.storm.redis.common.config.JedisClusterConfig; http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java index be9a328..b74ed1c 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java @@ -17,8 +17,8 @@ */ package org.apache.storm.redis.bolt; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.tuple.Tuple; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Tuple; import org.apache.storm.redis.common.config.JedisClusterConfig; import org.apache.storm.redis.common.config.JedisPoolConfig; import org.apache.storm.redis.common.mapper.RedisDataTypeDescription; http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisLookupMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisLookupMapper.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisLookupMapper.java index 727e4ec..fe464f5 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisLookupMapper.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisLookupMapper.java @@ -17,9 +17,9 @@ */ package org.apache.storm.redis.common.mapper; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.tuple.ITuple; -import backtype.storm.tuple.Values; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.ITuple; +import org.apache.storm.tuple.Values; import java.util.List; http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/TupleMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/TupleMapper.java b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/TupleMapper.java index bcc531e..a2ab48b 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/TupleMapper.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/TupleMapper.java @@ -17,7 +17,7 @@ */ package org.apache.storm.redis.common.mapper; -import backtype.storm.tuple.ITuple; +import org.apache.storm.tuple.ITuple; import java.io.Serializable; http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisMapState.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisMapState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisMapState.java index 26056d2..f5bd459 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisMapState.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisMapState.java @@ -19,12 +19,12 @@ package org.apache.storm.redis.trident.state; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; -import storm.trident.state.JSONNonTransactionalSerializer; -import storm.trident.state.JSONOpaqueSerializer; -import storm.trident.state.JSONTransactionalSerializer; -import storm.trident.state.Serializer; -import storm.trident.state.StateType; -import storm.trident.state.map.IBackingMap; +import org.apache.storm.trident.state.JSONNonTransactionalSerializer; +import org.apache.storm.trident.state.JSONOpaqueSerializer; +import org.apache.storm.trident.state.JSONTransactionalSerializer; +import org.apache.storm.trident.state.Serializer; +import org.apache.storm.trident.state.StateType; +import org.apache.storm.trident.state.map.IBackingMap; import java.util.ArrayList; import java.util.Collections; http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateQuerier.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateQuerier.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateQuerier.java index 5c7335d..3785b84 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateQuerier.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateQuerier.java @@ -17,14 +17,14 @@ */ package org.apache.storm.redis.trident.state; -import backtype.storm.tuple.Values; +import org.apache.storm.tuple.Values; import com.google.common.collect.Lists; import org.apache.storm.redis.common.mapper.RedisDataTypeDescription; import org.apache.storm.redis.common.mapper.RedisLookupMapper; -import storm.trident.operation.TridentCollector; -import storm.trident.state.BaseQueryFunction; -import storm.trident.state.State; -import storm.trident.tuple.TridentTuple; +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.state.BaseQueryFunction; +import org.apache.storm.trident.state.State; +import org.apache.storm.trident.tuple.TridentTuple; import java.util.List; http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateUpdater.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateUpdater.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateUpdater.java index e9654c7..82b7483 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateUpdater.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateUpdater.java @@ -19,10 +19,10 @@ package org.apache.storm.redis.trident.state; import org.apache.storm.redis.common.mapper.RedisDataTypeDescription; import org.apache.storm.redis.common.mapper.RedisStoreMapper; -import storm.trident.operation.TridentCollector; -import storm.trident.state.BaseStateUpdater; -import storm.trident.state.State; -import storm.trident.tuple.TridentTuple; +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.state.BaseStateUpdater; +import org.apache.storm.trident.state.State; +import org.apache.storm.trident.tuple.TridentTuple; import java.util.HashMap; import java.util.List; http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java index f4dbfaa..6ebcb22 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java @@ -18,7 +18,7 @@ package org.apache.storm.redis.trident.state; import org.apache.storm.redis.common.mapper.RedisDataTypeDescription; -import storm.trident.state.Serializer; +import org.apache.storm.trident.state.Serializer; import java.io.Serializable; http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java index cbd37c5..54e9aea 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java @@ -17,24 +17,24 @@ */ package org.apache.storm.redis.trident.state; -import backtype.storm.task.IMetricsContext; -import backtype.storm.tuple.Values; +import org.apache.storm.task.IMetricsContext; +import org.apache.storm.tuple.Values; import com.google.common.collect.Lists; import org.apache.storm.redis.common.config.JedisClusterConfig; import org.apache.storm.redis.common.mapper.RedisDataTypeDescription; import redis.clients.jedis.JedisCluster; -import storm.trident.state.OpaqueValue; -import storm.trident.state.Serializer; -import storm.trident.state.State; -import storm.trident.state.StateFactory; -import storm.trident.state.StateType; -import storm.trident.state.TransactionalValue; -import storm.trident.state.map.CachedMap; -import storm.trident.state.map.MapState; -import storm.trident.state.map.NonTransactionalMap; -import storm.trident.state.map.OpaqueMap; -import storm.trident.state.map.SnapshottableMap; -import storm.trident.state.map.TransactionalMap; +import org.apache.storm.trident.state.OpaqueValue; +import org.apache.storm.trident.state.Serializer; +import org.apache.storm.trident.state.State; +import org.apache.storm.trident.state.StateFactory; +import org.apache.storm.trident.state.StateType; +import org.apache.storm.trident.state.TransactionalValue; +import org.apache.storm.trident.state.map.CachedMap; +import org.apache.storm.trident.state.map.MapState; +import org.apache.storm.trident.state.map.NonTransactionalMap; +import org.apache.storm.trident.state.map.OpaqueMap; +import org.apache.storm.trident.state.map.SnapshottableMap; +import org.apache.storm.trident.state.map.TransactionalMap; import java.util.List; import java.util.Map; http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java index 764436d..c773c1a 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java @@ -17,13 +17,13 @@ */ package org.apache.storm.redis.trident.state; -import backtype.storm.task.IMetricsContext; +import org.apache.storm.task.IMetricsContext; import org.apache.storm.redis.common.config.JedisClusterConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import redis.clients.jedis.JedisCluster; -import storm.trident.state.State; -import storm.trident.state.StateFactory; +import org.apache.storm.trident.state.State; +import org.apache.storm.trident.state.StateFactory; import java.util.Map; http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java index 25e9924..b379fc1 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java @@ -17,25 +17,25 @@ */ package org.apache.storm.redis.trident.state; -import backtype.storm.task.IMetricsContext; -import backtype.storm.tuple.Values; +import org.apache.storm.task.IMetricsContext; +import org.apache.storm.tuple.Values; import org.apache.storm.redis.common.config.JedisPoolConfig; import org.apache.storm.redis.common.mapper.RedisDataTypeDescription; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.Pipeline; -import storm.trident.state.OpaqueValue; -import storm.trident.state.Serializer; -import storm.trident.state.State; -import storm.trident.state.StateFactory; -import storm.trident.state.StateType; -import storm.trident.state.TransactionalValue; -import storm.trident.state.map.CachedMap; -import storm.trident.state.map.MapState; -import storm.trident.state.map.NonTransactionalMap; -import storm.trident.state.map.OpaqueMap; -import storm.trident.state.map.SnapshottableMap; -import storm.trident.state.map.TransactionalMap; +import org.apache.storm.trident.state.OpaqueValue; +import org.apache.storm.trident.state.Serializer; +import org.apache.storm.trident.state.State; +import org.apache.storm.trident.state.StateFactory; +import org.apache.storm.trident.state.StateType; +import org.apache.storm.trident.state.TransactionalValue; +import org.apache.storm.trident.state.map.CachedMap; +import org.apache.storm.trident.state.map.MapState; +import org.apache.storm.trident.state.map.NonTransactionalMap; +import org.apache.storm.trident.state.map.OpaqueMap; +import org.apache.storm.trident.state.map.SnapshottableMap; +import org.apache.storm.trident.state.map.TransactionalMap; import java.util.List; import java.util.Map; http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisState.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisState.java index 85d0e1b..a93b348 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisState.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisState.java @@ -17,14 +17,14 @@ */ package org.apache.storm.redis.trident.state; -import backtype.storm.task.IMetricsContext; +import org.apache.storm.task.IMetricsContext; import org.apache.storm.redis.common.config.JedisPoolConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; -import storm.trident.state.State; -import storm.trident.state.StateFactory; +import org.apache.storm.trident.state.State; +import org.apache.storm.trident.state.StateFactory; import java.util.Map; http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-redis/src/test/java/org/apache/storm/redis/topology/LookupWordCount.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/LookupWordCount.java b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/LookupWordCount.java index ae053de..f62b7b0 100644 --- a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/LookupWordCount.java +++ b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/LookupWordCount.java @@ -17,18 +17,18 @@ */ package org.apache.storm.redis.topology; -import backtype.storm.Config; -import backtype.storm.LocalCluster; -import backtype.storm.StormSubmitter; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.TopologyBuilder; -import backtype.storm.topology.base.BaseRichBolt; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.ITuple; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.StormSubmitter; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.ITuple; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; import com.google.common.collect.Lists; import org.apache.storm.redis.bolt.RedisLookupBolt; import org.apache.storm.redis.common.config.JedisPoolConfig; http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java index 77c6ee8..d46bab6 100644 --- a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java +++ b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java @@ -17,14 +17,14 @@ */ package org.apache.storm.redis.topology; -import backtype.storm.Config; -import backtype.storm.LocalCluster; -import backtype.storm.StormSubmitter; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.TopologyBuilder; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.ITuple; -import backtype.storm.tuple.Tuple; +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.StormSubmitter; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.ITuple; +import org.apache.storm.tuple.Tuple; import org.apache.storm.redis.bolt.AbstractRedisBolt; import org.apache.storm.redis.bolt.RedisStoreBolt; import org.apache.storm.redis.common.config.JedisClusterConfig; http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-redis/src/test/java/org/apache/storm/redis/topology/WordCounter.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/WordCounter.java b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/WordCounter.java index 6f25038..6fa930c 100644 --- a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/WordCounter.java +++ b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/WordCounter.java @@ -17,18 +17,18 @@ */ package org.apache.storm.redis.topology; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.BasicOutputCollector; -import backtype.storm.topology.IBasicBolt; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.IBasicBolt; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; import com.google.common.collect.Maps; import java.util.Map; -import static backtype.storm.utils.Utils.tuple; +import static org.apache.storm.utils.Utils.tuple; public class WordCounter implements IBasicBolt { private Map<String, Integer> wordCounter = Maps.newHashMap(); @@ -64,4 +64,4 @@ public class WordCounter implements IBasicBolt { return null; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-redis/src/test/java/org/apache/storm/redis/topology/WordSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/WordSpout.java b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/WordSpout.java index bb9c2d7..e2cdfde 100644 --- a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/WordSpout.java +++ b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/WordSpout.java @@ -17,12 +17,12 @@ */ package org.apache.storm.redis.topology; -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.IRichSpout; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Values; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.IRichSpout; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; import java.util.Map; import java.util.Random; http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-redis/src/test/java/org/apache/storm/redis/trident/PrintFunction.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/PrintFunction.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/PrintFunction.java index 6f465c9..37d3936 100644 --- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/PrintFunction.java +++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/PrintFunction.java @@ -19,9 +19,9 @@ package org.apache.storm.redis.trident; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import storm.trident.operation.BaseFunction; -import storm.trident.operation.TridentCollector; -import storm.trident.tuple.TridentTuple; +import org.apache.storm.trident.operation.BaseFunction; +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.tuple.TridentTuple; import java.util.Random; http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountLookupMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountLookupMapper.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountLookupMapper.java index a445749..a6ca8c9 100644 --- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountLookupMapper.java +++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountLookupMapper.java @@ -17,10 +17,10 @@ */ package org.apache.storm.redis.trident; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.ITuple; -import backtype.storm.tuple.Values; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.ITuple; +import org.apache.storm.tuple.Values; import org.apache.storm.redis.common.mapper.RedisDataTypeDescription; import org.apache.storm.redis.common.mapper.RedisLookupMapper; @@ -54,4 +54,4 @@ public class WordCountLookupMapper implements RedisLookupMapper { public String getValueFromTuple(ITuple tuple) { return tuple.getInteger(1).toString(); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountStoreMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountStoreMapper.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountStoreMapper.java index b930998..58df150 100644 --- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountStoreMapper.java +++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountStoreMapper.java @@ -17,7 +17,7 @@ */ package org.apache.storm.redis.trident; -import backtype.storm.tuple.ITuple; +import org.apache.storm.tuple.ITuple; import org.apache.storm.redis.common.mapper.RedisDataTypeDescription; import org.apache.storm.redis.common.mapper.RedisStoreMapper; @@ -36,4 +36,4 @@ public class WordCountStoreMapper implements RedisStoreMapper { public String getValueFromTuple(ITuple tuple) { return tuple.getInteger(1).toString(); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java index 4a4aae0..e3eb0f9 100644 --- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java +++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java @@ -17,22 +17,22 @@ */ package org.apache.storm.redis.trident; -import backtype.storm.Config; -import backtype.storm.LocalCluster; -import backtype.storm.StormSubmitter; -import backtype.storm.generated.StormTopology; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Values; +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.StormSubmitter; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; import org.apache.storm.redis.common.mapper.RedisLookupMapper; import org.apache.storm.redis.common.mapper.RedisStoreMapper; import org.apache.storm.redis.trident.state.RedisState; import org.apache.storm.redis.trident.state.RedisStateQuerier; import org.apache.storm.redis.trident.state.RedisStateUpdater; import org.apache.storm.redis.common.config.JedisPoolConfig; -import storm.trident.Stream; -import storm.trident.TridentState; -import storm.trident.TridentTopology; -import storm.trident.testing.FixedBatchSpout; +import org.apache.storm.trident.Stream; +import org.apache.storm.trident.TridentState; +import org.apache.storm.trident.TridentTopology; +import org.apache.storm.trident.testing.FixedBatchSpout; public class WordCountTridentRedis { public static StormTopology buildTopology(String redisHost, Integer redisPort){ http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java index 765b339..116a58a 100644 --- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java +++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java @@ -17,22 +17,22 @@ */ package org.apache.storm.redis.trident; -import backtype.storm.Config; -import backtype.storm.LocalCluster; -import backtype.storm.StormSubmitter; -import backtype.storm.generated.StormTopology; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Values; +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.StormSubmitter; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; import org.apache.storm.redis.common.mapper.RedisLookupMapper; import org.apache.storm.redis.common.mapper.RedisStoreMapper; import org.apache.storm.redis.trident.state.RedisClusterState; import org.apache.storm.redis.trident.state.RedisClusterStateQuerier; import org.apache.storm.redis.trident.state.RedisClusterStateUpdater; import org.apache.storm.redis.common.config.JedisClusterConfig; -import storm.trident.Stream; -import storm.trident.TridentState; -import storm.trident.TridentTopology; -import storm.trident.testing.FixedBatchSpout; +import org.apache.storm.trident.Stream; +import org.apache.storm.trident.TridentState; +import org.apache.storm.trident.TridentTopology; +import org.apache.storm.trident.testing.FixedBatchSpout; import java.net.InetSocketAddress; import java.util.HashSet; http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java ---------------------------------------------------------------------- diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java index beb4b5f..fafb4e0 100644 --- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java +++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java @@ -17,23 +17,23 @@ */ package org.apache.storm.redis.trident; -import backtype.storm.Config; -import backtype.storm.LocalCluster; -import backtype.storm.StormSubmitter; -import backtype.storm.generated.StormTopology; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Values; +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.StormSubmitter; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; import org.apache.storm.redis.common.mapper.RedisDataTypeDescription; import org.apache.storm.redis.common.mapper.TupleMapper; import org.apache.storm.redis.trident.state.RedisClusterMapState; import org.apache.storm.redis.common.config.JedisClusterConfig; -import storm.trident.Stream; -import storm.trident.TridentState; -import storm.trident.TridentTopology; -import storm.trident.operation.builtin.MapGet; -import storm.trident.operation.builtin.Sum; -import storm.trident.state.StateFactory; -import storm.trident.testing.FixedBatchSpout; +import org.apache.storm.trident.Stream; +import org.apache.storm.trident.TridentState; +import org.apache.storm.trident.TridentTopology; +import org.apache.storm.trident.operation.builtin.MapGet; +import org.apache.storm.trident.operation.builtin.Sum; +import org.apache.storm.trident.state.StateFactory; +import org.apache.storm.trident.testing.FixedBatchSpout; import java.net.InetSocketAddress; import java.util.HashSet;
