http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java new file mode 100644 index 0000000..0f5aaa0 --- /dev/null +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java @@ -0,0 +1,316 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + +import org.apache.storm.Config; +import org.apache.storm.metric.api.CombinedMetric; +import org.apache.storm.metric.api.CountMetric; +import org.apache.storm.metric.api.MeanReducer; +import org.apache.storm.metric.api.ReducedMetric; +import org.apache.storm.spout.SpoutOutputCollector; + +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableMap; + +import kafka.javaapi.consumer.SimpleConsumer; +import kafka.javaapi.message.ByteBufferMessageSet; +import kafka.message.MessageAndOffset; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.storm.kafka.KafkaSpout.EmitState; +import org.apache.storm.kafka.trident.MaxMetric; + +import java.util.*; + +public class PartitionManager { + public static final Logger LOG = LoggerFactory.getLogger(PartitionManager.class); + + private final CombinedMetric _fetchAPILatencyMax; + private final ReducedMetric _fetchAPILatencyMean; + private final CountMetric _fetchAPICallCount; + private final CountMetric _fetchAPIMessageCount; + Long _emittedToOffset; + // _pending key = Kafka offset, value = time at which the message was first submitted to the topology + private SortedMap<Long,Long> _pending = new TreeMap<Long,Long>(); + private final FailedMsgRetryManager _failedMsgRetryManager; + + // retryRecords key = Kafka offset, value = retry info for the given message + Long _committedTo; + LinkedList<MessageAndOffset> _waitingToEmit = new LinkedList<MessageAndOffset>(); + Partition _partition; + SpoutConfig _spoutConfig; + String _topologyInstanceId; + SimpleConsumer _consumer; + DynamicPartitionConnections _connections; + ZkState _state; + Map _stormConf; + long numberFailed, numberAcked; + public PartitionManager(DynamicPartitionConnections connections, String topologyInstanceId, ZkState state, Map stormConf, SpoutConfig spoutConfig, Partition id) { + _partition = id; + _connections = connections; + _spoutConfig = spoutConfig; + _topologyInstanceId = topologyInstanceId; + _consumer = connections.register(id.host, id.topic, id.partition); + _state = state; + _stormConf = stormConf; + numberAcked = numberFailed = 0; + + _failedMsgRetryManager = new ExponentialBackoffMsgRetryManager(_spoutConfig.retryInitialDelayMs, + _spoutConfig.retryDelayMultiplier, + _spoutConfig.retryDelayMaxMs); + + String jsonTopologyId = null; + Long jsonOffset = null; + String path = committedPath(); + try { + Map<Object, Object> json = _state.readJSON(path); + LOG.info("Read partition information from: " + path + " --> " + json ); + if (json != null) { + jsonTopologyId = (String) ((Map<Object, Object>) json.get("topology")).get("id"); + jsonOffset = (Long) json.get("offset"); + } + } catch (Throwable e) { + LOG.warn("Error reading and/or parsing at ZkNode: " + path, e); + } + + String topic = _partition.topic; + Long currentOffset = KafkaUtils.getOffset(_consumer, topic, id.partition, spoutConfig); + + if (jsonTopologyId == null || jsonOffset == null) { // failed to parse JSON? + _committedTo = currentOffset; + LOG.info("No partition information found, using configuration to determine offset"); + } else if (!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.ignoreZkOffsets) { + _committedTo = KafkaUtils.getOffset(_consumer, topic, id.partition, spoutConfig.startOffsetTime); + LOG.info("Topology change detected and ignore zookeeper offsets set to true, using configuration to determine offset"); + } else { + _committedTo = jsonOffset; + LOG.info("Read last commit offset from zookeeper: " + _committedTo + "; old topology_id: " + jsonTopologyId + " - new topology_id: " + topologyInstanceId ); + } + + if (currentOffset - _committedTo > spoutConfig.maxOffsetBehind || _committedTo <= 0) { + LOG.info("Last commit offset from zookeeper: " + _committedTo); + Long lastCommittedOffset = _committedTo; + _committedTo = currentOffset; + LOG.info("Commit offset " + lastCommittedOffset + " is more than " + + spoutConfig.maxOffsetBehind + " behind latest offset " + currentOffset + ", resetting to startOffsetTime=" + spoutConfig.startOffsetTime); + } + + LOG.info("Starting Kafka " + _consumer.host() + ":" + id.partition + " from offset " + _committedTo); + _emittedToOffset = _committedTo; + + _fetchAPILatencyMax = new CombinedMetric(new MaxMetric()); + _fetchAPILatencyMean = new ReducedMetric(new MeanReducer()); + _fetchAPICallCount = new CountMetric(); + _fetchAPIMessageCount = new CountMetric(); + } + + public Map getMetricsDataMap() { + Map ret = new HashMap(); + ret.put(_partition + "/fetchAPILatencyMax", _fetchAPILatencyMax.getValueAndReset()); + ret.put(_partition + "/fetchAPILatencyMean", _fetchAPILatencyMean.getValueAndReset()); + ret.put(_partition + "/fetchAPICallCount", _fetchAPICallCount.getValueAndReset()); + ret.put(_partition + "/fetchAPIMessageCount", _fetchAPIMessageCount.getValueAndReset()); + return ret; + } + + //returns false if it's reached the end of current batch + public EmitState next(SpoutOutputCollector collector) { + if (_waitingToEmit.isEmpty()) { + fill(); + } + while (true) { + MessageAndOffset toEmit = _waitingToEmit.pollFirst(); + if (toEmit == null) { + return EmitState.NO_EMITTED; + } + + Iterable<List<Object>> tups; + if (_spoutConfig.scheme instanceof MessageMetadataSchemeAsMultiScheme) { + tups = KafkaUtils.generateTuples((MessageMetadataSchemeAsMultiScheme) _spoutConfig.scheme, toEmit.message(), _partition, toEmit.offset()); + } else { + tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.message(), _partition.topic); + } + + if ((tups != null) && tups.iterator().hasNext()) { + if (!Strings.isNullOrEmpty(_spoutConfig.outputStreamId)) { + for (List<Object> tup : tups) { + collector.emit(_spoutConfig.topic, tup, new KafkaMessageId(_partition, toEmit.offset())); + } + } else { + for (List<Object> tup : tups) { + collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset())); + } + } + break; + } else { + ack(toEmit.offset()); + } + } + if (!_waitingToEmit.isEmpty()) { + return EmitState.EMITTED_MORE_LEFT; + } else { + return EmitState.EMITTED_END; + } + } + + + private void fill() { + long start = System.nanoTime(); + Long offset; + + // Are there failed tuples? If so, fetch those first. + offset = this._failedMsgRetryManager.nextFailedMessageToRetry(); + final boolean processingNewTuples = (offset == null); + if (processingNewTuples) { + offset = _emittedToOffset; + } + + ByteBufferMessageSet msgs = null; + try { + msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, offset); + } catch (TopicOffsetOutOfRangeException e) { + _emittedToOffset = KafkaUtils.getOffset(_consumer, _partition.topic, _partition.partition, kafka.api.OffsetRequest.EarliestTime()); + LOG.warn("{} Using new offset: {}", _partition.partition, _emittedToOffset); + // fetch failed, so don't update the metrics + + //fix bug [STORM-643] : remove outdated failed offsets + if (!processingNewTuples) { + // For the case of EarliestTime it would be better to discard + // all the failed offsets, that are earlier than actual EarliestTime + // offset, since they are anyway not there. + // These calls to broker API will be then saved. + Set<Long> omitted = this._failedMsgRetryManager.clearInvalidMessages(_emittedToOffset); + + LOG.warn("Removing the failed offsets that are out of range: {}", omitted); + } + + return; + } + long end = System.nanoTime(); + long millis = (end - start) / 1000000; + _fetchAPILatencyMax.update(millis); + _fetchAPILatencyMean.update(millis); + _fetchAPICallCount.incr(); + if (msgs != null) { + int numMessages = 0; + + for (MessageAndOffset msg : msgs) { + final Long cur_offset = msg.offset(); + if (cur_offset < offset) { + // Skip any old offsets. + continue; + } + if (processingNewTuples || this._failedMsgRetryManager.shouldRetryMsg(cur_offset)) { + numMessages += 1; + if (!_pending.containsKey(cur_offset)) { + _pending.put(cur_offset, System.currentTimeMillis()); + } + _waitingToEmit.add(msg); + _emittedToOffset = Math.max(msg.nextOffset(), _emittedToOffset); + if (_failedMsgRetryManager.shouldRetryMsg(cur_offset)) { + this._failedMsgRetryManager.retryStarted(cur_offset); + } + } + } + _fetchAPIMessageCount.incrBy(numMessages); + } + } + + public void ack(Long offset) { + if (!_pending.isEmpty() && _pending.firstKey() < offset - _spoutConfig.maxOffsetBehind) { + // Too many things pending! + _pending.headMap(offset - _spoutConfig.maxOffsetBehind).clear(); + } + _pending.remove(offset); + this._failedMsgRetryManager.acked(offset); + numberAcked++; + } + + public void fail(Long offset) { + if (offset < _emittedToOffset - _spoutConfig.maxOffsetBehind) { + LOG.info( + "Skipping failed tuple at offset=" + offset + + " because it's more than maxOffsetBehind=" + _spoutConfig.maxOffsetBehind + + " behind _emittedToOffset=" + _emittedToOffset + ); + } else { + LOG.debug("failing at offset={} with _pending.size()={} pending and _emittedToOffset={}", offset, _pending.size(), _emittedToOffset); + numberFailed++; + if (numberAcked == 0 && numberFailed > _spoutConfig.maxOffsetBehind) { + throw new RuntimeException("Too many tuple failures"); + } + + this._failedMsgRetryManager.failed(offset); + } + } + + public void commit() { + long lastCompletedOffset = lastCompletedOffset(); + if (_committedTo != lastCompletedOffset) { + LOG.debug("Writing last completed offset ({}) to ZK for {} for topology: {}", lastCompletedOffset, _partition, _topologyInstanceId); + Map<Object, Object> data = (Map<Object, Object>) ImmutableMap.builder() + .put("topology", ImmutableMap.of("id", _topologyInstanceId, + "name", _stormConf.get(Config.TOPOLOGY_NAME))) + .put("offset", lastCompletedOffset) + .put("partition", _partition.partition) + .put("broker", ImmutableMap.of("host", _partition.host.host, + "port", _partition.host.port)) + .put("topic", _partition.topic).build(); + _state.writeJSON(committedPath(), data); + + _committedTo = lastCompletedOffset; + LOG.debug("Wrote last completed offset ({}) to ZK for {} for topology: {}", lastCompletedOffset, _partition, _topologyInstanceId); + } else { + LOG.debug("No new offset for {} for topology: {}", _partition, _topologyInstanceId); + } + } + + private String committedPath() { + return _spoutConfig.zkRoot + "/" + _spoutConfig.id + "/" + _partition.getId(); + } + + public long lastCompletedOffset() { + if (_pending.isEmpty()) { + return _emittedToOffset; + } else { + return _pending.firstKey(); + } + } + + public Partition getPartition() { + return _partition; + } + + public void close() { + commit(); + _connections.unregister(_partition.host, _partition.topic , _partition.partition); + } + + static class KafkaMessageId { + public Partition partition; + public long offset; + + + public KafkaMessageId(Partition partition, long offset) { + this.partition = partition; + this.offset = offset; + } + } +}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java new file mode 100644 index 0000000..1ac41c8 --- /dev/null +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + +import java.io.Serializable; +import java.util.List; + + +public class SpoutConfig extends KafkaConfig implements Serializable { + public List<String> zkServers = null; + public Integer zkPort = null; + public String zkRoot = null; + public String id = null; + + public String outputStreamId; + + // setting for how often to save the current kafka offset to ZooKeeper + public long stateUpdateIntervalMs = 2000; + + // Exponential back-off retry settings. These are used when retrying messages after a bolt + // calls OutputCollector.fail(). + public long retryInitialDelayMs = 0; + public double retryDelayMultiplier = 1.0; + public long retryDelayMaxMs = 60 * 1000; + + public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id) { + super(hosts, topic); + this.zkRoot = zkRoot; + this.id = id; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java new file mode 100644 index 0000000..bdbc44d --- /dev/null +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + +import org.apache.storm.kafka.trident.GlobalPartitionInformation; + +import java.util.*; + + +public class StaticCoordinator implements PartitionCoordinator { + Map<Partition, PartitionManager> _managers = new HashMap<Partition, PartitionManager>(); + List<PartitionManager> _allManagers = new ArrayList(); + + public StaticCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig config, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId) { + StaticHosts hosts = (StaticHosts) config.hosts; + List<GlobalPartitionInformation> partitions = new ArrayList<GlobalPartitionInformation>(); + partitions.add(hosts.getPartitionInformation()); + List<Partition> myPartitions = KafkaUtils.calculatePartitionsForTask(partitions, totalTasks, taskIndex); + for (Partition myPartition : myPartitions) { + _managers.put(myPartition, new PartitionManager(connections, topologyInstanceId, state, stormConf, config, myPartition)); + } + _allManagers = new ArrayList(_managers.values()); + } + + @Override + public List<PartitionManager> getMyManagedPartitions() { + return _allManagers; + } + + public PartitionManager getManager(Partition partition) { + return _managers.get(partition); + } + + @Override + public void refresh() { return; } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticHosts.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticHosts.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticHosts.java new file mode 100644 index 0000000..33d5c16 --- /dev/null +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticHosts.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + +import org.apache.storm.kafka.trident.GlobalPartitionInformation; + +/** + * Date: 11/05/2013 + * Time: 14:43 + */ +public class StaticHosts implements BrokerHosts { + + + private GlobalPartitionInformation partitionInformation; + + public StaticHosts(GlobalPartitionInformation partitionInformation) { + this.partitionInformation = partitionInformation; + } + + public GlobalPartitionInformation getPartitionInformation() { + return partitionInformation; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticPartitionConnections.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticPartitionConnections.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticPartitionConnections.java new file mode 100644 index 0000000..77a7211 --- /dev/null +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticPartitionConnections.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + +import kafka.javaapi.consumer.SimpleConsumer; + +import java.util.HashMap; +import java.util.Map; + +public class StaticPartitionConnections { + Map<Integer, SimpleConsumer> _kafka = new HashMap<Integer, SimpleConsumer>(); + KafkaConfig _config; + StaticHosts hosts; + + public StaticPartitionConnections(KafkaConfig conf) { + _config = conf; + if (!(conf.hosts instanceof StaticHosts)) { + throw new RuntimeException("Must configure with static hosts"); + } + this.hosts = (StaticHosts) conf.hosts; + } + + public SimpleConsumer getConsumer(int partition) { + if (!_kafka.containsKey(partition)) { + Broker hp = hosts.getPartitionInformation().getBrokerFor(partition); + _kafka.put(partition, new SimpleConsumer(hp.host, hp.port, _config.socketTimeoutMs, _config.bufferSizeBytes, _config.clientId)); + + } + return _kafka.get(partition); + } + + public void close() { + for (SimpleConsumer consumer : _kafka.values()) { + consumer.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringKeyValueScheme.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringKeyValueScheme.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringKeyValueScheme.java new file mode 100644 index 0000000..9ef7f74 --- /dev/null +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringKeyValueScheme.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + +import org.apache.storm.tuple.Values; +import com.google.common.collect.ImmutableMap; + +import java.nio.ByteBuffer; +import java.util.List; + +public class StringKeyValueScheme extends StringScheme implements KeyValueScheme { + + @Override + public List<Object> deserializeKeyAndValue(ByteBuffer key, ByteBuffer value) { + if ( key == null ) { + return deserialize(value); + } + String keyString = StringScheme.deserializeString(key); + String valueString = StringScheme.deserializeString(value); + return new Values(ImmutableMap.of(keyString, valueString)); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringMessageAndMetadataScheme.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringMessageAndMetadataScheme.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringMessageAndMetadataScheme.java new file mode 100644 index 0000000..e57738d --- /dev/null +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringMessageAndMetadataScheme.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; + +import java.nio.ByteBuffer; +import java.util.List; + +public class StringMessageAndMetadataScheme extends StringScheme implements MessageMetadataScheme { + private static final long serialVersionUID = -5441841920447947374L; + + public static final String STRING_SCHEME_PARTITION_KEY = "partition"; + public static final String STRING_SCHEME_OFFSET = "offset"; + + @Override + public List<Object> deserializeMessageWithMetadata(ByteBuffer message, Partition partition, long offset) { + String stringMessage = StringScheme.deserializeString(message); + return new Values(stringMessage, partition.partition, offset); + } + + @Override + public Fields getOutputFields() { + return new Fields(STRING_SCHEME_KEY, STRING_SCHEME_PARTITION_KEY, STRING_SCHEME_OFFSET); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringMultiSchemeWithTopic.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringMultiSchemeWithTopic.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringMultiSchemeWithTopic.java new file mode 100644 index 0000000..d92a879 --- /dev/null +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringMultiSchemeWithTopic.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + +import org.apache.storm.spout.MultiScheme; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; +import sun.reflect.generics.reflectiveObjects.NotImplementedException; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; + +public class StringMultiSchemeWithTopic + implements MultiScheme { + public static final String STRING_SCHEME_KEY = "str"; + + public static final String TOPIC_KEY = "topic"; + + @Override + public Iterable<List<Object>> deserialize(ByteBuffer bytes) { + throw new NotImplementedException(); + } + + public Iterable<List<Object>> deserializeWithTopic(String topic, ByteBuffer bytes) { + List<Object> items = new Values(StringScheme.deserializeString(bytes), topic); + return Collections.singletonList(items); + } + + public Fields getOutputFields() { + return new Fields(STRING_SCHEME_KEY, TOPIC_KEY); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringScheme.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringScheme.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringScheme.java new file mode 100644 index 0000000..e2a2c22 --- /dev/null +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringScheme.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + +import org.apache.storm.spout.Scheme; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; + +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.List; + +public class StringScheme implements Scheme { + private static final Charset UTF8_CHARSET = StandardCharsets.UTF_8; + public static final String STRING_SCHEME_KEY = "str"; + + public List<Object> deserialize(ByteBuffer bytes) { + return new Values(deserializeString(bytes)); + } + + public static String deserializeString(ByteBuffer string) { + if (string.hasArray()) { + int base = string.arrayOffset(); + return new String(string.array(), base + string.position(), string.remaining()); + } else { + return new String(Utils.toByteArray(string), UTF8_CHARSET); + } + } + + public Fields getOutputFields() { + return new Fields(STRING_SCHEME_KEY); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/TopicOffsetOutOfRangeException.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/TopicOffsetOutOfRangeException.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/TopicOffsetOutOfRangeException.java new file mode 100644 index 0000000..8e1c98f --- /dev/null +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/TopicOffsetOutOfRangeException.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + +public class TopicOffsetOutOfRangeException extends RuntimeException { + + public TopicOffsetOutOfRangeException(String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java new file mode 100644 index 0000000..a53d566 --- /dev/null +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.storm.kafka.trident.GlobalPartitionInformation; + +import java.util.*; + +import static org.apache.storm.kafka.KafkaUtils.taskId; + +public class ZkCoordinator implements PartitionCoordinator { + public static final Logger LOG = LoggerFactory.getLogger(ZkCoordinator.class); + + SpoutConfig _spoutConfig; + int _taskIndex; + int _totalTasks; + String _topologyInstanceId; + Map<Partition, PartitionManager> _managers = new HashMap(); + List<PartitionManager> _cachedList = new ArrayList<PartitionManager>(); + Long _lastRefreshTime = null; + int _refreshFreqMs; + DynamicPartitionConnections _connections; + DynamicBrokersReader _reader; + ZkState _state; + Map _stormConf; + + public ZkCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId) { + this(connections, stormConf, spoutConfig, state, taskIndex, totalTasks, topologyInstanceId, buildReader(stormConf, spoutConfig)); + } + + public ZkCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId, DynamicBrokersReader reader) { + _spoutConfig = spoutConfig; + _connections = connections; + _taskIndex = taskIndex; + _totalTasks = totalTasks; + _topologyInstanceId = topologyInstanceId; + _stormConf = stormConf; + _state = state; + ZkHosts brokerConf = (ZkHosts) spoutConfig.hosts; + _refreshFreqMs = brokerConf.refreshFreqSecs * 1000; + _reader = reader; + } + + private static DynamicBrokersReader buildReader(Map stormConf, SpoutConfig spoutConfig) { + ZkHosts hosts = (ZkHosts) spoutConfig.hosts; + return new DynamicBrokersReader(stormConf, hosts.brokerZkStr, hosts.brokerZkPath, spoutConfig.topic); + } + + @Override + public List<PartitionManager> getMyManagedPartitions() { + if (_lastRefreshTime == null || (System.currentTimeMillis() - _lastRefreshTime) > _refreshFreqMs) { + refresh(); + _lastRefreshTime = System.currentTimeMillis(); + } + return _cachedList; + } + + @Override + public void refresh() { + try { + LOG.info(taskId(_taskIndex, _totalTasks) + "Refreshing partition manager connections"); + List<GlobalPartitionInformation> brokerInfo = _reader.getBrokerInfo(); + List<Partition> mine = KafkaUtils.calculatePartitionsForTask(brokerInfo, _totalTasks, _taskIndex); + + Set<Partition> curr = _managers.keySet(); + Set<Partition> newPartitions = new HashSet<Partition>(mine); + newPartitions.removeAll(curr); + + Set<Partition> deletedPartitions = new HashSet<Partition>(curr); + deletedPartitions.removeAll(mine); + + LOG.info(taskId(_taskIndex, _totalTasks) + "Deleted partition managers: " + deletedPartitions.toString()); + + for (Partition id : deletedPartitions) { + PartitionManager man = _managers.remove(id); + man.close(); + } + LOG.info(taskId(_taskIndex, _totalTasks) + "New partition managers: " + newPartitions.toString()); + + for (Partition id : newPartitions) { + PartitionManager man = new PartitionManager(_connections, _topologyInstanceId, _state, _stormConf, _spoutConfig, id); + _managers.put(id, man); + } + + } catch (Exception e) { + throw new RuntimeException(e); + } + _cachedList = new ArrayList<PartitionManager>(_managers.values()); + LOG.info(taskId(_taskIndex, _totalTasks) + "Finished refreshing"); + } + + @Override + public PartitionManager getManager(Partition partition) { + return _managers.get(partition); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkHosts.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkHosts.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkHosts.java new file mode 100644 index 0000000..2c2a26f --- /dev/null +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkHosts.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + + +public class ZkHosts implements BrokerHosts { + private static final String DEFAULT_ZK_PATH = "/brokers"; + + public String brokerZkStr = null; + public String brokerZkPath = null; // e.g., /kafka/brokers + public int refreshFreqSecs = 60; + + public ZkHosts(String brokerZkStr, String brokerZkPath) { + this.brokerZkStr = brokerZkStr; + this.brokerZkPath = brokerZkPath; + } + + public ZkHosts(String brokerZkStr) { + this(brokerZkStr, DEFAULT_ZK_PATH); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkState.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkState.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkState.java new file mode 100644 index 0000000..d12016b --- /dev/null +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkState.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + +import org.apache.storm.Config; +import org.apache.storm.utils.Utils; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryNTimes; +import org.apache.zookeeper.CreateMode; +import org.json.simple.JSONValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.Charset; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class ZkState { + public static final Logger LOG = LoggerFactory.getLogger(ZkState.class); + CuratorFramework _curator; + + private CuratorFramework newCurator(Map stateConf) throws Exception { + Integer port = (Integer) stateConf.get(Config.TRANSACTIONAL_ZOOKEEPER_PORT); + String serverPorts = ""; + for (String server : (List<String>) stateConf.get(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS)) { + serverPorts = serverPorts + server + ":" + port + ","; + } + return CuratorFrameworkFactory.newClient(serverPorts, + Utils.getInt(stateConf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)), + Utils.getInt(stateConf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT)), + new RetryNTimes(Utils.getInt(stateConf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)), + Utils.getInt(stateConf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL)))); + } + + public CuratorFramework getCurator() { + assert _curator != null; + return _curator; + } + + public ZkState(Map stateConf) { + stateConf = new HashMap(stateConf); + + try { + _curator = newCurator(stateConf); + _curator.start(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public void writeJSON(String path, Map<Object, Object> data) { + LOG.debug("Writing {} the data {}", path, data.toString()); + writeBytes(path, JSONValue.toJSONString(data).getBytes(Charset.forName("UTF-8"))); + } + + public void writeBytes(String path, byte[] bytes) { + try { + if (_curator.checkExists().forPath(path) == null) { + _curator.create() + .creatingParentsIfNeeded() + .withMode(CreateMode.PERSISTENT) + .forPath(path, bytes); + } else { + _curator.setData().forPath(path, bytes); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public Map<Object, Object> readJSON(String path) { + try { + byte[] b = readBytes(path); + if (b == null) { + return null; + } + return (Map<Object, Object>) JSONValue.parse(new String(b, "UTF-8")); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public byte[] readBytes(String path) { + try { + if (_curator.checkExists().forPath(path) != null) { + return _curator.getData().forPath(path); + } else { + return null; + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public void close() { + _curator.close(); + _curator = null; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java new file mode 100644 index 0000000..0ceac3a --- /dev/null +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.bolt; + +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.utils.TupleUtils; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.clients.producer.Callback; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper; +import org.apache.storm.kafka.bolt.mapper.TupleToKafkaMapper; +import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector; +import org.apache.storm.kafka.bolt.selector.KafkaTopicSelector; +import java.util.concurrent.Future; +import java.util.concurrent.ExecutionException; +import java.util.Map; +import java.util.Properties; + + +/** + * Bolt implementation that can send Tuple data to Kafka + * <p/> + * It expects the producer configuration and topic in storm config under + * <p/> + * 'kafka.broker.properties' and 'topic' + * <p/> + * respectively. + * <p/> + * This bolt uses 0.8.2 Kafka Producer API. + * <p/> + * It works for sending tuples to older Kafka version (0.8.1). + */ +public class KafkaBolt<K, V> extends BaseRichBolt { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaBolt.class); + + public static final String TOPIC = "topic"; + + private KafkaProducer<K, V> producer; + private OutputCollector collector; + private TupleToKafkaMapper<K,V> mapper; + private KafkaTopicSelector topicSelector; + private Properties boltSpecfiedProperties = new Properties(); + /** + * With default setting for fireAndForget and async, the callback is called when the sending succeeds. + * By setting fireAndForget true, the send will not wait at all for kafka to ack. + * "acks" setting in 0.8.2 Producer API config doesn't matter if fireAndForget is set. + * By setting async false, synchronous sending is used. + */ + private boolean fireAndForget = false; + private boolean async = true; + + public KafkaBolt() {} + + public KafkaBolt<K,V> withTupleToKafkaMapper(TupleToKafkaMapper<K,V> mapper) { + this.mapper = mapper; + return this; + } + + public KafkaBolt<K,V> withTopicSelector(KafkaTopicSelector selector) { + this.topicSelector = selector; + return this; + } + + public KafkaBolt<K,V> withProducerProperties(Properties producerProperties) { + this.boltSpecfiedProperties = producerProperties; + return this; + } + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + //for backward compatibility. + if(mapper == null) { + this.mapper = new FieldNameBasedTupleToKafkaMapper<K,V>(); + } + + //for backward compatibility. + if(topicSelector == null) { + this.topicSelector = new DefaultTopicSelector((String) stormConf.get(TOPIC)); + } + + producer = new KafkaProducer<>(boltSpecfiedProperties); + this.collector = collector; + } + + @Override + public void execute(final Tuple input) { + if (TupleUtils.isTick(input)) { + collector.ack(input); + return; // Do not try to send ticks to Kafka + } + K key = null; + V message = null; + String topic = null; + try { + key = mapper.getKeyFromTuple(input); + message = mapper.getMessageFromTuple(input); + topic = topicSelector.getTopic(input); + if (topic != null ) { + Callback callback = null; + + if (!fireAndForget && async) { + callback = new Callback() { + @Override + public void onCompletion(RecordMetadata ignored, Exception e) { + synchronized (collector) { + if (e != null) { + collector.reportError(e); + collector.fail(input); + } else { + collector.ack(input); + } + } + } + }; + } + Future<RecordMetadata> result = producer.send(new ProducerRecord<K, V>(topic, key, message), callback); + if (!async) { + try { + result.get(); + collector.ack(input); + } catch (ExecutionException err) { + collector.reportError(err); + collector.fail(input); + } + } else if (fireAndForget) { + collector.ack(input); + } + } else { + LOG.warn("skipping key = " + key + ", topic selector returned null."); + collector.ack(input); + } + } catch (Exception ex) { + collector.reportError(ex); + collector.fail(input); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + + } + + @Override + public void cleanup() { + producer.close(); + } + + public void setFireAndForget(boolean fireAndForget) { + this.fireAndForget = fireAndForget; + } + + public void setAsync(boolean async) { + this.async = async; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/FieldNameBasedTupleToKafkaMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/FieldNameBasedTupleToKafkaMapper.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/FieldNameBasedTupleToKafkaMapper.java new file mode 100644 index 0000000..672da8e --- /dev/null +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/FieldNameBasedTupleToKafkaMapper.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.bolt.mapper; + +import org.apache.storm.tuple.Tuple; + +public class FieldNameBasedTupleToKafkaMapper<K,V> implements TupleToKafkaMapper<K, V> { + + public static final String BOLT_KEY = "key"; + public static final String BOLT_MESSAGE = "message"; + public String boltKeyField; + public String boltMessageField; + + public FieldNameBasedTupleToKafkaMapper() { + this(BOLT_KEY, BOLT_MESSAGE); + } + + public FieldNameBasedTupleToKafkaMapper(String boltKeyField, String boltMessageField) { + this.boltKeyField = boltKeyField; + this.boltMessageField = boltMessageField; + } + + @Override + public K getKeyFromTuple(Tuple tuple) { + //for backward compatibility, we return null when key is not present. + return tuple.contains(boltKeyField) ? (K) tuple.getValueByField(boltKeyField) : null; + } + + @Override + public V getMessageFromTuple(Tuple tuple) { + return (V) tuple.getValueByField(boltMessageField); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/TupleToKafkaMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/TupleToKafkaMapper.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/TupleToKafkaMapper.java new file mode 100644 index 0000000..3890413 --- /dev/null +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/TupleToKafkaMapper.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.bolt.mapper; + +import org.apache.storm.tuple.Tuple; + +import java.io.Serializable; + +/** + * as the really verbose name suggests this interface mapps a storm tuple to kafka key and message. + * @param <K> type of key. + * @param <V> type of value. + */ +public interface TupleToKafkaMapper<K,V> extends Serializable { + K getKeyFromTuple(Tuple tuple); + V getMessageFromTuple(Tuple tuple); +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/DefaultTopicSelector.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/DefaultTopicSelector.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/DefaultTopicSelector.java new file mode 100644 index 0000000..2aafc78 --- /dev/null +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/DefaultTopicSelector.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.bolt.selector; + +import org.apache.storm.tuple.Tuple; + +public class DefaultTopicSelector implements KafkaTopicSelector { + + private final String topicName; + + public DefaultTopicSelector(final String topicName) { + this.topicName = topicName; + } + + @Override + public String getTopic(Tuple tuple) { + return topicName; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/KafkaTopicSelector.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/KafkaTopicSelector.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/KafkaTopicSelector.java new file mode 100644 index 0000000..cb7fb44 --- /dev/null +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/KafkaTopicSelector.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.bolt.selector; + +import org.apache.storm.tuple.Tuple; + +import java.io.Serializable; + +public interface KafkaTopicSelector extends Serializable { + String getTopic(Tuple tuple); +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/Coordinator.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/Coordinator.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/Coordinator.java new file mode 100644 index 0000000..baec8cb --- /dev/null +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/Coordinator.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.trident; + +import org.apache.storm.kafka.KafkaUtils; +import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout; +import org.apache.storm.trident.spout.IPartitionedTridentSpout; + +import java.util.List; +import java.util.Map; + +class Coordinator implements IPartitionedTridentSpout.Coordinator<List<GlobalPartitionInformation>>, IOpaquePartitionedTridentSpout.Coordinator<List<GlobalPartitionInformation>> { + + private IBrokerReader reader; + private TridentKafkaConfig config; + + public Coordinator(Map conf, TridentKafkaConfig tridentKafkaConfig) { + config = tridentKafkaConfig; + reader = KafkaUtils.makeBrokerReader(conf, config); + } + + @Override + public void close() { + config.coordinator.close(); + } + + @Override + public boolean isReady(long txid) { + return config.coordinator.isReady(txid); + } + + @Override + public List<GlobalPartitionInformation> getPartitionsForBatch() { + return reader.getAllBrokers(); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/DefaultCoordinator.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/DefaultCoordinator.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/DefaultCoordinator.java new file mode 100644 index 0000000..7a7e32c --- /dev/null +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/DefaultCoordinator.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.trident; + +public class DefaultCoordinator implements IBatchCoordinator { + + @Override + public boolean isReady(long txid) { + return true; + } + + @Override + public void close() { + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/GlobalPartitionInformation.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/GlobalPartitionInformation.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/GlobalPartitionInformation.java new file mode 100644 index 0000000..3108ff8 --- /dev/null +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/GlobalPartitionInformation.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.trident; + +import com.google.common.base.Objects; +import org.apache.storm.kafka.Broker; +import org.apache.storm.kafka.Partition; + +import java.io.Serializable; +import java.util.*; + + +public class GlobalPartitionInformation implements Iterable<Partition>, Serializable { + + private Map<Integer, Broker> partitionMap; + public String topic; + + //Flag to keep the Partition Path Id backward compatible with Old implementation of Partition.getId() == "partition_" + partition + private Boolean bUseTopicNameForPartitionPathId; + + public GlobalPartitionInformation(String topic, Boolean bUseTopicNameForPartitionPathId) { + this.topic = topic; + this.partitionMap = new TreeMap<Integer, Broker>(); + this.bUseTopicNameForPartitionPathId = bUseTopicNameForPartitionPathId; + } + + public GlobalPartitionInformation(String topic) { + this.topic = topic; + this.partitionMap = new TreeMap<Integer, Broker>(); + this.bUseTopicNameForPartitionPathId = false; + } + + public void addPartition(int partitionId, Broker broker) { + partitionMap.put(partitionId, broker); + } + + @Override + public String toString() { + return "GlobalPartitionInformation{" + + "topic=" + topic + + ", partitionMap=" + partitionMap + + '}'; + } + + public Broker getBrokerFor(Integer partitionId) { + return partitionMap.get(partitionId); + } + + public List<Partition> getOrderedPartitions() { + List<Partition> partitions = new LinkedList<Partition>(); + for (Map.Entry<Integer, Broker> partition : partitionMap.entrySet()) { + partitions.add(new Partition(partition.getValue(), this.topic, partition.getKey(), this.bUseTopicNameForPartitionPathId)); + } + return partitions; + } + + @Override + public Iterator<Partition> iterator() { + final Iterator<Map.Entry<Integer, Broker>> iterator = partitionMap.entrySet().iterator(); + final String topic = this.topic; + final Boolean bUseTopicNameForPartitionPathId = this.bUseTopicNameForPartitionPathId; + return new Iterator<Partition>() { + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public Partition next() { + Map.Entry<Integer, Broker> next = iterator.next(); + return new Partition(next.getValue(), topic , next.getKey(), bUseTopicNameForPartitionPathId); + } + + @Override + public void remove() { + iterator.remove(); + } + }; + } + + @Override + public int hashCode() { + return Objects.hashCode(partitionMap); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + final GlobalPartitionInformation other = (GlobalPartitionInformation) obj; + return Objects.equal(this.partitionMap, other.partitionMap); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/IBatchCoordinator.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/IBatchCoordinator.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/IBatchCoordinator.java new file mode 100644 index 0000000..41369ba --- /dev/null +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/IBatchCoordinator.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.trident; + +import java.io.Serializable; + +public interface IBatchCoordinator extends Serializable { + boolean isReady(long txid); + + void close(); +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/IBrokerReader.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/IBrokerReader.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/IBrokerReader.java new file mode 100644 index 0000000..904d8c9 --- /dev/null +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/IBrokerReader.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.trident; + +import java.util.List; +import java.util.Map; + +public interface IBrokerReader { + + GlobalPartitionInformation getBrokerForTopic(String topic); + + List<GlobalPartitionInformation> getAllBrokers(); + + void close(); +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/MaxMetric.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/MaxMetric.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/MaxMetric.java new file mode 100644 index 0000000..2332205 --- /dev/null +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/MaxMetric.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.trident; + + +import org.apache.storm.metric.api.ICombiner; + +public class MaxMetric implements ICombiner<Long> { + @Override + public Long identity() { + return null; + } + + @Override + public Long combine(Long l1, Long l2) { + if (l1 == null) { + return l2; + } + if (l2 == null) { + return l1; + } + return Math.max(l1, l2); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/OpaqueTridentKafkaSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/OpaqueTridentKafkaSpout.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/OpaqueTridentKafkaSpout.java new file mode 100644 index 0000000..f540c87 --- /dev/null +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/OpaqueTridentKafkaSpout.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.trident; + +import org.apache.storm.task.TopologyContext; +import org.apache.storm.tuple.Fields; +import org.apache.storm.kafka.Partition; +import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout; + +import java.util.List; +import java.util.Map; +import java.util.UUID; + + +public class OpaqueTridentKafkaSpout implements IOpaquePartitionedTridentSpout<List<GlobalPartitionInformation>, Partition, Map> { + + + TridentKafkaConfig _config; + + public OpaqueTridentKafkaSpout(TridentKafkaConfig config) { + _config = config; + } + + @Override + public IOpaquePartitionedTridentSpout.Emitter<List<GlobalPartitionInformation>, Partition, Map> getEmitter(Map conf, TopologyContext context) { + return new TridentKafkaEmitter(conf, context, _config, context + .getStormId()).asOpaqueEmitter(); + } + + @Override + public IOpaquePartitionedTridentSpout.Coordinator getCoordinator(Map conf, TopologyContext tc) { + return new org.apache.storm.kafka.trident.Coordinator(conf, _config); + } + + @Override + public Fields getOutputFields() { + return _config.scheme.getOutputFields(); + } + + @Override + public Map<String, Object> getComponentConfiguration() { + return null; + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/StaticBrokerReader.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/StaticBrokerReader.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/StaticBrokerReader.java new file mode 100644 index 0000000..ba27651 --- /dev/null +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/StaticBrokerReader.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.trident; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +public class StaticBrokerReader implements IBrokerReader { + + private Map<String,GlobalPartitionInformation> brokers = new TreeMap<String,GlobalPartitionInformation>(); + + public StaticBrokerReader(String topic, GlobalPartitionInformation partitionInformation) { + this.brokers.put(topic, partitionInformation); + } + + @Override + public GlobalPartitionInformation getBrokerForTopic(String topic) { + if (brokers.containsKey(topic)) return brokers.get(topic); + return null; + } + + @Override + public List<GlobalPartitionInformation> getAllBrokers () { + List<GlobalPartitionInformation> list = new ArrayList<GlobalPartitionInformation>(); + list.addAll(brokers.values()); + return list; + } + + @Override + public void close() { + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TransactionalTridentKafkaSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TransactionalTridentKafkaSpout.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TransactionalTridentKafkaSpout.java new file mode 100644 index 0000000..ac5b49f --- /dev/null +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TransactionalTridentKafkaSpout.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.trident; + +import org.apache.storm.task.TopologyContext; +import org.apache.storm.tuple.Fields; +import org.apache.storm.kafka.Partition; +import org.apache.storm.trident.spout.IPartitionedTridentSpout; + +import java.util.Map; +import java.util.UUID; + + +public class TransactionalTridentKafkaSpout implements IPartitionedTridentSpout<GlobalPartitionInformation, Partition, Map> { + + TridentKafkaConfig _config; + + public TransactionalTridentKafkaSpout(TridentKafkaConfig config) { + _config = config; + } + + + @Override + public IPartitionedTridentSpout.Coordinator getCoordinator(Map conf, TopologyContext context) { + return new org.apache.storm.kafka.trident.Coordinator(conf, _config); + } + + @Override + public IPartitionedTridentSpout.Emitter getEmitter(Map conf, TopologyContext context) { + return new TridentKafkaEmitter(conf, context, _config, context + .getStormId()).asTransactionalEmitter(); + } + + @Override + public Fields getOutputFields() { + return _config.scheme.getOutputFields(); + } + + @Override + public Map<String, Object> getComponentConfiguration() { + return null; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaConfig.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaConfig.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaConfig.java new file mode 100644 index 0000000..b225e9a --- /dev/null +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaConfig.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.trident; + +import org.apache.storm.kafka.BrokerHosts; +import org.apache.storm.kafka.KafkaConfig; + + +public class TridentKafkaConfig extends KafkaConfig { + + + public final IBatchCoordinator coordinator = new DefaultCoordinator(); + + public TridentKafkaConfig(BrokerHosts hosts, String topic) { + super(hosts, topic); + } + + public TridentKafkaConfig(BrokerHosts hosts, String topic, String clientId) { + super(hosts, topic, clientId); + } + +}
