http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaUtilsTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaUtilsTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaUtilsTest.java new file mode 100644 index 0000000..9da6c0a --- /dev/null +++ b/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaUtilsTest.java @@ -0,0 +1,295 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +import kafka.api.OffsetRequest; +import kafka.javaapi.consumer.SimpleConsumer; +import kafka.javaapi.message.ByteBufferMessageSet; +import kafka.message.MessageAndOffset; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.storm.kafka.trident.GlobalPartitionInformation; +import org.apache.storm.spout.SchemeAsMultiScheme; +import org.apache.storm.utils.Utils; + +import com.google.common.collect.ImmutableMap; +public class KafkaUtilsTest { + private String TEST_TOPIC = "testTopic"; + private static final Logger LOG = LoggerFactory.getLogger(KafkaUtilsTest.class); + private KafkaTestBroker broker; + private SimpleConsumer simpleConsumer; + private KafkaConfig config; + private BrokerHosts brokerHosts; + + @Before + public void setup() { + broker = new KafkaTestBroker(); + GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation(TEST_TOPIC); + globalPartitionInformation.addPartition(0, Broker.fromString(broker.getBrokerConnectionString())); + brokerHosts = new StaticHosts(globalPartitionInformation); + config = new KafkaConfig(brokerHosts, TEST_TOPIC); + simpleConsumer = new SimpleConsumer("localhost", broker.getPort(), 60000, 1024, "testClient"); + } + + @After + public void shutdown() { + simpleConsumer.close(); + broker.shutdown(); + } + + + @Test(expected = FailedFetchException.class) + public void topicDoesNotExist() throws Exception { + KafkaUtils.fetchMessages(config, simpleConsumer, new Partition(Broker.fromString(broker.getBrokerConnectionString()), TEST_TOPIC, 0), 0); + } + + @Test(expected = FailedFetchException.class) + public void brokerIsDown() throws Exception { + int port = broker.getPort(); + broker.shutdown(); + SimpleConsumer simpleConsumer = new SimpleConsumer("localhost", port, 100, 1024, "testClient"); + try { + KafkaUtils.fetchMessages(config, simpleConsumer, new Partition(Broker.fromString(broker.getBrokerConnectionString()), TEST_TOPIC, 0), OffsetRequest.LatestTime()); + } finally { + simpleConsumer.close(); + } + } + + @Test + public void fetchMessage() throws Exception { + String value = "test"; + createTopicAndSendMessage(value); + long offset = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, OffsetRequest.LatestTime()) - 1; + ByteBufferMessageSet messageAndOffsets = KafkaUtils.fetchMessages(config, simpleConsumer, + new Partition(Broker.fromString(broker.getBrokerConnectionString()), TEST_TOPIC, 0), offset); + String message = new String(Utils.toByteArray(messageAndOffsets.iterator().next().message().payload())); + assertThat(message, is(equalTo(value))); + } + + @Test(expected = FailedFetchException.class) + public void fetchMessagesWithInvalidOffsetAndDefaultHandlingDisabled() throws Exception { + config.useStartOffsetTimeIfOffsetOutOfRange = false; + KafkaUtils.fetchMessages(config, simpleConsumer, + new Partition(Broker.fromString(broker.getBrokerConnectionString()), TEST_TOPIC, 0), -99); + } + + @Test(expected = TopicOffsetOutOfRangeException.class) + public void fetchMessagesWithInvalidOffsetAndDefaultHandlingEnabled() throws Exception { + config = new KafkaConfig(brokerHosts, "newTopic"); + String value = "test"; + createTopicAndSendMessage(value); + KafkaUtils.fetchMessages(config, simpleConsumer, + new Partition(Broker.fromString(broker.getBrokerConnectionString()), "newTopic", 0), -99); + } + + @Test + public void getOffsetFromConfigAndDontForceFromStart() { + config.ignoreZkOffsets = false; + config.startOffsetTime = OffsetRequest.EarliestTime(); + createTopicAndSendMessage(); + long latestOffset = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, OffsetRequest.EarliestTime()); + long offsetFromConfig = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, config); + assertThat(latestOffset, is(equalTo(offsetFromConfig))); + } + + @Test + public void getOffsetFromConfigAndFroceFromStart() { + config.ignoreZkOffsets = true; + config.startOffsetTime = OffsetRequest.EarliestTime(); + createTopicAndSendMessage(); + long earliestOffset = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, OffsetRequest.EarliestTime()); + long offsetFromConfig = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, config); + assertThat(earliestOffset, is(equalTo(offsetFromConfig))); + } + + @Test + public void generateTuplesWithoutKeyAndKeyValueScheme() { + config.scheme = new KeyValueSchemeAsMultiScheme(new StringKeyValueScheme()); + runGetValueOnlyTuplesTest(); + } + + @Test + public void generateTuplesWithKeyAndKeyValueScheme() { + config.scheme = new KeyValueSchemeAsMultiScheme(new StringKeyValueScheme()); + config.useStartOffsetTimeIfOffsetOutOfRange = false; + String value = "value"; + String key = "key"; + createTopicAndSendMessage(key, value); + ByteBufferMessageSet messageAndOffsets = getLastMessage(); + for (MessageAndOffset msg : messageAndOffsets) { + Iterable<List<Object>> lists = KafkaUtils.generateTuples(config, msg.message(), config.topic); + assertEquals(ImmutableMap.of(key, value), lists.iterator().next().get(0)); + } + } + + @Test + public void generateTupelsWithValueScheme() { + config.scheme = new SchemeAsMultiScheme(new StringScheme()); + runGetValueOnlyTuplesTest(); + } + + @Test + public void generateTuplesWithValueAndStringMultiSchemeWithTopic() { + config.scheme = new StringMultiSchemeWithTopic(); + String value = "value"; + createTopicAndSendMessage(value); + ByteBufferMessageSet messageAndOffsets = getLastMessage(); + for (MessageAndOffset msg : messageAndOffsets) { + Iterable<List<Object>> lists = KafkaUtils.generateTuples(config, msg.message(), config.topic); + List<Object> list = lists.iterator().next(); + assertEquals(value, list.get(0)); + assertEquals(config.topic, list.get(1)); + } + } + + @Test + public void generateTuplesWithValueSchemeAndKeyValueMessage() { + config.scheme = new SchemeAsMultiScheme(new StringScheme()); + String value = "value"; + String key = "key"; + createTopicAndSendMessage(key, value); + ByteBufferMessageSet messageAndOffsets = getLastMessage(); + for (MessageAndOffset msg : messageAndOffsets) { + Iterable<List<Object>> lists = KafkaUtils.generateTuples(config, msg.message(), config.topic); + assertEquals(value, lists.iterator().next().get(0)); + } + } + + @Test + public void generateTuplesWithMessageAndMetadataScheme() { + String value = "value"; + Partition mockPartition = Mockito.mock(Partition.class); + mockPartition.partition = 0; + long offset = 0L; + + MessageMetadataSchemeAsMultiScheme scheme = new MessageMetadataSchemeAsMultiScheme(new StringMessageAndMetadataScheme()); + + createTopicAndSendMessage(null, value); + ByteBufferMessageSet messageAndOffsets = getLastMessage(); + for (MessageAndOffset msg : messageAndOffsets) { + Iterable<List<Object>> lists = KafkaUtils.generateTuples(scheme, msg.message(), mockPartition, offset); + List<Object> values = lists.iterator().next(); + assertEquals("Message is incorrect", value, values.get(0)); + assertEquals("Partition is incorrect", mockPartition.partition, values.get(1)); + assertEquals("Offset is incorrect", offset, values.get(2)); + } + } + + private ByteBufferMessageSet getLastMessage() { + long offsetOfLastMessage = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, OffsetRequest.LatestTime()) - 1; + return KafkaUtils.fetchMessages(config, simpleConsumer, new Partition(Broker.fromString(broker.getBrokerConnectionString()), TEST_TOPIC, 0), offsetOfLastMessage); + } + + private void runGetValueOnlyTuplesTest() { + String value = "value"; + + createTopicAndSendMessage(null, value); + ByteBufferMessageSet messageAndOffsets = getLastMessage(); + for (MessageAndOffset msg : messageAndOffsets) { + Iterable<List<Object>> lists = KafkaUtils.generateTuples(config, msg.message(), config.topic); + assertEquals(value, lists.iterator().next().get(0)); + } + } + + private void createTopicAndSendMessage() { + createTopicAndSendMessage(null, "someValue"); + } + + private void createTopicAndSendMessage(String value) { + createTopicAndSendMessage(null, value); + } + + private void createTopicAndSendMessage(String key, String value) { + Properties p = new Properties(); + p.put("acks", "1"); + p.put("bootstrap.servers", broker.getBrokerConnectionString()); + p.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + p.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + p.put("metadata.fetch.timeout.ms", 1000); + KafkaProducer<String, String> producer = new KafkaProducer<String, String>(p); + try { + producer.send(new ProducerRecord<String, String>(config.topic, key, value)).get(); + } catch (Exception e) { + Assert.fail(e.getMessage()); + LOG.error("Failed to do synchronous sending due to " + e, e); + } finally { + producer.close(); + } + } + + @Test + public void assignOnePartitionPerTask() { + runPartitionToTaskMappingTest(16, 1); + } + + @Test + public void assignTwoPartitionsPerTask() { + runPartitionToTaskMappingTest(16, 2); + } + + @Test + public void assignAllPartitionsToOneTask() { + runPartitionToTaskMappingTest(32, 32); + } + + public void runPartitionToTaskMappingTest(int numPartitions, int partitionsPerTask) { + GlobalPartitionInformation globalPartitionInformation = TestUtils.buildPartitionInfo(numPartitions); + List<GlobalPartitionInformation> partitions = new ArrayList<GlobalPartitionInformation>(); + partitions.add(globalPartitionInformation); + int numTasks = numPartitions / partitionsPerTask; + for (int i = 0 ; i < numTasks ; i++) { + assertEquals(partitionsPerTask, KafkaUtils.calculatePartitionsForTask(partitions, numTasks, i).size()); + } + } + + @Test + public void moreTasksThanPartitions() { + GlobalPartitionInformation globalPartitionInformation = TestUtils.buildPartitionInfo(1); + List<GlobalPartitionInformation> partitions = new ArrayList<GlobalPartitionInformation>(); + partitions.add(globalPartitionInformation); + int numTasks = 2; + assertEquals(1, KafkaUtils.calculatePartitionsForTask(partitions, numTasks, 0).size()); + assertEquals(0, KafkaUtils.calculatePartitionsForTask(partitions, numTasks, 1).size()); + } + + @Test (expected = IllegalArgumentException.class ) + public void assignInvalidTask() { + GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation(TEST_TOPIC); + List<GlobalPartitionInformation> partitions = new ArrayList<GlobalPartitionInformation>(); + partitions.add(globalPartitionInformation); + KafkaUtils.calculatePartitionsForTask(partitions, 1, 1); + } +}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/test/org/apache/storm/kafka/StringKeyValueSchemeTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/StringKeyValueSchemeTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/StringKeyValueSchemeTest.java new file mode 100644 index 0000000..7e5ff00 --- /dev/null +++ b/external/storm-kafka/src/test/org/apache/storm/kafka/StringKeyValueSchemeTest.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + +import org.apache.storm.tuple.Fields; +import com.google.common.collect.ImmutableMap; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.Collections; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class StringKeyValueSchemeTest { + + private StringKeyValueScheme scheme = new StringKeyValueScheme(); + + @Test + public void testDeserialize() throws Exception { + assertEquals(Collections.singletonList("test"), scheme.deserialize(wrapString("test"))); + } + + @Test + public void testGetOutputFields() throws Exception { + Fields outputFields = scheme.getOutputFields(); + assertTrue(outputFields.contains(StringScheme.STRING_SCHEME_KEY)); + assertEquals(1, outputFields.size()); + } + + @Test + public void testDeserializeWithNullKeyAndValue() throws Exception { + assertEquals(Collections.singletonList("test"), + scheme.deserializeKeyAndValue(null, wrapString("test"))); + } + + @Test + public void testDeserializeWithKeyAndValue() throws Exception { + assertEquals(Collections.singletonList(ImmutableMap.of("key", "test")), + scheme.deserializeKeyAndValue(wrapString("key"), wrapString("test"))); + } + + private static ByteBuffer wrapString(String s) { + return ByteBuffer.wrap(s.getBytes(Charset.defaultCharset())); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/test/org/apache/storm/kafka/TestStringScheme.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/TestStringScheme.java b/external/storm-kafka/src/test/org/apache/storm/kafka/TestStringScheme.java new file mode 100644 index 0000000..23944ab --- /dev/null +++ b/external/storm-kafka/src/test/org/apache/storm/kafka/TestStringScheme.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; + +import static org.junit.Assert.assertEquals; + +public class TestStringScheme { + @Test + public void testDeserializeString() { + String s = "foo"; + byte[] bytes = s.getBytes(StandardCharsets.UTF_8); + ByteBuffer direct = ByteBuffer.allocateDirect(bytes.length); + direct.put(bytes); + direct.flip(); + String s1 = StringScheme.deserializeString(ByteBuffer.wrap(bytes)); + String s2 = StringScheme.deserializeString(direct); + assertEquals(s, s1); + assertEquals(s, s2); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/test/org/apache/storm/kafka/TestUtils.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/TestUtils.java b/external/storm-kafka/src/test/org/apache/storm/kafka/TestUtils.java new file mode 100644 index 0000000..cc3f2be --- /dev/null +++ b/external/storm-kafka/src/test/org/apache/storm/kafka/TestUtils.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + +import org.apache.storm.Config; +import org.apache.storm.utils.Utils; +import kafka.api.OffsetRequest; +import kafka.javaapi.consumer.SimpleConsumer; +import kafka.javaapi.message.ByteBufferMessageSet; +import kafka.message.Message; +import kafka.message.MessageAndOffset; +import org.apache.storm.kafka.bolt.KafkaBolt; +import org.apache.storm.kafka.trident.GlobalPartitionInformation; + +import java.nio.ByteBuffer; +import java.util.*; + +import static org.junit.Assert.assertEquals; + +public class TestUtils { + + public static final String TOPIC = "test"; + + public static GlobalPartitionInformation buildPartitionInfo(int numPartitions) { + return buildPartitionInfo(numPartitions, 9092); + } + + public static List<GlobalPartitionInformation> buildPartitionInfoList(GlobalPartitionInformation partitionInformation) { + List<GlobalPartitionInformation> map = new ArrayList<GlobalPartitionInformation>(); + map.add(partitionInformation); + return map; + } + + public static GlobalPartitionInformation buildPartitionInfo(int numPartitions, int brokerPort) { + GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation(TOPIC); + for (int i = 0; i < numPartitions; i++) { + globalPartitionInformation.addPartition(i, Broker.fromString("broker-" + i + " :" + brokerPort)); + } + return globalPartitionInformation; + } + + public static SimpleConsumer getKafkaConsumer(KafkaTestBroker broker) { + BrokerHosts brokerHosts = getBrokerHosts(broker); + KafkaConfig kafkaConfig = new KafkaConfig(brokerHosts, TOPIC); + SimpleConsumer simpleConsumer = new SimpleConsumer("localhost", broker.getPort(), 60000, 1024, "testClient"); + return simpleConsumer; + } + + public static KafkaConfig getKafkaConfig(KafkaTestBroker broker) { + BrokerHosts brokerHosts = getBrokerHosts(broker); + KafkaConfig kafkaConfig = new KafkaConfig(brokerHosts, TOPIC); + return kafkaConfig; + } + + private static BrokerHosts getBrokerHosts(KafkaTestBroker broker) { + GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation(TOPIC); + globalPartitionInformation.addPartition(0, Broker.fromString(broker.getBrokerConnectionString())); + return new StaticHosts(globalPartitionInformation); + } + + public static Properties getProducerProperties(String brokerConnectionString) { + Properties props = new Properties(); + props.put("bootstrap.servers", brokerConnectionString); + props.put("acks", "1"); + props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + return props; + } + + public static boolean verifyMessage(String key, String message, KafkaTestBroker broker, SimpleConsumer simpleConsumer) { + long lastMessageOffset = KafkaUtils.getOffset(simpleConsumer, TestUtils.TOPIC, 0, OffsetRequest.LatestTime()) - 1; + ByteBufferMessageSet messageAndOffsets = KafkaUtils.fetchMessages(TestUtils.getKafkaConfig(broker), simpleConsumer, + new Partition(Broker.fromString(broker.getBrokerConnectionString()),TestUtils.TOPIC, 0), lastMessageOffset); + MessageAndOffset messageAndOffset = messageAndOffsets.iterator().next(); + Message kafkaMessage = messageAndOffset.message(); + ByteBuffer messageKeyBuffer = kafkaMessage.key(); + String keyString = null; + String messageString = new String(Utils.toByteArray(kafkaMessage.payload())); + if (messageKeyBuffer != null) { + keyString = new String(Utils.toByteArray(messageKeyBuffer)); + } + assertEquals(key, keyString); + assertEquals(message, messageString); + return true; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/test/org/apache/storm/kafka/TridentKafkaTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/TridentKafkaTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/TridentKafkaTest.java new file mode 100644 index 0000000..7a6073a --- /dev/null +++ b/external/storm-kafka/src/test/org/apache/storm/kafka/TridentKafkaTest.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + +import org.apache.storm.tuple.Fields; +import kafka.javaapi.consumer.SimpleConsumer; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.apache.storm.kafka.trident.TridentKafkaState; +import org.apache.storm.kafka.trident.mapper.FieldNameBasedTupleToKafkaMapper; +import org.apache.storm.kafka.trident.mapper.TridentTupleToKafkaMapper; +import org.apache.storm.kafka.trident.selector.DefaultTopicSelector; +import org.apache.storm.kafka.trident.selector.KafkaTopicSelector; +import org.apache.storm.trident.tuple.TridentTuple; +import org.apache.storm.trident.tuple.TridentTupleView; + +import java.util.ArrayList; +import java.util.List; + +public class TridentKafkaTest { + private KafkaTestBroker broker; + private TridentKafkaState state; + private SimpleConsumer simpleConsumer; + + @Before + public void setup() { + broker = new KafkaTestBroker(); + simpleConsumer = TestUtils.getKafkaConsumer(broker); + TridentTupleToKafkaMapper mapper = new FieldNameBasedTupleToKafkaMapper("key", "message"); + KafkaTopicSelector topicSelector = new DefaultTopicSelector(TestUtils.TOPIC); + state = new TridentKafkaState() + .withKafkaTopicSelector(topicSelector) + .withTridentTupleToKafkaMapper(mapper); + state.prepare(TestUtils.getProducerProperties(broker.getBrokerConnectionString())); + } + + @Test + public void testKeyValue() { + String keyString = "key-123"; + String valString = "message-123"; + int batchSize = 10; + + List<TridentTuple> tridentTuples = generateTupleBatch(keyString, valString, batchSize); + + state.updateState(tridentTuples, null); + + for(int i = 0 ; i < batchSize ; i++) { + TestUtils.verifyMessage(keyString, valString, broker, simpleConsumer); + } + } + + private List<TridentTuple> generateTupleBatch(String key, String message, int batchsize) { + List<TridentTuple> batch = new ArrayList<>(); + for(int i =0 ; i < batchsize; i++) { + batch.add(TridentTupleView.createFreshTuple(new Fields("key", "message"), key, message)); + } + return batch; + } + + @After + public void shutdown() { + simpleConsumer.close(); + broker.shutdown(); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/test/org/apache/storm/kafka/TridentKafkaTopology.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/TridentKafkaTopology.java b/external/storm-kafka/src/test/org/apache/storm/kafka/TridentKafkaTopology.java new file mode 100644 index 0000000..fdc6752 --- /dev/null +++ b/external/storm-kafka/src/test/org/apache/storm/kafka/TridentKafkaTopology.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; +import com.google.common.collect.ImmutableMap; +import org.apache.storm.kafka.trident.TridentKafkaStateFactory; +import org.apache.storm.kafka.trident.TridentKafkaUpdater; +import org.apache.storm.kafka.trident.mapper.FieldNameBasedTupleToKafkaMapper; +import org.apache.storm.kafka.trident.selector.DefaultTopicSelector; +import org.apache.storm.trident.Stream; +import org.apache.storm.trident.TridentTopology; +import org.apache.storm.trident.testing.FixedBatchSpout; + +import java.util.Properties; + +public class TridentKafkaTopology { + + private static StormTopology buildTopology(String brokerConnectionString) { + Fields fields = new Fields("word", "count"); + FixedBatchSpout spout = new FixedBatchSpout(fields, 4, + new Values("storm", "1"), + new Values("trident", "1"), + new Values("needs", "1"), + new Values("javadoc", "1") + ); + spout.setCycle(true); + + TridentTopology topology = new TridentTopology(); + Stream stream = topology.newStream("spout1", spout); + + Properties props = new Properties(); + props.put("bootstrap.servers", brokerConnectionString); + props.put("acks", "1"); + props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + + TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory() + .withProducerProperties(props) + .withKafkaTopicSelector(new DefaultTopicSelector("test")) + .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word", "count")); + stream.partitionPersist(stateFactory, fields, new TridentKafkaUpdater(), new Fields()); + + return topology.build(); + } + + /** + * To run this topology ensure you have a kafka broker running and provide connection string to broker as argument. + * Create a topic test with command line, + * kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partition 1 --topic test + * + * run this program and run the kafka consumer: + * kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning + * + * you should see the messages flowing through. + * + * @param args + * @throws Exception + */ + public static void main(String[] args) throws Exception { + if(args.length < 1) { + System.out.println("Please provide kafka broker url ,e.g. localhost:9092"); + } + + LocalCluster cluster = new LocalCluster(); + cluster.submitTopology("wordCounter", new Config(), buildTopology(args[0])); + Thread.sleep(60 * 1000); + cluster.killTopology("wordCounter"); + + cluster.shutdown(); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java new file mode 100644 index 0000000..65bf0b4 --- /dev/null +++ b/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + +import org.apache.storm.Config; +import org.apache.curator.test.TestingServer; +import kafka.javaapi.consumer.SimpleConsumer; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.util.*; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Mockito.when; + +public class ZkCoordinatorTest { + + + @Mock + private DynamicBrokersReader reader; + + @Mock + private DynamicPartitionConnections dynamicPartitionConnections; + + private KafkaTestBroker broker = new KafkaTestBroker(); + private TestingServer server; + private Map stormConf = new HashMap(); + private SpoutConfig spoutConfig; + private ZkState state; + private SimpleConsumer simpleConsumer; + + @Before + public void setUp() throws Exception { + MockitoAnnotations.initMocks(this); + server = new TestingServer(); + String connectionString = server.getConnectString(); + ZkHosts hosts = new ZkHosts(connectionString); + hosts.refreshFreqSecs = 1; + spoutConfig = new SpoutConfig(hosts, "topic", "/test", "id"); + Map conf = buildZookeeperConfig(server); + state = new ZkState(conf); + simpleConsumer = new SimpleConsumer("localhost", broker.getPort(), 60000, 1024, "testClient"); + when(dynamicPartitionConnections.register(any(Broker.class), any(String.class) ,anyInt())).thenReturn(simpleConsumer); + } + + private Map buildZookeeperConfig(TestingServer server) { + Map conf = new HashMap(); + conf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, server.getPort()); + conf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Arrays.asList("localhost")); + conf.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 20000); + conf.put(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT, 20000); + conf.put(Config.STORM_ZOOKEEPER_RETRY_TIMES, 3); + conf.put(Config.STORM_ZOOKEEPER_RETRY_INTERVAL, 30); + return conf; + } + + @After + public void shutdown() throws Exception { + simpleConsumer.close(); + broker.shutdown(); + server.close(); + } + + @Test + public void testOnePartitionPerTask() throws Exception { + int totalTasks = 64; + int partitionsPerTask = 1; + List<ZkCoordinator> coordinatorList = buildCoordinators(totalTasks / partitionsPerTask); + when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(TestUtils.buildPartitionInfo(totalTasks))); + for (ZkCoordinator coordinator : coordinatorList) { + List<PartitionManager> myManagedPartitions = coordinator.getMyManagedPartitions(); + assertEquals(partitionsPerTask, myManagedPartitions.size()); + assertEquals(coordinator._taskIndex, myManagedPartitions.get(0).getPartition().partition); + } + } + + + @Test + public void testPartitionsChange() throws Exception { + final int totalTasks = 64; + int partitionsPerTask = 2; + List<ZkCoordinator> coordinatorList = buildCoordinators(totalTasks / partitionsPerTask); + when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(TestUtils.buildPartitionInfo(totalTasks, 9092))); + List<List<PartitionManager>> partitionManagersBeforeRefresh = getPartitionManagers(coordinatorList); + waitForRefresh(); + when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(TestUtils.buildPartitionInfo(totalTasks, 9093))); + List<List<PartitionManager>> partitionManagersAfterRefresh = getPartitionManagers(coordinatorList); + assertEquals(partitionManagersAfterRefresh.size(), partitionManagersAfterRefresh.size()); + Iterator<List<PartitionManager>> iterator = partitionManagersAfterRefresh.iterator(); + for (List<PartitionManager> partitionManagersBefore : partitionManagersBeforeRefresh) { + List<PartitionManager> partitionManagersAfter = iterator.next(); + assertPartitionsAreDifferent(partitionManagersBefore, partitionManagersAfter, partitionsPerTask); + } + } + + private void assertPartitionsAreDifferent(List<PartitionManager> partitionManagersBefore, List<PartitionManager> partitionManagersAfter, int partitionsPerTask) { + assertEquals(partitionsPerTask, partitionManagersBefore.size()); + assertEquals(partitionManagersBefore.size(), partitionManagersAfter.size()); + for (int i = 0; i < partitionsPerTask; i++) { + assertNotEquals(partitionManagersBefore.get(i).getPartition(), partitionManagersAfter.get(i).getPartition()); + } + + } + + private List<List<PartitionManager>> getPartitionManagers(List<ZkCoordinator> coordinatorList) { + List<List<PartitionManager>> partitions = new ArrayList(); + for (ZkCoordinator coordinator : coordinatorList) { + partitions.add(coordinator.getMyManagedPartitions()); + } + return partitions; + } + + private void waitForRefresh() throws InterruptedException { + Thread.sleep(((ZkHosts) spoutConfig.hosts).refreshFreqSecs * 1000 + 1); + } + + private List<ZkCoordinator> buildCoordinators(int totalTasks) { + List<ZkCoordinator> coordinatorList = new ArrayList<ZkCoordinator>(); + for (int i = 0; i < totalTasks; i++) { + ZkCoordinator coordinator = new ZkCoordinator(dynamicPartitionConnections, stormConf, spoutConfig, state, i, totalTasks, "test-id", reader); + coordinatorList.add(coordinator); + } + return coordinatorList; + } + + +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/test/org/apache/storm/kafka/bolt/KafkaBoltTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/bolt/KafkaBoltTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/bolt/KafkaBoltTest.java new file mode 100644 index 0000000..180828e --- /dev/null +++ b/external/storm-kafka/src/test/org/apache/storm/kafka/bolt/KafkaBoltTest.java @@ -0,0 +1,341 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.bolt; + +import org.apache.storm.Config; +import org.apache.storm.Constants; +import org.apache.storm.task.GeneralTopologyContext; +import org.apache.storm.task.IOutputCollector; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.TupleImpl; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.TupleUtils; +import org.apache.storm.utils.Utils; +import com.google.common.collect.ImmutableList; +import kafka.api.OffsetRequest; +import kafka.api.FetchRequest; +import kafka.javaapi.FetchResponse; +import kafka.javaapi.OffsetResponse; +import kafka.javaapi.consumer.SimpleConsumer; +import kafka.javaapi.message.ByteBufferMessageSet; +import kafka.message.Message; +import kafka.message.MessageAndOffset; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.junit.*; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.mockito.internal.util.reflection.Whitebox; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.apache.storm.kafka.*; +import org.apache.storm.kafka.trident.GlobalPartitionInformation; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.HashMap; +import java.util.Properties; +import java.util.concurrent.Future; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.*; + +public class KafkaBoltTest { + + private static final String TEST_TOPIC = "test-topic"; + private KafkaTestBroker broker; + private KafkaBolt bolt; + private Config config = new Config(); + private KafkaConfig kafkaConfig; + private SimpleConsumer simpleConsumer; + + @Mock + private IOutputCollector collector; + + @Before + public void initMocks() { + MockitoAnnotations.initMocks(this); + broker = new KafkaTestBroker(); + setupKafkaConsumer(); + config.put(KafkaBolt.TOPIC, TEST_TOPIC); + bolt = generateStringSerializerBolt(); + } + + @After + public void shutdown() { + simpleConsumer.close(); + broker.shutdown(); + bolt.cleanup(); + } + + private void setupKafkaConsumer() { + GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation(TEST_TOPIC); + globalPartitionInformation.addPartition(0, Broker.fromString(broker.getBrokerConnectionString())); + BrokerHosts brokerHosts = new StaticHosts(globalPartitionInformation); + kafkaConfig = new KafkaConfig(brokerHosts, TEST_TOPIC); + simpleConsumer = new SimpleConsumer("localhost", broker.getPort(), 60000, 1024, "testClient"); + } + + @Test + public void shouldAcknowledgeTickTuples() throws Exception { + // Given + Tuple tickTuple = mockTickTuple(); + + // When + bolt.execute(tickTuple); + + // Then + verify(collector).ack(tickTuple); + } + + @Test + public void executeWithKey() throws Exception { + String message = "value-123"; + String key = "key-123"; + Tuple tuple = generateTestTuple(key, message); + bolt.execute(tuple); + verify(collector).ack(tuple); + verifyMessage(key, message); + } + + /* test synchronous sending */ + @Test + public void executeWithByteArrayKeyAndMessageSync() { + boolean async = false; + boolean fireAndForget = false; + bolt = generateDefaultSerializerBolt(async, fireAndForget, null); + String keyString = "test-key"; + String messageString = "test-message"; + byte[] key = keyString.getBytes(); + byte[] message = messageString.getBytes(); + Tuple tuple = generateTestTuple(key, message); + bolt.execute(tuple); + verify(collector).ack(tuple); + verifyMessage(keyString, messageString); + } + + /* test asynchronous sending (default) */ + @Test + public void executeWithByteArrayKeyAndMessageAsync() { + boolean async = true; + boolean fireAndForget = false; + String keyString = "test-key"; + String messageString = "test-message"; + byte[] key = keyString.getBytes(); + byte[] message = messageString.getBytes(); + final Tuple tuple = generateTestTuple(key, message); + + final ByteBufferMessageSet mockMsg = mockSingleMessage(key, message); + simpleConsumer.close(); + simpleConsumer = mockSimpleConsumer(mockMsg); + KafkaProducer<?, ?> producer = mock(KafkaProducer.class); + when(producer.send(any(ProducerRecord.class), any(Callback.class))).thenAnswer(new Answer<Future>() { + @Override + public Future answer(InvocationOnMock invocationOnMock) throws Throwable { + Callback cb = (Callback) invocationOnMock.getArguments()[1]; + cb.onCompletion(null, null); + return mock(Future.class); + } + }); + bolt = generateDefaultSerializerBolt(async, fireAndForget, producer); + bolt.execute(tuple); + verify(collector).ack(tuple); + verifyMessage(keyString, messageString); + } + + /* test with fireAndForget option enabled */ + @Test + public void executeWithByteArrayKeyAndMessageFire() { + boolean async = true; + boolean fireAndForget = true; + bolt = generateDefaultSerializerBolt(async, fireAndForget, null); + String keyString = "test-key"; + String messageString = "test-message"; + byte[] key = keyString.getBytes(); + byte[] message = messageString.getBytes(); + Tuple tuple = generateTestTuple(key, message); + final ByteBufferMessageSet mockMsg = mockSingleMessage(key, message); + simpleConsumer.close(); + simpleConsumer = mockSimpleConsumer(mockMsg); + KafkaProducer<?, ?> producer = mock(KafkaProducer.class); + // do not invoke the callback of send() in order to test whether the bolt handle the fireAndForget option + // properly. + doReturn(mock(Future.class)).when(producer).send(any(ProducerRecord.class), any(Callback.class)); + bolt.execute(tuple); + verify(collector).ack(tuple); + verifyMessage(keyString, messageString); + } + + /* test bolt specified properties */ + @Test + public void executeWithBoltSpecifiedProperties() { + boolean async = false; + boolean fireAndForget = false; + bolt = defaultSerializerBoltWithSpecifiedProperties(async, fireAndForget); + String keyString = "test-key"; + String messageString = "test-message"; + byte[] key = keyString.getBytes(); + byte[] message = messageString.getBytes(); + Tuple tuple = generateTestTuple(key, message); + bolt.execute(tuple); + verify(collector).ack(tuple); + verifyMessage(keyString, messageString); + } + + private KafkaBolt generateStringSerializerBolt() { + Properties props = new Properties(); + props.put("acks", "1"); + props.put("bootstrap.servers", broker.getBrokerConnectionString()); + props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("metadata.fetch.timeout.ms", 1000); + KafkaBolt bolt = new KafkaBolt().withProducerProperties(props); + bolt.prepare(config, null, new OutputCollector(collector)); + bolt.setAsync(false); + return bolt; + } + + private KafkaBolt generateDefaultSerializerBolt(boolean async, boolean fireAndForget, + KafkaProducer<?, ?> mockProducer) { + Properties props = new Properties(); + props.put("acks", "1"); + props.put("bootstrap.servers", broker.getBrokerConnectionString()); + props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); + props.put("metadata.fetch.timeout.ms", 1000); + props.put("linger.ms", 0); + KafkaBolt bolt = new KafkaBolt().withProducerProperties(props); + bolt.prepare(config, null, new OutputCollector(collector)); + bolt.setAsync(async); + bolt.setFireAndForget(fireAndForget); + if (mockProducer != null) { + Whitebox.setInternalState(bolt, "producer", mockProducer); + } + return bolt; + } + + private KafkaBolt defaultSerializerBoltWithSpecifiedProperties(boolean async, boolean fireAndForget) { + Properties props = new Properties(); + props.put("acks", "1"); + props.put("bootstrap.servers", broker.getBrokerConnectionString()); + props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); + props.put("metadata.fetch.timeout.ms", 1000); + props.put("linger.ms", 0); + KafkaBolt bolt = new KafkaBolt().withProducerProperties(props); + bolt.prepare(config, null, new OutputCollector(collector)); + bolt.setAsync(async); + bolt.setFireAndForget(fireAndForget); + return bolt; + } + + @Test + public void executeWithoutKey() throws Exception { + String message = "value-234"; + Tuple tuple = generateTestTuple(message); + bolt.execute(tuple); + verify(collector).ack(tuple); + verifyMessage(null, message); + } + + + @Test + public void executeWithBrokerDown() throws Exception { + broker.shutdown(); + String message = "value-234"; + Tuple tuple = generateTestTuple(message); + bolt.execute(tuple); + verify(collector).fail(tuple); + } + + private boolean verifyMessage(String key, String message) { + long lastMessageOffset = KafkaUtils.getOffset(simpleConsumer, kafkaConfig.topic, 0, OffsetRequest.LatestTime()) - 1; + ByteBufferMessageSet messageAndOffsets = KafkaUtils.fetchMessages(kafkaConfig, simpleConsumer, + new Partition(Broker.fromString(broker.getBrokerConnectionString()),kafkaConfig.topic, 0), lastMessageOffset); + MessageAndOffset messageAndOffset = messageAndOffsets.iterator().next(); + Message kafkaMessage = messageAndOffset.message(); + ByteBuffer messageKeyBuffer = kafkaMessage.key(); + String keyString = null; + String messageString = new String(Utils.toByteArray(kafkaMessage.payload())); + if (messageKeyBuffer != null) { + keyString = new String(Utils.toByteArray(messageKeyBuffer)); + } + assertEquals(key, keyString); + assertEquals(message, messageString); + return true; + } + + private Tuple generateTestTuple(Object key, Object message) { + TopologyBuilder builder = new TopologyBuilder(); + GeneralTopologyContext topologyContext = new GeneralTopologyContext(builder.createTopology(), new Config(), new HashMap(), new HashMap(), new HashMap(), "") { + @Override + public Fields getComponentOutputFields(String componentId, String streamId) { + return new Fields("key", "message"); + } + }; + return new TupleImpl(topologyContext, new Values(key, message), 1, ""); + } + + private Tuple generateTestTuple(Object message) { + TopologyBuilder builder = new TopologyBuilder(); + GeneralTopologyContext topologyContext = new GeneralTopologyContext(builder.createTopology(), new Config(), new HashMap(), new HashMap(), new HashMap(), "") { + @Override + public Fields getComponentOutputFields(String componentId, String streamId) { + return new Fields("message"); + } + }; + return new TupleImpl(topologyContext, new Values(message), 1, ""); + } + + private Tuple mockTickTuple() { + Tuple tuple = mock(Tuple.class); + when(tuple.getSourceComponent()).thenReturn(Constants.SYSTEM_COMPONENT_ID); + when(tuple.getSourceStreamId()).thenReturn(Constants.SYSTEM_TICK_STREAM_ID); + // Sanity check + assertTrue(TupleUtils.isTick(tuple)); + return tuple; + } + + private static ByteBufferMessageSet mockSingleMessage(byte[] key, byte[] message) { + ByteBufferMessageSet sets = mock(ByteBufferMessageSet.class); + MessageAndOffset msg = mock(MessageAndOffset.class); + final List<MessageAndOffset> msgs = ImmutableList.of(msg); + doReturn(msgs.iterator()).when(sets).iterator(); + Message kafkaMessage = mock(Message.class); + doReturn(ByteBuffer.wrap(key)).when(kafkaMessage).key(); + doReturn(ByteBuffer.wrap(message)).when(kafkaMessage).payload(); + doReturn(kafkaMessage).when(msg).message(); + return sets; + } + + private static SimpleConsumer mockSimpleConsumer(ByteBufferMessageSet mockMsg) { + SimpleConsumer simpleConsumer = mock(SimpleConsumer.class); + FetchResponse resp = mock(FetchResponse.class); + doReturn(resp).when(simpleConsumer).fetch(any(FetchRequest.class)); + OffsetResponse mockOffsetResponse = mock(OffsetResponse.class); + doReturn(new long[] {}).when(mockOffsetResponse).offsets(anyString(), anyInt()); + doReturn(mockOffsetResponse).when(simpleConsumer).getOffsetsBefore(any(kafka.javaapi.OffsetRequest.class)); + doReturn(mockMsg).when(resp).messageSet(anyString(), anyInt()); + return simpleConsumer; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/test/storm/kafka/DynamicBrokersReaderTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/test/storm/kafka/DynamicBrokersReaderTest.java b/external/storm-kafka/src/test/storm/kafka/DynamicBrokersReaderTest.java deleted file mode 100644 index d871924..0000000 --- a/external/storm-kafka/src/test/storm/kafka/DynamicBrokersReaderTest.java +++ /dev/null @@ -1,252 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka; - -import backtype.storm.Config; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.retry.ExponentialBackoffRetry; -import org.apache.curator.test.TestingServer; -import org.apache.curator.utils.ZKPaths; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import storm.kafka.trident.GlobalPartitionInformation; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - -/** - * Date: 16/05/2013 - * Time: 20:35 - */ -public class DynamicBrokersReaderTest { - private DynamicBrokersReader dynamicBrokersReader, wildCardBrokerReader; - private String masterPath = "/brokers"; - private String topic = "testing1"; - private String secondTopic = "testing2"; - private String thirdTopic = "testing3"; - - private CuratorFramework zookeeper; - private TestingServer server; - - @Before - public void setUp() throws Exception { - server = new TestingServer(); - String connectionString = server.getConnectString(); - Map conf = new HashMap(); - conf.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 1000); - conf.put(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT, 1000); - conf.put(Config.STORM_ZOOKEEPER_RETRY_TIMES, 4); - conf.put(Config.STORM_ZOOKEEPER_RETRY_INTERVAL, 5); - - ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3); - zookeeper = CuratorFrameworkFactory.newClient(connectionString, retryPolicy); - dynamicBrokersReader = new DynamicBrokersReader(conf, connectionString, masterPath, topic); - - Map conf2 = new HashMap(); - conf2.putAll(conf); - conf2.put("kafka.topic.wildcard.match",true); - - wildCardBrokerReader = new DynamicBrokersReader(conf2, connectionString, masterPath, "^test.*$"); - zookeeper.start(); - } - - @After - public void tearDown() throws Exception { - dynamicBrokersReader.close(); - zookeeper.close(); - server.close(); - } - - private void addPartition(int id, String host, int port, String topic) throws Exception { - writePartitionId(id, topic); - writeLeader(id, 0, topic); - writeLeaderDetails(0, host, port); - } - - private void addPartition(int id, int leader, String host, int port, String topic) throws Exception { - writePartitionId(id, topic); - writeLeader(id, leader, topic); - writeLeaderDetails(leader, host, port); - } - - private void writePartitionId(int id, String topic) throws Exception { - String path = dynamicBrokersReader.partitionPath(topic); - writeDataToPath(path, ("" + id)); - } - - private void writeDataToPath(String path, String data) throws Exception { - ZKPaths.mkdirs(zookeeper.getZookeeperClient().getZooKeeper(), path); - zookeeper.setData().forPath(path, data.getBytes()); - } - - private void writeLeader(int id, int leaderId, String topic) throws Exception { - String path = dynamicBrokersReader.partitionPath(topic) + "/" + id + "/state"; - String value = " { \"controller_epoch\":4, \"isr\":[ 1, 0 ], \"leader\":" + leaderId + ", \"leader_epoch\":1, \"version\":1 }"; - writeDataToPath(path, value); - } - - private void writeLeaderDetails(int leaderId, String host, int port) throws Exception { - String path = dynamicBrokersReader.brokerPath() + "/" + leaderId; - String value = "{ \"host\":\"" + host + "\", \"jmx_port\":9999, \"port\":" + port + ", \"version\":1 }"; - writeDataToPath(path, value); - } - - - private GlobalPartitionInformation getByTopic(List<GlobalPartitionInformation> partitions, String topic){ - for(GlobalPartitionInformation partitionInformation : partitions) { - if (partitionInformation.topic.equals(topic)) return partitionInformation; - } - return null; - } - - @Test - public void testGetBrokerInfo() throws Exception { - String host = "localhost"; - int port = 9092; - int partition = 0; - addPartition(partition, host, port, topic); - List<GlobalPartitionInformation> partitions = dynamicBrokersReader.getBrokerInfo(); - - GlobalPartitionInformation brokerInfo = getByTopic(partitions, topic); - assertNotNull(brokerInfo); - assertEquals(1, brokerInfo.getOrderedPartitions().size()); - assertEquals(port, brokerInfo.getBrokerFor(partition).port); - assertEquals(host, brokerInfo.getBrokerFor(partition).host); - } - - @Test - public void testGetBrokerInfoWildcardMatch() throws Exception { - String host = "localhost"; - int port = 9092; - int partition = 0; - addPartition(partition, host, port, topic); - addPartition(partition, host, port, secondTopic); - - List<GlobalPartitionInformation> partitions = wildCardBrokerReader.getBrokerInfo(); - - GlobalPartitionInformation brokerInfo = getByTopic(partitions, topic); - assertNotNull(brokerInfo); - assertEquals(1, brokerInfo.getOrderedPartitions().size()); - assertEquals(port, brokerInfo.getBrokerFor(partition).port); - assertEquals(host, brokerInfo.getBrokerFor(partition).host); - - brokerInfo = getByTopic(partitions, secondTopic); - assertNotNull(brokerInfo); - assertEquals(1, brokerInfo.getOrderedPartitions().size()); - assertEquals(port, brokerInfo.getBrokerFor(partition).port); - assertEquals(host, brokerInfo.getBrokerFor(partition).host); - - addPartition(partition, host, port, thirdTopic); - //Discover newly added topic - partitions = wildCardBrokerReader.getBrokerInfo(); - assertNotNull(getByTopic(partitions, topic)); - assertNotNull(getByTopic(partitions, secondTopic)); - assertNotNull(getByTopic(partitions, secondTopic)); - } - - - @Test - public void testMultiplePartitionsOnDifferentHosts() throws Exception { - String host = "localhost"; - int port = 9092; - int secondPort = 9093; - int partition = 0; - int secondPartition = partition + 1; - addPartition(partition, 0, host, port, topic); - addPartition(secondPartition, 1, host, secondPort, topic); - - List<GlobalPartitionInformation> partitions = dynamicBrokersReader.getBrokerInfo(); - - GlobalPartitionInformation brokerInfo = getByTopic(partitions, topic); - assertNotNull(brokerInfo); - assertEquals(2, brokerInfo.getOrderedPartitions().size()); - - assertEquals(port, brokerInfo.getBrokerFor(partition).port); - assertEquals(host, brokerInfo.getBrokerFor(partition).host); - - assertEquals(secondPort, brokerInfo.getBrokerFor(secondPartition).port); - assertEquals(host, brokerInfo.getBrokerFor(secondPartition).host); - } - - - @Test - public void testMultiplePartitionsOnSameHost() throws Exception { - String host = "localhost"; - int port = 9092; - int partition = 0; - int secondPartition = partition + 1; - addPartition(partition, 0, host, port, topic); - addPartition(secondPartition, 0, host, port, topic); - - List<GlobalPartitionInformation> partitions = dynamicBrokersReader.getBrokerInfo(); - - GlobalPartitionInformation brokerInfo = getByTopic(partitions, topic); - assertNotNull(brokerInfo); - assertEquals(2, brokerInfo.getOrderedPartitions().size()); - - assertEquals(port, brokerInfo.getBrokerFor(partition).port); - assertEquals(host, brokerInfo.getBrokerFor(partition).host); - - assertEquals(port, brokerInfo.getBrokerFor(secondPartition).port); - assertEquals(host, brokerInfo.getBrokerFor(secondPartition).host); - } - - @Test - public void testSwitchHostForPartition() throws Exception { - String host = "localhost"; - int port = 9092; - int partition = 0; - addPartition(partition, host, port, topic); - List<GlobalPartitionInformation> partitions = dynamicBrokersReader.getBrokerInfo(); - - GlobalPartitionInformation brokerInfo = getByTopic(partitions, topic); - assertNotNull(brokerInfo); - assertEquals(port, brokerInfo.getBrokerFor(partition).port); - assertEquals(host, brokerInfo.getBrokerFor(partition).host); - - String newHost = host + "switch"; - int newPort = port + 1; - addPartition(partition, newHost, newPort, topic); - partitions = dynamicBrokersReader.getBrokerInfo(); - - brokerInfo = getByTopic(partitions, topic); - assertNotNull(brokerInfo); - assertEquals(newPort, brokerInfo.getBrokerFor(partition).port); - assertEquals(newHost, brokerInfo.getBrokerFor(partition).host); - } - - @Test(expected = NullPointerException.class) - public void testErrorLogsWhenConfigIsMissing() throws Exception { - String connectionString = server.getConnectString(); - Map conf = new HashMap(); - conf.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 1000); -// conf.put(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT, 1000); - conf.put(Config.STORM_ZOOKEEPER_RETRY_TIMES, 4); - conf.put(Config.STORM_ZOOKEEPER_RETRY_INTERVAL, 5); - - DynamicBrokersReader dynamicBrokersReader1 = new DynamicBrokersReader(conf, connectionString, masterPath, topic); - - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/test/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/test/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java b/external/storm-kafka/src/test/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java deleted file mode 100644 index da23718..0000000 --- a/external/storm-kafka/src/test/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java +++ /dev/null @@ -1,235 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import org.junit.Test; - -public class ExponentialBackoffMsgRetryManagerTest { - - private static final Long TEST_OFFSET = 101L; - private static final Long TEST_OFFSET2 = 102L; - private static final Long TEST_OFFSET3 = 105L; - private static final Long TEST_NEW_OFFSET = 103L; - - @Test - public void testImmediateRetry() throws Exception { - ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0); - manager.failed(TEST_OFFSET); - Long next = manager.nextFailedMessageToRetry(); - assertEquals("expect test offset next available for retry", TEST_OFFSET, next); - assertTrue("message should be ready for retry immediately", manager.shouldRetryMsg(TEST_OFFSET)); - - manager.retryStarted(TEST_OFFSET); - - manager.failed(TEST_OFFSET); - next = manager.nextFailedMessageToRetry(); - assertEquals("expect test offset next available for retry", TEST_OFFSET, next); - assertTrue("message should be ready for retry immediately", manager.shouldRetryMsg(TEST_OFFSET)); - } - - @Test - public void testSingleDelay() throws Exception { - ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(100, 1d, 1000); - manager.failed(TEST_OFFSET); - Thread.sleep(5); - Long next = manager.nextFailedMessageToRetry(); - assertNull("expect no message ready for retry yet", next); - assertFalse("message should not be ready for retry yet", manager.shouldRetryMsg(TEST_OFFSET)); - - Thread.sleep(100); - next = manager.nextFailedMessageToRetry(); - assertEquals("expect test offset next available for retry", TEST_OFFSET, next); - assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET)); - } - - @Test - public void testExponentialBackoff() throws Exception { - final long initial = 10; - final double mult = 2d; - ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(initial, mult, initial * 10); - - long expectedWaitTime = initial; - for (long i = 0L; i < 3L; ++i) { - manager.failed(TEST_OFFSET); - - Thread.sleep((expectedWaitTime + 1L) / 2L); - assertFalse("message should not be ready for retry yet", manager.shouldRetryMsg(TEST_OFFSET)); - - Thread.sleep((expectedWaitTime + 1L) / 2L); - Long next = manager.nextFailedMessageToRetry(); - assertEquals("expect test offset next available for retry", TEST_OFFSET, next); - assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET)); - - manager.retryStarted(TEST_OFFSET); - expectedWaitTime *= mult; - } - } - - @Test - public void testRetryOrder() throws Exception { - final long initial = 10; - final double mult = 2d; - final long max = 20; - ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(initial, mult, max); - - manager.failed(TEST_OFFSET); - Thread.sleep(initial); - - manager.retryStarted(TEST_OFFSET); - manager.failed(TEST_OFFSET); - manager.failed(TEST_OFFSET2); - - // although TEST_OFFSET failed first, it's retry delay time is longer b/c this is the second retry - // so TEST_OFFSET2 should come first - - Thread.sleep(initial * 2); - assertTrue("message "+TEST_OFFSET+"should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET)); - assertTrue("message "+TEST_OFFSET2+"should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET2)); - - Long next = manager.nextFailedMessageToRetry(); - assertEquals("expect first message to retry is "+TEST_OFFSET2, TEST_OFFSET2, next); - - Thread.sleep(initial); - - // haven't retried yet, so first should still be TEST_OFFSET2 - next = manager.nextFailedMessageToRetry(); - assertEquals("expect first message to retry is "+TEST_OFFSET2, TEST_OFFSET2, next); - manager.retryStarted(next); - - // now it should be TEST_OFFSET - next = manager.nextFailedMessageToRetry(); - assertEquals("expect message to retry is now "+TEST_OFFSET, TEST_OFFSET, next); - manager.retryStarted(next); - - // now none left - next = manager.nextFailedMessageToRetry(); - assertNull("expect no message to retry now", next); - } - - @Test - public void testQueriesAfterRetriedAlready() throws Exception { - ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0); - manager.failed(TEST_OFFSET); - Long next = manager.nextFailedMessageToRetry(); - assertEquals("expect test offset next available for retry", TEST_OFFSET, next); - assertTrue("message should be ready for retry immediately", manager.shouldRetryMsg(TEST_OFFSET)); - - manager.retryStarted(TEST_OFFSET); - next = manager.nextFailedMessageToRetry(); - assertNull("expect no message ready after retried", next); - assertFalse("message should not be ready after retried", manager.shouldRetryMsg(TEST_OFFSET)); - } - - @Test(expected = IllegalStateException.class) - public void testRetryWithoutFail() throws Exception { - ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0); - manager.retryStarted(TEST_OFFSET); - } - - @Test(expected = IllegalStateException.class) - public void testFailRetryRetry() throws Exception { - ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0); - manager.failed(TEST_OFFSET); - try { - manager.retryStarted(TEST_OFFSET); - } catch (IllegalStateException ise) { - fail("IllegalStateException unexpected here: " + ise); - } - - assertFalse("message should not be ready for retry", manager.shouldRetryMsg(TEST_OFFSET)); - manager.retryStarted(TEST_OFFSET); - } - - @Test - public void testMaxBackoff() throws Exception { - final long initial = 100; - final double mult = 2d; - final long max = 2000; - ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(initial, mult, max); - - long expectedWaitTime = initial; - for (long i = 0L; i < 4L; ++i) { - manager.failed(TEST_OFFSET); - - Thread.sleep((expectedWaitTime + 1L) / 2L); - assertFalse("message should not be ready for retry yet", manager.shouldRetryMsg(TEST_OFFSET)); - - Thread.sleep((expectedWaitTime + 1L) / 2L); - Long next = manager.nextFailedMessageToRetry(); - assertEquals("expect test offset next available for retry", TEST_OFFSET, next); - assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET)); - - manager.retryStarted(TEST_OFFSET); - expectedWaitTime = Math.min((long) (expectedWaitTime * mult), max); - } - } - - @Test - public void testFailThenAck() throws Exception { - ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0); - manager.failed(TEST_OFFSET); - assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET)); - - manager.acked(TEST_OFFSET); - - Long next = manager.nextFailedMessageToRetry(); - assertNull("expect no message ready after acked", next); - assertFalse("message should not be ready after acked", manager.shouldRetryMsg(TEST_OFFSET)); - } - - @Test - public void testAckThenFail() throws Exception { - ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0); - manager.acked(TEST_OFFSET); - assertFalse("message should not be ready after acked", manager.shouldRetryMsg(TEST_OFFSET)); - - manager.failed(TEST_OFFSET); - - Long next = manager.nextFailedMessageToRetry(); - assertEquals("expect test offset next available for retry", TEST_OFFSET, next); - assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET)); - } - - @Test - public void testClearInvalidMessages() throws Exception { - ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0); - manager.failed(TEST_OFFSET); - manager.failed(TEST_OFFSET2); - manager.failed(TEST_OFFSET3); - - assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET)); - assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET2)); - assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET3)); - - manager.clearInvalidMessages(TEST_NEW_OFFSET); - - Long next = manager.nextFailedMessageToRetry(); - assertEquals("expect test offset next available for retry", TEST_OFFSET3, next); - - manager.acked(TEST_OFFSET3); - next = manager.nextFailedMessageToRetry(); - assertNull("expect no message ready after acked", next); - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/test/storm/kafka/KafkaErrorTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/test/storm/kafka/KafkaErrorTest.java b/external/storm-kafka/src/test/storm/kafka/KafkaErrorTest.java deleted file mode 100644 index 9f170db..0000000 --- a/external/storm-kafka/src/test/storm/kafka/KafkaErrorTest.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka; - -import org.junit.Test; - -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertThat; - -/** - * Date: 12/01/2014 - * Time: 18:09 - */ -public class KafkaErrorTest { - - @Test - public void getError() { - assertThat(KafkaError.getError(0), is(equalTo(KafkaError.NO_ERROR))); - } - - @Test - public void offsetMetaDataTooLarge() { - assertThat(KafkaError.getError(12), is(equalTo(KafkaError.OFFSET_METADATA_TOO_LARGE))); - } - - @Test - public void unknownNegative() { - assertThat(KafkaError.getError(-1), is(equalTo(KafkaError.UNKNOWN))); - } - - @Test - public void unknownPositive() { - assertThat(KafkaError.getError(75), is(equalTo(KafkaError.UNKNOWN))); - } - - @Test - public void unknown() { - assertThat(KafkaError.getError(13), is(equalTo(KafkaError.UNKNOWN))); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/test/storm/kafka/KafkaTestBroker.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/test/storm/kafka/KafkaTestBroker.java b/external/storm-kafka/src/test/storm/kafka/KafkaTestBroker.java deleted file mode 100644 index 73203d1..0000000 --- a/external/storm-kafka/src/test/storm/kafka/KafkaTestBroker.java +++ /dev/null @@ -1,92 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka; - -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.framework.imps.CuratorFrameworkState; -import org.apache.curator.retry.ExponentialBackoffRetry; -import org.apache.curator.test.InstanceSpec; -import org.apache.curator.test.TestingServer; - -import kafka.server.KafkaConfig; -import kafka.server.KafkaServerStartable; -import org.apache.commons.io.FileUtils; - -import java.io.File; -import java.io.IOException; -import java.util.Properties; - -/** - * Date: 11/01/2014 - * Time: 13:15 - */ -public class KafkaTestBroker { - - private int port; - private KafkaServerStartable kafka; - private TestingServer server; - private CuratorFramework zookeeper; - private File logDir; - - public KafkaTestBroker() { - try { - server = new TestingServer(); - String zookeeperConnectionString = server.getConnectString(); - ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3); - zookeeper = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy); - zookeeper.start(); - port = InstanceSpec.getRandomPort(); - logDir = new File(System.getProperty("java.io.tmpdir"), "kafka/logs/kafka-test-" + port); - KafkaConfig config = buildKafkaConfig(zookeeperConnectionString); - kafka = new KafkaServerStartable(config); - kafka.startup(); - } catch (Exception ex) { - throw new RuntimeException("Could not start test broker", ex); - } - } - - private kafka.server.KafkaConfig buildKafkaConfig(String zookeeperConnectionString) { - Properties p = new Properties(); - p.setProperty("zookeeper.connect", zookeeperConnectionString); - p.setProperty("broker.id", "0"); - p.setProperty("port", "" + port); - p.setProperty("log.dirs", logDir.getAbsolutePath()); - return new KafkaConfig(p); - } - - public String getBrokerConnectionString() { - return "localhost:" + port; - } - - public int getPort() { - return port; - } - public void shutdown() { - kafka.shutdown(); - if (zookeeper.getState().equals(CuratorFrameworkState.STARTED)) { - zookeeper.close(); - } - try { - server.close(); - } catch (IOException e) { - e.printStackTrace(); - } - FileUtils.deleteQuietly(logDir); - } -}
