http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
new file mode 100644
index 0000000..0f5aaa0
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
@@ -0,0 +1,316 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import org.apache.storm.Config;
+import org.apache.storm.metric.api.CombinedMetric;
+import org.apache.storm.metric.api.CountMetric;
+import org.apache.storm.metric.api.MeanReducer;
+import org.apache.storm.metric.api.ReducedMetric;
+import org.apache.storm.spout.SpoutOutputCollector;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableMap;
+
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.javaapi.message.ByteBufferMessageSet;
+import kafka.message.MessageAndOffset;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.storm.kafka.KafkaSpout.EmitState;
+import org.apache.storm.kafka.trident.MaxMetric;
+
+import java.util.*;
+
+public class PartitionManager {
+    public static final Logger LOG = 
LoggerFactory.getLogger(PartitionManager.class);
+
+    private final CombinedMetric _fetchAPILatencyMax;
+    private final ReducedMetric _fetchAPILatencyMean;
+    private final CountMetric _fetchAPICallCount;
+    private final CountMetric _fetchAPIMessageCount;
+    Long _emittedToOffset;
+    // _pending key = Kafka offset, value = time at which the message was 
first submitted to the topology
+    private SortedMap<Long,Long> _pending = new TreeMap<Long,Long>();
+    private final FailedMsgRetryManager _failedMsgRetryManager;
+
+    // retryRecords key = Kafka offset, value = retry info for the given 
message
+    Long _committedTo;
+    LinkedList<MessageAndOffset> _waitingToEmit = new 
LinkedList<MessageAndOffset>();
+    Partition _partition;
+    SpoutConfig _spoutConfig;
+    String _topologyInstanceId;
+    SimpleConsumer _consumer;
+    DynamicPartitionConnections _connections;
+    ZkState _state;
+    Map _stormConf;
+    long numberFailed, numberAcked;
+    public PartitionManager(DynamicPartitionConnections connections, String 
topologyInstanceId, ZkState state, Map stormConf, SpoutConfig spoutConfig, 
Partition id) {
+        _partition = id;
+        _connections = connections;
+        _spoutConfig = spoutConfig;
+        _topologyInstanceId = topologyInstanceId;
+        _consumer = connections.register(id.host, id.topic, id.partition);
+        _state = state;
+        _stormConf = stormConf;
+        numberAcked = numberFailed = 0;
+
+        _failedMsgRetryManager = new 
ExponentialBackoffMsgRetryManager(_spoutConfig.retryInitialDelayMs,
+                                                                           
_spoutConfig.retryDelayMultiplier,
+                                                                           
_spoutConfig.retryDelayMaxMs);
+
+        String jsonTopologyId = null;
+        Long jsonOffset = null;
+        String path = committedPath();
+        try {
+            Map<Object, Object> json = _state.readJSON(path);
+            LOG.info("Read partition information from: " + path +  "  --> " + 
json );
+            if (json != null) {
+                jsonTopologyId = (String) ((Map<Object, Object>) 
json.get("topology")).get("id");
+                jsonOffset = (Long) json.get("offset");
+            }
+        } catch (Throwable e) {
+            LOG.warn("Error reading and/or parsing at ZkNode: " + path, e);
+        }
+
+        String topic = _partition.topic;
+        Long currentOffset = KafkaUtils.getOffset(_consumer, topic, 
id.partition, spoutConfig);
+
+        if (jsonTopologyId == null || jsonOffset == null) { // failed to parse 
JSON?
+            _committedTo = currentOffset;
+            LOG.info("No partition information found, using configuration to 
determine offset");
+        } else if (!topologyInstanceId.equals(jsonTopologyId) && 
spoutConfig.ignoreZkOffsets) {
+            _committedTo = KafkaUtils.getOffset(_consumer, topic, 
id.partition, spoutConfig.startOffsetTime);
+            LOG.info("Topology change detected and ignore zookeeper offsets 
set to true, using configuration to determine offset");
+        } else {
+            _committedTo = jsonOffset;
+            LOG.info("Read last commit offset from zookeeper: " + _committedTo 
+ "; old topology_id: " + jsonTopologyId + " - new topology_id: " + 
topologyInstanceId );
+        }
+
+        if (currentOffset - _committedTo > spoutConfig.maxOffsetBehind || 
_committedTo <= 0) {
+            LOG.info("Last commit offset from zookeeper: " + _committedTo);
+            Long lastCommittedOffset = _committedTo;
+            _committedTo = currentOffset;
+            LOG.info("Commit offset " + lastCommittedOffset + " is more than " 
+
+                    spoutConfig.maxOffsetBehind + " behind latest offset " + 
currentOffset + ", resetting to startOffsetTime=" + 
spoutConfig.startOffsetTime);
+        }
+
+        LOG.info("Starting Kafka " + _consumer.host() + ":" + id.partition + " 
from offset " + _committedTo);
+        _emittedToOffset = _committedTo;
+
+        _fetchAPILatencyMax = new CombinedMetric(new MaxMetric());
+        _fetchAPILatencyMean = new ReducedMetric(new MeanReducer());
+        _fetchAPICallCount = new CountMetric();
+        _fetchAPIMessageCount = new CountMetric();
+    }
+
+    public Map getMetricsDataMap() {
+        Map ret = new HashMap();
+        ret.put(_partition + "/fetchAPILatencyMax", 
_fetchAPILatencyMax.getValueAndReset());
+        ret.put(_partition + "/fetchAPILatencyMean", 
_fetchAPILatencyMean.getValueAndReset());
+        ret.put(_partition + "/fetchAPICallCount", 
_fetchAPICallCount.getValueAndReset());
+        ret.put(_partition + "/fetchAPIMessageCount", 
_fetchAPIMessageCount.getValueAndReset());
+        return ret;
+    }
+
+    //returns false if it's reached the end of current batch
+    public EmitState next(SpoutOutputCollector collector) {
+        if (_waitingToEmit.isEmpty()) {
+            fill();
+        }
+        while (true) {
+            MessageAndOffset toEmit = _waitingToEmit.pollFirst();
+            if (toEmit == null) {
+                return EmitState.NO_EMITTED;
+            }
+
+            Iterable<List<Object>> tups;
+            if (_spoutConfig.scheme instanceof 
MessageMetadataSchemeAsMultiScheme) {
+                tups = 
KafkaUtils.generateTuples((MessageMetadataSchemeAsMultiScheme) 
_spoutConfig.scheme, toEmit.message(), _partition, toEmit.offset());
+            } else {
+                tups = KafkaUtils.generateTuples(_spoutConfig, 
toEmit.message(), _partition.topic);
+            }
+            
+            if ((tups != null) && tups.iterator().hasNext()) {
+               if (!Strings.isNullOrEmpty(_spoutConfig.outputStreamId)) {
+                    for (List<Object> tup : tups) {
+                        collector.emit(_spoutConfig.topic, tup, new 
KafkaMessageId(_partition, toEmit.offset()));
+                    }
+                } else {
+                    for (List<Object> tup : tups) {
+                        collector.emit(tup, new KafkaMessageId(_partition, 
toEmit.offset()));
+                    }
+                }
+                break;
+            } else {
+                ack(toEmit.offset());
+            }
+        }
+        if (!_waitingToEmit.isEmpty()) {
+            return EmitState.EMITTED_MORE_LEFT;
+        } else {
+            return EmitState.EMITTED_END;
+        }
+    }
+
+
+    private void fill() {
+        long start = System.nanoTime();
+        Long offset;
+
+        // Are there failed tuples? If so, fetch those first.
+        offset = this._failedMsgRetryManager.nextFailedMessageToRetry();
+        final boolean processingNewTuples = (offset == null);
+        if (processingNewTuples) {
+            offset = _emittedToOffset;
+        }
+
+        ByteBufferMessageSet msgs = null;
+        try {
+            msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, 
_partition, offset);
+        } catch (TopicOffsetOutOfRangeException e) {
+            _emittedToOffset = KafkaUtils.getOffset(_consumer, 
_partition.topic, _partition.partition, kafka.api.OffsetRequest.EarliestTime());
+            LOG.warn("{} Using new offset: {}", _partition.partition, 
_emittedToOffset);
+            // fetch failed, so don't update the metrics
+            
+            //fix bug [STORM-643] : remove outdated failed offsets
+            if (!processingNewTuples) {
+                // For the case of EarliestTime it would be better to discard
+                // all the failed offsets, that are earlier than actual 
EarliestTime
+                // offset, since they are anyway not there.
+                // These calls to broker API will be then saved.
+                Set<Long> omitted = 
this._failedMsgRetryManager.clearInvalidMessages(_emittedToOffset);
+                
+                LOG.warn("Removing the failed offsets that are out of range: 
{}", omitted);
+            }
+            
+            return;
+        }
+        long end = System.nanoTime();
+        long millis = (end - start) / 1000000;
+        _fetchAPILatencyMax.update(millis);
+        _fetchAPILatencyMean.update(millis);
+        _fetchAPICallCount.incr();
+        if (msgs != null) {
+            int numMessages = 0;
+
+            for (MessageAndOffset msg : msgs) {
+                final Long cur_offset = msg.offset();
+                if (cur_offset < offset) {
+                    // Skip any old offsets.
+                    continue;
+                }
+                if (processingNewTuples || 
this._failedMsgRetryManager.shouldRetryMsg(cur_offset)) {
+                    numMessages += 1;
+                    if (!_pending.containsKey(cur_offset)) {
+                        _pending.put(cur_offset, System.currentTimeMillis());
+                    }
+                    _waitingToEmit.add(msg);
+                    _emittedToOffset = Math.max(msg.nextOffset(), 
_emittedToOffset);
+                    if (_failedMsgRetryManager.shouldRetryMsg(cur_offset)) {
+                        this._failedMsgRetryManager.retryStarted(cur_offset);
+                    }
+                }
+            }
+            _fetchAPIMessageCount.incrBy(numMessages);
+        }
+    }
+
+    public void ack(Long offset) {
+        if (!_pending.isEmpty() && _pending.firstKey() < offset - 
_spoutConfig.maxOffsetBehind) {
+            // Too many things pending!
+            _pending.headMap(offset - _spoutConfig.maxOffsetBehind).clear();
+        }
+        _pending.remove(offset);
+        this._failedMsgRetryManager.acked(offset);
+        numberAcked++;
+    }
+
+    public void fail(Long offset) {
+        if (offset < _emittedToOffset - _spoutConfig.maxOffsetBehind) {
+            LOG.info(
+                    "Skipping failed tuple at offset=" + offset +
+                            " because it's more than maxOffsetBehind=" + 
_spoutConfig.maxOffsetBehind +
+                            " behind _emittedToOffset=" + _emittedToOffset
+            );
+        } else {
+            LOG.debug("failing at offset={} with _pending.size()={} pending 
and _emittedToOffset={}", offset, _pending.size(), _emittedToOffset);
+            numberFailed++;
+            if (numberAcked == 0 && numberFailed > 
_spoutConfig.maxOffsetBehind) {
+                throw new RuntimeException("Too many tuple failures");
+            }
+
+            this._failedMsgRetryManager.failed(offset);
+        }
+    }
+
+    public void commit() {
+        long lastCompletedOffset = lastCompletedOffset();
+        if (_committedTo != lastCompletedOffset) {
+            LOG.debug("Writing last completed offset ({}) to ZK for {} for 
topology: {}", lastCompletedOffset, _partition, _topologyInstanceId);
+            Map<Object, Object> data = (Map<Object, Object>) 
ImmutableMap.builder()
+                    .put("topology", ImmutableMap.of("id", _topologyInstanceId,
+                            "name", _stormConf.get(Config.TOPOLOGY_NAME)))
+                    .put("offset", lastCompletedOffset)
+                    .put("partition", _partition.partition)
+                    .put("broker", ImmutableMap.of("host", 
_partition.host.host,
+                            "port", _partition.host.port))
+                    .put("topic", _partition.topic).build();
+            _state.writeJSON(committedPath(), data);
+
+            _committedTo = lastCompletedOffset;
+            LOG.debug("Wrote last completed offset ({}) to ZK for {} for 
topology: {}", lastCompletedOffset, _partition, _topologyInstanceId);
+        } else {
+            LOG.debug("No new offset for {} for topology: {}", _partition, 
_topologyInstanceId);
+        }
+    }
+
+    private String committedPath() {
+        return _spoutConfig.zkRoot + "/" + _spoutConfig.id + "/" + 
_partition.getId();
+    }
+
+    public long lastCompletedOffset() {
+        if (_pending.isEmpty()) {
+            return _emittedToOffset;
+        } else {
+            return _pending.firstKey();
+        }
+    }
+
+    public Partition getPartition() {
+        return _partition;
+    }
+
+    public void close() {
+        commit();
+        _connections.unregister(_partition.host, _partition.topic , 
_partition.partition);
+    }
+
+    static class KafkaMessageId {
+        public Partition partition;
+        public long offset;
+
+
+        public KafkaMessageId(Partition partition, long offset) {
+            this.partition = partition;
+            this.offset = offset;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java
new file mode 100644
index 0000000..1ac41c8
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import java.io.Serializable;
+import java.util.List;
+
+
+public class SpoutConfig extends KafkaConfig implements Serializable {
+    public List<String> zkServers = null;
+    public Integer zkPort = null;
+    public String zkRoot = null;
+    public String id = null;
+
+    public String outputStreamId;
+
+    // setting for how often to save the current kafka offset to ZooKeeper
+    public long stateUpdateIntervalMs = 2000;
+
+    // Exponential back-off retry settings.  These are used when retrying 
messages after a bolt
+    // calls OutputCollector.fail().
+    public long retryInitialDelayMs = 0;
+    public double retryDelayMultiplier = 1.0;
+    public long retryDelayMaxMs = 60 * 1000;
+
+    public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String 
id) {
+        super(hosts, topic);
+        this.zkRoot = zkRoot;
+        this.id = id;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java
new file mode 100644
index 0000000..bdbc44d
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import org.apache.storm.kafka.trident.GlobalPartitionInformation;
+
+import java.util.*;
+
+
+public class StaticCoordinator implements PartitionCoordinator {
+    Map<Partition, PartitionManager> _managers = new HashMap<Partition, 
PartitionManager>();
+    List<PartitionManager> _allManagers = new ArrayList();
+
+    public StaticCoordinator(DynamicPartitionConnections connections, Map 
stormConf, SpoutConfig config, ZkState state, int taskIndex, int totalTasks, 
String topologyInstanceId) {
+        StaticHosts hosts = (StaticHosts) config.hosts;
+        List<GlobalPartitionInformation> partitions = new 
ArrayList<GlobalPartitionInformation>();
+        partitions.add(hosts.getPartitionInformation());
+        List<Partition> myPartitions = 
KafkaUtils.calculatePartitionsForTask(partitions, totalTasks, taskIndex);
+        for (Partition myPartition : myPartitions) {
+            _managers.put(myPartition, new PartitionManager(connections, 
topologyInstanceId, state, stormConf, config, myPartition));
+        }
+        _allManagers = new ArrayList(_managers.values());
+    }
+
+    @Override
+    public List<PartitionManager> getMyManagedPartitions() {
+        return _allManagers;
+    }
+
+    public PartitionManager getManager(Partition partition) {
+        return _managers.get(partition);
+    }
+
+    @Override
+    public void refresh() { return; }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticHosts.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticHosts.java 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticHosts.java
new file mode 100644
index 0000000..33d5c16
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticHosts.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import org.apache.storm.kafka.trident.GlobalPartitionInformation;
+
+/**
+ * Date: 11/05/2013
+ * Time: 14:43
+ */
+public class StaticHosts implements BrokerHosts {
+
+
+    private GlobalPartitionInformation partitionInformation;
+
+    public StaticHosts(GlobalPartitionInformation partitionInformation) {
+        this.partitionInformation = partitionInformation;
+    }
+
+    public GlobalPartitionInformation getPartitionInformation() {
+        return partitionInformation;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticPartitionConnections.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticPartitionConnections.java
 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticPartitionConnections.java
new file mode 100644
index 0000000..77a7211
--- /dev/null
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticPartitionConnections.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import kafka.javaapi.consumer.SimpleConsumer;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class StaticPartitionConnections {
+    Map<Integer, SimpleConsumer> _kafka = new HashMap<Integer, 
SimpleConsumer>();
+    KafkaConfig _config;
+    StaticHosts hosts;
+
+    public StaticPartitionConnections(KafkaConfig conf) {
+        _config = conf;
+        if (!(conf.hosts instanceof StaticHosts)) {
+            throw new RuntimeException("Must configure with static hosts");
+        }
+        this.hosts = (StaticHosts) conf.hosts;
+    }
+
+    public SimpleConsumer getConsumer(int partition) {
+        if (!_kafka.containsKey(partition)) {
+            Broker hp = 
hosts.getPartitionInformation().getBrokerFor(partition);
+            _kafka.put(partition, new SimpleConsumer(hp.host, hp.port, 
_config.socketTimeoutMs, _config.bufferSizeBytes, _config.clientId));
+
+        }
+        return _kafka.get(partition);
+    }
+
+    public void close() {
+        for (SimpleConsumer consumer : _kafka.values()) {
+            consumer.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringKeyValueScheme.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringKeyValueScheme.java 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringKeyValueScheme.java
new file mode 100644
index 0000000..9ef7f74
--- /dev/null
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringKeyValueScheme.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import org.apache.storm.tuple.Values;
+import com.google.common.collect.ImmutableMap;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public class StringKeyValueScheme extends StringScheme implements 
KeyValueScheme {
+
+    @Override
+    public List<Object> deserializeKeyAndValue(ByteBuffer key, ByteBuffer 
value) {
+        if ( key == null ) {
+            return deserialize(value);
+        }
+        String keyString = StringScheme.deserializeString(key);
+        String valueString = StringScheme.deserializeString(value);
+        return new Values(ImmutableMap.of(keyString, valueString));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringMessageAndMetadataScheme.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringMessageAndMetadataScheme.java
 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringMessageAndMetadataScheme.java
new file mode 100644
index 0000000..e57738d
--- /dev/null
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringMessageAndMetadataScheme.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public class StringMessageAndMetadataScheme extends StringScheme implements 
MessageMetadataScheme {
+    private static final long serialVersionUID = -5441841920447947374L;
+
+    public static final String STRING_SCHEME_PARTITION_KEY = "partition";
+    public static final String STRING_SCHEME_OFFSET = "offset";
+
+    @Override
+    public List<Object> deserializeMessageWithMetadata(ByteBuffer message, 
Partition partition, long offset) {
+        String stringMessage = StringScheme.deserializeString(message);
+        return new Values(stringMessage, partition.partition, offset);
+    }
+
+    @Override
+    public Fields getOutputFields() {
+        return new Fields(STRING_SCHEME_KEY, STRING_SCHEME_PARTITION_KEY, 
STRING_SCHEME_OFFSET);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringMultiSchemeWithTopic.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringMultiSchemeWithTopic.java
 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringMultiSchemeWithTopic.java
new file mode 100644
index 0000000..d92a879
--- /dev/null
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringMultiSchemeWithTopic.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import org.apache.storm.spout.MultiScheme;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+
+public class StringMultiSchemeWithTopic
+        implements MultiScheme {
+    public static final String STRING_SCHEME_KEY = "str";
+
+    public static final String TOPIC_KEY = "topic";
+
+    @Override
+    public Iterable<List<Object>> deserialize(ByteBuffer bytes) {
+        throw new NotImplementedException();
+    }
+
+    public Iterable<List<Object>> deserializeWithTopic(String topic, 
ByteBuffer bytes) {
+        List<Object> items = new Values(StringScheme.deserializeString(bytes), 
topic);
+        return Collections.singletonList(items);
+    }
+
+    public Fields getOutputFields() {
+        return new Fields(STRING_SCHEME_KEY, TOPIC_KEY);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringScheme.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringScheme.java 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringScheme.java
new file mode 100644
index 0000000..e2a2c22
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringScheme.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import org.apache.storm.spout.Scheme;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+public class StringScheme implements Scheme {
+    private static final Charset UTF8_CHARSET = StandardCharsets.UTF_8;
+    public static final String STRING_SCHEME_KEY = "str";
+
+    public List<Object> deserialize(ByteBuffer bytes) {
+        return new Values(deserializeString(bytes));
+    }
+
+    public static String deserializeString(ByteBuffer string) {
+        if (string.hasArray()) {
+            int base = string.arrayOffset();
+            return new String(string.array(), base + string.position(), 
string.remaining());
+        } else {
+            return new String(Utils.toByteArray(string), UTF8_CHARSET);
+        }
+    }
+
+    public Fields getOutputFields() {
+        return new Fields(STRING_SCHEME_KEY);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/TopicOffsetOutOfRangeException.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/TopicOffsetOutOfRangeException.java
 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/TopicOffsetOutOfRangeException.java
new file mode 100644
index 0000000..8e1c98f
--- /dev/null
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/TopicOffsetOutOfRangeException.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+public class TopicOffsetOutOfRangeException extends RuntimeException {
+
+    public TopicOffsetOutOfRangeException(String message) {
+        super(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java
new file mode 100644
index 0000000..a53d566
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.kafka.trident.GlobalPartitionInformation;
+
+import java.util.*;
+
+import static org.apache.storm.kafka.KafkaUtils.taskId;
+
+public class ZkCoordinator implements PartitionCoordinator {
+    public static final Logger LOG = 
LoggerFactory.getLogger(ZkCoordinator.class);
+
+    SpoutConfig _spoutConfig;
+    int _taskIndex;
+    int _totalTasks;
+    String _topologyInstanceId;
+    Map<Partition, PartitionManager> _managers = new HashMap();
+    List<PartitionManager> _cachedList = new ArrayList<PartitionManager>();
+    Long _lastRefreshTime = null;
+    int _refreshFreqMs;
+    DynamicPartitionConnections _connections;
+    DynamicBrokersReader _reader;
+    ZkState _state;
+    Map _stormConf;
+
+    public ZkCoordinator(DynamicPartitionConnections connections, Map 
stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int 
totalTasks, String topologyInstanceId) {
+        this(connections, stormConf, spoutConfig, state, taskIndex, 
totalTasks, topologyInstanceId, buildReader(stormConf, spoutConfig));
+    }
+
+    public ZkCoordinator(DynamicPartitionConnections connections, Map 
stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int 
totalTasks, String topologyInstanceId, DynamicBrokersReader reader) {
+        _spoutConfig = spoutConfig;
+        _connections = connections;
+        _taskIndex = taskIndex;
+        _totalTasks = totalTasks;
+        _topologyInstanceId = topologyInstanceId;
+        _stormConf = stormConf;
+        _state = state;
+        ZkHosts brokerConf = (ZkHosts) spoutConfig.hosts;
+        _refreshFreqMs = brokerConf.refreshFreqSecs * 1000;
+        _reader = reader;
+    }
+
+    private static DynamicBrokersReader buildReader(Map stormConf, SpoutConfig 
spoutConfig) {
+        ZkHosts hosts = (ZkHosts) spoutConfig.hosts;
+        return new DynamicBrokersReader(stormConf, hosts.brokerZkStr, 
hosts.brokerZkPath, spoutConfig.topic);
+    }
+
+    @Override
+    public List<PartitionManager> getMyManagedPartitions() {
+        if (_lastRefreshTime == null || (System.currentTimeMillis() - 
_lastRefreshTime) > _refreshFreqMs) {
+            refresh();
+            _lastRefreshTime = System.currentTimeMillis();
+        }
+        return _cachedList;
+    }
+
+    @Override
+    public void refresh() {
+        try {
+            LOG.info(taskId(_taskIndex, _totalTasks) + "Refreshing partition 
manager connections");
+            List<GlobalPartitionInformation> brokerInfo = 
_reader.getBrokerInfo();
+            List<Partition> mine = 
KafkaUtils.calculatePartitionsForTask(brokerInfo, _totalTasks, _taskIndex);
+
+            Set<Partition> curr = _managers.keySet();
+            Set<Partition> newPartitions = new HashSet<Partition>(mine);
+            newPartitions.removeAll(curr);
+
+            Set<Partition> deletedPartitions = new HashSet<Partition>(curr);
+            deletedPartitions.removeAll(mine);
+
+            LOG.info(taskId(_taskIndex, _totalTasks) + "Deleted partition 
managers: " + deletedPartitions.toString());
+
+            for (Partition id : deletedPartitions) {
+                PartitionManager man = _managers.remove(id);
+                man.close();
+            }
+            LOG.info(taskId(_taskIndex, _totalTasks) + "New partition 
managers: " + newPartitions.toString());
+
+            for (Partition id : newPartitions) {
+                PartitionManager man = new PartitionManager(_connections, 
_topologyInstanceId, _state, _stormConf, _spoutConfig, id);
+                _managers.put(id, man);
+            }
+
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        _cachedList = new ArrayList<PartitionManager>(_managers.values());
+        LOG.info(taskId(_taskIndex, _totalTasks) + "Finished refreshing");
+    }
+
+    @Override
+    public PartitionManager getManager(Partition partition) {
+        return _managers.get(partition);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkHosts.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkHosts.java 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkHosts.java
new file mode 100644
index 0000000..2c2a26f
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkHosts.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+
+public class ZkHosts implements BrokerHosts {
+    private static final String DEFAULT_ZK_PATH = "/brokers";
+
+    public String brokerZkStr = null;
+    public String brokerZkPath = null; // e.g., /kafka/brokers
+    public int refreshFreqSecs = 60;
+
+    public ZkHosts(String brokerZkStr, String brokerZkPath) {
+        this.brokerZkStr = brokerZkStr;
+        this.brokerZkPath = brokerZkPath;
+    }
+
+    public ZkHosts(String brokerZkStr) {
+        this(brokerZkStr, DEFAULT_ZK_PATH);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkState.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkState.java 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkState.java
new file mode 100644
index 0000000..d12016b
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkState.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import org.apache.storm.Config;
+import org.apache.storm.utils.Utils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.zookeeper.CreateMode;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ZkState {
+    public static final Logger LOG = LoggerFactory.getLogger(ZkState.class);
+    CuratorFramework _curator;
+
+    private CuratorFramework newCurator(Map stateConf) throws Exception {
+        Integer port = (Integer) 
stateConf.get(Config.TRANSACTIONAL_ZOOKEEPER_PORT);
+        String serverPorts = "";
+        for (String server : (List<String>) 
stateConf.get(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS)) {
+            serverPorts = serverPorts + server + ":" + port + ",";
+        }
+        return CuratorFrameworkFactory.newClient(serverPorts,
+                
Utils.getInt(stateConf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)),
+                
Utils.getInt(stateConf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT)),
+                new 
RetryNTimes(Utils.getInt(stateConf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)),
+                        
Utils.getInt(stateConf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL))));
+    }
+
+    public CuratorFramework getCurator() {
+        assert _curator != null;
+        return _curator;
+    }
+
+    public ZkState(Map stateConf) {
+        stateConf = new HashMap(stateConf);
+
+        try {
+            _curator = newCurator(stateConf);
+            _curator.start();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void writeJSON(String path, Map<Object, Object> data) {
+        LOG.debug("Writing {} the data {}", path, data.toString());
+        writeBytes(path, 
JSONValue.toJSONString(data).getBytes(Charset.forName("UTF-8")));
+    }
+
+    public void writeBytes(String path, byte[] bytes) {
+        try {
+            if (_curator.checkExists().forPath(path) == null) {
+                _curator.create()
+                        .creatingParentsIfNeeded()
+                        .withMode(CreateMode.PERSISTENT)
+                        .forPath(path, bytes);
+            } else {
+                _curator.setData().forPath(path, bytes);
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public Map<Object, Object> readJSON(String path) {
+        try {
+            byte[] b = readBytes(path);
+            if (b == null) {
+                return null;
+            }
+            return (Map<Object, Object>) JSONValue.parse(new String(b, 
"UTF-8"));
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public byte[] readBytes(String path) {
+        try {
+            if (_curator.checkExists().forPath(path) != null) {
+                return _curator.getData().forPath(path);
+            } else {
+                return null;
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void close() {
+        _curator.close();
+        _curator = null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java
new file mode 100644
index 0000000..0ceac3a
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.TupleUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.clients.producer.Callback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
+import org.apache.storm.kafka.bolt.mapper.TupleToKafkaMapper;
+import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
+import org.apache.storm.kafka.bolt.selector.KafkaTopicSelector;
+import java.util.concurrent.Future;
+import java.util.concurrent.ExecutionException;
+import java.util.Map;
+import java.util.Properties;
+
+
+/**
+ * Bolt implementation that can send Tuple data to Kafka
+ * <p/>
+ * It expects the producer configuration and topic in storm config under
+ * <p/>
+ * 'kafka.broker.properties' and 'topic'
+ * <p/>
+ * respectively.
+ * <p/>
+ * This bolt uses 0.8.2 Kafka Producer API.
+ * <p/>
+ * It works for sending tuples to older Kafka version (0.8.1).
+ */
+public class KafkaBolt<K, V> extends BaseRichBolt {
+
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaBolt.class);
+
+    public static final String TOPIC = "topic";
+
+    private KafkaProducer<K, V> producer;
+    private OutputCollector collector;
+    private TupleToKafkaMapper<K,V> mapper;
+    private KafkaTopicSelector topicSelector;
+    private Properties boltSpecfiedProperties = new Properties();
+    /**
+     * With default setting for fireAndForget and async, the callback is 
called when the sending succeeds.
+     * By setting fireAndForget true, the send will not wait at all for kafka 
to ack.
+     * "acks" setting in 0.8.2 Producer API config doesn't matter if 
fireAndForget is set.
+     * By setting async false, synchronous sending is used. 
+     */
+    private boolean fireAndForget = false;
+    private boolean async = true;
+
+    public KafkaBolt() {}
+
+    public KafkaBolt<K,V> withTupleToKafkaMapper(TupleToKafkaMapper<K,V> 
mapper) {
+        this.mapper = mapper;
+        return this;
+    }
+
+    public KafkaBolt<K,V> withTopicSelector(KafkaTopicSelector selector) {
+        this.topicSelector = selector;
+        return this;
+    }
+
+    public KafkaBolt<K,V> withProducerProperties(Properties 
producerProperties) {
+        this.boltSpecfiedProperties = producerProperties;
+        return this;
+    }
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, 
OutputCollector collector) {
+        //for backward compatibility.
+        if(mapper == null) {
+            this.mapper = new FieldNameBasedTupleToKafkaMapper<K,V>();
+        }
+
+        //for backward compatibility.
+        if(topicSelector == null) {
+            this.topicSelector = new DefaultTopicSelector((String) 
stormConf.get(TOPIC));
+        }
+
+        producer = new KafkaProducer<>(boltSpecfiedProperties);
+        this.collector = collector;
+    }
+
+    @Override
+    public void execute(final Tuple input) {
+        if (TupleUtils.isTick(input)) {
+          collector.ack(input);
+          return; // Do not try to send ticks to Kafka
+        }
+        K key = null;
+        V message = null;
+        String topic = null;
+        try {
+            key = mapper.getKeyFromTuple(input);
+            message = mapper.getMessageFromTuple(input);
+            topic = topicSelector.getTopic(input);
+            if (topic != null ) {
+                Callback callback = null;
+
+                if (!fireAndForget && async) {
+                    callback = new Callback() {
+                        @Override
+                        public void onCompletion(RecordMetadata ignored, 
Exception e) {
+                            synchronized (collector) {
+                                if (e != null) {
+                                    collector.reportError(e);
+                                    collector.fail(input);
+                                } else {
+                                    collector.ack(input);
+                                }
+                            }
+                        }
+                    };
+                }
+                Future<RecordMetadata> result = producer.send(new 
ProducerRecord<K, V>(topic, key, message), callback);
+                if (!async) {
+                    try {
+                        result.get();
+                        collector.ack(input);
+                    } catch (ExecutionException err) {
+                        collector.reportError(err);
+                        collector.fail(input);
+                    }
+                } else if (fireAndForget) {
+                    collector.ack(input);
+                }
+            } else {
+                LOG.warn("skipping key = " + key + ", topic selector returned 
null.");
+                collector.ack(input);
+            }
+        } catch (Exception ex) {
+            collector.reportError(ex);
+            collector.fail(input);
+        }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+    }
+
+    @Override
+    public void cleanup() {
+        producer.close();
+    }
+
+    public void setFireAndForget(boolean fireAndForget) {
+        this.fireAndForget = fireAndForget;
+    }
+
+    public void setAsync(boolean async) {
+        this.async = async;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/FieldNameBasedTupleToKafkaMapper.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/FieldNameBasedTupleToKafkaMapper.java
 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/FieldNameBasedTupleToKafkaMapper.java
new file mode 100644
index 0000000..672da8e
--- /dev/null
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/FieldNameBasedTupleToKafkaMapper.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt.mapper;
+
+import org.apache.storm.tuple.Tuple;
+
+public class FieldNameBasedTupleToKafkaMapper<K,V> implements 
TupleToKafkaMapper<K, V> {
+
+    public static final String BOLT_KEY = "key";
+    public static final String BOLT_MESSAGE = "message";
+    public String boltKeyField;
+    public String boltMessageField;
+
+    public FieldNameBasedTupleToKafkaMapper() {
+        this(BOLT_KEY, BOLT_MESSAGE);
+    }
+
+    public FieldNameBasedTupleToKafkaMapper(String boltKeyField, String 
boltMessageField) {
+        this.boltKeyField = boltKeyField;
+        this.boltMessageField = boltMessageField;
+    }
+
+    @Override
+    public K getKeyFromTuple(Tuple tuple) {
+        //for backward compatibility, we return null when key is not present.
+        return tuple.contains(boltKeyField) ? (K) 
tuple.getValueByField(boltKeyField) : null;
+    }
+
+    @Override
+    public V getMessageFromTuple(Tuple tuple) {
+        return (V) tuple.getValueByField(boltMessageField);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/TupleToKafkaMapper.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/TupleToKafkaMapper.java
 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/TupleToKafkaMapper.java
new file mode 100644
index 0000000..3890413
--- /dev/null
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/TupleToKafkaMapper.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt.mapper;
+
+import org.apache.storm.tuple.Tuple;
+
+import java.io.Serializable;
+
+/**
+ * as the really verbose name suggests this interface mapps a storm tuple to 
kafka key and message.
+ * @param <K> type of key.
+ * @param <V> type of value.
+ */
+public interface TupleToKafkaMapper<K,V> extends Serializable {
+    K getKeyFromTuple(Tuple tuple);
+    V getMessageFromTuple(Tuple tuple);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/DefaultTopicSelector.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/DefaultTopicSelector.java
 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/DefaultTopicSelector.java
new file mode 100644
index 0000000..2aafc78
--- /dev/null
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/DefaultTopicSelector.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt.selector;
+
+import org.apache.storm.tuple.Tuple;
+
+public class DefaultTopicSelector implements KafkaTopicSelector {
+
+    private final String topicName;
+
+    public DefaultTopicSelector(final String topicName) {
+        this.topicName = topicName;
+    }
+
+    @Override
+    public String getTopic(Tuple tuple) {
+        return topicName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/KafkaTopicSelector.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/KafkaTopicSelector.java
 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/KafkaTopicSelector.java
new file mode 100644
index 0000000..cb7fb44
--- /dev/null
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/KafkaTopicSelector.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt.selector;
+
+import org.apache.storm.tuple.Tuple;
+
+import java.io.Serializable;
+
+public interface KafkaTopicSelector extends Serializable {
+    String getTopic(Tuple tuple);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/Coordinator.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/Coordinator.java 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/Coordinator.java
new file mode 100644
index 0000000..baec8cb
--- /dev/null
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/Coordinator.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident;
+
+import org.apache.storm.kafka.KafkaUtils;
+import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
+import org.apache.storm.trident.spout.IPartitionedTridentSpout;
+
+import java.util.List;
+import java.util.Map;
+
+class Coordinator implements 
IPartitionedTridentSpout.Coordinator<List<GlobalPartitionInformation>>, 
IOpaquePartitionedTridentSpout.Coordinator<List<GlobalPartitionInformation>> {
+
+    private IBrokerReader reader;
+    private TridentKafkaConfig config;
+
+    public Coordinator(Map conf, TridentKafkaConfig tridentKafkaConfig) {
+        config = tridentKafkaConfig;
+        reader = KafkaUtils.makeBrokerReader(conf, config);
+    }
+
+    @Override
+    public void close() {
+        config.coordinator.close();
+    }
+
+    @Override
+    public boolean isReady(long txid) {
+        return config.coordinator.isReady(txid);
+    }
+
+    @Override
+    public List<GlobalPartitionInformation> getPartitionsForBatch() {
+        return reader.getAllBrokers();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/DefaultCoordinator.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/DefaultCoordinator.java
 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/DefaultCoordinator.java
new file mode 100644
index 0000000..7a7e32c
--- /dev/null
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/DefaultCoordinator.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident;
+
+public class DefaultCoordinator implements IBatchCoordinator {
+
+    @Override
+    public boolean isReady(long txid) {
+        return true;
+    }
+
+    @Override
+    public void close() {
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/GlobalPartitionInformation.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/GlobalPartitionInformation.java
 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/GlobalPartitionInformation.java
new file mode 100644
index 0000000..3108ff8
--- /dev/null
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/GlobalPartitionInformation.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident;
+
+import com.google.common.base.Objects;
+import org.apache.storm.kafka.Broker;
+import org.apache.storm.kafka.Partition;
+
+import java.io.Serializable;
+import java.util.*;
+
+
+public class GlobalPartitionInformation implements Iterable<Partition>, 
Serializable {
+
+    private Map<Integer, Broker> partitionMap;
+    public String topic;
+
+    //Flag to keep the Partition Path Id backward compatible with Old 
implementation of Partition.getId() == "partition_" + partition
+    private Boolean bUseTopicNameForPartitionPathId;
+
+    public GlobalPartitionInformation(String topic, Boolean 
bUseTopicNameForPartitionPathId) {
+        this.topic = topic;
+        this.partitionMap = new TreeMap<Integer, Broker>();
+        this.bUseTopicNameForPartitionPathId = bUseTopicNameForPartitionPathId;
+    }
+
+    public GlobalPartitionInformation(String topic) {
+        this.topic = topic;
+        this.partitionMap = new TreeMap<Integer, Broker>();
+        this.bUseTopicNameForPartitionPathId = false;
+    }
+
+    public void addPartition(int partitionId, Broker broker) {
+        partitionMap.put(partitionId, broker);
+    }
+
+    @Override
+    public String toString() {
+        return "GlobalPartitionInformation{" +
+                "topic=" + topic +
+                ", partitionMap=" + partitionMap +
+                '}';
+    }
+
+    public Broker getBrokerFor(Integer partitionId) {
+        return partitionMap.get(partitionId);
+    }
+
+    public List<Partition> getOrderedPartitions() {
+        List<Partition> partitions = new LinkedList<Partition>();
+        for (Map.Entry<Integer, Broker> partition : partitionMap.entrySet()) {
+            partitions.add(new Partition(partition.getValue(), this.topic, 
partition.getKey(), this.bUseTopicNameForPartitionPathId));
+        }
+        return partitions;
+    }
+
+    @Override
+    public Iterator<Partition> iterator() {
+        final Iterator<Map.Entry<Integer, Broker>> iterator = 
partitionMap.entrySet().iterator();
+        final String topic = this.topic;
+        final Boolean bUseTopicNameForPartitionPathId = 
this.bUseTopicNameForPartitionPathId;
+        return new Iterator<Partition>() {
+            @Override
+            public boolean hasNext() {
+                return iterator.hasNext();
+            }
+
+            @Override
+            public Partition next() {
+                Map.Entry<Integer, Broker> next = iterator.next();
+                return new Partition(next.getValue(), topic , next.getKey(), 
bUseTopicNameForPartitionPathId);
+            }
+
+            @Override
+            public void remove() {
+                iterator.remove();
+            }
+        };
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(partitionMap);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null || getClass() != obj.getClass()) {
+            return false;
+        }
+        final GlobalPartitionInformation other = (GlobalPartitionInformation) 
obj;
+        return Objects.equal(this.partitionMap, other.partitionMap);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/IBatchCoordinator.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/IBatchCoordinator.java
 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/IBatchCoordinator.java
new file mode 100644
index 0000000..41369ba
--- /dev/null
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/IBatchCoordinator.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident;
+
+import java.io.Serializable;
+
+public interface IBatchCoordinator extends Serializable {
+    boolean isReady(long txid);
+
+    void close();
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/IBrokerReader.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/IBrokerReader.java
 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/IBrokerReader.java
new file mode 100644
index 0000000..904d8c9
--- /dev/null
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/IBrokerReader.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident;
+
+import java.util.List;
+import java.util.Map;
+
+public interface IBrokerReader {
+
+    GlobalPartitionInformation getBrokerForTopic(String topic);
+
+    List<GlobalPartitionInformation> getAllBrokers();
+
+    void close();
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/MaxMetric.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/MaxMetric.java 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/MaxMetric.java
new file mode 100644
index 0000000..2332205
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/MaxMetric.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident;
+
+
+import org.apache.storm.metric.api.ICombiner;
+
+public class MaxMetric implements ICombiner<Long> {
+    @Override
+    public Long identity() {
+        return null;
+    }
+
+    @Override
+    public Long combine(Long l1, Long l2) {
+        if (l1 == null) {
+            return l2;
+        }
+        if (l2 == null) {
+            return l1;
+        }
+        return Math.max(l1, l2);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/OpaqueTridentKafkaSpout.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/OpaqueTridentKafkaSpout.java
 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/OpaqueTridentKafkaSpout.java
new file mode 100644
index 0000000..f540c87
--- /dev/null
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/OpaqueTridentKafkaSpout.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident;
+
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.kafka.Partition;
+import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+
+public class OpaqueTridentKafkaSpout implements 
IOpaquePartitionedTridentSpout<List<GlobalPartitionInformation>, Partition, 
Map> {
+
+
+    TridentKafkaConfig _config;
+
+    public OpaqueTridentKafkaSpout(TridentKafkaConfig config) {
+        _config = config;
+    }
+
+    @Override
+    public 
IOpaquePartitionedTridentSpout.Emitter<List<GlobalPartitionInformation>, 
Partition, Map> getEmitter(Map conf, TopologyContext context) {
+        return new TridentKafkaEmitter(conf, context, _config, context
+                .getStormId()).asOpaqueEmitter();
+    }
+
+    @Override
+    public IOpaquePartitionedTridentSpout.Coordinator getCoordinator(Map conf, 
TopologyContext tc) {
+        return new org.apache.storm.kafka.trident.Coordinator(conf, _config);
+    }
+
+    @Override
+    public Fields getOutputFields() {
+        return _config.scheme.getOutputFields();
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        return null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/StaticBrokerReader.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/StaticBrokerReader.java
 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/StaticBrokerReader.java
new file mode 100644
index 0000000..ba27651
--- /dev/null
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/StaticBrokerReader.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+public class StaticBrokerReader implements IBrokerReader {
+
+    private Map<String,GlobalPartitionInformation> brokers = new 
TreeMap<String,GlobalPartitionInformation>();
+
+    public StaticBrokerReader(String topic, GlobalPartitionInformation 
partitionInformation) {
+        this.brokers.put(topic, partitionInformation);
+    }
+
+    @Override
+    public GlobalPartitionInformation getBrokerForTopic(String topic) {
+        if (brokers.containsKey(topic)) return brokers.get(topic);
+        return null;
+    }
+
+    @Override
+    public List<GlobalPartitionInformation> getAllBrokers () {
+        List<GlobalPartitionInformation> list = new 
ArrayList<GlobalPartitionInformation>();
+        list.addAll(brokers.values());
+        return list;
+    }
+
+    @Override
+    public void close() {
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TransactionalTridentKafkaSpout.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TransactionalTridentKafkaSpout.java
 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TransactionalTridentKafkaSpout.java
new file mode 100644
index 0000000..ac5b49f
--- /dev/null
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TransactionalTridentKafkaSpout.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident;
+
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.kafka.Partition;
+import org.apache.storm.trident.spout.IPartitionedTridentSpout;
+
+import java.util.Map;
+import java.util.UUID;
+
+
+public class TransactionalTridentKafkaSpout implements 
IPartitionedTridentSpout<GlobalPartitionInformation, Partition, Map> {
+
+    TridentKafkaConfig _config;
+
+    public TransactionalTridentKafkaSpout(TridentKafkaConfig config) {
+        _config = config;
+    }
+
+
+    @Override
+    public IPartitionedTridentSpout.Coordinator getCoordinator(Map conf, 
TopologyContext context) {
+        return new org.apache.storm.kafka.trident.Coordinator(conf, _config);
+    }
+
+    @Override
+    public IPartitionedTridentSpout.Emitter getEmitter(Map conf, 
TopologyContext context) {
+        return new TridentKafkaEmitter(conf, context, _config, context
+                .getStormId()).asTransactionalEmitter();
+    }
+
+    @Override
+    public Fields getOutputFields() {
+        return _config.scheme.getOutputFields();
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaConfig.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaConfig.java
 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaConfig.java
new file mode 100644
index 0000000..b225e9a
--- /dev/null
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaConfig.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident;
+
+import org.apache.storm.kafka.BrokerHosts;
+import org.apache.storm.kafka.KafkaConfig;
+
+
+public class TridentKafkaConfig extends KafkaConfig {
+
+
+    public final IBatchCoordinator coordinator = new DefaultCoordinator();
+
+    public TridentKafkaConfig(BrokerHosts hosts, String topic) {
+        super(hosts, topic);
+    }
+
+    public TridentKafkaConfig(BrokerHosts hosts, String topic, String 
clientId) {
+        super(hosts, topic, clientId);
+    }
+
+}

Reply via email to