http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java 
b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
deleted file mode 100644
index 52cdde8..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
+++ /dev/null
@@ -1,275 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-import backtype.storm.metric.api.IMetric;
-import backtype.storm.utils.Utils;
-import com.google.common.base.Preconditions;
-import kafka.api.FetchRequest;
-import kafka.api.FetchRequestBuilder;
-import kafka.api.PartitionOffsetRequestInfo;
-import kafka.common.TopicAndPartition;
-import kafka.javaapi.FetchResponse;
-import kafka.javaapi.OffsetRequest;
-import kafka.javaapi.consumer.SimpleConsumer;
-import kafka.javaapi.message.ByteBufferMessageSet;
-import kafka.message.Message;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import storm.kafka.trident.GlobalPartitionInformation;
-import storm.kafka.trident.IBrokerReader;
-import storm.kafka.trident.StaticBrokerReader;
-import storm.kafka.trident.ZkBrokerReader;
-
-import java.io.IOException;
-import java.net.ConnectException;
-import java.net.SocketTimeoutException;
-import java.nio.ByteBuffer;
-import java.nio.channels.UnresolvedAddressException;
-import java.util.*;
-
-
-public class KafkaUtils {
-
-    public static final Logger LOG = LoggerFactory.getLogger(KafkaUtils.class);
-    private static final int NO_OFFSET = -5;
-
-
-    public static IBrokerReader makeBrokerReader(Map stormConf, KafkaConfig 
conf) {
-        if (conf.hosts instanceof StaticHosts) {
-            return new StaticBrokerReader(conf.topic, ((StaticHosts) 
conf.hosts).getPartitionInformation());
-        } else {
-            return new ZkBrokerReader(stormConf, conf.topic, (ZkHosts) 
conf.hosts);
-        }
-    }
-
-
-    public static long getOffset(SimpleConsumer consumer, String topic, int 
partition, KafkaConfig config) {
-        long startOffsetTime = config.startOffsetTime;
-        return getOffset(consumer, topic, partition, startOffsetTime);
-    }
-
-    public static long getOffset(SimpleConsumer consumer, String topic, int 
partition, long startOffsetTime) {
-        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, 
partition);
-        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new 
HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
-        requestInfo.put(topicAndPartition, new 
PartitionOffsetRequestInfo(startOffsetTime, 1));
-        OffsetRequest request = new OffsetRequest(
-                requestInfo, kafka.api.OffsetRequest.CurrentVersion(), 
consumer.clientId());
-
-        long[] offsets = consumer.getOffsetsBefore(request).offsets(topic, 
partition);
-        if (offsets.length > 0) {
-            return offsets[0];
-        } else {
-            return NO_OFFSET;
-        }
-    }
-
-    public static class KafkaOffsetMetric implements IMetric {
-        Map<Partition, Long> _partitionToOffset = new HashMap<Partition, 
Long>();
-        Set<Partition> _partitions;
-        DynamicPartitionConnections _connections;
-
-        public KafkaOffsetMetric(DynamicPartitionConnections connections) {
-            _connections = connections;
-        }
-
-        public void setLatestEmittedOffset(Partition partition, long offset) {
-            _partitionToOffset.put(partition, offset);
-        }
-
-        private class TopicMetrics {
-            long totalSpoutLag = 0;
-            long totalEarliestTimeOffset = 0;
-            long totalLatestTimeOffset = 0;
-            long totalLatestEmittedOffset = 0;
-        }
-
-        @Override
-        public Object getValueAndReset() {
-            try {
-                HashMap ret = new HashMap();
-                if (_partitions != null && _partitions.size() == 
_partitionToOffset.size()) {
-                    Map<String,TopicMetrics> topicMetricsMap = new 
TreeMap<String, TopicMetrics>();
-                    for (Map.Entry<Partition, Long> e : 
_partitionToOffset.entrySet()) {
-                        Partition partition = e.getKey();
-                        SimpleConsumer consumer = 
_connections.getConnection(partition);
-                        if (consumer == null) {
-                            LOG.warn("partitionToOffset contains partition not 
found in _connections. Stale partition data?");
-                            return null;
-                        }
-                        long latestTimeOffset = getOffset(consumer, 
partition.topic, partition.partition, kafka.api.OffsetRequest.LatestTime());
-                        long earliestTimeOffset = getOffset(consumer, 
partition.topic, partition.partition, kafka.api.OffsetRequest.EarliestTime());
-                        if (latestTimeOffset == KafkaUtils.NO_OFFSET) {
-                            LOG.warn("No data found in Kafka Partition " + 
partition.getId());
-                            return null;
-                        }
-                        long latestEmittedOffset = e.getValue();
-                        long spoutLag = latestTimeOffset - latestEmittedOffset;
-                        String topic = partition.topic;
-                        String metricPath = partition.getId();
-                        //Handle the case where Partition Path Id does not 
contain topic name Partition.getId() == "partition_" + partition
-                        if (!metricPath.startsWith(topic + "/")) {
-                            metricPath = topic + "/" + metricPath;
-                        }
-                        ret.put(metricPath + "/" + "spoutLag", spoutLag);
-                        ret.put(metricPath + "/" + "earliestTimeOffset", 
earliestTimeOffset);
-                        ret.put(metricPath + "/" + "latestTimeOffset", 
latestTimeOffset);
-                        ret.put(metricPath + "/" + "latestEmittedOffset", 
latestEmittedOffset);
-
-                        if (!topicMetricsMap.containsKey(partition.topic)) {
-                            topicMetricsMap.put(partition.topic,new 
TopicMetrics());
-                        }
-
-                        TopicMetrics topicMetrics = 
topicMetricsMap.get(partition.topic);
-                        topicMetrics.totalSpoutLag += spoutLag;
-                        topicMetrics.totalEarliestTimeOffset += 
earliestTimeOffset;
-                        topicMetrics.totalLatestTimeOffset += latestTimeOffset;
-                        topicMetrics.totalLatestEmittedOffset += 
latestEmittedOffset;
-                    }
-
-                    for(Map.Entry<String, TopicMetrics> e : 
topicMetricsMap.entrySet()) {
-                        String topic = e.getKey();
-                        TopicMetrics topicMetrics = e.getValue();
-                        ret.put(topic + "/" + "totalSpoutLag", 
topicMetrics.totalSpoutLag);
-                        ret.put(topic + "/" + "totalEarliestTimeOffset", 
topicMetrics.totalEarliestTimeOffset);
-                        ret.put(topic + "/" + "totalLatestTimeOffset", 
topicMetrics.totalLatestTimeOffset);
-                        ret.put(topic + "/" + "totalLatestEmittedOffset", 
topicMetrics.totalLatestEmittedOffset);
-                    }
-
-                    return ret;
-                } else {
-                    LOG.info("Metrics Tick: Not enough data to calculate spout 
lag.");
-                }
-            } catch (Throwable t) {
-                LOG.warn("Metrics Tick: Exception when computing kafkaOffset 
metric.", t);
-            }
-            return null;
-        }
-
-        public void refreshPartitions(Set<Partition> partitions) {
-            _partitions = partitions;
-            Iterator<Partition> it = _partitionToOffset.keySet().iterator();
-            while (it.hasNext()) {
-                if (!partitions.contains(it.next())) {
-                    it.remove();
-                }
-            }
-        }
-    }
-
-    public static ByteBufferMessageSet fetchMessages(KafkaConfig config, 
SimpleConsumer consumer, Partition partition, long offset)
-            throws TopicOffsetOutOfRangeException, 
FailedFetchException,RuntimeException {
-        ByteBufferMessageSet msgs = null;
-        String topic = partition.topic;
-        int partitionId = partition.partition;
-        FetchRequestBuilder builder = new FetchRequestBuilder();
-        FetchRequest fetchRequest = builder.addFetch(topic, partitionId, 
offset, config.fetchSizeBytes).
-                clientId(config.clientId).maxWait(config.fetchMaxWait).build();
-        FetchResponse fetchResponse;
-        try {
-            fetchResponse = consumer.fetch(fetchRequest);
-        } catch (Exception e) {
-            if (e instanceof ConnectException ||
-                    e instanceof SocketTimeoutException ||
-                    e instanceof IOException ||
-                    e instanceof UnresolvedAddressException
-                    ) {
-                LOG.warn("Network error when fetching messages:", e);
-                throw new FailedFetchException(e);
-            } else {
-                throw new RuntimeException(e);
-            }
-        }
-        if (fetchResponse.hasError()) {
-            KafkaError error = 
KafkaError.getError(fetchResponse.errorCode(topic, partitionId));
-            if (error.equals(KafkaError.OFFSET_OUT_OF_RANGE) && 
config.useStartOffsetTimeIfOffsetOutOfRange) {
-                String msg = partition + " Got fetch request with offset out 
of range: [" + offset + "]";
-                LOG.warn(msg);
-                throw new TopicOffsetOutOfRangeException(msg);
-            } else {
-                String message = "Error fetching data from [" + partition + "] 
for topic [" + topic + "]: [" + error + "]";
-                LOG.error(message);
-                throw new FailedFetchException(message);
-            }
-        } else {
-            msgs = fetchResponse.messageSet(topic, partitionId);
-        }
-        return msgs;
-    }
-
-
-    public static Iterable<List<Object>> generateTuples(KafkaConfig 
kafkaConfig, Message msg, String topic) {
-        Iterable<List<Object>> tups;
-        ByteBuffer payload = msg.payload();
-        if (payload == null) {
-            return null;
-        }
-        ByteBuffer key = msg.key();
-        if (key != null && kafkaConfig.scheme instanceof 
KeyValueSchemeAsMultiScheme) {
-            tups = ((KeyValueSchemeAsMultiScheme) 
kafkaConfig.scheme).deserializeKeyAndValue(key, payload);
-        } else {
-            if (kafkaConfig.scheme instanceof StringMultiSchemeWithTopic) {
-                tups = 
((StringMultiSchemeWithTopic)kafkaConfig.scheme).deserializeWithTopic(topic, 
payload);
-            } else {
-                tups = kafkaConfig.scheme.deserialize(payload);
-            }
-        }
-        return tups;
-    }
-    
-    public static Iterable<List<Object>> 
generateTuples(MessageMetadataSchemeAsMultiScheme scheme, Message msg, 
Partition partition, long offset) {
-        ByteBuffer payload = msg.payload();
-        if (payload == null) {
-            return null;
-        }
-        return scheme.deserializeMessageWithMetadata(payload, partition, 
offset);
-    }
-
-
-    public static List<Partition> 
calculatePartitionsForTask(List<GlobalPartitionInformation> partitons, int 
totalTasks, int taskIndex) {
-        Preconditions.checkArgument(taskIndex < totalTasks, "task index must 
be less that total tasks");
-        List<Partition> taskPartitions = new ArrayList<Partition>();
-        List<Partition> partitions = new ArrayList<Partition>();
-        for(GlobalPartitionInformation partitionInformation : partitons) {
-            partitions.addAll(partitionInformation.getOrderedPartitions());
-        }
-        int numPartitions = partitions.size();
-        if (numPartitions < totalTasks) {
-            LOG.warn("there are more tasks than partitions (tasks: " + 
totalTasks + "; partitions: " + numPartitions + "), some tasks will be idle");
-        }
-        for (int i = taskIndex; i < numPartitions; i += totalTasks) {
-            Partition taskPartition = partitions.get(i);
-            taskPartitions.add(taskPartition);
-        }
-        logPartitionMapping(totalTasks, taskIndex, taskPartitions);
-        return taskPartitions;
-    }
-
-    private static void logPartitionMapping(int totalTasks, int taskIndex, 
List<Partition> taskPartitions) {
-        String taskPrefix = taskId(taskIndex, totalTasks);
-        if (taskPartitions.isEmpty()) {
-            LOG.warn(taskPrefix + "no partitions assigned");
-        } else {
-            LOG.info(taskPrefix + "assigned " + taskPartitions);
-        }
-    }
-
-    public static String taskId(int taskIndex, int totalTasks) {
-        return "Task [" + (taskIndex + 1) + "/" + totalTasks + "] ";
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/KeyValueScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KeyValueScheme.java 
b/external/storm-kafka/src/jvm/storm/kafka/KeyValueScheme.java
deleted file mode 100644
index 7c0dc6c..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/KeyValueScheme.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-import backtype.storm.spout.Scheme;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-
-public interface KeyValueScheme extends Scheme {
-    List<Object> deserializeKeyAndValue(ByteBuffer key, ByteBuffer value);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/KeyValueSchemeAsMultiScheme.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/storm/kafka/KeyValueSchemeAsMultiScheme.java 
b/external/storm-kafka/src/jvm/storm/kafka/KeyValueSchemeAsMultiScheme.java
deleted file mode 100644
index d27ae7e..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/KeyValueSchemeAsMultiScheme.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-import backtype.storm.spout.SchemeAsMultiScheme;
-
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.List;
-
-public class KeyValueSchemeAsMultiScheme extends SchemeAsMultiScheme {
-
-    public KeyValueSchemeAsMultiScheme(KeyValueScheme scheme) {
-        super(scheme);
-    }
-
-    public Iterable<List<Object>> deserializeKeyAndValue(final ByteBuffer key, 
final ByteBuffer value) {
-        List<Object> o = ((KeyValueScheme)scheme).deserializeKeyAndValue(key, 
value);
-        if(o == null) return null;
-        else return Arrays.asList(o);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataScheme.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataScheme.java 
b/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataScheme.java
deleted file mode 100644
index 62f652f..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataScheme.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-import backtype.storm.spout.Scheme;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-
-public interface MessageMetadataScheme extends Scheme {
-    List<Object> deserializeMessageWithMetadata(ByteBuffer message, Partition 
partition, long offset);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataSchemeAsMultiScheme.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataSchemeAsMultiScheme.java
 
b/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataSchemeAsMultiScheme.java
deleted file mode 100644
index f23a101..0000000
--- 
a/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataSchemeAsMultiScheme.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.List;
-
-import backtype.storm.spout.SchemeAsMultiScheme;
-
-public class MessageMetadataSchemeAsMultiScheme extends SchemeAsMultiScheme {
-    private static final long serialVersionUID = -7172403703813625116L;
-
-    public MessageMetadataSchemeAsMultiScheme(MessageMetadataScheme scheme) {
-        super(scheme);
-    }
-
-    public Iterable<List<Object>> deserializeMessageWithMetadata(ByteBuffer 
message, Partition partition, long offset) {
-        List<Object> o = ((MessageMetadataScheme) 
scheme).deserializeMessageWithMetadata(message, partition, offset);
-        if (o == null) {
-            return null;
-        } else {
-            return Arrays.asList(o);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/Partition.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/Partition.java 
b/external/storm-kafka/src/jvm/storm/kafka/Partition.java
deleted file mode 100644
index 5f683ef..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/Partition.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-import com.google.common.base.Objects;
-import storm.trident.spout.ISpoutPartition;
-
-
-public class Partition implements ISpoutPartition {
-
-    public Broker host;
-    public int partition;
-    public String topic;
-
-    //Flag to keep the Partition Path Id backward compatible with Old 
implementation of Partition.getId() == "partition_" + partition
-    private Boolean bUseTopicNameForPartitionPathId;
-
-    // for kryo compatibility
-    private Partition() {
-       
-    }
-    public Partition(Broker host, String topic, int partition) {
-        this.topic = topic;
-        this.host = host;
-        this.partition = partition;
-        this.bUseTopicNameForPartitionPathId = false;
-    }
-    
-    public Partition(Broker host, String topic, int partition,Boolean 
bUseTopicNameForPartitionPathId) {
-        this.topic = topic;
-        this.host = host;
-        this.partition = partition;
-        this.bUseTopicNameForPartitionPathId = bUseTopicNameForPartitionPathId;
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hashCode(host, topic, partition);
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (this == obj) {
-            return true;
-        }
-        if (obj == null || getClass() != obj.getClass()) {
-            return false;
-        }
-        final Partition other = (Partition) obj;
-        return Objects.equal(this.host, other.host) && 
Objects.equal(this.topic, other.topic) && Objects.equal(this.partition, 
other.partition);
-    }
-
-    @Override
-    public String toString() {
-        return "Partition{" +
-                "host=" + host +
-                ", topic=" + topic +
-                ", partition=" + partition +
-                '}';
-    }
-
-    @Override
-    public String getId() {
-        if (bUseTopicNameForPartitionPathId) {
-            return  topic  + "/partition_" + partition;
-        } else {
-            //Keep the Partition Id backward compatible with Old 
implementation of Partition.getId() == "partition_" + partition
-            return "partition_" + partition;
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/PartitionCoordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/PartitionCoordinator.java 
b/external/storm-kafka/src/jvm/storm/kafka/PartitionCoordinator.java
deleted file mode 100644
index 9cfed60..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/PartitionCoordinator.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-import java.util.List;
-
-public interface PartitionCoordinator {
-    List<PartitionManager> getMyManagedPartitions();
-
-    PartitionManager getManager(Partition partition);
-
-    void refresh();
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java 
b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
deleted file mode 100644
index ff02e22..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
+++ /dev/null
@@ -1,316 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-import backtype.storm.Config;
-import backtype.storm.metric.api.CombinedMetric;
-import backtype.storm.metric.api.CountMetric;
-import backtype.storm.metric.api.MeanReducer;
-import backtype.storm.metric.api.ReducedMetric;
-import backtype.storm.spout.SpoutOutputCollector;
-
-import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableMap;
-
-import kafka.javaapi.consumer.SimpleConsumer;
-import kafka.javaapi.message.ByteBufferMessageSet;
-import kafka.message.MessageAndOffset;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import storm.kafka.KafkaSpout.EmitState;
-import storm.kafka.trident.MaxMetric;
-
-import java.util.*;
-
-public class PartitionManager {
-    public static final Logger LOG = 
LoggerFactory.getLogger(PartitionManager.class);
-
-    private final CombinedMetric _fetchAPILatencyMax;
-    private final ReducedMetric _fetchAPILatencyMean;
-    private final CountMetric _fetchAPICallCount;
-    private final CountMetric _fetchAPIMessageCount;
-    Long _emittedToOffset;
-    // _pending key = Kafka offset, value = time at which the message was 
first submitted to the topology
-    private SortedMap<Long,Long> _pending = new TreeMap<Long,Long>();
-    private final FailedMsgRetryManager _failedMsgRetryManager;
-
-    // retryRecords key = Kafka offset, value = retry info for the given 
message
-    Long _committedTo;
-    LinkedList<MessageAndOffset> _waitingToEmit = new 
LinkedList<MessageAndOffset>();
-    Partition _partition;
-    SpoutConfig _spoutConfig;
-    String _topologyInstanceId;
-    SimpleConsumer _consumer;
-    DynamicPartitionConnections _connections;
-    ZkState _state;
-    Map _stormConf;
-    long numberFailed, numberAcked;
-    public PartitionManager(DynamicPartitionConnections connections, String 
topologyInstanceId, ZkState state, Map stormConf, SpoutConfig spoutConfig, 
Partition id) {
-        _partition = id;
-        _connections = connections;
-        _spoutConfig = spoutConfig;
-        _topologyInstanceId = topologyInstanceId;
-        _consumer = connections.register(id.host, id.topic, id.partition);
-        _state = state;
-        _stormConf = stormConf;
-        numberAcked = numberFailed = 0;
-
-        _failedMsgRetryManager = new 
ExponentialBackoffMsgRetryManager(_spoutConfig.retryInitialDelayMs,
-                                                                           
_spoutConfig.retryDelayMultiplier,
-                                                                           
_spoutConfig.retryDelayMaxMs);
-
-        String jsonTopologyId = null;
-        Long jsonOffset = null;
-        String path = committedPath();
-        try {
-            Map<Object, Object> json = _state.readJSON(path);
-            LOG.info("Read partition information from: " + path +  "  --> " + 
json );
-            if (json != null) {
-                jsonTopologyId = (String) ((Map<Object, Object>) 
json.get("topology")).get("id");
-                jsonOffset = (Long) json.get("offset");
-            }
-        } catch (Throwable e) {
-            LOG.warn("Error reading and/or parsing at ZkNode: " + path, e);
-        }
-
-        String topic = _partition.topic;
-        Long currentOffset = KafkaUtils.getOffset(_consumer, topic, 
id.partition, spoutConfig);
-
-        if (jsonTopologyId == null || jsonOffset == null) { // failed to parse 
JSON?
-            _committedTo = currentOffset;
-            LOG.info("No partition information found, using configuration to 
determine offset");
-        } else if (!topologyInstanceId.equals(jsonTopologyId) && 
spoutConfig.ignoreZkOffsets) {
-            _committedTo = KafkaUtils.getOffset(_consumer, topic, 
id.partition, spoutConfig.startOffsetTime);
-            LOG.info("Topology change detected and ignore zookeeper offsets 
set to true, using configuration to determine offset");
-        } else {
-            _committedTo = jsonOffset;
-            LOG.info("Read last commit offset from zookeeper: " + _committedTo 
+ "; old topology_id: " + jsonTopologyId + " - new topology_id: " + 
topologyInstanceId );
-        }
-
-        if (currentOffset - _committedTo > spoutConfig.maxOffsetBehind || 
_committedTo <= 0) {
-            LOG.info("Last commit offset from zookeeper: " + _committedTo);
-            Long lastCommittedOffset = _committedTo;
-            _committedTo = currentOffset;
-            LOG.info("Commit offset " + lastCommittedOffset + " is more than " 
+
-                    spoutConfig.maxOffsetBehind + " behind latest offset " + 
currentOffset + ", resetting to startOffsetTime=" + 
spoutConfig.startOffsetTime);
-        }
-
-        LOG.info("Starting Kafka " + _consumer.host() + ":" + id.partition + " 
from offset " + _committedTo);
-        _emittedToOffset = _committedTo;
-
-        _fetchAPILatencyMax = new CombinedMetric(new MaxMetric());
-        _fetchAPILatencyMean = new ReducedMetric(new MeanReducer());
-        _fetchAPICallCount = new CountMetric();
-        _fetchAPIMessageCount = new CountMetric();
-    }
-
-    public Map getMetricsDataMap() {
-        Map ret = new HashMap();
-        ret.put(_partition + "/fetchAPILatencyMax", 
_fetchAPILatencyMax.getValueAndReset());
-        ret.put(_partition + "/fetchAPILatencyMean", 
_fetchAPILatencyMean.getValueAndReset());
-        ret.put(_partition + "/fetchAPICallCount", 
_fetchAPICallCount.getValueAndReset());
-        ret.put(_partition + "/fetchAPIMessageCount", 
_fetchAPIMessageCount.getValueAndReset());
-        return ret;
-    }
-
-    //returns false if it's reached the end of current batch
-    public EmitState next(SpoutOutputCollector collector) {
-        if (_waitingToEmit.isEmpty()) {
-            fill();
-        }
-        while (true) {
-            MessageAndOffset toEmit = _waitingToEmit.pollFirst();
-            if (toEmit == null) {
-                return EmitState.NO_EMITTED;
-            }
-
-            Iterable<List<Object>> tups;
-            if (_spoutConfig.scheme instanceof 
MessageMetadataSchemeAsMultiScheme) {
-                tups = 
KafkaUtils.generateTuples((MessageMetadataSchemeAsMultiScheme) 
_spoutConfig.scheme, toEmit.message(), _partition, toEmit.offset());
-            } else {
-                tups = KafkaUtils.generateTuples(_spoutConfig, 
toEmit.message(), _partition.topic);
-            }
-            
-            if ((tups != null) && tups.iterator().hasNext()) {
-               if (!Strings.isNullOrEmpty(_spoutConfig.outputStreamId)) {
-                    for (List<Object> tup : tups) {
-                        collector.emit(_spoutConfig.topic, tup, new 
KafkaMessageId(_partition, toEmit.offset()));
-                    }
-                } else {
-                    for (List<Object> tup : tups) {
-                        collector.emit(tup, new KafkaMessageId(_partition, 
toEmit.offset()));
-                    }
-                }
-                break;
-            } else {
-                ack(toEmit.offset());
-            }
-        }
-        if (!_waitingToEmit.isEmpty()) {
-            return EmitState.EMITTED_MORE_LEFT;
-        } else {
-            return EmitState.EMITTED_END;
-        }
-    }
-
-
-    private void fill() {
-        long start = System.nanoTime();
-        Long offset;
-
-        // Are there failed tuples? If so, fetch those first.
-        offset = this._failedMsgRetryManager.nextFailedMessageToRetry();
-        final boolean processingNewTuples = (offset == null);
-        if (processingNewTuples) {
-            offset = _emittedToOffset;
-        }
-
-        ByteBufferMessageSet msgs = null;
-        try {
-            msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, 
_partition, offset);
-        } catch (TopicOffsetOutOfRangeException e) {
-            _emittedToOffset = KafkaUtils.getOffset(_consumer, 
_partition.topic, _partition.partition, kafka.api.OffsetRequest.EarliestTime());
-            LOG.warn("{} Using new offset: {}", _partition.partition, 
_emittedToOffset);
-            // fetch failed, so don't update the metrics
-            
-            //fix bug [STORM-643] : remove outdated failed offsets
-            if (!processingNewTuples) {
-                // For the case of EarliestTime it would be better to discard
-                // all the failed offsets, that are earlier than actual 
EarliestTime
-                // offset, since they are anyway not there.
-                // These calls to broker API will be then saved.
-                Set<Long> omitted = 
this._failedMsgRetryManager.clearInvalidMessages(_emittedToOffset);
-                
-                LOG.warn("Removing the failed offsets that are out of range: 
{}", omitted);
-            }
-            
-            return;
-        }
-        long end = System.nanoTime();
-        long millis = (end - start) / 1000000;
-        _fetchAPILatencyMax.update(millis);
-        _fetchAPILatencyMean.update(millis);
-        _fetchAPICallCount.incr();
-        if (msgs != null) {
-            int numMessages = 0;
-
-            for (MessageAndOffset msg : msgs) {
-                final Long cur_offset = msg.offset();
-                if (cur_offset < offset) {
-                    // Skip any old offsets.
-                    continue;
-                }
-                if (processingNewTuples || 
this._failedMsgRetryManager.shouldRetryMsg(cur_offset)) {
-                    numMessages += 1;
-                    if (!_pending.containsKey(cur_offset)) {
-                        _pending.put(cur_offset, System.currentTimeMillis());
-                    }
-                    _waitingToEmit.add(msg);
-                    _emittedToOffset = Math.max(msg.nextOffset(), 
_emittedToOffset);
-                    if (_failedMsgRetryManager.shouldRetryMsg(cur_offset)) {
-                        this._failedMsgRetryManager.retryStarted(cur_offset);
-                    }
-                }
-            }
-            _fetchAPIMessageCount.incrBy(numMessages);
-        }
-    }
-
-    public void ack(Long offset) {
-        if (!_pending.isEmpty() && _pending.firstKey() < offset - 
_spoutConfig.maxOffsetBehind) {
-            // Too many things pending!
-            _pending.headMap(offset - _spoutConfig.maxOffsetBehind).clear();
-        }
-        _pending.remove(offset);
-        this._failedMsgRetryManager.acked(offset);
-        numberAcked++;
-    }
-
-    public void fail(Long offset) {
-        if (offset < _emittedToOffset - _spoutConfig.maxOffsetBehind) {
-            LOG.info(
-                    "Skipping failed tuple at offset=" + offset +
-                            " because it's more than maxOffsetBehind=" + 
_spoutConfig.maxOffsetBehind +
-                            " behind _emittedToOffset=" + _emittedToOffset
-            );
-        } else {
-            LOG.debug("failing at offset={} with _pending.size()={} pending 
and _emittedToOffset={}", offset, _pending.size(), _emittedToOffset);
-            numberFailed++;
-            if (numberAcked == 0 && numberFailed > 
_spoutConfig.maxOffsetBehind) {
-                throw new RuntimeException("Too many tuple failures");
-            }
-
-            this._failedMsgRetryManager.failed(offset);
-        }
-    }
-
-    public void commit() {
-        long lastCompletedOffset = lastCompletedOffset();
-        if (_committedTo != lastCompletedOffset) {
-            LOG.debug("Writing last completed offset ({}) to ZK for {} for 
topology: {}", lastCompletedOffset, _partition, _topologyInstanceId);
-            Map<Object, Object> data = (Map<Object, Object>) 
ImmutableMap.builder()
-                    .put("topology", ImmutableMap.of("id", _topologyInstanceId,
-                            "name", _stormConf.get(Config.TOPOLOGY_NAME)))
-                    .put("offset", lastCompletedOffset)
-                    .put("partition", _partition.partition)
-                    .put("broker", ImmutableMap.of("host", 
_partition.host.host,
-                            "port", _partition.host.port))
-                    .put("topic", _partition.topic).build();
-            _state.writeJSON(committedPath(), data);
-
-            _committedTo = lastCompletedOffset;
-            LOG.debug("Wrote last completed offset ({}) to ZK for {} for 
topology: {}", lastCompletedOffset, _partition, _topologyInstanceId);
-        } else {
-            LOG.debug("No new offset for {} for topology: {}", _partition, 
_topologyInstanceId);
-        }
-    }
-
-    private String committedPath() {
-        return _spoutConfig.zkRoot + "/" + _spoutConfig.id + "/" + 
_partition.getId();
-    }
-
-    public long lastCompletedOffset() {
-        if (_pending.isEmpty()) {
-            return _emittedToOffset;
-        } else {
-            return _pending.firstKey();
-        }
-    }
-
-    public Partition getPartition() {
-        return _partition;
-    }
-
-    public void close() {
-        commit();
-        _connections.unregister(_partition.host, _partition.topic , 
_partition.partition);
-    }
-
-    static class KafkaMessageId {
-        public Partition partition;
-        public long offset;
-
-
-        public KafkaMessageId(Partition partition, long offset) {
-            this.partition = partition;
-            this.offset = offset;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java 
b/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java
deleted file mode 100644
index d125ebb..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-import java.io.Serializable;
-import java.util.List;
-
-
-public class SpoutConfig extends KafkaConfig implements Serializable {
-    public List<String> zkServers = null;
-    public Integer zkPort = null;
-    public String zkRoot = null;
-    public String id = null;
-
-    public String outputStreamId;
-
-    // setting for how often to save the current kafka offset to ZooKeeper
-    public long stateUpdateIntervalMs = 2000;
-
-    // Exponential back-off retry settings.  These are used when retrying 
messages after a bolt
-    // calls OutputCollector.fail().
-    public long retryInitialDelayMs = 0;
-    public double retryDelayMultiplier = 1.0;
-    public long retryDelayMaxMs = 60 * 1000;
-
-    public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String 
id) {
-        super(hosts, topic);
-        this.zkRoot = zkRoot;
-        this.id = id;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/StaticCoordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/StaticCoordinator.java 
b/external/storm-kafka/src/jvm/storm/kafka/StaticCoordinator.java
deleted file mode 100644
index 4b20d84..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/StaticCoordinator.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-import storm.kafka.trident.GlobalPartitionInformation;
-
-import java.util.*;
-
-
-public class StaticCoordinator implements PartitionCoordinator {
-    Map<Partition, PartitionManager> _managers = new HashMap<Partition, 
PartitionManager>();
-    List<PartitionManager> _allManagers = new ArrayList();
-
-    public StaticCoordinator(DynamicPartitionConnections connections, Map 
stormConf, SpoutConfig config, ZkState state, int taskIndex, int totalTasks, 
String topologyInstanceId) {
-        StaticHosts hosts = (StaticHosts) config.hosts;
-        List<GlobalPartitionInformation> partitions = new 
ArrayList<GlobalPartitionInformation>();
-        partitions.add(hosts.getPartitionInformation());
-        List<Partition> myPartitions = 
KafkaUtils.calculatePartitionsForTask(partitions, totalTasks, taskIndex);
-        for (Partition myPartition : myPartitions) {
-            _managers.put(myPartition, new PartitionManager(connections, 
topologyInstanceId, state, stormConf, config, myPartition));
-        }
-        _allManagers = new ArrayList(_managers.values());
-    }
-
-    @Override
-    public List<PartitionManager> getMyManagedPartitions() {
-        return _allManagers;
-    }
-
-    public PartitionManager getManager(Partition partition) {
-        return _managers.get(partition);
-    }
-
-    @Override
-    public void refresh() { return; }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/StaticHosts.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/StaticHosts.java 
b/external/storm-kafka/src/jvm/storm/kafka/StaticHosts.java
deleted file mode 100644
index bee7118..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/StaticHosts.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-import storm.kafka.trident.GlobalPartitionInformation;
-
-/**
- * Date: 11/05/2013
- * Time: 14:43
- */
-public class StaticHosts implements BrokerHosts {
-
-
-    private GlobalPartitionInformation partitionInformation;
-
-    public StaticHosts(GlobalPartitionInformation partitionInformation) {
-        this.partitionInformation = partitionInformation;
-    }
-
-    public GlobalPartitionInformation getPartitionInformation() {
-        return partitionInformation;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/StaticPartitionConnections.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/storm/kafka/StaticPartitionConnections.java 
b/external/storm-kafka/src/jvm/storm/kafka/StaticPartitionConnections.java
deleted file mode 100644
index 1353b6c..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/StaticPartitionConnections.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-import kafka.javaapi.consumer.SimpleConsumer;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class StaticPartitionConnections {
-    Map<Integer, SimpleConsumer> _kafka = new HashMap<Integer, 
SimpleConsumer>();
-    KafkaConfig _config;
-    StaticHosts hosts;
-
-    public StaticPartitionConnections(KafkaConfig conf) {
-        _config = conf;
-        if (!(conf.hosts instanceof StaticHosts)) {
-            throw new RuntimeException("Must configure with static hosts");
-        }
-        this.hosts = (StaticHosts) conf.hosts;
-    }
-
-    public SimpleConsumer getConsumer(int partition) {
-        if (!_kafka.containsKey(partition)) {
-            Broker hp = 
hosts.getPartitionInformation().getBrokerFor(partition);
-            _kafka.put(partition, new SimpleConsumer(hp.host, hp.port, 
_config.socketTimeoutMs, _config.bufferSizeBytes, _config.clientId));
-
-        }
-        return _kafka.get(partition);
-    }
-
-    public void close() {
-        for (SimpleConsumer consumer : _kafka.values()) {
-            consumer.close();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/StringKeyValueScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/StringKeyValueScheme.java 
b/external/storm-kafka/src/jvm/storm/kafka/StringKeyValueScheme.java
deleted file mode 100644
index 6f6d339..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/StringKeyValueScheme.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-import backtype.storm.tuple.Values;
-import com.google.common.collect.ImmutableMap;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-
-public class StringKeyValueScheme extends StringScheme implements 
KeyValueScheme {
-
-    @Override
-    public List<Object> deserializeKeyAndValue(ByteBuffer key, ByteBuffer 
value) {
-        if ( key == null ) {
-            return deserialize(value);
-        }
-        String keyString = StringScheme.deserializeString(key);
-        String valueString = StringScheme.deserializeString(value);
-        return new Values(ImmutableMap.of(keyString, valueString));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/StringMessageAndMetadataScheme.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/storm/kafka/StringMessageAndMetadataScheme.java 
b/external/storm-kafka/src/jvm/storm/kafka/StringMessageAndMetadataScheme.java
deleted file mode 100644
index 1708b97..0000000
--- 
a/external/storm-kafka/src/jvm/storm/kafka/StringMessageAndMetadataScheme.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-
-public class StringMessageAndMetadataScheme extends StringScheme implements 
MessageMetadataScheme {
-    private static final long serialVersionUID = -5441841920447947374L;
-
-    public static final String STRING_SCHEME_PARTITION_KEY = "partition";
-    public static final String STRING_SCHEME_OFFSET = "offset";
-
-    @Override
-    public List<Object> deserializeMessageWithMetadata(ByteBuffer message, 
Partition partition, long offset) {
-        String stringMessage = StringScheme.deserializeString(message);
-        return new Values(stringMessage, partition.partition, offset);
-    }
-
-    @Override
-    public Fields getOutputFields() {
-        return new Fields(STRING_SCHEME_KEY, STRING_SCHEME_PARTITION_KEY, 
STRING_SCHEME_OFFSET);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/StringMultiSchemeWithTopic.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/storm/kafka/StringMultiSchemeWithTopic.java 
b/external/storm-kafka/src/jvm/storm/kafka/StringMultiSchemeWithTopic.java
deleted file mode 100644
index 1e7f216..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/StringMultiSchemeWithTopic.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-import backtype.storm.spout.MultiScheme;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-import sun.reflect.generics.reflectiveObjects.NotImplementedException;
-
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.List;
-
-public class StringMultiSchemeWithTopic
-        implements MultiScheme {
-    public static final String STRING_SCHEME_KEY = "str";
-
-    public static final String TOPIC_KEY = "topic";
-
-    @Override
-    public Iterable<List<Object>> deserialize(ByteBuffer bytes) {
-        throw new NotImplementedException();
-    }
-
-    public Iterable<List<Object>> deserializeWithTopic(String topic, 
ByteBuffer bytes) {
-        List<Object> items = new Values(StringScheme.deserializeString(bytes), 
topic);
-        return Collections.singletonList(items);
-    }
-
-    public Fields getOutputFields() {
-        return new Fields(STRING_SCHEME_KEY, TOPIC_KEY);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/StringScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/StringScheme.java 
b/external/storm-kafka/src/jvm/storm/kafka/StringScheme.java
deleted file mode 100644
index 1071e60..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/StringScheme.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-import backtype.storm.spout.Scheme;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.Utils;
-
-import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-import java.util.List;
-
-public class StringScheme implements Scheme {
-    private static final Charset UTF8_CHARSET = StandardCharsets.UTF_8;
-    public static final String STRING_SCHEME_KEY = "str";
-
-    public List<Object> deserialize(ByteBuffer bytes) {
-        return new Values(deserializeString(bytes));
-    }
-
-    public static String deserializeString(ByteBuffer string) {
-        if (string.hasArray()) {
-            int base = string.arrayOffset();
-            return new String(string.array(), base + string.position(), 
string.remaining());
-        } else {
-            return new String(Utils.toByteArray(string), UTF8_CHARSET);
-        }
-    }
-
-    public Fields getOutputFields() {
-        return new Fields(STRING_SCHEME_KEY);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/TopicOffsetOutOfRangeException.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/storm/kafka/TopicOffsetOutOfRangeException.java 
b/external/storm-kafka/src/jvm/storm/kafka/TopicOffsetOutOfRangeException.java
deleted file mode 100644
index 5101a3e..0000000
--- 
a/external/storm-kafka/src/jvm/storm/kafka/TopicOffsetOutOfRangeException.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-public class TopicOffsetOutOfRangeException extends RuntimeException {
-
-    public TopicOffsetOutOfRangeException(String message) {
-        super(message);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/ZkCoordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/ZkCoordinator.java 
b/external/storm-kafka/src/jvm/storm/kafka/ZkCoordinator.java
deleted file mode 100644
index 8650e6f..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/ZkCoordinator.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import storm.kafka.trident.GlobalPartitionInformation;
-
-import java.util.*;
-
-import static storm.kafka.KafkaUtils.taskId;
-
-public class ZkCoordinator implements PartitionCoordinator {
-    public static final Logger LOG = 
LoggerFactory.getLogger(ZkCoordinator.class);
-
-    SpoutConfig _spoutConfig;
-    int _taskIndex;
-    int _totalTasks;
-    String _topologyInstanceId;
-    Map<Partition, PartitionManager> _managers = new HashMap();
-    List<PartitionManager> _cachedList = new ArrayList<PartitionManager>();
-    Long _lastRefreshTime = null;
-    int _refreshFreqMs;
-    DynamicPartitionConnections _connections;
-    DynamicBrokersReader _reader;
-    ZkState _state;
-    Map _stormConf;
-
-    public ZkCoordinator(DynamicPartitionConnections connections, Map 
stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int 
totalTasks, String topologyInstanceId) {
-        this(connections, stormConf, spoutConfig, state, taskIndex, 
totalTasks, topologyInstanceId, buildReader(stormConf, spoutConfig));
-    }
-
-    public ZkCoordinator(DynamicPartitionConnections connections, Map 
stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int 
totalTasks, String topologyInstanceId, DynamicBrokersReader reader) {
-        _spoutConfig = spoutConfig;
-        _connections = connections;
-        _taskIndex = taskIndex;
-        _totalTasks = totalTasks;
-        _topologyInstanceId = topologyInstanceId;
-        _stormConf = stormConf;
-        _state = state;
-        ZkHosts brokerConf = (ZkHosts) spoutConfig.hosts;
-        _refreshFreqMs = brokerConf.refreshFreqSecs * 1000;
-        _reader = reader;
-    }
-
-    private static DynamicBrokersReader buildReader(Map stormConf, SpoutConfig 
spoutConfig) {
-        ZkHosts hosts = (ZkHosts) spoutConfig.hosts;
-        return new DynamicBrokersReader(stormConf, hosts.brokerZkStr, 
hosts.brokerZkPath, spoutConfig.topic);
-    }
-
-    @Override
-    public List<PartitionManager> getMyManagedPartitions() {
-        if (_lastRefreshTime == null || (System.currentTimeMillis() - 
_lastRefreshTime) > _refreshFreqMs) {
-            refresh();
-            _lastRefreshTime = System.currentTimeMillis();
-        }
-        return _cachedList;
-    }
-
-    @Override
-    public void refresh() {
-        try {
-            LOG.info(taskId(_taskIndex, _totalTasks) + "Refreshing partition 
manager connections");
-            List<GlobalPartitionInformation> brokerInfo = 
_reader.getBrokerInfo();
-            List<Partition> mine = 
KafkaUtils.calculatePartitionsForTask(brokerInfo, _totalTasks, _taskIndex);
-
-            Set<Partition> curr = _managers.keySet();
-            Set<Partition> newPartitions = new HashSet<Partition>(mine);
-            newPartitions.removeAll(curr);
-
-            Set<Partition> deletedPartitions = new HashSet<Partition>(curr);
-            deletedPartitions.removeAll(mine);
-
-            LOG.info(taskId(_taskIndex, _totalTasks) + "Deleted partition 
managers: " + deletedPartitions.toString());
-
-            for (Partition id : deletedPartitions) {
-                PartitionManager man = _managers.remove(id);
-                man.close();
-            }
-            LOG.info(taskId(_taskIndex, _totalTasks) + "New partition 
managers: " + newPartitions.toString());
-
-            for (Partition id : newPartitions) {
-                PartitionManager man = new PartitionManager(_connections, 
_topologyInstanceId, _state, _stormConf, _spoutConfig, id);
-                _managers.put(id, man);
-            }
-
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-        _cachedList = new ArrayList<PartitionManager>(_managers.values());
-        LOG.info(taskId(_taskIndex, _totalTasks) + "Finished refreshing");
-    }
-
-    @Override
-    public PartitionManager getManager(Partition partition) {
-        return _managers.get(partition);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/ZkHosts.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/ZkHosts.java 
b/external/storm-kafka/src/jvm/storm/kafka/ZkHosts.java
deleted file mode 100644
index 4e4327d..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/ZkHosts.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-
-public class ZkHosts implements BrokerHosts {
-    private static final String DEFAULT_ZK_PATH = "/brokers";
-
-    public String brokerZkStr = null;
-    public String brokerZkPath = null; // e.g., /kafka/brokers
-    public int refreshFreqSecs = 60;
-
-    public ZkHosts(String brokerZkStr, String brokerZkPath) {
-        this.brokerZkStr = brokerZkStr;
-        this.brokerZkPath = brokerZkPath;
-    }
-
-    public ZkHosts(String brokerZkStr) {
-        this(brokerZkStr, DEFAULT_ZK_PATH);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/ZkState.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/ZkState.java 
b/external/storm-kafka/src/jvm/storm/kafka/ZkState.java
deleted file mode 100644
index e5e67e5..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/ZkState.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-import backtype.storm.Config;
-import backtype.storm.utils.Utils;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.RetryNTimes;
-import org.apache.zookeeper.CreateMode;
-import org.json.simple.JSONValue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.charset.Charset;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class ZkState {
-    public static final Logger LOG = LoggerFactory.getLogger(ZkState.class);
-    CuratorFramework _curator;
-
-    private CuratorFramework newCurator(Map stateConf) throws Exception {
-        Integer port = (Integer) 
stateConf.get(Config.TRANSACTIONAL_ZOOKEEPER_PORT);
-        String serverPorts = "";
-        for (String server : (List<String>) 
stateConf.get(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS)) {
-            serverPorts = serverPorts + server + ":" + port + ",";
-        }
-        return CuratorFrameworkFactory.newClient(serverPorts,
-                
Utils.getInt(stateConf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)),
-                
Utils.getInt(stateConf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT)),
-                new 
RetryNTimes(Utils.getInt(stateConf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)),
-                        
Utils.getInt(stateConf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL))));
-    }
-
-    public CuratorFramework getCurator() {
-        assert _curator != null;
-        return _curator;
-    }
-
-    public ZkState(Map stateConf) {
-        stateConf = new HashMap(stateConf);
-
-        try {
-            _curator = newCurator(stateConf);
-            _curator.start();
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public void writeJSON(String path, Map<Object, Object> data) {
-        LOG.debug("Writing {} the data {}", path, data.toString());
-        writeBytes(path, 
JSONValue.toJSONString(data).getBytes(Charset.forName("UTF-8")));
-    }
-
-    public void writeBytes(String path, byte[] bytes) {
-        try {
-            if (_curator.checkExists().forPath(path) == null) {
-                _curator.create()
-                        .creatingParentsIfNeeded()
-                        .withMode(CreateMode.PERSISTENT)
-                        .forPath(path, bytes);
-            } else {
-                _curator.setData().forPath(path, bytes);
-            }
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public Map<Object, Object> readJSON(String path) {
-        try {
-            byte[] b = readBytes(path);
-            if (b == null) {
-                return null;
-            }
-            return (Map<Object, Object>) JSONValue.parse(new String(b, 
"UTF-8"));
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public byte[] readBytes(String path) {
-        try {
-            if (_curator.checkExists().forPath(path) != null) {
-                return _curator.getData().forPath(path);
-            } else {
-                return null;
-            }
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public void close() {
-        _curator.close();
-        _curator = null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java 
b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
deleted file mode 100644
index 1ebe142..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka.bolt;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.utils.TupleUtils;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.clients.producer.Callback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
-import storm.kafka.bolt.mapper.TupleToKafkaMapper;
-import storm.kafka.bolt.selector.DefaultTopicSelector;
-import storm.kafka.bolt.selector.KafkaTopicSelector;
-import java.util.concurrent.Future;
-import java.util.concurrent.ExecutionException;
-import java.util.Map;
-import java.util.Properties;
-
-
-/**
- * Bolt implementation that can send Tuple data to Kafka
- * <p/>
- * It expects the producer configuration and topic in storm config under
- * <p/>
- * 'kafka.broker.properties' and 'topic'
- * <p/>
- * respectively.
- * <p/>
- * This bolt uses 0.8.2 Kafka Producer API.
- * <p/>
- * It works for sending tuples to older Kafka version (0.8.1).
- */
-public class KafkaBolt<K, V> extends BaseRichBolt {
-
-    private static final Logger LOG = LoggerFactory.getLogger(KafkaBolt.class);
-
-    public static final String TOPIC = "topic";
-
-    private KafkaProducer<K, V> producer;
-    private OutputCollector collector;
-    private TupleToKafkaMapper<K,V> mapper;
-    private KafkaTopicSelector topicSelector;
-    private Properties boltSpecfiedProperties = new Properties();
-    /**
-     * With default setting for fireAndForget and async, the callback is 
called when the sending succeeds.
-     * By setting fireAndForget true, the send will not wait at all for kafka 
to ack.
-     * "acks" setting in 0.8.2 Producer API config doesn't matter if 
fireAndForget is set.
-     * By setting async false, synchronous sending is used. 
-     */
-    private boolean fireAndForget = false;
-    private boolean async = true;
-
-    public KafkaBolt() {}
-
-    public KafkaBolt<K,V> withTupleToKafkaMapper(TupleToKafkaMapper<K,V> 
mapper) {
-        this.mapper = mapper;
-        return this;
-    }
-
-    public KafkaBolt<K,V> withTopicSelector(KafkaTopicSelector selector) {
-        this.topicSelector = selector;
-        return this;
-    }
-
-    public KafkaBolt<K,V> withProducerProperties(Properties 
producerProperties) {
-        this.boltSpecfiedProperties = producerProperties;
-        return this;
-    }
-
-    @Override
-    public void prepare(Map stormConf, TopologyContext context, 
OutputCollector collector) {
-        //for backward compatibility.
-        if(mapper == null) {
-            this.mapper = new FieldNameBasedTupleToKafkaMapper<K,V>();
-        }
-
-        //for backward compatibility.
-        if(topicSelector == null) {
-            this.topicSelector = new DefaultTopicSelector((String) 
stormConf.get(TOPIC));
-        }
-
-        producer = new KafkaProducer<>(boltSpecfiedProperties);
-        this.collector = collector;
-    }
-
-    @Override
-    public void execute(final Tuple input) {
-        if (TupleUtils.isTick(input)) {
-          collector.ack(input);
-          return; // Do not try to send ticks to Kafka
-        }
-        K key = null;
-        V message = null;
-        String topic = null;
-        try {
-            key = mapper.getKeyFromTuple(input);
-            message = mapper.getMessageFromTuple(input);
-            topic = topicSelector.getTopic(input);
-            if (topic != null ) {
-                Callback callback = null;
-
-                if (!fireAndForget && async) {
-                    callback = new Callback() {
-                        @Override
-                        public void onCompletion(RecordMetadata ignored, 
Exception e) {
-                            synchronized (collector) {
-                                if (e != null) {
-                                    collector.reportError(e);
-                                    collector.fail(input);
-                                } else {
-                                    collector.ack(input);
-                                }
-                            }
-                        }
-                    };
-                }
-                Future<RecordMetadata> result = producer.send(new 
ProducerRecord<K, V>(topic, key, message), callback);
-                if (!async) {
-                    try {
-                        result.get();
-                        collector.ack(input);
-                    } catch (ExecutionException err) {
-                        collector.reportError(err);
-                        collector.fail(input);
-                    }
-                } else if (fireAndForget) {
-                    collector.ack(input);
-                }
-            } else {
-                LOG.warn("skipping key = " + key + ", topic selector returned 
null.");
-                collector.ack(input);
-            }
-        } catch (Exception ex) {
-            collector.reportError(ex);
-            collector.fail(input);
-        }
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-
-    }
-
-    @Override
-    public void cleanup() {
-        producer.close();
-    }
-
-    public void setFireAndForget(boolean fireAndForget) {
-        this.fireAndForget = fireAndForget;
-    }
-
-    public void setAsync(boolean async) {
-        this.async = async;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/bolt/mapper/FieldNameBasedTupleToKafkaMapper.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/storm/kafka/bolt/mapper/FieldNameBasedTupleToKafkaMapper.java
 
b/external/storm-kafka/src/jvm/storm/kafka/bolt/mapper/FieldNameBasedTupleToKafkaMapper.java
deleted file mode 100644
index 936b7e5..0000000
--- 
a/external/storm-kafka/src/jvm/storm/kafka/bolt/mapper/FieldNameBasedTupleToKafkaMapper.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka.bolt.mapper;
-
-import backtype.storm.tuple.Tuple;
-
-public class FieldNameBasedTupleToKafkaMapper<K,V> implements 
TupleToKafkaMapper<K, V> {
-
-    public static final String BOLT_KEY = "key";
-    public static final String BOLT_MESSAGE = "message";
-    public String boltKeyField;
-    public String boltMessageField;
-
-    public FieldNameBasedTupleToKafkaMapper() {
-        this(BOLT_KEY, BOLT_MESSAGE);
-    }
-
-    public FieldNameBasedTupleToKafkaMapper(String boltKeyField, String 
boltMessageField) {
-        this.boltKeyField = boltKeyField;
-        this.boltMessageField = boltMessageField;
-    }
-
-    @Override
-    public K getKeyFromTuple(Tuple tuple) {
-        //for backward compatibility, we return null when key is not present.
-        return tuple.contains(boltKeyField) ? (K) 
tuple.getValueByField(boltKeyField) : null;
-    }
-
-    @Override
-    public V getMessageFromTuple(Tuple tuple) {
-        return (V) tuple.getValueByField(boltMessageField);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/bolt/mapper/TupleToKafkaMapper.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/storm/kafka/bolt/mapper/TupleToKafkaMapper.java 
b/external/storm-kafka/src/jvm/storm/kafka/bolt/mapper/TupleToKafkaMapper.java
deleted file mode 100644
index d92de7b..0000000
--- 
a/external/storm-kafka/src/jvm/storm/kafka/bolt/mapper/TupleToKafkaMapper.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka.bolt.mapper;
-
-import backtype.storm.tuple.Tuple;
-
-import java.io.Serializable;
-
-/**
- * as the really verbose name suggests this interface mapps a storm tuple to 
kafka key and message.
- * @param <K> type of key.
- * @param <V> type of value.
- */
-public interface TupleToKafkaMapper<K,V> extends Serializable {
-    K getKeyFromTuple(Tuple tuple);
-    V getMessageFromTuple(Tuple tuple);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/bolt/selector/DefaultTopicSelector.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/storm/kafka/bolt/selector/DefaultTopicSelector.java
 
b/external/storm-kafka/src/jvm/storm/kafka/bolt/selector/DefaultTopicSelector.java
deleted file mode 100644
index 9c87658..0000000
--- 
a/external/storm-kafka/src/jvm/storm/kafka/bolt/selector/DefaultTopicSelector.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka.bolt.selector;
-
-import backtype.storm.tuple.Tuple;
-
-public class DefaultTopicSelector implements KafkaTopicSelector {
-
-    private final String topicName;
-
-    public DefaultTopicSelector(final String topicName) {
-        this.topicName = topicName;
-    }
-
-    @Override
-    public String getTopic(Tuple tuple) {
-        return topicName;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/bolt/selector/KafkaTopicSelector.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/storm/kafka/bolt/selector/KafkaTopicSelector.java
 
b/external/storm-kafka/src/jvm/storm/kafka/bolt/selector/KafkaTopicSelector.java
deleted file mode 100644
index f77fc47..0000000
--- 
a/external/storm-kafka/src/jvm/storm/kafka/bolt/selector/KafkaTopicSelector.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka.bolt.selector;
-
-import backtype.storm.tuple.Tuple;
-
-import java.io.Serializable;
-
-public interface KafkaTopicSelector extends Serializable {
-    String getTopic(Tuple tuple);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/trident/Coordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/Coordinator.java 
b/external/storm-kafka/src/jvm/storm/kafka/trident/Coordinator.java
deleted file mode 100644
index bd786b3..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/Coordinator.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka.trident;
-
-import storm.kafka.KafkaUtils;
-import storm.trident.spout.IOpaquePartitionedTridentSpout;
-import storm.trident.spout.IPartitionedTridentSpout;
-
-import java.util.List;
-import java.util.Map;
-
-class Coordinator implements 
IPartitionedTridentSpout.Coordinator<List<GlobalPartitionInformation>>, 
IOpaquePartitionedTridentSpout.Coordinator<List<GlobalPartitionInformation>> {
-
-    private IBrokerReader reader;
-    private TridentKafkaConfig config;
-
-    public Coordinator(Map conf, TridentKafkaConfig tridentKafkaConfig) {
-        config = tridentKafkaConfig;
-        reader = KafkaUtils.makeBrokerReader(conf, config);
-    }
-
-    @Override
-    public void close() {
-        config.coordinator.close();
-    }
-
-    @Override
-    public boolean isReady(long txid) {
-        return config.coordinator.isReady(txid);
-    }
-
-    @Override
-    public List<GlobalPartitionInformation> getPartitionsForBatch() {
-        return reader.getAllBrokers();
-    }
-}

Reply via email to