http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaUtilsTest.java 
b/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaUtilsTest.java
new file mode 100644
index 0000000..9da6c0a
--- /dev/null
+++ b/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaUtilsTest.java
@@ -0,0 +1,295 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import kafka.api.OffsetRequest;
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.javaapi.message.ByteBufferMessageSet;
+import kafka.message.MessageAndOffset;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.storm.kafka.trident.GlobalPartitionInformation;
+import org.apache.storm.spout.SchemeAsMultiScheme;
+import org.apache.storm.utils.Utils;
+
+import com.google.common.collect.ImmutableMap;
+public class KafkaUtilsTest {
+    private String TEST_TOPIC = "testTopic";
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaUtilsTest.class);
+    private KafkaTestBroker broker;
+    private SimpleConsumer simpleConsumer;
+    private KafkaConfig config;
+    private BrokerHosts brokerHosts;
+
+    @Before
+    public void setup() {
+        broker = new KafkaTestBroker();
+        GlobalPartitionInformation globalPartitionInformation = new 
GlobalPartitionInformation(TEST_TOPIC);
+        globalPartitionInformation.addPartition(0, 
Broker.fromString(broker.getBrokerConnectionString()));
+        brokerHosts = new StaticHosts(globalPartitionInformation);
+        config = new KafkaConfig(brokerHosts, TEST_TOPIC);
+        simpleConsumer = new SimpleConsumer("localhost", broker.getPort(), 
60000, 1024, "testClient");
+    }
+
+    @After
+    public void shutdown() {
+        simpleConsumer.close();
+        broker.shutdown();
+    }
+
+
+    @Test(expected = FailedFetchException.class)
+    public void topicDoesNotExist() throws Exception {
+        KafkaUtils.fetchMessages(config, simpleConsumer, new 
Partition(Broker.fromString(broker.getBrokerConnectionString()), TEST_TOPIC, 
0), 0);
+    }
+
+    @Test(expected = FailedFetchException.class)
+    public void brokerIsDown() throws Exception {
+        int port = broker.getPort();
+        broker.shutdown();
+        SimpleConsumer simpleConsumer = new SimpleConsumer("localhost", port, 
100, 1024, "testClient");
+        try {
+            KafkaUtils.fetchMessages(config, simpleConsumer, new 
Partition(Broker.fromString(broker.getBrokerConnectionString()), TEST_TOPIC, 
0), OffsetRequest.LatestTime());
+        } finally {
+            simpleConsumer.close();
+        }
+    }
+
+    @Test
+    public void fetchMessage() throws Exception {
+        String value = "test";
+        createTopicAndSendMessage(value);
+        long offset = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, 
OffsetRequest.LatestTime()) - 1;
+        ByteBufferMessageSet messageAndOffsets = 
KafkaUtils.fetchMessages(config, simpleConsumer,
+                new 
Partition(Broker.fromString(broker.getBrokerConnectionString()), TEST_TOPIC, 
0), offset);
+        String message = new 
String(Utils.toByteArray(messageAndOffsets.iterator().next().message().payload()));
+        assertThat(message, is(equalTo(value)));
+    }
+
+    @Test(expected = FailedFetchException.class)
+    public void fetchMessagesWithInvalidOffsetAndDefaultHandlingDisabled() 
throws Exception {
+        config.useStartOffsetTimeIfOffsetOutOfRange = false;
+        KafkaUtils.fetchMessages(config, simpleConsumer,
+                new 
Partition(Broker.fromString(broker.getBrokerConnectionString()), TEST_TOPIC, 
0), -99);
+    }
+
+    @Test(expected = TopicOffsetOutOfRangeException.class)
+    public void fetchMessagesWithInvalidOffsetAndDefaultHandlingEnabled() 
throws Exception {
+        config = new KafkaConfig(brokerHosts, "newTopic");
+        String value = "test";
+        createTopicAndSendMessage(value);
+        KafkaUtils.fetchMessages(config, simpleConsumer,
+                new 
Partition(Broker.fromString(broker.getBrokerConnectionString()), "newTopic", 
0), -99);
+    }
+
+    @Test
+    public void getOffsetFromConfigAndDontForceFromStart() {
+        config.ignoreZkOffsets = false;
+        config.startOffsetTime = OffsetRequest.EarliestTime();
+        createTopicAndSendMessage();
+        long latestOffset = KafkaUtils.getOffset(simpleConsumer, config.topic, 
0, OffsetRequest.EarliestTime());
+        long offsetFromConfig = KafkaUtils.getOffset(simpleConsumer, 
config.topic, 0, config);
+        assertThat(latestOffset, is(equalTo(offsetFromConfig)));
+    }
+
+    @Test
+    public void getOffsetFromConfigAndFroceFromStart() {
+        config.ignoreZkOffsets = true;
+        config.startOffsetTime = OffsetRequest.EarliestTime();
+        createTopicAndSendMessage();
+        long earliestOffset = KafkaUtils.getOffset(simpleConsumer, 
config.topic, 0, OffsetRequest.EarliestTime());
+        long offsetFromConfig = KafkaUtils.getOffset(simpleConsumer, 
config.topic, 0, config);
+        assertThat(earliestOffset, is(equalTo(offsetFromConfig)));
+    }
+
+    @Test
+    public void generateTuplesWithoutKeyAndKeyValueScheme() {
+        config.scheme = new KeyValueSchemeAsMultiScheme(new 
StringKeyValueScheme());
+        runGetValueOnlyTuplesTest();
+    }
+
+    @Test
+    public void generateTuplesWithKeyAndKeyValueScheme() {
+        config.scheme = new KeyValueSchemeAsMultiScheme(new 
StringKeyValueScheme());
+        config.useStartOffsetTimeIfOffsetOutOfRange = false;
+        String value = "value";
+        String key = "key";
+        createTopicAndSendMessage(key, value);
+        ByteBufferMessageSet messageAndOffsets = getLastMessage();
+        for (MessageAndOffset msg : messageAndOffsets) {
+            Iterable<List<Object>> lists = KafkaUtils.generateTuples(config, 
msg.message(), config.topic);
+            assertEquals(ImmutableMap.of(key, value), 
lists.iterator().next().get(0));
+        }
+    }
+
+    @Test
+    public void generateTupelsWithValueScheme() {
+        config.scheme = new SchemeAsMultiScheme(new StringScheme());
+        runGetValueOnlyTuplesTest();
+    }
+
+    @Test
+    public void generateTuplesWithValueAndStringMultiSchemeWithTopic() {
+        config.scheme = new StringMultiSchemeWithTopic();
+        String value = "value";
+        createTopicAndSendMessage(value);
+        ByteBufferMessageSet messageAndOffsets = getLastMessage();
+        for (MessageAndOffset msg : messageAndOffsets) {
+            Iterable<List<Object>> lists = KafkaUtils.generateTuples(config, 
msg.message(), config.topic);
+            List<Object> list = lists.iterator().next();
+            assertEquals(value, list.get(0));
+            assertEquals(config.topic, list.get(1));
+        }
+    }
+
+    @Test
+    public void generateTuplesWithValueSchemeAndKeyValueMessage() {
+        config.scheme = new SchemeAsMultiScheme(new StringScheme());
+        String value = "value";
+        String key = "key";
+        createTopicAndSendMessage(key, value);
+        ByteBufferMessageSet messageAndOffsets = getLastMessage();
+        for (MessageAndOffset msg : messageAndOffsets) {
+            Iterable<List<Object>> lists = KafkaUtils.generateTuples(config, 
msg.message(), config.topic);
+            assertEquals(value, lists.iterator().next().get(0));
+        }
+    }
+    
+    @Test
+    public void generateTuplesWithMessageAndMetadataScheme() {
+        String value = "value";
+        Partition mockPartition = Mockito.mock(Partition.class);
+        mockPartition.partition = 0;
+        long offset = 0L;
+        
+        MessageMetadataSchemeAsMultiScheme scheme = new 
MessageMetadataSchemeAsMultiScheme(new StringMessageAndMetadataScheme());
+        
+        createTopicAndSendMessage(null, value);
+        ByteBufferMessageSet messageAndOffsets = getLastMessage();
+        for (MessageAndOffset msg : messageAndOffsets) {
+            Iterable<List<Object>> lists = KafkaUtils.generateTuples(scheme, 
msg.message(), mockPartition, offset);
+            List<Object> values = lists.iterator().next(); 
+            assertEquals("Message is incorrect", value, values.get(0));
+            assertEquals("Partition is incorrect", mockPartition.partition, 
values.get(1));
+            assertEquals("Offset is incorrect", offset, values.get(2));
+        }
+    }
+
+    private ByteBufferMessageSet getLastMessage() {
+        long offsetOfLastMessage = KafkaUtils.getOffset(simpleConsumer, 
config.topic, 0, OffsetRequest.LatestTime()) - 1;
+        return KafkaUtils.fetchMessages(config, simpleConsumer, new 
Partition(Broker.fromString(broker.getBrokerConnectionString()), TEST_TOPIC, 
0), offsetOfLastMessage);
+    }
+
+    private void runGetValueOnlyTuplesTest() {
+        String value = "value";
+        
+        createTopicAndSendMessage(null, value);
+        ByteBufferMessageSet messageAndOffsets = getLastMessage();
+        for (MessageAndOffset msg : messageAndOffsets) {
+            Iterable<List<Object>> lists = KafkaUtils.generateTuples(config, 
msg.message(), config.topic);
+            assertEquals(value, lists.iterator().next().get(0));
+        }
+    }
+
+    private void createTopicAndSendMessage() {
+        createTopicAndSendMessage(null, "someValue");
+    }
+
+    private void createTopicAndSendMessage(String value) {
+        createTopicAndSendMessage(null, value);
+    }
+
+    private void createTopicAndSendMessage(String key, String value) {
+        Properties p = new Properties();
+        p.put("acks", "1");
+        p.put("bootstrap.servers", broker.getBrokerConnectionString());
+        p.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+        p.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+        p.put("metadata.fetch.timeout.ms", 1000);
+        KafkaProducer<String, String> producer = new KafkaProducer<String, 
String>(p);
+        try {
+            producer.send(new ProducerRecord<String, String>(config.topic, 
key, value)).get();
+        } catch (Exception e) {
+            Assert.fail(e.getMessage());
+            LOG.error("Failed to do synchronous sending due to " + e, e);
+        } finally {
+            producer.close();
+        }
+    }
+
+    @Test
+    public void assignOnePartitionPerTask() {
+        runPartitionToTaskMappingTest(16, 1);
+    }
+
+    @Test
+    public void assignTwoPartitionsPerTask() {
+        runPartitionToTaskMappingTest(16, 2);
+    }
+
+    @Test
+    public void assignAllPartitionsToOneTask() {
+        runPartitionToTaskMappingTest(32, 32);
+    }
+    
+    public void runPartitionToTaskMappingTest(int numPartitions, int 
partitionsPerTask) {
+        GlobalPartitionInformation globalPartitionInformation = 
TestUtils.buildPartitionInfo(numPartitions);
+        List<GlobalPartitionInformation> partitions = new 
ArrayList<GlobalPartitionInformation>();
+        partitions.add(globalPartitionInformation);
+        int numTasks = numPartitions / partitionsPerTask;
+        for (int i = 0 ; i < numTasks ; i++) {
+            assertEquals(partitionsPerTask, 
KafkaUtils.calculatePartitionsForTask(partitions, numTasks, i).size());
+        }
+    }
+
+    @Test
+    public void moreTasksThanPartitions() {
+        GlobalPartitionInformation globalPartitionInformation = 
TestUtils.buildPartitionInfo(1);
+        List<GlobalPartitionInformation> partitions = new 
ArrayList<GlobalPartitionInformation>();
+        partitions.add(globalPartitionInformation);
+        int numTasks = 2;
+        assertEquals(1, KafkaUtils.calculatePartitionsForTask(partitions, 
numTasks, 0).size());
+        assertEquals(0, KafkaUtils.calculatePartitionsForTask(partitions, 
numTasks, 1).size());
+    }
+
+    @Test (expected = IllegalArgumentException.class )
+    public void assignInvalidTask() {
+        GlobalPartitionInformation globalPartitionInformation = new 
GlobalPartitionInformation(TEST_TOPIC);
+        List<GlobalPartitionInformation> partitions = new 
ArrayList<GlobalPartitionInformation>();
+        partitions.add(globalPartitionInformation);
+        KafkaUtils.calculatePartitionsForTask(partitions, 1, 1);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/test/org/apache/storm/kafka/StringKeyValueSchemeTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/test/org/apache/storm/kafka/StringKeyValueSchemeTest.java
 
b/external/storm-kafka/src/test/org/apache/storm/kafka/StringKeyValueSchemeTest.java
new file mode 100644
index 0000000..7e5ff00
--- /dev/null
+++ 
b/external/storm-kafka/src/test/org/apache/storm/kafka/StringKeyValueSchemeTest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import org.apache.storm.tuple.Fields;
+import com.google.common.collect.ImmutableMap;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class StringKeyValueSchemeTest {
+
+    private StringKeyValueScheme scheme = new StringKeyValueScheme();
+
+    @Test
+    public void testDeserialize() throws Exception {
+        assertEquals(Collections.singletonList("test"), 
scheme.deserialize(wrapString("test")));
+    }
+
+    @Test
+    public void testGetOutputFields() throws Exception {
+        Fields outputFields = scheme.getOutputFields();
+        assertTrue(outputFields.contains(StringScheme.STRING_SCHEME_KEY));
+        assertEquals(1, outputFields.size());
+    }
+
+    @Test
+    public void testDeserializeWithNullKeyAndValue() throws Exception {
+        assertEquals(Collections.singletonList("test"),
+            scheme.deserializeKeyAndValue(null, wrapString("test")));
+    }
+
+    @Test
+    public void testDeserializeWithKeyAndValue() throws Exception {
+        assertEquals(Collections.singletonList(ImmutableMap.of("key", "test")),
+                scheme.deserializeKeyAndValue(wrapString("key"), 
wrapString("test")));
+    }
+
+    private static ByteBuffer wrapString(String s) {
+        return ByteBuffer.wrap(s.getBytes(Charset.defaultCharset()));
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/test/org/apache/storm/kafka/TestStringScheme.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/test/org/apache/storm/kafka/TestStringScheme.java 
b/external/storm-kafka/src/test/org/apache/storm/kafka/TestStringScheme.java
new file mode 100644
index 0000000..23944ab
--- /dev/null
+++ b/external/storm-kafka/src/test/org/apache/storm/kafka/TestStringScheme.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestStringScheme {
+  @Test
+  public void testDeserializeString() {
+    String s = "foo";
+    byte[] bytes = s.getBytes(StandardCharsets.UTF_8);
+    ByteBuffer direct = ByteBuffer.allocateDirect(bytes.length);
+    direct.put(bytes);
+    direct.flip();
+    String s1 = StringScheme.deserializeString(ByteBuffer.wrap(bytes));
+    String s2 = StringScheme.deserializeString(direct);
+    assertEquals(s, s1);
+    assertEquals(s, s2);
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/test/org/apache/storm/kafka/TestUtils.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/test/org/apache/storm/kafka/TestUtils.java 
b/external/storm-kafka/src/test/org/apache/storm/kafka/TestUtils.java
new file mode 100644
index 0000000..cc3f2be
--- /dev/null
+++ b/external/storm-kafka/src/test/org/apache/storm/kafka/TestUtils.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import org.apache.storm.Config;
+import org.apache.storm.utils.Utils;
+import kafka.api.OffsetRequest;
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.javaapi.message.ByteBufferMessageSet;
+import kafka.message.Message;
+import kafka.message.MessageAndOffset;
+import org.apache.storm.kafka.bolt.KafkaBolt;
+import org.apache.storm.kafka.trident.GlobalPartitionInformation;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestUtils {
+
+    public static final String TOPIC = "test";
+
+    public static GlobalPartitionInformation buildPartitionInfo(int 
numPartitions) {
+        return buildPartitionInfo(numPartitions, 9092);
+    }
+
+    public static List<GlobalPartitionInformation> 
buildPartitionInfoList(GlobalPartitionInformation partitionInformation) {
+        List<GlobalPartitionInformation> map = new 
ArrayList<GlobalPartitionInformation>();
+        map.add(partitionInformation);
+        return map;
+    }
+
+    public static GlobalPartitionInformation buildPartitionInfo(int 
numPartitions, int brokerPort) {
+        GlobalPartitionInformation globalPartitionInformation = new 
GlobalPartitionInformation(TOPIC);
+        for (int i = 0; i < numPartitions; i++) {
+            globalPartitionInformation.addPartition(i, 
Broker.fromString("broker-" + i + " :" + brokerPort));
+        }
+        return globalPartitionInformation;
+    }
+
+    public static SimpleConsumer getKafkaConsumer(KafkaTestBroker broker) {
+        BrokerHosts brokerHosts = getBrokerHosts(broker);
+        KafkaConfig kafkaConfig = new KafkaConfig(brokerHosts, TOPIC);
+        SimpleConsumer simpleConsumer = new SimpleConsumer("localhost", 
broker.getPort(), 60000, 1024, "testClient");
+        return simpleConsumer;
+    }
+
+    public static KafkaConfig getKafkaConfig(KafkaTestBroker broker) {
+        BrokerHosts brokerHosts = getBrokerHosts(broker);
+        KafkaConfig kafkaConfig = new KafkaConfig(brokerHosts, TOPIC);
+        return kafkaConfig;
+    }
+
+    private static BrokerHosts getBrokerHosts(KafkaTestBroker broker) {
+        GlobalPartitionInformation globalPartitionInformation = new 
GlobalPartitionInformation(TOPIC);
+        globalPartitionInformation.addPartition(0, 
Broker.fromString(broker.getBrokerConnectionString()));
+        return new StaticHosts(globalPartitionInformation);
+    }
+
+    public static Properties getProducerProperties(String 
brokerConnectionString) {
+        Properties props = new Properties();
+        props.put("bootstrap.servers", brokerConnectionString);
+        props.put("acks", "1");
+        props.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+        props.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+        return props;
+    }
+
+    public static boolean verifyMessage(String key, String message, 
KafkaTestBroker broker, SimpleConsumer simpleConsumer) {
+        long lastMessageOffset = KafkaUtils.getOffset(simpleConsumer, 
TestUtils.TOPIC, 0, OffsetRequest.LatestTime()) - 1;
+        ByteBufferMessageSet messageAndOffsets = 
KafkaUtils.fetchMessages(TestUtils.getKafkaConfig(broker), simpleConsumer,
+                new 
Partition(Broker.fromString(broker.getBrokerConnectionString()),TestUtils.TOPIC,
 0), lastMessageOffset);
+        MessageAndOffset messageAndOffset = 
messageAndOffsets.iterator().next();
+        Message kafkaMessage = messageAndOffset.message();
+        ByteBuffer messageKeyBuffer = kafkaMessage.key();
+        String keyString = null;
+        String messageString = new 
String(Utils.toByteArray(kafkaMessage.payload()));
+        if (messageKeyBuffer != null) {
+            keyString = new String(Utils.toByteArray(messageKeyBuffer));
+        }
+        assertEquals(key, keyString);
+        assertEquals(message, messageString);
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/test/org/apache/storm/kafka/TridentKafkaTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/test/org/apache/storm/kafka/TridentKafkaTest.java 
b/external/storm-kafka/src/test/org/apache/storm/kafka/TridentKafkaTest.java
new file mode 100644
index 0000000..7a6073a
--- /dev/null
+++ b/external/storm-kafka/src/test/org/apache/storm/kafka/TridentKafkaTest.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import org.apache.storm.tuple.Fields;
+import kafka.javaapi.consumer.SimpleConsumer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.apache.storm.kafka.trident.TridentKafkaState;
+import org.apache.storm.kafka.trident.mapper.FieldNameBasedTupleToKafkaMapper;
+import org.apache.storm.kafka.trident.mapper.TridentTupleToKafkaMapper;
+import org.apache.storm.kafka.trident.selector.DefaultTopicSelector;
+import org.apache.storm.kafka.trident.selector.KafkaTopicSelector;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.trident.tuple.TridentTupleView;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class TridentKafkaTest {
+    private KafkaTestBroker broker;
+    private TridentKafkaState state;
+    private SimpleConsumer simpleConsumer;
+
+    @Before
+    public void setup() {
+        broker = new KafkaTestBroker();
+        simpleConsumer = TestUtils.getKafkaConsumer(broker);
+        TridentTupleToKafkaMapper mapper = new 
FieldNameBasedTupleToKafkaMapper("key", "message");
+        KafkaTopicSelector topicSelector = new 
DefaultTopicSelector(TestUtils.TOPIC);
+        state = new TridentKafkaState()
+                .withKafkaTopicSelector(topicSelector)
+                .withTridentTupleToKafkaMapper(mapper);
+        
state.prepare(TestUtils.getProducerProperties(broker.getBrokerConnectionString()));
+    }
+
+    @Test
+    public void testKeyValue() {
+        String keyString = "key-123";
+        String valString = "message-123";
+        int batchSize = 10;
+
+        List<TridentTuple> tridentTuples = generateTupleBatch(keyString, 
valString, batchSize);
+
+        state.updateState(tridentTuples, null);
+
+        for(int i = 0 ; i < batchSize ; i++) {
+            TestUtils.verifyMessage(keyString, valString, broker, 
simpleConsumer);
+        }
+    }
+
+    private List<TridentTuple> generateTupleBatch(String key, String message, 
int batchsize) {
+        List<TridentTuple> batch = new ArrayList<>();
+        for(int i =0 ; i < batchsize; i++) {
+            batch.add(TridentTupleView.createFreshTuple(new Fields("key", 
"message"), key, message));
+        }
+        return batch;
+    }
+
+    @After
+    public void shutdown() {
+        simpleConsumer.close();
+        broker.shutdown();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/test/org/apache/storm/kafka/TridentKafkaTopology.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/test/org/apache/storm/kafka/TridentKafkaTopology.java
 
b/external/storm-kafka/src/test/org/apache/storm/kafka/TridentKafkaTopology.java
new file mode 100644
index 0000000..fdc6752
--- /dev/null
+++ 
b/external/storm-kafka/src/test/org/apache/storm/kafka/TridentKafkaTopology.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import com.google.common.collect.ImmutableMap;
+import org.apache.storm.kafka.trident.TridentKafkaStateFactory;
+import org.apache.storm.kafka.trident.TridentKafkaUpdater;
+import org.apache.storm.kafka.trident.mapper.FieldNameBasedTupleToKafkaMapper;
+import org.apache.storm.kafka.trident.selector.DefaultTopicSelector;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.testing.FixedBatchSpout;
+
+import java.util.Properties;
+
+public class TridentKafkaTopology {
+
+    private static StormTopology buildTopology(String brokerConnectionString) {
+        Fields fields = new Fields("word", "count");
+        FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
+                new Values("storm", "1"),
+                new Values("trident", "1"),
+                new Values("needs", "1"),
+                new Values("javadoc", "1")
+        );
+        spout.setCycle(true);
+
+        TridentTopology topology = new TridentTopology();
+        Stream stream = topology.newStream("spout1", spout);
+
+        Properties props = new Properties();
+        props.put("bootstrap.servers", brokerConnectionString);
+        props.put("acks", "1");
+        props.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+        props.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+
+        TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
+            .withProducerProperties(props)
+            .withKafkaTopicSelector(new DefaultTopicSelector("test"))
+            .withTridentTupleToKafkaMapper(new 
FieldNameBasedTupleToKafkaMapper("word", "count"));
+        stream.partitionPersist(stateFactory, fields, new 
TridentKafkaUpdater(), new Fields());
+
+        return topology.build();
+    }
+
+    /**
+     * To run this topology ensure you have a kafka broker running and provide 
connection string to broker as argument.
+     * Create a topic test with command line,
+     * kafka-topics.sh --create --zookeeper localhost:2181 
--replication-factor 1 --partition 1 --topic test
+     *
+     * run this program and run the kafka consumer:
+     * kafka-console-consumer.sh --zookeeper localhost:2181 --topic test 
--from-beginning
+     *
+     * you should see the messages flowing through.
+     *
+     * @param args
+     * @throws Exception
+     */
+    public static void main(String[] args) throws Exception {
+        if(args.length < 1) {
+            System.out.println("Please provide kafka broker url ,e.g. 
localhost:9092");
+        }
+
+        LocalCluster cluster = new LocalCluster();
+        cluster.submitTopology("wordCounter", new Config(), 
buildTopology(args[0]));
+        Thread.sleep(60 * 1000);
+        cluster.killTopology("wordCounter");
+
+        cluster.shutdown();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java 
b/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java
new file mode 100644
index 0000000..65bf0b4
--- /dev/null
+++ 
b/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import org.apache.storm.Config;
+import org.apache.curator.test.TestingServer;
+import kafka.javaapi.consumer.SimpleConsumer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.util.*;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.when;
+
+public class ZkCoordinatorTest {
+
+
+    @Mock
+    private DynamicBrokersReader reader;
+
+    @Mock
+    private DynamicPartitionConnections dynamicPartitionConnections;
+
+    private KafkaTestBroker broker = new KafkaTestBroker();
+    private TestingServer server;
+    private Map stormConf = new HashMap();
+    private SpoutConfig spoutConfig;
+    private ZkState state;
+    private SimpleConsumer simpleConsumer;
+
+    @Before
+    public void setUp() throws Exception {
+        MockitoAnnotations.initMocks(this);
+        server = new TestingServer();
+        String connectionString = server.getConnectString();
+        ZkHosts hosts = new ZkHosts(connectionString);
+        hosts.refreshFreqSecs = 1;
+        spoutConfig = new SpoutConfig(hosts, "topic", "/test", "id");
+        Map conf = buildZookeeperConfig(server);
+        state = new ZkState(conf);
+        simpleConsumer = new SimpleConsumer("localhost", broker.getPort(), 
60000, 1024, "testClient");
+        when(dynamicPartitionConnections.register(any(Broker.class), 
any(String.class) ,anyInt())).thenReturn(simpleConsumer);
+    }
+
+    private Map buildZookeeperConfig(TestingServer server) {
+        Map conf = new HashMap();
+        conf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, server.getPort());
+        conf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, 
Arrays.asList("localhost"));
+        conf.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 20000);
+        conf.put(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT, 20000);
+        conf.put(Config.STORM_ZOOKEEPER_RETRY_TIMES, 3);
+        conf.put(Config.STORM_ZOOKEEPER_RETRY_INTERVAL, 30);
+        return conf;
+    }
+
+    @After
+    public void shutdown() throws Exception {
+        simpleConsumer.close();
+        broker.shutdown();
+        server.close();
+    }
+
+    @Test
+    public void testOnePartitionPerTask() throws Exception {
+        int totalTasks = 64;
+        int partitionsPerTask = 1;
+        List<ZkCoordinator> coordinatorList = buildCoordinators(totalTasks / 
partitionsPerTask);
+        
when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(TestUtils.buildPartitionInfo(totalTasks)));
+        for (ZkCoordinator coordinator : coordinatorList) {
+            List<PartitionManager> myManagedPartitions = 
coordinator.getMyManagedPartitions();
+            assertEquals(partitionsPerTask, myManagedPartitions.size());
+            assertEquals(coordinator._taskIndex, 
myManagedPartitions.get(0).getPartition().partition);
+        }
+    }
+
+
+    @Test
+    public void testPartitionsChange() throws Exception {
+        final int totalTasks = 64;
+        int partitionsPerTask = 2;
+        List<ZkCoordinator> coordinatorList = buildCoordinators(totalTasks / 
partitionsPerTask);
+        
when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(TestUtils.buildPartitionInfo(totalTasks,
 9092)));
+        List<List<PartitionManager>> partitionManagersBeforeRefresh = 
getPartitionManagers(coordinatorList);
+        waitForRefresh();
+        
when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(TestUtils.buildPartitionInfo(totalTasks,
 9093)));
+        List<List<PartitionManager>> partitionManagersAfterRefresh = 
getPartitionManagers(coordinatorList);
+        assertEquals(partitionManagersAfterRefresh.size(), 
partitionManagersAfterRefresh.size());
+        Iterator<List<PartitionManager>> iterator = 
partitionManagersAfterRefresh.iterator();
+        for (List<PartitionManager> partitionManagersBefore : 
partitionManagersBeforeRefresh) {
+            List<PartitionManager> partitionManagersAfter = iterator.next();
+            assertPartitionsAreDifferent(partitionManagersBefore, 
partitionManagersAfter, partitionsPerTask);
+        }
+    }
+
+    private void assertPartitionsAreDifferent(List<PartitionManager> 
partitionManagersBefore, List<PartitionManager> partitionManagersAfter, int 
partitionsPerTask) {
+        assertEquals(partitionsPerTask, partitionManagersBefore.size());
+        assertEquals(partitionManagersBefore.size(), 
partitionManagersAfter.size());
+        for (int i = 0; i < partitionsPerTask; i++) {
+            assertNotEquals(partitionManagersBefore.get(i).getPartition(), 
partitionManagersAfter.get(i).getPartition());
+        }
+
+    }
+
+    private List<List<PartitionManager>> 
getPartitionManagers(List<ZkCoordinator> coordinatorList) {
+        List<List<PartitionManager>> partitions = new ArrayList();
+        for (ZkCoordinator coordinator : coordinatorList) {
+            partitions.add(coordinator.getMyManagedPartitions());
+        }
+        return partitions;
+    }
+
+    private void waitForRefresh() throws InterruptedException {
+        Thread.sleep(((ZkHosts) spoutConfig.hosts).refreshFreqSecs * 1000 + 1);
+    }
+
+    private List<ZkCoordinator> buildCoordinators(int totalTasks) {
+        List<ZkCoordinator> coordinatorList = new ArrayList<ZkCoordinator>();
+        for (int i = 0; i < totalTasks; i++) {
+            ZkCoordinator coordinator = new 
ZkCoordinator(dynamicPartitionConnections, stormConf, spoutConfig, state, i, 
totalTasks, "test-id", reader);
+            coordinatorList.add(coordinator);
+        }
+        return coordinatorList;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/test/org/apache/storm/kafka/bolt/KafkaBoltTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/test/org/apache/storm/kafka/bolt/KafkaBoltTest.java 
b/external/storm-kafka/src/test/org/apache/storm/kafka/bolt/KafkaBoltTest.java
new file mode 100644
index 0000000..180828e
--- /dev/null
+++ 
b/external/storm-kafka/src/test/org/apache/storm/kafka/bolt/KafkaBoltTest.java
@@ -0,0 +1,341 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt;
+
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+import org.apache.storm.task.GeneralTopologyContext;
+import org.apache.storm.task.IOutputCollector;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.TupleUtils;
+import org.apache.storm.utils.Utils;
+import com.google.common.collect.ImmutableList;
+import kafka.api.OffsetRequest;
+import kafka.api.FetchRequest;
+import kafka.javaapi.FetchResponse;
+import kafka.javaapi.OffsetResponse;
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.javaapi.message.ByteBufferMessageSet;
+import kafka.message.Message;
+import kafka.message.MessageAndOffset;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.*;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.mockito.internal.util.reflection.Whitebox;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.apache.storm.kafka.*;
+import org.apache.storm.kafka.trident.GlobalPartitionInformation;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.HashMap;
+import java.util.Properties;
+import java.util.concurrent.Future;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.*;
+
+public class KafkaBoltTest {
+
+    private static final String TEST_TOPIC = "test-topic";
+    private KafkaTestBroker broker;
+    private KafkaBolt bolt;
+    private Config config = new Config();
+    private KafkaConfig kafkaConfig;
+    private SimpleConsumer simpleConsumer;
+
+    @Mock
+    private IOutputCollector collector;
+
+    @Before
+    public void initMocks() {
+        MockitoAnnotations.initMocks(this);
+        broker = new KafkaTestBroker();
+        setupKafkaConsumer();
+        config.put(KafkaBolt.TOPIC, TEST_TOPIC);
+        bolt = generateStringSerializerBolt();
+    }
+
+    @After
+    public void shutdown() {
+        simpleConsumer.close();
+        broker.shutdown();
+        bolt.cleanup();
+    }
+
+    private void setupKafkaConsumer() {
+        GlobalPartitionInformation globalPartitionInformation = new 
GlobalPartitionInformation(TEST_TOPIC);
+        globalPartitionInformation.addPartition(0, 
Broker.fromString(broker.getBrokerConnectionString()));
+        BrokerHosts brokerHosts = new StaticHosts(globalPartitionInformation);
+        kafkaConfig = new KafkaConfig(brokerHosts, TEST_TOPIC);
+        simpleConsumer = new SimpleConsumer("localhost", broker.getPort(), 
60000, 1024, "testClient");
+    }
+
+    @Test
+    public void shouldAcknowledgeTickTuples() throws Exception {
+        // Given
+        Tuple tickTuple = mockTickTuple();
+
+        // When
+        bolt.execute(tickTuple);
+
+        // Then
+        verify(collector).ack(tickTuple);
+    }
+
+    @Test
+    public void executeWithKey() throws Exception {
+        String message = "value-123";
+        String key = "key-123";
+        Tuple tuple = generateTestTuple(key, message);
+        bolt.execute(tuple);
+        verify(collector).ack(tuple);
+        verifyMessage(key, message);
+    }
+
+    /* test synchronous sending */
+    @Test
+    public void executeWithByteArrayKeyAndMessageSync() {
+        boolean async = false;
+        boolean fireAndForget = false;
+        bolt = generateDefaultSerializerBolt(async, fireAndForget, null);
+        String keyString = "test-key";
+        String messageString = "test-message";
+        byte[] key = keyString.getBytes();
+        byte[] message = messageString.getBytes();
+        Tuple tuple = generateTestTuple(key, message);
+        bolt.execute(tuple);
+        verify(collector).ack(tuple);
+        verifyMessage(keyString, messageString);
+    }
+
+    /* test asynchronous sending (default) */
+    @Test
+    public void executeWithByteArrayKeyAndMessageAsync() {
+        boolean async = true;
+        boolean fireAndForget = false;
+        String keyString = "test-key";
+        String messageString = "test-message";
+        byte[] key = keyString.getBytes();
+        byte[] message = messageString.getBytes();
+        final Tuple tuple = generateTestTuple(key, message);
+
+        final ByteBufferMessageSet mockMsg = mockSingleMessage(key, message);
+        simpleConsumer.close();
+        simpleConsumer = mockSimpleConsumer(mockMsg);
+        KafkaProducer<?, ?> producer = mock(KafkaProducer.class);
+        when(producer.send(any(ProducerRecord.class), 
any(Callback.class))).thenAnswer(new Answer<Future>() {
+            @Override
+            public Future answer(InvocationOnMock invocationOnMock) throws 
Throwable {
+                Callback cb = (Callback) invocationOnMock.getArguments()[1];
+                cb.onCompletion(null, null);
+                return mock(Future.class);
+            }
+        });
+        bolt = generateDefaultSerializerBolt(async, fireAndForget, producer);
+        bolt.execute(tuple);
+        verify(collector).ack(tuple);
+        verifyMessage(keyString, messageString);
+    }
+
+    /* test with fireAndForget option enabled */
+    @Test
+    public void executeWithByteArrayKeyAndMessageFire() {
+        boolean async = true;
+        boolean fireAndForget = true;
+        bolt = generateDefaultSerializerBolt(async, fireAndForget, null);
+        String keyString = "test-key";
+        String messageString = "test-message";
+        byte[] key = keyString.getBytes();
+        byte[] message = messageString.getBytes();
+        Tuple tuple = generateTestTuple(key, message);
+        final ByteBufferMessageSet mockMsg = mockSingleMessage(key, message);
+        simpleConsumer.close();
+        simpleConsumer = mockSimpleConsumer(mockMsg);
+        KafkaProducer<?, ?> producer = mock(KafkaProducer.class);
+        // do not invoke the callback of send() in order to test whether the 
bolt handle the fireAndForget option
+        // properly.
+        
doReturn(mock(Future.class)).when(producer).send(any(ProducerRecord.class), 
any(Callback.class));
+        bolt.execute(tuple);
+        verify(collector).ack(tuple);
+        verifyMessage(keyString, messageString);
+    }
+
+    /* test bolt specified properties */
+    @Test
+    public void executeWithBoltSpecifiedProperties() {
+        boolean async = false;
+        boolean fireAndForget = false;
+        bolt = defaultSerializerBoltWithSpecifiedProperties(async, 
fireAndForget);
+        String keyString = "test-key";
+        String messageString = "test-message";
+        byte[] key = keyString.getBytes();
+        byte[] message = messageString.getBytes();
+        Tuple tuple = generateTestTuple(key, message);
+        bolt.execute(tuple);
+        verify(collector).ack(tuple);
+        verifyMessage(keyString, messageString);
+    }
+
+    private KafkaBolt generateStringSerializerBolt() {
+        Properties props = new Properties();
+        props.put("acks", "1");
+        props.put("bootstrap.servers", broker.getBrokerConnectionString());
+        props.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+        props.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+        props.put("metadata.fetch.timeout.ms", 1000);
+        KafkaBolt bolt = new KafkaBolt().withProducerProperties(props);
+        bolt.prepare(config, null, new OutputCollector(collector));
+        bolt.setAsync(false);
+        return bolt;
+    }
+
+    private KafkaBolt generateDefaultSerializerBolt(boolean async, boolean 
fireAndForget,
+                                                    KafkaProducer<?, ?> 
mockProducer) {
+        Properties props = new Properties();
+        props.put("acks", "1");
+        props.put("bootstrap.servers", broker.getBrokerConnectionString());
+        props.put("key.serializer", 
"org.apache.kafka.common.serialization.ByteArraySerializer");
+        props.put("value.serializer", 
"org.apache.kafka.common.serialization.ByteArraySerializer");
+        props.put("metadata.fetch.timeout.ms", 1000);
+        props.put("linger.ms", 0);
+        KafkaBolt bolt = new KafkaBolt().withProducerProperties(props);
+        bolt.prepare(config, null, new OutputCollector(collector));
+        bolt.setAsync(async);
+        bolt.setFireAndForget(fireAndForget);
+        if (mockProducer != null) {
+            Whitebox.setInternalState(bolt, "producer", mockProducer);
+        }
+        return bolt;
+    }
+
+    private KafkaBolt defaultSerializerBoltWithSpecifiedProperties(boolean 
async, boolean fireAndForget) {
+        Properties props = new Properties();
+        props.put("acks", "1");
+        props.put("bootstrap.servers", broker.getBrokerConnectionString());
+        props.put("key.serializer", 
"org.apache.kafka.common.serialization.ByteArraySerializer");
+        props.put("value.serializer", 
"org.apache.kafka.common.serialization.ByteArraySerializer");
+        props.put("metadata.fetch.timeout.ms", 1000);
+        props.put("linger.ms", 0);
+        KafkaBolt bolt = new KafkaBolt().withProducerProperties(props);
+        bolt.prepare(config, null, new OutputCollector(collector));
+        bolt.setAsync(async);
+        bolt.setFireAndForget(fireAndForget);
+        return bolt;
+    }
+
+    @Test
+    public void executeWithoutKey() throws Exception {
+        String message = "value-234";
+        Tuple tuple = generateTestTuple(message);
+        bolt.execute(tuple);
+        verify(collector).ack(tuple);
+        verifyMessage(null, message);
+    }
+
+
+    @Test
+    public void executeWithBrokerDown() throws Exception {
+        broker.shutdown();
+        String message = "value-234";
+        Tuple tuple = generateTestTuple(message);
+        bolt.execute(tuple);
+        verify(collector).fail(tuple);
+    }
+
+    private boolean verifyMessage(String key, String message) {
+        long lastMessageOffset = KafkaUtils.getOffset(simpleConsumer, 
kafkaConfig.topic, 0, OffsetRequest.LatestTime()) - 1;
+        ByteBufferMessageSet messageAndOffsets = 
KafkaUtils.fetchMessages(kafkaConfig, simpleConsumer,
+                new 
Partition(Broker.fromString(broker.getBrokerConnectionString()),kafkaConfig.topic,
 0), lastMessageOffset);
+        MessageAndOffset messageAndOffset = 
messageAndOffsets.iterator().next();
+        Message kafkaMessage = messageAndOffset.message();
+        ByteBuffer messageKeyBuffer = kafkaMessage.key();
+        String keyString = null;
+        String messageString = new 
String(Utils.toByteArray(kafkaMessage.payload()));
+        if (messageKeyBuffer != null) {
+            keyString = new String(Utils.toByteArray(messageKeyBuffer));
+        }
+        assertEquals(key, keyString);
+        assertEquals(message, messageString);
+        return true;
+    }
+
+    private Tuple generateTestTuple(Object key, Object message) {
+        TopologyBuilder builder = new TopologyBuilder();
+        GeneralTopologyContext topologyContext = new 
GeneralTopologyContext(builder.createTopology(), new Config(), new HashMap(), 
new HashMap(), new HashMap(), "") {
+            @Override
+            public Fields getComponentOutputFields(String componentId, String 
streamId) {
+                return new Fields("key", "message");
+            }
+        };
+        return new TupleImpl(topologyContext, new Values(key, message), 1, "");
+    }
+
+    private Tuple generateTestTuple(Object message) {
+        TopologyBuilder builder = new TopologyBuilder();
+        GeneralTopologyContext topologyContext = new 
GeneralTopologyContext(builder.createTopology(), new Config(), new HashMap(), 
new HashMap(), new HashMap(), "") {
+            @Override
+            public Fields getComponentOutputFields(String componentId, String 
streamId) {
+                return new Fields("message");
+            }
+        };
+        return new TupleImpl(topologyContext, new Values(message), 1, "");
+    }
+
+    private Tuple mockTickTuple() {
+        Tuple tuple = mock(Tuple.class);
+        
when(tuple.getSourceComponent()).thenReturn(Constants.SYSTEM_COMPONENT_ID);
+        
when(tuple.getSourceStreamId()).thenReturn(Constants.SYSTEM_TICK_STREAM_ID);
+        // Sanity check
+        assertTrue(TupleUtils.isTick(tuple));
+        return tuple;
+    }
+
+    private static ByteBufferMessageSet mockSingleMessage(byte[] key, byte[] 
message) {
+        ByteBufferMessageSet sets = mock(ByteBufferMessageSet.class);
+        MessageAndOffset msg = mock(MessageAndOffset.class);
+        final List<MessageAndOffset> msgs = ImmutableList.of(msg);
+        doReturn(msgs.iterator()).when(sets).iterator();
+        Message kafkaMessage = mock(Message.class);
+        doReturn(ByteBuffer.wrap(key)).when(kafkaMessage).key();
+        doReturn(ByteBuffer.wrap(message)).when(kafkaMessage).payload();
+        doReturn(kafkaMessage).when(msg).message();
+        return sets;
+    }
+
+    private static SimpleConsumer mockSimpleConsumer(ByteBufferMessageSet 
mockMsg) {
+        SimpleConsumer simpleConsumer = mock(SimpleConsumer.class);
+        FetchResponse resp = mock(FetchResponse.class);
+        doReturn(resp).when(simpleConsumer).fetch(any(FetchRequest.class));
+        OffsetResponse mockOffsetResponse = mock(OffsetResponse.class);
+        doReturn(new long[] {}).when(mockOffsetResponse).offsets(anyString(), 
anyInt());
+        
doReturn(mockOffsetResponse).when(simpleConsumer).getOffsetsBefore(any(kafka.javaapi.OffsetRequest.class));
+        doReturn(mockMsg).when(resp).messageSet(anyString(), anyInt());
+        return simpleConsumer;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/test/storm/kafka/DynamicBrokersReaderTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/test/storm/kafka/DynamicBrokersReaderTest.java 
b/external/storm-kafka/src/test/storm/kafka/DynamicBrokersReaderTest.java
deleted file mode 100644
index d871924..0000000
--- a/external/storm-kafka/src/test/storm/kafka/DynamicBrokersReaderTest.java
+++ /dev/null
@@ -1,252 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-import backtype.storm.Config;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.apache.curator.test.TestingServer;
-import org.apache.curator.utils.ZKPaths;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import storm.kafka.trident.GlobalPartitionInformation;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Date: 16/05/2013
- * Time: 20:35
- */
-public class DynamicBrokersReaderTest {
-    private DynamicBrokersReader dynamicBrokersReader, wildCardBrokerReader;
-    private String masterPath = "/brokers";
-    private String topic = "testing1";
-    private String secondTopic = "testing2";
-    private String thirdTopic = "testing3";
-
-    private CuratorFramework zookeeper;
-    private TestingServer server;
-
-    @Before
-    public void setUp() throws Exception {
-        server = new TestingServer();
-        String connectionString = server.getConnectString();
-        Map conf = new HashMap();
-        conf.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 1000);
-        conf.put(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT, 1000);
-        conf.put(Config.STORM_ZOOKEEPER_RETRY_TIMES, 4);
-        conf.put(Config.STORM_ZOOKEEPER_RETRY_INTERVAL, 5);
-
-        ExponentialBackoffRetry retryPolicy = new 
ExponentialBackoffRetry(1000, 3);
-        zookeeper = CuratorFrameworkFactory.newClient(connectionString, 
retryPolicy);
-        dynamicBrokersReader = new DynamicBrokersReader(conf, 
connectionString, masterPath, topic);
-
-        Map conf2 = new HashMap();
-        conf2.putAll(conf);
-        conf2.put("kafka.topic.wildcard.match",true);
-
-        wildCardBrokerReader = new DynamicBrokersReader(conf2, 
connectionString, masterPath, "^test.*$");
-        zookeeper.start();
-    }
-
-    @After
-    public void tearDown() throws Exception {
-        dynamicBrokersReader.close();
-        zookeeper.close();
-        server.close();
-    }
-
-    private void addPartition(int id, String host, int port, String topic) 
throws Exception {
-        writePartitionId(id, topic);
-        writeLeader(id, 0, topic);
-        writeLeaderDetails(0, host, port);
-    }
-
-    private void addPartition(int id, int leader, String host, int port, 
String topic) throws Exception {
-        writePartitionId(id, topic);
-        writeLeader(id, leader, topic);
-        writeLeaderDetails(leader, host, port);
-    }
-
-    private void writePartitionId(int id, String topic) throws Exception {
-        String path = dynamicBrokersReader.partitionPath(topic);
-        writeDataToPath(path, ("" + id));
-    }
-
-    private void writeDataToPath(String path, String data) throws Exception {
-        ZKPaths.mkdirs(zookeeper.getZookeeperClient().getZooKeeper(), path);
-        zookeeper.setData().forPath(path, data.getBytes());
-    }
-
-    private void writeLeader(int id, int leaderId, String topic) throws 
Exception {
-        String path = dynamicBrokersReader.partitionPath(topic) + "/" + id + 
"/state";
-        String value = " { \"controller_epoch\":4, \"isr\":[ 1, 0 ], 
\"leader\":" + leaderId + ", \"leader_epoch\":1, \"version\":1 }";
-        writeDataToPath(path, value);
-    }
-
-    private void writeLeaderDetails(int leaderId, String host, int port) 
throws Exception {
-        String path = dynamicBrokersReader.brokerPath() + "/" + leaderId;
-        String value = "{ \"host\":\"" + host + "\", \"jmx_port\":9999, 
\"port\":" + port + ", \"version\":1 }";
-        writeDataToPath(path, value);
-    }
-
-
-    private GlobalPartitionInformation 
getByTopic(List<GlobalPartitionInformation> partitions, String topic){
-        for(GlobalPartitionInformation partitionInformation : partitions) {
-            if (partitionInformation.topic.equals(topic)) return 
partitionInformation;
-        }
-        return null;
-    }
-
-    @Test
-    public void testGetBrokerInfo() throws Exception {
-        String host = "localhost";
-        int port = 9092;
-        int partition = 0;
-        addPartition(partition, host, port, topic);
-        List<GlobalPartitionInformation> partitions = 
dynamicBrokersReader.getBrokerInfo();
-
-        GlobalPartitionInformation brokerInfo = getByTopic(partitions, topic);
-        assertNotNull(brokerInfo);
-        assertEquals(1, brokerInfo.getOrderedPartitions().size());
-        assertEquals(port, brokerInfo.getBrokerFor(partition).port);
-        assertEquals(host, brokerInfo.getBrokerFor(partition).host);
-    }
-
-    @Test
-    public void testGetBrokerInfoWildcardMatch() throws Exception {
-        String host = "localhost";
-        int port = 9092;
-        int partition = 0;
-        addPartition(partition, host, port, topic);
-        addPartition(partition, host, port, secondTopic);
-
-        List<GlobalPartitionInformation> partitions = 
wildCardBrokerReader.getBrokerInfo();
-
-        GlobalPartitionInformation brokerInfo = getByTopic(partitions, topic);
-        assertNotNull(brokerInfo);
-        assertEquals(1, brokerInfo.getOrderedPartitions().size());
-        assertEquals(port, brokerInfo.getBrokerFor(partition).port);
-        assertEquals(host, brokerInfo.getBrokerFor(partition).host);
-
-        brokerInfo = getByTopic(partitions, secondTopic);
-        assertNotNull(brokerInfo);
-        assertEquals(1, brokerInfo.getOrderedPartitions().size());
-        assertEquals(port, brokerInfo.getBrokerFor(partition).port);
-        assertEquals(host, brokerInfo.getBrokerFor(partition).host);
-
-        addPartition(partition, host, port, thirdTopic);
-        //Discover newly added topic
-        partitions = wildCardBrokerReader.getBrokerInfo();
-        assertNotNull(getByTopic(partitions, topic));
-        assertNotNull(getByTopic(partitions, secondTopic));
-        assertNotNull(getByTopic(partitions, secondTopic));
-    }
-
-
-    @Test
-    public void testMultiplePartitionsOnDifferentHosts() throws Exception {
-        String host = "localhost";
-        int port = 9092;
-        int secondPort = 9093;
-        int partition = 0;
-        int secondPartition = partition + 1;
-        addPartition(partition, 0, host, port, topic);
-        addPartition(secondPartition, 1, host, secondPort, topic);
-
-        List<GlobalPartitionInformation> partitions = 
dynamicBrokersReader.getBrokerInfo();
-
-        GlobalPartitionInformation brokerInfo = getByTopic(partitions, topic);
-        assertNotNull(brokerInfo);
-        assertEquals(2, brokerInfo.getOrderedPartitions().size());
-
-        assertEquals(port, brokerInfo.getBrokerFor(partition).port);
-        assertEquals(host, brokerInfo.getBrokerFor(partition).host);
-
-        assertEquals(secondPort, 
brokerInfo.getBrokerFor(secondPartition).port);
-        assertEquals(host, brokerInfo.getBrokerFor(secondPartition).host);
-    }
-
-
-    @Test
-    public void testMultiplePartitionsOnSameHost() throws Exception {
-        String host = "localhost";
-        int port = 9092;
-        int partition = 0;
-        int secondPartition = partition + 1;
-        addPartition(partition, 0, host, port, topic);
-        addPartition(secondPartition, 0, host, port, topic);
-
-        List<GlobalPartitionInformation> partitions = 
dynamicBrokersReader.getBrokerInfo();
-
-        GlobalPartitionInformation brokerInfo = getByTopic(partitions, topic);
-        assertNotNull(brokerInfo);
-        assertEquals(2, brokerInfo.getOrderedPartitions().size());
-
-        assertEquals(port, brokerInfo.getBrokerFor(partition).port);
-        assertEquals(host, brokerInfo.getBrokerFor(partition).host);
-
-        assertEquals(port, brokerInfo.getBrokerFor(secondPartition).port);
-        assertEquals(host, brokerInfo.getBrokerFor(secondPartition).host);
-    }
-
-    @Test
-    public void testSwitchHostForPartition() throws Exception {
-        String host = "localhost";
-        int port = 9092;
-        int partition = 0;
-        addPartition(partition, host, port, topic);
-        List<GlobalPartitionInformation> partitions = 
dynamicBrokersReader.getBrokerInfo();
-
-        GlobalPartitionInformation brokerInfo = getByTopic(partitions, topic);
-        assertNotNull(brokerInfo);
-        assertEquals(port, brokerInfo.getBrokerFor(partition).port);
-        assertEquals(host, brokerInfo.getBrokerFor(partition).host);
-
-        String newHost = host + "switch";
-        int newPort = port + 1;
-        addPartition(partition, newHost, newPort, topic);
-        partitions = dynamicBrokersReader.getBrokerInfo();
-
-        brokerInfo = getByTopic(partitions, topic);
-        assertNotNull(brokerInfo);
-        assertEquals(newPort, brokerInfo.getBrokerFor(partition).port);
-        assertEquals(newHost, brokerInfo.getBrokerFor(partition).host);
-    }
-
-    @Test(expected = NullPointerException.class)
-    public void testErrorLogsWhenConfigIsMissing() throws Exception {
-        String connectionString = server.getConnectString();
-        Map conf = new HashMap();
-        conf.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 1000);
-//        conf.put(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT, 1000);
-        conf.put(Config.STORM_ZOOKEEPER_RETRY_TIMES, 4);
-        conf.put(Config.STORM_ZOOKEEPER_RETRY_INTERVAL, 5);
-
-        DynamicBrokersReader dynamicBrokersReader1 = new 
DynamicBrokersReader(conf, connectionString, masterPath, topic);
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/test/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/test/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
 
b/external/storm-kafka/src/test/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
deleted file mode 100644
index da23718..0000000
--- 
a/external/storm-kafka/src/test/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import org.junit.Test;
-
-public class ExponentialBackoffMsgRetryManagerTest {
-
-    private static final Long TEST_OFFSET = 101L;
-    private static final Long TEST_OFFSET2 = 102L;
-    private static final Long TEST_OFFSET3 = 105L;
-    private static final Long TEST_NEW_OFFSET = 103L;
-
-    @Test
-    public void testImmediateRetry() throws Exception {
-        ExponentialBackoffMsgRetryManager manager = new 
ExponentialBackoffMsgRetryManager(0, 0d, 0);
-        manager.failed(TEST_OFFSET);
-        Long next = manager.nextFailedMessageToRetry();
-        assertEquals("expect test offset next available for retry", 
TEST_OFFSET, next);
-        assertTrue("message should be ready for retry immediately", 
manager.shouldRetryMsg(TEST_OFFSET));
-
-        manager.retryStarted(TEST_OFFSET);
-
-        manager.failed(TEST_OFFSET);
-        next = manager.nextFailedMessageToRetry();
-        assertEquals("expect test offset next available for retry", 
TEST_OFFSET, next);
-        assertTrue("message should be ready for retry immediately", 
manager.shouldRetryMsg(TEST_OFFSET));
-    }
-
-    @Test
-    public void testSingleDelay() throws Exception {
-        ExponentialBackoffMsgRetryManager manager = new 
ExponentialBackoffMsgRetryManager(100, 1d, 1000);
-        manager.failed(TEST_OFFSET);
-        Thread.sleep(5);
-        Long next = manager.nextFailedMessageToRetry();
-        assertNull("expect no message ready for retry yet", next);
-        assertFalse("message should not be ready for retry yet", 
manager.shouldRetryMsg(TEST_OFFSET));
-
-        Thread.sleep(100);
-        next = manager.nextFailedMessageToRetry();
-        assertEquals("expect test offset next available for retry", 
TEST_OFFSET, next);
-        assertTrue("message should be ready for retry", 
manager.shouldRetryMsg(TEST_OFFSET));
-    }
-
-    @Test
-    public void testExponentialBackoff() throws Exception {
-        final long initial = 10;
-        final double mult = 2d;
-        ExponentialBackoffMsgRetryManager manager = new 
ExponentialBackoffMsgRetryManager(initial, mult, initial * 10);
-
-        long expectedWaitTime = initial;
-        for (long i = 0L; i < 3L; ++i) {
-            manager.failed(TEST_OFFSET);
-
-            Thread.sleep((expectedWaitTime + 1L) / 2L);
-            assertFalse("message should not be ready for retry yet", 
manager.shouldRetryMsg(TEST_OFFSET));
-
-            Thread.sleep((expectedWaitTime + 1L) / 2L);
-            Long next = manager.nextFailedMessageToRetry();
-            assertEquals("expect test offset next available for retry", 
TEST_OFFSET, next);
-            assertTrue("message should be ready for retry", 
manager.shouldRetryMsg(TEST_OFFSET));
-
-            manager.retryStarted(TEST_OFFSET);
-            expectedWaitTime *= mult;
-        }
-    }
-
-    @Test
-    public void testRetryOrder() throws Exception {
-        final long initial = 10;
-        final double mult = 2d;
-        final long max = 20;
-        ExponentialBackoffMsgRetryManager manager = new 
ExponentialBackoffMsgRetryManager(initial, mult, max);
-
-        manager.failed(TEST_OFFSET);
-        Thread.sleep(initial);
-
-        manager.retryStarted(TEST_OFFSET);
-        manager.failed(TEST_OFFSET);
-        manager.failed(TEST_OFFSET2);
-
-        // although TEST_OFFSET failed first, it's retry delay time is longer 
b/c this is the second retry
-        // so TEST_OFFSET2 should come first
-
-        Thread.sleep(initial * 2);
-        assertTrue("message "+TEST_OFFSET+"should be ready for retry", 
manager.shouldRetryMsg(TEST_OFFSET));
-        assertTrue("message "+TEST_OFFSET2+"should be ready for retry", 
manager.shouldRetryMsg(TEST_OFFSET2));
-
-        Long next = manager.nextFailedMessageToRetry();
-        assertEquals("expect first message to retry is "+TEST_OFFSET2, 
TEST_OFFSET2, next);
-
-        Thread.sleep(initial);
-
-        // haven't retried yet, so first should still be TEST_OFFSET2
-        next = manager.nextFailedMessageToRetry();
-        assertEquals("expect first message to retry is "+TEST_OFFSET2, 
TEST_OFFSET2, next);
-        manager.retryStarted(next);
-
-        // now it should be TEST_OFFSET
-        next = manager.nextFailedMessageToRetry();
-        assertEquals("expect message to retry is now "+TEST_OFFSET, 
TEST_OFFSET, next);
-        manager.retryStarted(next);
-
-        // now none left
-        next = manager.nextFailedMessageToRetry();
-        assertNull("expect no message to retry now", next);
-    }
-
-    @Test
-    public void testQueriesAfterRetriedAlready() throws Exception {
-        ExponentialBackoffMsgRetryManager manager = new 
ExponentialBackoffMsgRetryManager(0, 0d, 0);
-        manager.failed(TEST_OFFSET);
-        Long next = manager.nextFailedMessageToRetry();
-        assertEquals("expect test offset next available for retry", 
TEST_OFFSET, next);
-        assertTrue("message should be ready for retry immediately", 
manager.shouldRetryMsg(TEST_OFFSET));
-
-        manager.retryStarted(TEST_OFFSET);
-        next = manager.nextFailedMessageToRetry();
-        assertNull("expect no message ready after retried", next);
-        assertFalse("message should not be ready after retried", 
manager.shouldRetryMsg(TEST_OFFSET));
-    }
-
-    @Test(expected = IllegalStateException.class)
-    public void testRetryWithoutFail() throws Exception {
-        ExponentialBackoffMsgRetryManager manager = new 
ExponentialBackoffMsgRetryManager(0, 0d, 0);
-        manager.retryStarted(TEST_OFFSET);
-    }
-
-    @Test(expected = IllegalStateException.class)
-    public void testFailRetryRetry() throws Exception {
-        ExponentialBackoffMsgRetryManager manager = new 
ExponentialBackoffMsgRetryManager(0, 0d, 0);
-        manager.failed(TEST_OFFSET);
-        try {
-            manager.retryStarted(TEST_OFFSET);
-        } catch (IllegalStateException ise) {
-            fail("IllegalStateException unexpected here: " + ise);
-        }
-
-        assertFalse("message should not be ready for retry", 
manager.shouldRetryMsg(TEST_OFFSET));
-        manager.retryStarted(TEST_OFFSET);
-    }
-
-    @Test
-    public void testMaxBackoff() throws Exception {
-        final long initial = 100;
-        final double mult = 2d;
-        final long max = 2000;
-        ExponentialBackoffMsgRetryManager manager = new 
ExponentialBackoffMsgRetryManager(initial, mult, max);
-
-        long expectedWaitTime = initial;
-        for (long i = 0L; i < 4L; ++i) {
-            manager.failed(TEST_OFFSET);
-
-            Thread.sleep((expectedWaitTime + 1L) / 2L);
-            assertFalse("message should not be ready for retry yet", 
manager.shouldRetryMsg(TEST_OFFSET));
-
-            Thread.sleep((expectedWaitTime + 1L) / 2L);
-            Long next = manager.nextFailedMessageToRetry();
-            assertEquals("expect test offset next available for retry", 
TEST_OFFSET, next);
-            assertTrue("message should be ready for retry", 
manager.shouldRetryMsg(TEST_OFFSET));
-
-            manager.retryStarted(TEST_OFFSET);
-            expectedWaitTime = Math.min((long) (expectedWaitTime * mult), max);
-        }
-    }
-
-    @Test
-    public void testFailThenAck() throws Exception {
-        ExponentialBackoffMsgRetryManager manager = new 
ExponentialBackoffMsgRetryManager(0, 0d, 0);
-        manager.failed(TEST_OFFSET);
-        assertTrue("message should be ready for retry", 
manager.shouldRetryMsg(TEST_OFFSET));
-
-        manager.acked(TEST_OFFSET);
-
-        Long next = manager.nextFailedMessageToRetry();
-        assertNull("expect no message ready after acked", next);
-        assertFalse("message should not be ready after acked", 
manager.shouldRetryMsg(TEST_OFFSET));
-    }
-
-    @Test
-    public void testAckThenFail() throws Exception {
-        ExponentialBackoffMsgRetryManager manager = new 
ExponentialBackoffMsgRetryManager(0, 0d, 0);
-        manager.acked(TEST_OFFSET);
-        assertFalse("message should not be ready after acked", 
manager.shouldRetryMsg(TEST_OFFSET));
-
-        manager.failed(TEST_OFFSET);
-
-        Long next = manager.nextFailedMessageToRetry();
-        assertEquals("expect test offset next available for retry", 
TEST_OFFSET, next);
-        assertTrue("message should be ready for retry", 
manager.shouldRetryMsg(TEST_OFFSET));
-    }
-    
-    @Test
-    public void testClearInvalidMessages() throws Exception {
-        ExponentialBackoffMsgRetryManager manager = new 
ExponentialBackoffMsgRetryManager(0, 0d, 0);
-        manager.failed(TEST_OFFSET);
-        manager.failed(TEST_OFFSET2);
-        manager.failed(TEST_OFFSET3);
-        
-        assertTrue("message should be ready for retry", 
manager.shouldRetryMsg(TEST_OFFSET));
-        assertTrue("message should be ready for retry", 
manager.shouldRetryMsg(TEST_OFFSET2));
-        assertTrue("message should be ready for retry", 
manager.shouldRetryMsg(TEST_OFFSET3));
-
-        manager.clearInvalidMessages(TEST_NEW_OFFSET);
-
-        Long next = manager.nextFailedMessageToRetry();
-        assertEquals("expect test offset next available for retry", 
TEST_OFFSET3, next);
-        
-        manager.acked(TEST_OFFSET3);
-        next = manager.nextFailedMessageToRetry();
-        assertNull("expect no message ready after acked", next);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/test/storm/kafka/KafkaErrorTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/storm/kafka/KafkaErrorTest.java 
b/external/storm-kafka/src/test/storm/kafka/KafkaErrorTest.java
deleted file mode 100644
index 9f170db..0000000
--- a/external/storm-kafka/src/test/storm/kafka/KafkaErrorTest.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-import org.junit.Test;
-
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
-
-/**
- * Date: 12/01/2014
- * Time: 18:09
- */
-public class KafkaErrorTest {
-
-    @Test
-    public void getError() {
-        assertThat(KafkaError.getError(0), is(equalTo(KafkaError.NO_ERROR)));
-    }
-
-    @Test
-    public void offsetMetaDataTooLarge() {
-        assertThat(KafkaError.getError(12), 
is(equalTo(KafkaError.OFFSET_METADATA_TOO_LARGE)));
-    }
-
-    @Test
-    public void unknownNegative() {
-        assertThat(KafkaError.getError(-1), is(equalTo(KafkaError.UNKNOWN)));
-    }
-
-    @Test
-    public void unknownPositive() {
-        assertThat(KafkaError.getError(75), is(equalTo(KafkaError.UNKNOWN)));
-    }
-
-    @Test
-    public void unknown() {
-        assertThat(KafkaError.getError(13), is(equalTo(KafkaError.UNKNOWN)));
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/test/storm/kafka/KafkaTestBroker.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/storm/kafka/KafkaTestBroker.java 
b/external/storm-kafka/src/test/storm/kafka/KafkaTestBroker.java
deleted file mode 100644
index 73203d1..0000000
--- a/external/storm-kafka/src/test/storm/kafka/KafkaTestBroker.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.imps.CuratorFrameworkState;
-import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.apache.curator.test.InstanceSpec;
-import org.apache.curator.test.TestingServer;
-
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServerStartable;
-import org.apache.commons.io.FileUtils;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Properties;
-
-/**
- * Date: 11/01/2014
- * Time: 13:15
- */
-public class KafkaTestBroker {
-
-    private int port;
-    private KafkaServerStartable kafka;
-    private TestingServer server;
-    private CuratorFramework zookeeper;
-    private File logDir;
-
-    public KafkaTestBroker() {
-        try {
-            server = new TestingServer();
-            String zookeeperConnectionString = server.getConnectString();
-            ExponentialBackoffRetry retryPolicy = new 
ExponentialBackoffRetry(1000, 3);
-            zookeeper = 
CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
-            zookeeper.start();
-            port = InstanceSpec.getRandomPort();
-            logDir = new File(System.getProperty("java.io.tmpdir"), 
"kafka/logs/kafka-test-" + port);
-            KafkaConfig config = buildKafkaConfig(zookeeperConnectionString);
-            kafka = new KafkaServerStartable(config);
-            kafka.startup();
-        } catch (Exception ex) {
-            throw new RuntimeException("Could not start test broker", ex);
-        }
-    }
-
-    private kafka.server.KafkaConfig buildKafkaConfig(String 
zookeeperConnectionString) {
-        Properties p = new Properties();
-        p.setProperty("zookeeper.connect", zookeeperConnectionString);
-        p.setProperty("broker.id", "0");
-        p.setProperty("port", "" + port);
-        p.setProperty("log.dirs", logDir.getAbsolutePath());
-        return new KafkaConfig(p);
-    }
-
-    public String getBrokerConnectionString() {
-        return "localhost:" + port;
-    }
-
-    public int getPort() {
-        return port;
-    }
-    public void shutdown() {
-        kafka.shutdown();
-        if (zookeeper.getState().equals(CuratorFrameworkState.STARTED)) {
-            zookeeper.close();
-        }
-        try {
-            server.close();
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-        FileUtils.deleteQuietly(logDir);
-    }
-}

Reply via email to