http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java 
b/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
deleted file mode 100644
index eb694bb..0000000
--- a/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
+++ /dev/null
@@ -1,295 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-
-import kafka.api.OffsetRequest;
-import kafka.javaapi.consumer.SimpleConsumer;
-import kafka.javaapi.message.ByteBufferMessageSet;
-import kafka.message.MessageAndOffset;
-
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mockito;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import storm.kafka.trident.GlobalPartitionInformation;
-import backtype.storm.spout.SchemeAsMultiScheme;
-import backtype.storm.utils.Utils;
-
-import com.google.common.collect.ImmutableMap;
-public class KafkaUtilsTest {
-    private String TEST_TOPIC = "testTopic";
-    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaUtilsTest.class);
-    private KafkaTestBroker broker;
-    private SimpleConsumer simpleConsumer;
-    private KafkaConfig config;
-    private BrokerHosts brokerHosts;
-
-    @Before
-    public void setup() {
-        broker = new KafkaTestBroker();
-        GlobalPartitionInformation globalPartitionInformation = new 
GlobalPartitionInformation(TEST_TOPIC);
-        globalPartitionInformation.addPartition(0, 
Broker.fromString(broker.getBrokerConnectionString()));
-        brokerHosts = new StaticHosts(globalPartitionInformation);
-        config = new KafkaConfig(brokerHosts, TEST_TOPIC);
-        simpleConsumer = new SimpleConsumer("localhost", broker.getPort(), 
60000, 1024, "testClient");
-    }
-
-    @After
-    public void shutdown() {
-        simpleConsumer.close();
-        broker.shutdown();
-    }
-
-
-    @Test(expected = FailedFetchException.class)
-    public void topicDoesNotExist() throws Exception {
-        KafkaUtils.fetchMessages(config, simpleConsumer, new 
Partition(Broker.fromString(broker.getBrokerConnectionString()), TEST_TOPIC, 
0), 0);
-    }
-
-    @Test(expected = FailedFetchException.class)
-    public void brokerIsDown() throws Exception {
-        int port = broker.getPort();
-        broker.shutdown();
-        SimpleConsumer simpleConsumer = new SimpleConsumer("localhost", port, 
100, 1024, "testClient");
-        try {
-            KafkaUtils.fetchMessages(config, simpleConsumer, new 
Partition(Broker.fromString(broker.getBrokerConnectionString()), TEST_TOPIC, 
0), OffsetRequest.LatestTime());
-        } finally {
-            simpleConsumer.close();
-        }
-    }
-
-    @Test
-    public void fetchMessage() throws Exception {
-        String value = "test";
-        createTopicAndSendMessage(value);
-        long offset = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, 
OffsetRequest.LatestTime()) - 1;
-        ByteBufferMessageSet messageAndOffsets = 
KafkaUtils.fetchMessages(config, simpleConsumer,
-                new 
Partition(Broker.fromString(broker.getBrokerConnectionString()), TEST_TOPIC, 
0), offset);
-        String message = new 
String(Utils.toByteArray(messageAndOffsets.iterator().next().message().payload()));
-        assertThat(message, is(equalTo(value)));
-    }
-
-    @Test(expected = FailedFetchException.class)
-    public void fetchMessagesWithInvalidOffsetAndDefaultHandlingDisabled() 
throws Exception {
-        config.useStartOffsetTimeIfOffsetOutOfRange = false;
-        KafkaUtils.fetchMessages(config, simpleConsumer,
-                new 
Partition(Broker.fromString(broker.getBrokerConnectionString()), TEST_TOPIC, 
0), -99);
-    }
-
-    @Test(expected = TopicOffsetOutOfRangeException.class)
-    public void fetchMessagesWithInvalidOffsetAndDefaultHandlingEnabled() 
throws Exception {
-        config = new KafkaConfig(brokerHosts, "newTopic");
-        String value = "test";
-        createTopicAndSendMessage(value);
-        KafkaUtils.fetchMessages(config, simpleConsumer,
-                new 
Partition(Broker.fromString(broker.getBrokerConnectionString()), "newTopic", 
0), -99);
-    }
-
-    @Test
-    public void getOffsetFromConfigAndDontForceFromStart() {
-        config.ignoreZkOffsets = false;
-        config.startOffsetTime = OffsetRequest.EarliestTime();
-        createTopicAndSendMessage();
-        long latestOffset = KafkaUtils.getOffset(simpleConsumer, config.topic, 
0, OffsetRequest.EarliestTime());
-        long offsetFromConfig = KafkaUtils.getOffset(simpleConsumer, 
config.topic, 0, config);
-        assertThat(latestOffset, is(equalTo(offsetFromConfig)));
-    }
-
-    @Test
-    public void getOffsetFromConfigAndFroceFromStart() {
-        config.ignoreZkOffsets = true;
-        config.startOffsetTime = OffsetRequest.EarliestTime();
-        createTopicAndSendMessage();
-        long earliestOffset = KafkaUtils.getOffset(simpleConsumer, 
config.topic, 0, OffsetRequest.EarliestTime());
-        long offsetFromConfig = KafkaUtils.getOffset(simpleConsumer, 
config.topic, 0, config);
-        assertThat(earliestOffset, is(equalTo(offsetFromConfig)));
-    }
-
-    @Test
-    public void generateTuplesWithoutKeyAndKeyValueScheme() {
-        config.scheme = new KeyValueSchemeAsMultiScheme(new 
StringKeyValueScheme());
-        runGetValueOnlyTuplesTest();
-    }
-
-    @Test
-    public void generateTuplesWithKeyAndKeyValueScheme() {
-        config.scheme = new KeyValueSchemeAsMultiScheme(new 
StringKeyValueScheme());
-        config.useStartOffsetTimeIfOffsetOutOfRange = false;
-        String value = "value";
-        String key = "key";
-        createTopicAndSendMessage(key, value);
-        ByteBufferMessageSet messageAndOffsets = getLastMessage();
-        for (MessageAndOffset msg : messageAndOffsets) {
-            Iterable<List<Object>> lists = KafkaUtils.generateTuples(config, 
msg.message(), config.topic);
-            assertEquals(ImmutableMap.of(key, value), 
lists.iterator().next().get(0));
-        }
-    }
-
-    @Test
-    public void generateTupelsWithValueScheme() {
-        config.scheme = new SchemeAsMultiScheme(new StringScheme());
-        runGetValueOnlyTuplesTest();
-    }
-
-    @Test
-    public void generateTuplesWithValueAndStringMultiSchemeWithTopic() {
-        config.scheme = new StringMultiSchemeWithTopic();
-        String value = "value";
-        createTopicAndSendMessage(value);
-        ByteBufferMessageSet messageAndOffsets = getLastMessage();
-        for (MessageAndOffset msg : messageAndOffsets) {
-            Iterable<List<Object>> lists = KafkaUtils.generateTuples(config, 
msg.message(), config.topic);
-            List<Object> list = lists.iterator().next();
-            assertEquals(value, list.get(0));
-            assertEquals(config.topic, list.get(1));
-        }
-    }
-
-    @Test
-    public void generateTuplesWithValueSchemeAndKeyValueMessage() {
-        config.scheme = new SchemeAsMultiScheme(new StringScheme());
-        String value = "value";
-        String key = "key";
-        createTopicAndSendMessage(key, value);
-        ByteBufferMessageSet messageAndOffsets = getLastMessage();
-        for (MessageAndOffset msg : messageAndOffsets) {
-            Iterable<List<Object>> lists = KafkaUtils.generateTuples(config, 
msg.message(), config.topic);
-            assertEquals(value, lists.iterator().next().get(0));
-        }
-    }
-    
-    @Test
-    public void generateTuplesWithMessageAndMetadataScheme() {
-        String value = "value";
-        Partition mockPartition = Mockito.mock(Partition.class);
-        mockPartition.partition = 0;
-        long offset = 0L;
-        
-        MessageMetadataSchemeAsMultiScheme scheme = new 
MessageMetadataSchemeAsMultiScheme(new StringMessageAndMetadataScheme());
-        
-        createTopicAndSendMessage(null, value);
-        ByteBufferMessageSet messageAndOffsets = getLastMessage();
-        for (MessageAndOffset msg : messageAndOffsets) {
-            Iterable<List<Object>> lists = KafkaUtils.generateTuples(scheme, 
msg.message(), mockPartition, offset);
-            List<Object> values = lists.iterator().next(); 
-            assertEquals("Message is incorrect", value, values.get(0));
-            assertEquals("Partition is incorrect", mockPartition.partition, 
values.get(1));
-            assertEquals("Offset is incorrect", offset, values.get(2));
-        }
-    }
-
-    private ByteBufferMessageSet getLastMessage() {
-        long offsetOfLastMessage = KafkaUtils.getOffset(simpleConsumer, 
config.topic, 0, OffsetRequest.LatestTime()) - 1;
-        return KafkaUtils.fetchMessages(config, simpleConsumer, new 
Partition(Broker.fromString(broker.getBrokerConnectionString()), TEST_TOPIC, 
0), offsetOfLastMessage);
-    }
-
-    private void runGetValueOnlyTuplesTest() {
-        String value = "value";
-        
-        createTopicAndSendMessage(null, value);
-        ByteBufferMessageSet messageAndOffsets = getLastMessage();
-        for (MessageAndOffset msg : messageAndOffsets) {
-            Iterable<List<Object>> lists = KafkaUtils.generateTuples(config, 
msg.message(), config.topic);
-            assertEquals(value, lists.iterator().next().get(0));
-        }
-    }
-
-    private void createTopicAndSendMessage() {
-        createTopicAndSendMessage(null, "someValue");
-    }
-
-    private void createTopicAndSendMessage(String value) {
-        createTopicAndSendMessage(null, value);
-    }
-
-    private void createTopicAndSendMessage(String key, String value) {
-        Properties p = new Properties();
-        p.put("acks", "1");
-        p.put("bootstrap.servers", broker.getBrokerConnectionString());
-        p.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
-        p.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
-        p.put("metadata.fetch.timeout.ms", 1000);
-        KafkaProducer<String, String> producer = new KafkaProducer<String, 
String>(p);
-        try {
-            producer.send(new ProducerRecord<String, String>(config.topic, 
key, value)).get();
-        } catch (Exception e) {
-            Assert.fail(e.getMessage());
-            LOG.error("Failed to do synchronous sending due to " + e, e);
-        } finally {
-            producer.close();
-        }
-    }
-
-    @Test
-    public void assignOnePartitionPerTask() {
-        runPartitionToTaskMappingTest(16, 1);
-    }
-
-    @Test
-    public void assignTwoPartitionsPerTask() {
-        runPartitionToTaskMappingTest(16, 2);
-    }
-
-    @Test
-    public void assignAllPartitionsToOneTask() {
-        runPartitionToTaskMappingTest(32, 32);
-    }
-    
-    public void runPartitionToTaskMappingTest(int numPartitions, int 
partitionsPerTask) {
-        GlobalPartitionInformation globalPartitionInformation = 
TestUtils.buildPartitionInfo(numPartitions);
-        List<GlobalPartitionInformation> partitions = new 
ArrayList<GlobalPartitionInformation>();
-        partitions.add(globalPartitionInformation);
-        int numTasks = numPartitions / partitionsPerTask;
-        for (int i = 0 ; i < numTasks ; i++) {
-            assertEquals(partitionsPerTask, 
KafkaUtils.calculatePartitionsForTask(partitions, numTasks, i).size());
-        }
-    }
-
-    @Test
-    public void moreTasksThanPartitions() {
-        GlobalPartitionInformation globalPartitionInformation = 
TestUtils.buildPartitionInfo(1);
-        List<GlobalPartitionInformation> partitions = new 
ArrayList<GlobalPartitionInformation>();
-        partitions.add(globalPartitionInformation);
-        int numTasks = 2;
-        assertEquals(1, KafkaUtils.calculatePartitionsForTask(partitions, 
numTasks, 0).size());
-        assertEquals(0, KafkaUtils.calculatePartitionsForTask(partitions, 
numTasks, 1).size());
-    }
-
-    @Test (expected = IllegalArgumentException.class )
-    public void assignInvalidTask() {
-        GlobalPartitionInformation globalPartitionInformation = new 
GlobalPartitionInformation(TEST_TOPIC);
-        List<GlobalPartitionInformation> partitions = new 
ArrayList<GlobalPartitionInformation>();
-        partitions.add(globalPartitionInformation);
-        KafkaUtils.calculatePartitionsForTask(partitions, 1, 1);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/test/storm/kafka/StringKeyValueSchemeTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/test/storm/kafka/StringKeyValueSchemeTest.java 
b/external/storm-kafka/src/test/storm/kafka/StringKeyValueSchemeTest.java
deleted file mode 100644
index eddb900..0000000
--- a/external/storm-kafka/src/test/storm/kafka/StringKeyValueSchemeTest.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-import backtype.storm.tuple.Fields;
-import com.google.common.collect.ImmutableMap;
-import org.junit.Test;
-
-import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-import java.util.Collections;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class StringKeyValueSchemeTest {
-
-    private StringKeyValueScheme scheme = new StringKeyValueScheme();
-
-    @Test
-    public void testDeserialize() throws Exception {
-        assertEquals(Collections.singletonList("test"), 
scheme.deserialize(wrapString("test")));
-    }
-
-    @Test
-    public void testGetOutputFields() throws Exception {
-        Fields outputFields = scheme.getOutputFields();
-        assertTrue(outputFields.contains(StringScheme.STRING_SCHEME_KEY));
-        assertEquals(1, outputFields.size());
-    }
-
-    @Test
-    public void testDeserializeWithNullKeyAndValue() throws Exception {
-        assertEquals(Collections.singletonList("test"),
-            scheme.deserializeKeyAndValue(null, wrapString("test")));
-    }
-
-    @Test
-    public void testDeserializeWithKeyAndValue() throws Exception {
-        assertEquals(Collections.singletonList(ImmutableMap.of("key", "test")),
-                scheme.deserializeKeyAndValue(wrapString("key"), 
wrapString("test")));
-    }
-
-    private static ByteBuffer wrapString(String s) {
-        return ByteBuffer.wrap(s.getBytes(Charset.defaultCharset()));
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/test/storm/kafka/TestStringScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/storm/kafka/TestStringScheme.java 
b/external/storm-kafka/src/test/storm/kafka/TestStringScheme.java
deleted file mode 100644
index ae36409..0000000
--- a/external/storm-kafka/src/test/storm/kafka/TestStringScheme.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-import org.junit.Test;
-
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-
-import static org.junit.Assert.assertEquals;
-
-public class TestStringScheme {
-  @Test
-  public void testDeserializeString() {
-    String s = "foo";
-    byte[] bytes = s.getBytes(StandardCharsets.UTF_8);
-    ByteBuffer direct = ByteBuffer.allocateDirect(bytes.length);
-    direct.put(bytes);
-    direct.flip();
-    String s1 = StringScheme.deserializeString(ByteBuffer.wrap(bytes));
-    String s2 = StringScheme.deserializeString(direct);
-    assertEquals(s, s1);
-    assertEquals(s, s2);
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/test/storm/kafka/TestUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/storm/kafka/TestUtils.java 
b/external/storm-kafka/src/test/storm/kafka/TestUtils.java
deleted file mode 100644
index 3e69160..0000000
--- a/external/storm-kafka/src/test/storm/kafka/TestUtils.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-import backtype.storm.Config;
-import backtype.storm.utils.Utils;
-import kafka.api.OffsetRequest;
-import kafka.javaapi.consumer.SimpleConsumer;
-import kafka.javaapi.message.ByteBufferMessageSet;
-import kafka.message.Message;
-import kafka.message.MessageAndOffset;
-import storm.kafka.bolt.KafkaBolt;
-import storm.kafka.trident.GlobalPartitionInformation;
-
-import java.nio.ByteBuffer;
-import java.util.*;
-
-import static org.junit.Assert.assertEquals;
-
-public class TestUtils {
-
-    public static final String TOPIC = "test";
-
-    public static GlobalPartitionInformation buildPartitionInfo(int 
numPartitions) {
-        return buildPartitionInfo(numPartitions, 9092);
-    }
-
-    public static List<GlobalPartitionInformation> 
buildPartitionInfoList(GlobalPartitionInformation partitionInformation) {
-        List<GlobalPartitionInformation> map = new 
ArrayList<GlobalPartitionInformation>();
-        map.add(partitionInformation);
-        return map;
-    }
-
-    public static GlobalPartitionInformation buildPartitionInfo(int 
numPartitions, int brokerPort) {
-        GlobalPartitionInformation globalPartitionInformation = new 
GlobalPartitionInformation(TOPIC);
-        for (int i = 0; i < numPartitions; i++) {
-            globalPartitionInformation.addPartition(i, 
Broker.fromString("broker-" + i + " :" + brokerPort));
-        }
-        return globalPartitionInformation;
-    }
-
-    public static SimpleConsumer getKafkaConsumer(KafkaTestBroker broker) {
-        BrokerHosts brokerHosts = getBrokerHosts(broker);
-        KafkaConfig kafkaConfig = new KafkaConfig(brokerHosts, TOPIC);
-        SimpleConsumer simpleConsumer = new SimpleConsumer("localhost", 
broker.getPort(), 60000, 1024, "testClient");
-        return simpleConsumer;
-    }
-
-    public static KafkaConfig getKafkaConfig(KafkaTestBroker broker) {
-        BrokerHosts brokerHosts = getBrokerHosts(broker);
-        KafkaConfig kafkaConfig = new KafkaConfig(brokerHosts, TOPIC);
-        return kafkaConfig;
-    }
-
-    private static BrokerHosts getBrokerHosts(KafkaTestBroker broker) {
-        GlobalPartitionInformation globalPartitionInformation = new 
GlobalPartitionInformation(TOPIC);
-        globalPartitionInformation.addPartition(0, 
Broker.fromString(broker.getBrokerConnectionString()));
-        return new StaticHosts(globalPartitionInformation);
-    }
-
-    public static Properties getProducerProperties(String 
brokerConnectionString) {
-        Properties props = new Properties();
-        props.put("bootstrap.servers", brokerConnectionString);
-        props.put("acks", "1");
-        props.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
-        props.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
-        return props;
-    }
-
-    public static boolean verifyMessage(String key, String message, 
KafkaTestBroker broker, SimpleConsumer simpleConsumer) {
-        long lastMessageOffset = KafkaUtils.getOffset(simpleConsumer, 
TestUtils.TOPIC, 0, OffsetRequest.LatestTime()) - 1;
-        ByteBufferMessageSet messageAndOffsets = 
KafkaUtils.fetchMessages(TestUtils.getKafkaConfig(broker), simpleConsumer,
-                new 
Partition(Broker.fromString(broker.getBrokerConnectionString()),TestUtils.TOPIC,
 0), lastMessageOffset);
-        MessageAndOffset messageAndOffset = 
messageAndOffsets.iterator().next();
-        Message kafkaMessage = messageAndOffset.message();
-        ByteBuffer messageKeyBuffer = kafkaMessage.key();
-        String keyString = null;
-        String messageString = new 
String(Utils.toByteArray(kafkaMessage.payload()));
-        if (messageKeyBuffer != null) {
-            keyString = new String(Utils.toByteArray(messageKeyBuffer));
-        }
-        assertEquals(key, keyString);
-        assertEquals(message, messageString);
-        return true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/test/storm/kafka/TridentKafkaTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/storm/kafka/TridentKafkaTest.java 
b/external/storm-kafka/src/test/storm/kafka/TridentKafkaTest.java
deleted file mode 100644
index 8213b07..0000000
--- a/external/storm-kafka/src/test/storm/kafka/TridentKafkaTest.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-import backtype.storm.tuple.Fields;
-import kafka.javaapi.consumer.SimpleConsumer;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import storm.kafka.trident.TridentKafkaState;
-import storm.kafka.trident.mapper.FieldNameBasedTupleToKafkaMapper;
-import storm.kafka.trident.mapper.TridentTupleToKafkaMapper;
-import storm.kafka.trident.selector.DefaultTopicSelector;
-import storm.kafka.trident.selector.KafkaTopicSelector;
-import storm.trident.tuple.TridentTuple;
-import storm.trident.tuple.TridentTupleView;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class TridentKafkaTest {
-    private KafkaTestBroker broker;
-    private TridentKafkaState state;
-    private SimpleConsumer simpleConsumer;
-
-    @Before
-    public void setup() {
-        broker = new KafkaTestBroker();
-        simpleConsumer = TestUtils.getKafkaConsumer(broker);
-        TridentTupleToKafkaMapper mapper = new 
FieldNameBasedTupleToKafkaMapper("key", "message");
-        KafkaTopicSelector topicSelector = new 
DefaultTopicSelector(TestUtils.TOPIC);
-        state = new TridentKafkaState()
-                .withKafkaTopicSelector(topicSelector)
-                .withTridentTupleToKafkaMapper(mapper);
-        
state.prepare(TestUtils.getProducerProperties(broker.getBrokerConnectionString()));
-    }
-
-    @Test
-    public void testKeyValue() {
-        String keyString = "key-123";
-        String valString = "message-123";
-        int batchSize = 10;
-
-        List<TridentTuple> tridentTuples = generateTupleBatch(keyString, 
valString, batchSize);
-
-        state.updateState(tridentTuples, null);
-
-        for(int i = 0 ; i < batchSize ; i++) {
-            TestUtils.verifyMessage(keyString, valString, broker, 
simpleConsumer);
-        }
-    }
-
-    private List<TridentTuple> generateTupleBatch(String key, String message, 
int batchsize) {
-        List<TridentTuple> batch = new ArrayList<>();
-        for(int i =0 ; i < batchsize; i++) {
-            batch.add(TridentTupleView.createFreshTuple(new Fields("key", 
"message"), key, message));
-        }
-        return batch;
-    }
-
-    @After
-    public void shutdown() {
-        simpleConsumer.close();
-        broker.shutdown();
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/test/storm/kafka/TridentKafkaTopology.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/test/storm/kafka/TridentKafkaTopology.java 
b/external/storm-kafka/src/test/storm/kafka/TridentKafkaTopology.java
deleted file mode 100644
index b9e25e4..0000000
--- a/external/storm-kafka/src/test/storm/kafka/TridentKafkaTopology.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-import com.google.common.collect.ImmutableMap;
-import storm.kafka.trident.TridentKafkaStateFactory;
-import storm.kafka.trident.TridentKafkaUpdater;
-import storm.kafka.trident.mapper.FieldNameBasedTupleToKafkaMapper;
-import storm.kafka.trident.selector.DefaultTopicSelector;
-import storm.trident.Stream;
-import storm.trident.TridentTopology;
-import storm.trident.testing.FixedBatchSpout;
-
-import java.util.Properties;
-
-public class TridentKafkaTopology {
-
-    private static StormTopology buildTopology(String brokerConnectionString) {
-        Fields fields = new Fields("word", "count");
-        FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
-                new Values("storm", "1"),
-                new Values("trident", "1"),
-                new Values("needs", "1"),
-                new Values("javadoc", "1")
-        );
-        spout.setCycle(true);
-
-        TridentTopology topology = new TridentTopology();
-        Stream stream = topology.newStream("spout1", spout);
-
-        Properties props = new Properties();
-        props.put("bootstrap.servers", brokerConnectionString);
-        props.put("acks", "1");
-        props.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
-        props.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
-
-        TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
-            .withProducerProperties(props)
-            .withKafkaTopicSelector(new DefaultTopicSelector("test"))
-            .withTridentTupleToKafkaMapper(new 
FieldNameBasedTupleToKafkaMapper("word", "count"));
-        stream.partitionPersist(stateFactory, fields, new 
TridentKafkaUpdater(), new Fields());
-
-        return topology.build();
-    }
-
-    /**
-     * To run this topology ensure you have a kafka broker running and provide 
connection string to broker as argument.
-     * Create a topic test with command line,
-     * kafka-topics.sh --create --zookeeper localhost:2181 
--replication-factor 1 --partition 1 --topic test
-     *
-     * run this program and run the kafka consumer:
-     * kafka-console-consumer.sh --zookeeper localhost:2181 --topic test 
--from-beginning
-     *
-     * you should see the messages flowing through.
-     *
-     * @param args
-     * @throws Exception
-     */
-    public static void main(String[] args) throws Exception {
-        if(args.length < 1) {
-            System.out.println("Please provide kafka broker url ,e.g. 
localhost:9092");
-        }
-
-        LocalCluster cluster = new LocalCluster();
-        cluster.submitTopology("wordCounter", new Config(), 
buildTopology(args[0]));
-        Thread.sleep(60 * 1000);
-        cluster.killTopology("wordCounter");
-
-        cluster.shutdown();
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/test/storm/kafka/ZkCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/storm/kafka/ZkCoordinatorTest.java 
b/external/storm-kafka/src/test/storm/kafka/ZkCoordinatorTest.java
deleted file mode 100644
index 48ca60f..0000000
--- a/external/storm-kafka/src/test/storm/kafka/ZkCoordinatorTest.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-import backtype.storm.Config;
-import org.apache.curator.test.TestingServer;
-import kafka.javaapi.consumer.SimpleConsumer;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-import java.util.*;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Mockito.when;
-
-public class ZkCoordinatorTest {
-
-
-    @Mock
-    private DynamicBrokersReader reader;
-
-    @Mock
-    private DynamicPartitionConnections dynamicPartitionConnections;
-
-    private KafkaTestBroker broker = new KafkaTestBroker();
-    private TestingServer server;
-    private Map stormConf = new HashMap();
-    private SpoutConfig spoutConfig;
-    private ZkState state;
-    private SimpleConsumer simpleConsumer;
-
-    @Before
-    public void setUp() throws Exception {
-        MockitoAnnotations.initMocks(this);
-        server = new TestingServer();
-        String connectionString = server.getConnectString();
-        ZkHosts hosts = new ZkHosts(connectionString);
-        hosts.refreshFreqSecs = 1;
-        spoutConfig = new SpoutConfig(hosts, "topic", "/test", "id");
-        Map conf = buildZookeeperConfig(server);
-        state = new ZkState(conf);
-        simpleConsumer = new SimpleConsumer("localhost", broker.getPort(), 
60000, 1024, "testClient");
-        when(dynamicPartitionConnections.register(any(Broker.class), 
any(String.class) ,anyInt())).thenReturn(simpleConsumer);
-    }
-
-    private Map buildZookeeperConfig(TestingServer server) {
-        Map conf = new HashMap();
-        conf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, server.getPort());
-        conf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, 
Arrays.asList("localhost"));
-        conf.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 20000);
-        conf.put(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT, 20000);
-        conf.put(Config.STORM_ZOOKEEPER_RETRY_TIMES, 3);
-        conf.put(Config.STORM_ZOOKEEPER_RETRY_INTERVAL, 30);
-        return conf;
-    }
-
-    @After
-    public void shutdown() throws Exception {
-        simpleConsumer.close();
-        broker.shutdown();
-        server.close();
-    }
-
-    @Test
-    public void testOnePartitionPerTask() throws Exception {
-        int totalTasks = 64;
-        int partitionsPerTask = 1;
-        List<ZkCoordinator> coordinatorList = buildCoordinators(totalTasks / 
partitionsPerTask);
-        
when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(TestUtils.buildPartitionInfo(totalTasks)));
-        for (ZkCoordinator coordinator : coordinatorList) {
-            List<PartitionManager> myManagedPartitions = 
coordinator.getMyManagedPartitions();
-            assertEquals(partitionsPerTask, myManagedPartitions.size());
-            assertEquals(coordinator._taskIndex, 
myManagedPartitions.get(0).getPartition().partition);
-        }
-    }
-
-
-    @Test
-    public void testPartitionsChange() throws Exception {
-        final int totalTasks = 64;
-        int partitionsPerTask = 2;
-        List<ZkCoordinator> coordinatorList = buildCoordinators(totalTasks / 
partitionsPerTask);
-        
when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(TestUtils.buildPartitionInfo(totalTasks,
 9092)));
-        List<List<PartitionManager>> partitionManagersBeforeRefresh = 
getPartitionManagers(coordinatorList);
-        waitForRefresh();
-        
when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(TestUtils.buildPartitionInfo(totalTasks,
 9093)));
-        List<List<PartitionManager>> partitionManagersAfterRefresh = 
getPartitionManagers(coordinatorList);
-        assertEquals(partitionManagersAfterRefresh.size(), 
partitionManagersAfterRefresh.size());
-        Iterator<List<PartitionManager>> iterator = 
partitionManagersAfterRefresh.iterator();
-        for (List<PartitionManager> partitionManagersBefore : 
partitionManagersBeforeRefresh) {
-            List<PartitionManager> partitionManagersAfter = iterator.next();
-            assertPartitionsAreDifferent(partitionManagersBefore, 
partitionManagersAfter, partitionsPerTask);
-        }
-    }
-
-    private void assertPartitionsAreDifferent(List<PartitionManager> 
partitionManagersBefore, List<PartitionManager> partitionManagersAfter, int 
partitionsPerTask) {
-        assertEquals(partitionsPerTask, partitionManagersBefore.size());
-        assertEquals(partitionManagersBefore.size(), 
partitionManagersAfter.size());
-        for (int i = 0; i < partitionsPerTask; i++) {
-            assertNotEquals(partitionManagersBefore.get(i).getPartition(), 
partitionManagersAfter.get(i).getPartition());
-        }
-
-    }
-
-    private List<List<PartitionManager>> 
getPartitionManagers(List<ZkCoordinator> coordinatorList) {
-        List<List<PartitionManager>> partitions = new ArrayList();
-        for (ZkCoordinator coordinator : coordinatorList) {
-            partitions.add(coordinator.getMyManagedPartitions());
-        }
-        return partitions;
-    }
-
-    private void waitForRefresh() throws InterruptedException {
-        Thread.sleep(((ZkHosts) spoutConfig.hosts).refreshFreqSecs * 1000 + 1);
-    }
-
-    private List<ZkCoordinator> buildCoordinators(int totalTasks) {
-        List<ZkCoordinator> coordinatorList = new ArrayList<ZkCoordinator>();
-        for (int i = 0; i < totalTasks; i++) {
-            ZkCoordinator coordinator = new 
ZkCoordinator(dynamicPartitionConnections, stormConf, spoutConfig, state, i, 
totalTasks, "test-id", reader);
-            coordinatorList.add(coordinator);
-        }
-        return coordinatorList;
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java 
b/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
deleted file mode 100644
index f3aee76..0000000
--- a/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
+++ /dev/null
@@ -1,341 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka.bolt;
-
-import backtype.storm.Config;
-import backtype.storm.Constants;
-import backtype.storm.task.GeneralTopologyContext;
-import backtype.storm.task.IOutputCollector;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.TupleImpl;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.TupleUtils;
-import backtype.storm.utils.Utils;
-import com.google.common.collect.ImmutableList;
-import kafka.api.OffsetRequest;
-import kafka.api.FetchRequest;
-import kafka.javaapi.FetchResponse;
-import kafka.javaapi.OffsetResponse;
-import kafka.javaapi.consumer.SimpleConsumer;
-import kafka.javaapi.message.ByteBufferMessageSet;
-import kafka.message.Message;
-import kafka.message.MessageAndOffset;
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.junit.*;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-import org.mockito.internal.util.reflection.Whitebox;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import storm.kafka.*;
-import storm.kafka.trident.GlobalPartitionInformation;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.HashMap;
-import java.util.Properties;
-import java.util.concurrent.Future;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.*;
-
-public class KafkaBoltTest {
-
-    private static final String TEST_TOPIC = "test-topic";
-    private KafkaTestBroker broker;
-    private KafkaBolt bolt;
-    private Config config = new Config();
-    private KafkaConfig kafkaConfig;
-    private SimpleConsumer simpleConsumer;
-
-    @Mock
-    private IOutputCollector collector;
-
-    @Before
-    public void initMocks() {
-        MockitoAnnotations.initMocks(this);
-        broker = new KafkaTestBroker();
-        setupKafkaConsumer();
-        config.put(KafkaBolt.TOPIC, TEST_TOPIC);
-        bolt = generateStringSerializerBolt();
-    }
-
-    @After
-    public void shutdown() {
-        simpleConsumer.close();
-        broker.shutdown();
-        bolt.cleanup();
-    }
-
-    private void setupKafkaConsumer() {
-        GlobalPartitionInformation globalPartitionInformation = new 
GlobalPartitionInformation(TEST_TOPIC);
-        globalPartitionInformation.addPartition(0, 
Broker.fromString(broker.getBrokerConnectionString()));
-        BrokerHosts brokerHosts = new StaticHosts(globalPartitionInformation);
-        kafkaConfig = new KafkaConfig(brokerHosts, TEST_TOPIC);
-        simpleConsumer = new SimpleConsumer("localhost", broker.getPort(), 
60000, 1024, "testClient");
-    }
-
-    @Test
-    public void shouldAcknowledgeTickTuples() throws Exception {
-        // Given
-        Tuple tickTuple = mockTickTuple();
-
-        // When
-        bolt.execute(tickTuple);
-
-        // Then
-        verify(collector).ack(tickTuple);
-    }
-
-    @Test
-    public void executeWithKey() throws Exception {
-        String message = "value-123";
-        String key = "key-123";
-        Tuple tuple = generateTestTuple(key, message);
-        bolt.execute(tuple);
-        verify(collector).ack(tuple);
-        verifyMessage(key, message);
-    }
-
-    /* test synchronous sending */
-    @Test
-    public void executeWithByteArrayKeyAndMessageSync() {
-        boolean async = false;
-        boolean fireAndForget = false;
-        bolt = generateDefaultSerializerBolt(async, fireAndForget, null);
-        String keyString = "test-key";
-        String messageString = "test-message";
-        byte[] key = keyString.getBytes();
-        byte[] message = messageString.getBytes();
-        Tuple tuple = generateTestTuple(key, message);
-        bolt.execute(tuple);
-        verify(collector).ack(tuple);
-        verifyMessage(keyString, messageString);
-    }
-
-    /* test asynchronous sending (default) */
-    @Test
-    public void executeWithByteArrayKeyAndMessageAsync() {
-        boolean async = true;
-        boolean fireAndForget = false;
-        String keyString = "test-key";
-        String messageString = "test-message";
-        byte[] key = keyString.getBytes();
-        byte[] message = messageString.getBytes();
-        final Tuple tuple = generateTestTuple(key, message);
-
-        final ByteBufferMessageSet mockMsg = mockSingleMessage(key, message);
-        simpleConsumer.close();
-        simpleConsumer = mockSimpleConsumer(mockMsg);
-        KafkaProducer<?, ?> producer = mock(KafkaProducer.class);
-        when(producer.send(any(ProducerRecord.class), 
any(Callback.class))).thenAnswer(new Answer<Future>() {
-            @Override
-            public Future answer(InvocationOnMock invocationOnMock) throws 
Throwable {
-                Callback cb = (Callback) invocationOnMock.getArguments()[1];
-                cb.onCompletion(null, null);
-                return mock(Future.class);
-            }
-        });
-        bolt = generateDefaultSerializerBolt(async, fireAndForget, producer);
-        bolt.execute(tuple);
-        verify(collector).ack(tuple);
-        verifyMessage(keyString, messageString);
-    }
-
-    /* test with fireAndForget option enabled */
-    @Test
-    public void executeWithByteArrayKeyAndMessageFire() {
-        boolean async = true;
-        boolean fireAndForget = true;
-        bolt = generateDefaultSerializerBolt(async, fireAndForget, null);
-        String keyString = "test-key";
-        String messageString = "test-message";
-        byte[] key = keyString.getBytes();
-        byte[] message = messageString.getBytes();
-        Tuple tuple = generateTestTuple(key, message);
-        final ByteBufferMessageSet mockMsg = mockSingleMessage(key, message);
-        simpleConsumer.close();
-        simpleConsumer = mockSimpleConsumer(mockMsg);
-        KafkaProducer<?, ?> producer = mock(KafkaProducer.class);
-        // do not invoke the callback of send() in order to test whether the 
bolt handle the fireAndForget option
-        // properly.
-        
doReturn(mock(Future.class)).when(producer).send(any(ProducerRecord.class), 
any(Callback.class));
-        bolt.execute(tuple);
-        verify(collector).ack(tuple);
-        verifyMessage(keyString, messageString);
-    }
-
-    /* test bolt specified properties */
-    @Test
-    public void executeWithBoltSpecifiedProperties() {
-        boolean async = false;
-        boolean fireAndForget = false;
-        bolt = defaultSerializerBoltWithSpecifiedProperties(async, 
fireAndForget);
-        String keyString = "test-key";
-        String messageString = "test-message";
-        byte[] key = keyString.getBytes();
-        byte[] message = messageString.getBytes();
-        Tuple tuple = generateTestTuple(key, message);
-        bolt.execute(tuple);
-        verify(collector).ack(tuple);
-        verifyMessage(keyString, messageString);
-    }
-
-    private KafkaBolt generateStringSerializerBolt() {
-        Properties props = new Properties();
-        props.put("acks", "1");
-        props.put("bootstrap.servers", broker.getBrokerConnectionString());
-        props.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
-        props.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
-        props.put("metadata.fetch.timeout.ms", 1000);
-        KafkaBolt bolt = new KafkaBolt().withProducerProperties(props);
-        bolt.prepare(config, null, new OutputCollector(collector));
-        bolt.setAsync(false);
-        return bolt;
-    }
-
-    private KafkaBolt generateDefaultSerializerBolt(boolean async, boolean 
fireAndForget,
-                                                    KafkaProducer<?, ?> 
mockProducer) {
-        Properties props = new Properties();
-        props.put("acks", "1");
-        props.put("bootstrap.servers", broker.getBrokerConnectionString());
-        props.put("key.serializer", 
"org.apache.kafka.common.serialization.ByteArraySerializer");
-        props.put("value.serializer", 
"org.apache.kafka.common.serialization.ByteArraySerializer");
-        props.put("metadata.fetch.timeout.ms", 1000);
-        props.put("linger.ms", 0);
-        KafkaBolt bolt = new KafkaBolt().withProducerProperties(props);
-        bolt.prepare(config, null, new OutputCollector(collector));
-        bolt.setAsync(async);
-        bolt.setFireAndForget(fireAndForget);
-        if (mockProducer != null) {
-            Whitebox.setInternalState(bolt, "producer", mockProducer);
-        }
-        return bolt;
-    }
-
-    private KafkaBolt defaultSerializerBoltWithSpecifiedProperties(boolean 
async, boolean fireAndForget) {
-        Properties props = new Properties();
-        props.put("acks", "1");
-        props.put("bootstrap.servers", broker.getBrokerConnectionString());
-        props.put("key.serializer", 
"org.apache.kafka.common.serialization.ByteArraySerializer");
-        props.put("value.serializer", 
"org.apache.kafka.common.serialization.ByteArraySerializer");
-        props.put("metadata.fetch.timeout.ms", 1000);
-        props.put("linger.ms", 0);
-        KafkaBolt bolt = new KafkaBolt().withProducerProperties(props);
-        bolt.prepare(config, null, new OutputCollector(collector));
-        bolt.setAsync(async);
-        bolt.setFireAndForget(fireAndForget);
-        return bolt;
-    }
-
-    @Test
-    public void executeWithoutKey() throws Exception {
-        String message = "value-234";
-        Tuple tuple = generateTestTuple(message);
-        bolt.execute(tuple);
-        verify(collector).ack(tuple);
-        verifyMessage(null, message);
-    }
-
-
-    @Test
-    public void executeWithBrokerDown() throws Exception {
-        broker.shutdown();
-        String message = "value-234";
-        Tuple tuple = generateTestTuple(message);
-        bolt.execute(tuple);
-        verify(collector).fail(tuple);
-    }
-
-    private boolean verifyMessage(String key, String message) {
-        long lastMessageOffset = KafkaUtils.getOffset(simpleConsumer, 
kafkaConfig.topic, 0, OffsetRequest.LatestTime()) - 1;
-        ByteBufferMessageSet messageAndOffsets = 
KafkaUtils.fetchMessages(kafkaConfig, simpleConsumer,
-                new 
Partition(Broker.fromString(broker.getBrokerConnectionString()),kafkaConfig.topic,
 0), lastMessageOffset);
-        MessageAndOffset messageAndOffset = 
messageAndOffsets.iterator().next();
-        Message kafkaMessage = messageAndOffset.message();
-        ByteBuffer messageKeyBuffer = kafkaMessage.key();
-        String keyString = null;
-        String messageString = new 
String(Utils.toByteArray(kafkaMessage.payload()));
-        if (messageKeyBuffer != null) {
-            keyString = new String(Utils.toByteArray(messageKeyBuffer));
-        }
-        assertEquals(key, keyString);
-        assertEquals(message, messageString);
-        return true;
-    }
-
-    private Tuple generateTestTuple(Object key, Object message) {
-        TopologyBuilder builder = new TopologyBuilder();
-        GeneralTopologyContext topologyContext = new 
GeneralTopologyContext(builder.createTopology(), new Config(), new HashMap(), 
new HashMap(), new HashMap(), "") {
-            @Override
-            public Fields getComponentOutputFields(String componentId, String 
streamId) {
-                return new Fields("key", "message");
-            }
-        };
-        return new TupleImpl(topologyContext, new Values(key, message), 1, "");
-    }
-
-    private Tuple generateTestTuple(Object message) {
-        TopologyBuilder builder = new TopologyBuilder();
-        GeneralTopologyContext topologyContext = new 
GeneralTopologyContext(builder.createTopology(), new Config(), new HashMap(), 
new HashMap(), new HashMap(), "") {
-            @Override
-            public Fields getComponentOutputFields(String componentId, String 
streamId) {
-                return new Fields("message");
-            }
-        };
-        return new TupleImpl(topologyContext, new Values(message), 1, "");
-    }
-
-    private Tuple mockTickTuple() {
-        Tuple tuple = mock(Tuple.class);
-        
when(tuple.getSourceComponent()).thenReturn(Constants.SYSTEM_COMPONENT_ID);
-        
when(tuple.getSourceStreamId()).thenReturn(Constants.SYSTEM_TICK_STREAM_ID);
-        // Sanity check
-        assertTrue(TupleUtils.isTick(tuple));
-        return tuple;
-    }
-
-    private static ByteBufferMessageSet mockSingleMessage(byte[] key, byte[] 
message) {
-        ByteBufferMessageSet sets = mock(ByteBufferMessageSet.class);
-        MessageAndOffset msg = mock(MessageAndOffset.class);
-        final List<MessageAndOffset> msgs = ImmutableList.of(msg);
-        doReturn(msgs.iterator()).when(sets).iterator();
-        Message kafkaMessage = mock(Message.class);
-        doReturn(ByteBuffer.wrap(key)).when(kafkaMessage).key();
-        doReturn(ByteBuffer.wrap(message)).when(kafkaMessage).payload();
-        doReturn(kafkaMessage).when(msg).message();
-        return sets;
-    }
-
-    private static SimpleConsumer mockSimpleConsumer(ByteBufferMessageSet 
mockMsg) {
-        SimpleConsumer simpleConsumer = mock(SimpleConsumer.class);
-        FetchResponse resp = mock(FetchResponse.class);
-        doReturn(resp).when(simpleConsumer).fetch(any(FetchRequest.class));
-        OffsetResponse mockOffsetResponse = mock(OffsetResponse.class);
-        doReturn(new long[] {}).when(mockOffsetResponse).offsets(anyString(), 
anyInt());
-        
doReturn(mockOffsetResponse).when(simpleConsumer).getOffsetsBefore(any(kafka.javaapi.OffsetRequest.class));
-        doReturn(mockMsg).when(resp).messageSet(anyString(), anyInt());
-        return simpleConsumer;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-metrics/src/main/java/org/apache/storm/metrics/hdrhistogram/HistogramMetric.java
----------------------------------------------------------------------
diff --git 
a/external/storm-metrics/src/main/java/org/apache/storm/metrics/hdrhistogram/HistogramMetric.java
 
b/external/storm-metrics/src/main/java/org/apache/storm/metrics/hdrhistogram/HistogramMetric.java
index 4adc500..7f58a3d 100644
--- 
a/external/storm-metrics/src/main/java/org/apache/storm/metrics/hdrhistogram/HistogramMetric.java
+++ 
b/external/storm-metrics/src/main/java/org/apache/storm/metrics/hdrhistogram/HistogramMetric.java
@@ -17,7 +17,7 @@
  */
 package org.apache.storm.metrics.hdrhistogram;
 
-import backtype.storm.metric.api.IMetric;
+import org.apache.storm.metric.api.IMetric;
 import org.HdrHistogram.Histogram;
 
 /**

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-metrics/src/main/java/org/apache/storm/metrics/sigar/CPUMetric.java
----------------------------------------------------------------------
diff --git 
a/external/storm-metrics/src/main/java/org/apache/storm/metrics/sigar/CPUMetric.java
 
b/external/storm-metrics/src/main/java/org/apache/storm/metrics/sigar/CPUMetric.java
index a3addc9..7c6d75e 100644
--- 
a/external/storm-metrics/src/main/java/org/apache/storm/metrics/sigar/CPUMetric.java
+++ 
b/external/storm-metrics/src/main/java/org/apache/storm/metrics/sigar/CPUMetric.java
@@ -20,7 +20,7 @@ package org.apache.storm.metrics.sigar;
 import org.hyperic.sigar.Sigar;
 import org.hyperic.sigar.ProcCpu;
 
-import backtype.storm.metric.api.IMetric;
+import org.apache.storm.metric.api.IMetric;
 
 import java.util.HashMap;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java
----------------------------------------------------------------------
diff --git 
a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java
 
b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java
index dc2a2d3..0c64f43 100644
--- 
a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java
+++ 
b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/AbstractRedisBolt.java
@@ -17,9 +17,9 @@
  */
 package org.apache.storm.redis.bolt;
 
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.base.BaseRichBolt;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.base.BaseRichBolt;
 import org.apache.storm.redis.common.config.JedisClusterConfig;
 import org.apache.storm.redis.common.config.JedisPoolConfig;
 import org.apache.storm.redis.common.container.JedisCommandsContainerBuilder;

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java
----------------------------------------------------------------------
diff --git 
a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java
 
b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java
index 47c98cb..4d6dc4e 100644
--- 
a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java
+++ 
b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisLookupBolt.java
@@ -17,9 +17,9 @@
  */
 package org.apache.storm.redis.bolt;
 
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
 import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
 import org.apache.storm.redis.common.mapper.RedisLookupMapper;
 import org.apache.storm.redis.common.config.JedisClusterConfig;

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java
----------------------------------------------------------------------
diff --git 
a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java
 
b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java
index be9a328..b74ed1c 100644
--- 
a/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java
+++ 
b/external/storm-redis/src/main/java/org/apache/storm/redis/bolt/RedisStoreBolt.java
@@ -17,8 +17,8 @@
  */
 package org.apache.storm.redis.bolt;
 
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Tuple;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Tuple;
 import org.apache.storm.redis.common.config.JedisClusterConfig;
 import org.apache.storm.redis.common.config.JedisPoolConfig;
 import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisLookupMapper.java
----------------------------------------------------------------------
diff --git 
a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisLookupMapper.java
 
b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisLookupMapper.java
index 727e4ec..fe464f5 100644
--- 
a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisLookupMapper.java
+++ 
b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/RedisLookupMapper.java
@@ -17,9 +17,9 @@
  */
 package org.apache.storm.redis.common.mapper;
 
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.ITuple;
-import backtype.storm.tuple.Values;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.ITuple;
+import org.apache.storm.tuple.Values;
 
 import java.util.List;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/TupleMapper.java
----------------------------------------------------------------------
diff --git 
a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/TupleMapper.java
 
b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/TupleMapper.java
index bcc531e..a2ab48b 100644
--- 
a/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/TupleMapper.java
+++ 
b/external/storm-redis/src/main/java/org/apache/storm/redis/common/mapper/TupleMapper.java
@@ -17,7 +17,7 @@
  */
 package org.apache.storm.redis.common.mapper;
 
-import backtype.storm.tuple.ITuple;
+import org.apache.storm.tuple.ITuple;
 
 import java.io.Serializable;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisMapState.java
----------------------------------------------------------------------
diff --git 
a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisMapState.java
 
b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisMapState.java
index 26056d2..f5bd459 100644
--- 
a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisMapState.java
+++ 
b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisMapState.java
@@ -19,12 +19,12 @@ package org.apache.storm.redis.trident.state;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
-import storm.trident.state.JSONNonTransactionalSerializer;
-import storm.trident.state.JSONOpaqueSerializer;
-import storm.trident.state.JSONTransactionalSerializer;
-import storm.trident.state.Serializer;
-import storm.trident.state.StateType;
-import storm.trident.state.map.IBackingMap;
+import org.apache.storm.trident.state.JSONNonTransactionalSerializer;
+import org.apache.storm.trident.state.JSONOpaqueSerializer;
+import org.apache.storm.trident.state.JSONTransactionalSerializer;
+import org.apache.storm.trident.state.Serializer;
+import org.apache.storm.trident.state.StateType;
+import org.apache.storm.trident.state.map.IBackingMap;
 
 import java.util.ArrayList;
 import java.util.Collections;

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateQuerier.java
----------------------------------------------------------------------
diff --git 
a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateQuerier.java
 
b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateQuerier.java
index 5c7335d..3785b84 100644
--- 
a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateQuerier.java
+++ 
b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateQuerier.java
@@ -17,14 +17,14 @@
  */
 package org.apache.storm.redis.trident.state;
 
-import backtype.storm.tuple.Values;
+import org.apache.storm.tuple.Values;
 import com.google.common.collect.Lists;
 import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
 import org.apache.storm.redis.common.mapper.RedisLookupMapper;
-import storm.trident.operation.TridentCollector;
-import storm.trident.state.BaseQueryFunction;
-import storm.trident.state.State;
-import storm.trident.tuple.TridentTuple;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.state.BaseQueryFunction;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.tuple.TridentTuple;
 
 import java.util.List;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateUpdater.java
----------------------------------------------------------------------
diff --git 
a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateUpdater.java
 
b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateUpdater.java
index e9654c7..82b7483 100644
--- 
a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateUpdater.java
+++ 
b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/AbstractRedisStateUpdater.java
@@ -19,10 +19,10 @@ package org.apache.storm.redis.trident.state;
 
 import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
 import org.apache.storm.redis.common.mapper.RedisStoreMapper;
-import storm.trident.operation.TridentCollector;
-import storm.trident.state.BaseStateUpdater;
-import storm.trident.state.State;
-import storm.trident.tuple.TridentTuple;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.state.BaseStateUpdater;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.tuple.TridentTuple;
 
 import java.util.HashMap;
 import java.util.List;

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java
----------------------------------------------------------------------
diff --git 
a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java
 
b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java
index f4dbfaa..6ebcb22 100644
--- 
a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java
+++ 
b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/Options.java
@@ -18,7 +18,7 @@
 package org.apache.storm.redis.trident.state;
 
 import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
-import storm.trident.state.Serializer;
+import org.apache.storm.trident.state.Serializer;
 
 import java.io.Serializable;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
----------------------------------------------------------------------
diff --git 
a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
 
b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
index cbd37c5..54e9aea 100644
--- 
a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
+++ 
b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterMapState.java
@@ -17,24 +17,24 @@
  */
 package org.apache.storm.redis.trident.state;
 
-import backtype.storm.task.IMetricsContext;
-import backtype.storm.tuple.Values;
+import org.apache.storm.task.IMetricsContext;
+import org.apache.storm.tuple.Values;
 import com.google.common.collect.Lists;
 import org.apache.storm.redis.common.config.JedisClusterConfig;
 import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
 import redis.clients.jedis.JedisCluster;
-import storm.trident.state.OpaqueValue;
-import storm.trident.state.Serializer;
-import storm.trident.state.State;
-import storm.trident.state.StateFactory;
-import storm.trident.state.StateType;
-import storm.trident.state.TransactionalValue;
-import storm.trident.state.map.CachedMap;
-import storm.trident.state.map.MapState;
-import storm.trident.state.map.NonTransactionalMap;
-import storm.trident.state.map.OpaqueMap;
-import storm.trident.state.map.SnapshottableMap;
-import storm.trident.state.map.TransactionalMap;
+import org.apache.storm.trident.state.OpaqueValue;
+import org.apache.storm.trident.state.Serializer;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.state.StateFactory;
+import org.apache.storm.trident.state.StateType;
+import org.apache.storm.trident.state.TransactionalValue;
+import org.apache.storm.trident.state.map.CachedMap;
+import org.apache.storm.trident.state.map.MapState;
+import org.apache.storm.trident.state.map.NonTransactionalMap;
+import org.apache.storm.trident.state.map.OpaqueMap;
+import org.apache.storm.trident.state.map.SnapshottableMap;
+import org.apache.storm.trident.state.map.TransactionalMap;
 
 import java.util.List;
 import java.util.Map;

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java
----------------------------------------------------------------------
diff --git 
a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java
 
b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java
index 764436d..c773c1a 100644
--- 
a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java
+++ 
b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterState.java
@@ -17,13 +17,13 @@
  */
 package org.apache.storm.redis.trident.state;
 
-import backtype.storm.task.IMetricsContext;
+import org.apache.storm.task.IMetricsContext;
 import org.apache.storm.redis.common.config.JedisClusterConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import redis.clients.jedis.JedisCluster;
-import storm.trident.state.State;
-import storm.trident.state.StateFactory;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.state.StateFactory;
 
 import java.util.Map;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
----------------------------------------------------------------------
diff --git 
a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
 
b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
index 25e9924..b379fc1 100644
--- 
a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
+++ 
b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java
@@ -17,25 +17,25 @@
  */
 package org.apache.storm.redis.trident.state;
 
-import backtype.storm.task.IMetricsContext;
-import backtype.storm.tuple.Values;
+import org.apache.storm.task.IMetricsContext;
+import org.apache.storm.tuple.Values;
 import org.apache.storm.redis.common.config.JedisPoolConfig;
 import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
 import redis.clients.jedis.Jedis;
 import redis.clients.jedis.JedisPool;
 import redis.clients.jedis.Pipeline;
-import storm.trident.state.OpaqueValue;
-import storm.trident.state.Serializer;
-import storm.trident.state.State;
-import storm.trident.state.StateFactory;
-import storm.trident.state.StateType;
-import storm.trident.state.TransactionalValue;
-import storm.trident.state.map.CachedMap;
-import storm.trident.state.map.MapState;
-import storm.trident.state.map.NonTransactionalMap;
-import storm.trident.state.map.OpaqueMap;
-import storm.trident.state.map.SnapshottableMap;
-import storm.trident.state.map.TransactionalMap;
+import org.apache.storm.trident.state.OpaqueValue;
+import org.apache.storm.trident.state.Serializer;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.state.StateFactory;
+import org.apache.storm.trident.state.StateType;
+import org.apache.storm.trident.state.TransactionalValue;
+import org.apache.storm.trident.state.map.CachedMap;
+import org.apache.storm.trident.state.map.MapState;
+import org.apache.storm.trident.state.map.NonTransactionalMap;
+import org.apache.storm.trident.state.map.OpaqueMap;
+import org.apache.storm.trident.state.map.SnapshottableMap;
+import org.apache.storm.trident.state.map.TransactionalMap;
 
 import java.util.List;
 import java.util.Map;

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisState.java
----------------------------------------------------------------------
diff --git 
a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisState.java
 
b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisState.java
index 85d0e1b..a93b348 100644
--- 
a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisState.java
+++ 
b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisState.java
@@ -17,14 +17,14 @@
  */
 package org.apache.storm.redis.trident.state;
 
-import backtype.storm.task.IMetricsContext;
+import org.apache.storm.task.IMetricsContext;
 import org.apache.storm.redis.common.config.JedisPoolConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import redis.clients.jedis.Jedis;
 import redis.clients.jedis.JedisPool;
-import storm.trident.state.State;
-import storm.trident.state.StateFactory;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.state.StateFactory;
 
 import java.util.Map;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-redis/src/test/java/org/apache/storm/redis/topology/LookupWordCount.java
----------------------------------------------------------------------
diff --git 
a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/LookupWordCount.java
 
b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/LookupWordCount.java
index ae053de..f62b7b0 100644
--- 
a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/LookupWordCount.java
+++ 
b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/LookupWordCount.java
@@ -17,18 +17,18 @@
  */
 package org.apache.storm.redis.topology;
 
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.StormSubmitter;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.ITuple;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.ITuple;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
 import com.google.common.collect.Lists;
 import org.apache.storm.redis.bolt.RedisLookupBolt;
 import org.apache.storm.redis.common.config.JedisPoolConfig;

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java
----------------------------------------------------------------------
diff --git 
a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java
 
b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java
index 77c6ee8..d46bab6 100644
--- 
a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java
+++ 
b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/PersistentWordCount.java
@@ -17,14 +17,14 @@
  */
 package org.apache.storm.redis.topology;
 
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.StormSubmitter;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.ITuple;
-import backtype.storm.tuple.Tuple;
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.ITuple;
+import org.apache.storm.tuple.Tuple;
 import org.apache.storm.redis.bolt.AbstractRedisBolt;
 import org.apache.storm.redis.bolt.RedisStoreBolt;
 import org.apache.storm.redis.common.config.JedisClusterConfig;

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-redis/src/test/java/org/apache/storm/redis/topology/WordCounter.java
----------------------------------------------------------------------
diff --git 
a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/WordCounter.java
 
b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/WordCounter.java
index 6f25038..6fa930c 100644
--- 
a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/WordCounter.java
+++ 
b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/WordCounter.java
@@ -17,18 +17,18 @@
  */
 package org.apache.storm.redis.topology;
 
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.BasicOutputCollector;
-import backtype.storm.topology.IBasicBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.IBasicBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
 import com.google.common.collect.Maps;
 
 import java.util.Map;
 
-import static backtype.storm.utils.Utils.tuple;
+import static org.apache.storm.utils.Utils.tuple;
 
 public class WordCounter implements IBasicBolt {
     private Map<String, Integer> wordCounter = Maps.newHashMap();
@@ -64,4 +64,4 @@ public class WordCounter implements IBasicBolt {
         return null;
     }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-redis/src/test/java/org/apache/storm/redis/topology/WordSpout.java
----------------------------------------------------------------------
diff --git 
a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/WordSpout.java
 
b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/WordSpout.java
index bb9c2d7..e2cdfde 100644
--- 
a/external/storm-redis/src/test/java/org/apache/storm/redis/topology/WordSpout.java
+++ 
b/external/storm-redis/src/test/java/org/apache/storm/redis/topology/WordSpout.java
@@ -17,12 +17,12 @@
  */
 package org.apache.storm.redis.topology;
 
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
 
 import java.util.Map;
 import java.util.Random;

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-redis/src/test/java/org/apache/storm/redis/trident/PrintFunction.java
----------------------------------------------------------------------
diff --git 
a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/PrintFunction.java
 
b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/PrintFunction.java
index 6f465c9..37d3936 100644
--- 
a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/PrintFunction.java
+++ 
b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/PrintFunction.java
@@ -19,9 +19,9 @@ package org.apache.storm.redis.trident;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import storm.trident.operation.BaseFunction;
-import storm.trident.operation.TridentCollector;
-import storm.trident.tuple.TridentTuple;
+import org.apache.storm.trident.operation.BaseFunction;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.tuple.TridentTuple;
 
 import java.util.Random;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountLookupMapper.java
----------------------------------------------------------------------
diff --git 
a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountLookupMapper.java
 
b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountLookupMapper.java
index a445749..a6ca8c9 100644
--- 
a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountLookupMapper.java
+++ 
b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountLookupMapper.java
@@ -17,10 +17,10 @@
  */
 package org.apache.storm.redis.trident;
 
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.ITuple;
-import backtype.storm.tuple.Values;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.ITuple;
+import org.apache.storm.tuple.Values;
 import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
 import org.apache.storm.redis.common.mapper.RedisLookupMapper;
 
@@ -54,4 +54,4 @@ public class WordCountLookupMapper implements 
RedisLookupMapper {
     public String getValueFromTuple(ITuple tuple) {
         return tuple.getInteger(1).toString();
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountStoreMapper.java
----------------------------------------------------------------------
diff --git 
a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountStoreMapper.java
 
b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountStoreMapper.java
index b930998..58df150 100644
--- 
a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountStoreMapper.java
+++ 
b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountStoreMapper.java
@@ -17,7 +17,7 @@
  */
 package org.apache.storm.redis.trident;
 
-import backtype.storm.tuple.ITuple;
+import org.apache.storm.tuple.ITuple;
 import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
 import org.apache.storm.redis.common.mapper.RedisStoreMapper;
 
@@ -36,4 +36,4 @@ public class WordCountStoreMapper implements RedisStoreMapper 
{
     public String getValueFromTuple(ITuple tuple) {
         return tuple.getInteger(1).toString();
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
----------------------------------------------------------------------
diff --git 
a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
 
b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
index 4a4aae0..e3eb0f9 100644
--- 
a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
+++ 
b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java
@@ -17,22 +17,22 @@
  */
 package org.apache.storm.redis.trident;
 
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.StormSubmitter;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
 import org.apache.storm.redis.common.mapper.RedisLookupMapper;
 import org.apache.storm.redis.common.mapper.RedisStoreMapper;
 import org.apache.storm.redis.trident.state.RedisState;
 import org.apache.storm.redis.trident.state.RedisStateQuerier;
 import org.apache.storm.redis.trident.state.RedisStateUpdater;
 import org.apache.storm.redis.common.config.JedisPoolConfig;
-import storm.trident.Stream;
-import storm.trident.TridentState;
-import storm.trident.TridentTopology;
-import storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentState;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.testing.FixedBatchSpout;
 
 public class WordCountTridentRedis {
     public static StormTopology buildTopology(String redisHost, Integer 
redisPort){

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
----------------------------------------------------------------------
diff --git 
a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
 
b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
index 765b339..116a58a 100644
--- 
a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
+++ 
b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java
@@ -17,22 +17,22 @@
  */
 package org.apache.storm.redis.trident;
 
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.StormSubmitter;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
 import org.apache.storm.redis.common.mapper.RedisLookupMapper;
 import org.apache.storm.redis.common.mapper.RedisStoreMapper;
 import org.apache.storm.redis.trident.state.RedisClusterState;
 import org.apache.storm.redis.trident.state.RedisClusterStateQuerier;
 import org.apache.storm.redis.trident.state.RedisClusterStateUpdater;
 import org.apache.storm.redis.common.config.JedisClusterConfig;
-import storm.trident.Stream;
-import storm.trident.TridentState;
-import storm.trident.TridentTopology;
-import storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentState;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.testing.FixedBatchSpout;
 
 import java.net.InetSocketAddress;
 import java.util.HashSet;

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java
----------------------------------------------------------------------
diff --git 
a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java
 
b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java
index beb4b5f..fafb4e0 100644
--- 
a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java
+++ 
b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java
@@ -17,23 +17,23 @@
  */
 package org.apache.storm.redis.trident;
 
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.StormSubmitter;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
 import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
 import org.apache.storm.redis.common.mapper.TupleMapper;
 import org.apache.storm.redis.trident.state.RedisClusterMapState;
 import org.apache.storm.redis.common.config.JedisClusterConfig;
-import storm.trident.Stream;
-import storm.trident.TridentState;
-import storm.trident.TridentTopology;
-import storm.trident.operation.builtin.MapGet;
-import storm.trident.operation.builtin.Sum;
-import storm.trident.state.StateFactory;
-import storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentState;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.builtin.MapGet;
+import org.apache.storm.trident.operation.builtin.Sum;
+import org.apache.storm.trident.state.StateFactory;
+import org.apache.storm.trident.testing.FixedBatchSpout;
 
 import java.net.InetSocketAddress;
 import java.util.HashSet;

Reply via email to