http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java deleted file mode 100644 index 52cdde8..0000000 --- a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java +++ /dev/null @@ -1,275 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka; - -import backtype.storm.metric.api.IMetric; -import backtype.storm.utils.Utils; -import com.google.common.base.Preconditions; -import kafka.api.FetchRequest; -import kafka.api.FetchRequestBuilder; -import kafka.api.PartitionOffsetRequestInfo; -import kafka.common.TopicAndPartition; -import kafka.javaapi.FetchResponse; -import kafka.javaapi.OffsetRequest; -import kafka.javaapi.consumer.SimpleConsumer; -import kafka.javaapi.message.ByteBufferMessageSet; -import kafka.message.Message; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import storm.kafka.trident.GlobalPartitionInformation; -import storm.kafka.trident.IBrokerReader; -import storm.kafka.trident.StaticBrokerReader; -import storm.kafka.trident.ZkBrokerReader; - -import java.io.IOException; -import java.net.ConnectException; -import java.net.SocketTimeoutException; -import java.nio.ByteBuffer; -import java.nio.channels.UnresolvedAddressException; -import java.util.*; - - -public class KafkaUtils { - - public static final Logger LOG = LoggerFactory.getLogger(KafkaUtils.class); - private static final int NO_OFFSET = -5; - - - public static IBrokerReader makeBrokerReader(Map stormConf, KafkaConfig conf) { - if (conf.hosts instanceof StaticHosts) { - return new StaticBrokerReader(conf.topic, ((StaticHosts) conf.hosts).getPartitionInformation()); - } else { - return new ZkBrokerReader(stormConf, conf.topic, (ZkHosts) conf.hosts); - } - } - - - public static long getOffset(SimpleConsumer consumer, String topic, int partition, KafkaConfig config) { - long startOffsetTime = config.startOffsetTime; - return getOffset(consumer, topic, partition, startOffsetTime); - } - - public static long getOffset(SimpleConsumer consumer, String topic, int partition, long startOffsetTime) { - TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition); - Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>(); - requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(startOffsetTime, 1)); - OffsetRequest request = new OffsetRequest( - requestInfo, kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId()); - - long[] offsets = consumer.getOffsetsBefore(request).offsets(topic, partition); - if (offsets.length > 0) { - return offsets[0]; - } else { - return NO_OFFSET; - } - } - - public static class KafkaOffsetMetric implements IMetric { - Map<Partition, Long> _partitionToOffset = new HashMap<Partition, Long>(); - Set<Partition> _partitions; - DynamicPartitionConnections _connections; - - public KafkaOffsetMetric(DynamicPartitionConnections connections) { - _connections = connections; - } - - public void setLatestEmittedOffset(Partition partition, long offset) { - _partitionToOffset.put(partition, offset); - } - - private class TopicMetrics { - long totalSpoutLag = 0; - long totalEarliestTimeOffset = 0; - long totalLatestTimeOffset = 0; - long totalLatestEmittedOffset = 0; - } - - @Override - public Object getValueAndReset() { - try { - HashMap ret = new HashMap(); - if (_partitions != null && _partitions.size() == _partitionToOffset.size()) { - Map<String,TopicMetrics> topicMetricsMap = new TreeMap<String, TopicMetrics>(); - for (Map.Entry<Partition, Long> e : _partitionToOffset.entrySet()) { - Partition partition = e.getKey(); - SimpleConsumer consumer = _connections.getConnection(partition); - if (consumer == null) { - LOG.warn("partitionToOffset contains partition not found in _connections. Stale partition data?"); - return null; - } - long latestTimeOffset = getOffset(consumer, partition.topic, partition.partition, kafka.api.OffsetRequest.LatestTime()); - long earliestTimeOffset = getOffset(consumer, partition.topic, partition.partition, kafka.api.OffsetRequest.EarliestTime()); - if (latestTimeOffset == KafkaUtils.NO_OFFSET) { - LOG.warn("No data found in Kafka Partition " + partition.getId()); - return null; - } - long latestEmittedOffset = e.getValue(); - long spoutLag = latestTimeOffset - latestEmittedOffset; - String topic = partition.topic; - String metricPath = partition.getId(); - //Handle the case where Partition Path Id does not contain topic name Partition.getId() == "partition_" + partition - if (!metricPath.startsWith(topic + "/")) { - metricPath = topic + "/" + metricPath; - } - ret.put(metricPath + "/" + "spoutLag", spoutLag); - ret.put(metricPath + "/" + "earliestTimeOffset", earliestTimeOffset); - ret.put(metricPath + "/" + "latestTimeOffset", latestTimeOffset); - ret.put(metricPath + "/" + "latestEmittedOffset", latestEmittedOffset); - - if (!topicMetricsMap.containsKey(partition.topic)) { - topicMetricsMap.put(partition.topic,new TopicMetrics()); - } - - TopicMetrics topicMetrics = topicMetricsMap.get(partition.topic); - topicMetrics.totalSpoutLag += spoutLag; - topicMetrics.totalEarliestTimeOffset += earliestTimeOffset; - topicMetrics.totalLatestTimeOffset += latestTimeOffset; - topicMetrics.totalLatestEmittedOffset += latestEmittedOffset; - } - - for(Map.Entry<String, TopicMetrics> e : topicMetricsMap.entrySet()) { - String topic = e.getKey(); - TopicMetrics topicMetrics = e.getValue(); - ret.put(topic + "/" + "totalSpoutLag", topicMetrics.totalSpoutLag); - ret.put(topic + "/" + "totalEarliestTimeOffset", topicMetrics.totalEarliestTimeOffset); - ret.put(topic + "/" + "totalLatestTimeOffset", topicMetrics.totalLatestTimeOffset); - ret.put(topic + "/" + "totalLatestEmittedOffset", topicMetrics.totalLatestEmittedOffset); - } - - return ret; - } else { - LOG.info("Metrics Tick: Not enough data to calculate spout lag."); - } - } catch (Throwable t) { - LOG.warn("Metrics Tick: Exception when computing kafkaOffset metric.", t); - } - return null; - } - - public void refreshPartitions(Set<Partition> partitions) { - _partitions = partitions; - Iterator<Partition> it = _partitionToOffset.keySet().iterator(); - while (it.hasNext()) { - if (!partitions.contains(it.next())) { - it.remove(); - } - } - } - } - - public static ByteBufferMessageSet fetchMessages(KafkaConfig config, SimpleConsumer consumer, Partition partition, long offset) - throws TopicOffsetOutOfRangeException, FailedFetchException,RuntimeException { - ByteBufferMessageSet msgs = null; - String topic = partition.topic; - int partitionId = partition.partition; - FetchRequestBuilder builder = new FetchRequestBuilder(); - FetchRequest fetchRequest = builder.addFetch(topic, partitionId, offset, config.fetchSizeBytes). - clientId(config.clientId).maxWait(config.fetchMaxWait).build(); - FetchResponse fetchResponse; - try { - fetchResponse = consumer.fetch(fetchRequest); - } catch (Exception e) { - if (e instanceof ConnectException || - e instanceof SocketTimeoutException || - e instanceof IOException || - e instanceof UnresolvedAddressException - ) { - LOG.warn("Network error when fetching messages:", e); - throw new FailedFetchException(e); - } else { - throw new RuntimeException(e); - } - } - if (fetchResponse.hasError()) { - KafkaError error = KafkaError.getError(fetchResponse.errorCode(topic, partitionId)); - if (error.equals(KafkaError.OFFSET_OUT_OF_RANGE) && config.useStartOffsetTimeIfOffsetOutOfRange) { - String msg = partition + " Got fetch request with offset out of range: [" + offset + "]"; - LOG.warn(msg); - throw new TopicOffsetOutOfRangeException(msg); - } else { - String message = "Error fetching data from [" + partition + "] for topic [" + topic + "]: [" + error + "]"; - LOG.error(message); - throw new FailedFetchException(message); - } - } else { - msgs = fetchResponse.messageSet(topic, partitionId); - } - return msgs; - } - - - public static Iterable<List<Object>> generateTuples(KafkaConfig kafkaConfig, Message msg, String topic) { - Iterable<List<Object>> tups; - ByteBuffer payload = msg.payload(); - if (payload == null) { - return null; - } - ByteBuffer key = msg.key(); - if (key != null && kafkaConfig.scheme instanceof KeyValueSchemeAsMultiScheme) { - tups = ((KeyValueSchemeAsMultiScheme) kafkaConfig.scheme).deserializeKeyAndValue(key, payload); - } else { - if (kafkaConfig.scheme instanceof StringMultiSchemeWithTopic) { - tups = ((StringMultiSchemeWithTopic)kafkaConfig.scheme).deserializeWithTopic(topic, payload); - } else { - tups = kafkaConfig.scheme.deserialize(payload); - } - } - return tups; - } - - public static Iterable<List<Object>> generateTuples(MessageMetadataSchemeAsMultiScheme scheme, Message msg, Partition partition, long offset) { - ByteBuffer payload = msg.payload(); - if (payload == null) { - return null; - } - return scheme.deserializeMessageWithMetadata(payload, partition, offset); - } - - - public static List<Partition> calculatePartitionsForTask(List<GlobalPartitionInformation> partitons, int totalTasks, int taskIndex) { - Preconditions.checkArgument(taskIndex < totalTasks, "task index must be less that total tasks"); - List<Partition> taskPartitions = new ArrayList<Partition>(); - List<Partition> partitions = new ArrayList<Partition>(); - for(GlobalPartitionInformation partitionInformation : partitons) { - partitions.addAll(partitionInformation.getOrderedPartitions()); - } - int numPartitions = partitions.size(); - if (numPartitions < totalTasks) { - LOG.warn("there are more tasks than partitions (tasks: " + totalTasks + "; partitions: " + numPartitions + "), some tasks will be idle"); - } - for (int i = taskIndex; i < numPartitions; i += totalTasks) { - Partition taskPartition = partitions.get(i); - taskPartitions.add(taskPartition); - } - logPartitionMapping(totalTasks, taskIndex, taskPartitions); - return taskPartitions; - } - - private static void logPartitionMapping(int totalTasks, int taskIndex, List<Partition> taskPartitions) { - String taskPrefix = taskId(taskIndex, totalTasks); - if (taskPartitions.isEmpty()) { - LOG.warn(taskPrefix + "no partitions assigned"); - } else { - LOG.info(taskPrefix + "assigned " + taskPartitions); - } - } - - public static String taskId(int taskIndex, int totalTasks) { - return "Task [" + (taskIndex + 1) + "/" + totalTasks + "] "; - } -}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/KeyValueScheme.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/KeyValueScheme.java b/external/storm-kafka/src/jvm/storm/kafka/KeyValueScheme.java deleted file mode 100644 index 7c0dc6c..0000000 --- a/external/storm-kafka/src/jvm/storm/kafka/KeyValueScheme.java +++ /dev/null @@ -1,27 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka; - -import backtype.storm.spout.Scheme; - -import java.nio.ByteBuffer; -import java.util.List; - -public interface KeyValueScheme extends Scheme { - List<Object> deserializeKeyAndValue(ByteBuffer key, ByteBuffer value); -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/KeyValueSchemeAsMultiScheme.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/KeyValueSchemeAsMultiScheme.java b/external/storm-kafka/src/jvm/storm/kafka/KeyValueSchemeAsMultiScheme.java deleted file mode 100644 index d27ae7e..0000000 --- a/external/storm-kafka/src/jvm/storm/kafka/KeyValueSchemeAsMultiScheme.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka; - -import backtype.storm.spout.SchemeAsMultiScheme; - -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.List; - -public class KeyValueSchemeAsMultiScheme extends SchemeAsMultiScheme { - - public KeyValueSchemeAsMultiScheme(KeyValueScheme scheme) { - super(scheme); - } - - public Iterable<List<Object>> deserializeKeyAndValue(final ByteBuffer key, final ByteBuffer value) { - List<Object> o = ((KeyValueScheme)scheme).deserializeKeyAndValue(key, value); - if(o == null) return null; - else return Arrays.asList(o); - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataScheme.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataScheme.java b/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataScheme.java deleted file mode 100644 index 62f652f..0000000 --- a/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataScheme.java +++ /dev/null @@ -1,27 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka; - -import backtype.storm.spout.Scheme; - -import java.nio.ByteBuffer; -import java.util.List; - -public interface MessageMetadataScheme extends Scheme { - List<Object> deserializeMessageWithMetadata(ByteBuffer message, Partition partition, long offset); -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataSchemeAsMultiScheme.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataSchemeAsMultiScheme.java b/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataSchemeAsMultiScheme.java deleted file mode 100644 index f23a101..0000000 --- a/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataSchemeAsMultiScheme.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka; - -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.List; - -import backtype.storm.spout.SchemeAsMultiScheme; - -public class MessageMetadataSchemeAsMultiScheme extends SchemeAsMultiScheme { - private static final long serialVersionUID = -7172403703813625116L; - - public MessageMetadataSchemeAsMultiScheme(MessageMetadataScheme scheme) { - super(scheme); - } - - public Iterable<List<Object>> deserializeMessageWithMetadata(ByteBuffer message, Partition partition, long offset) { - List<Object> o = ((MessageMetadataScheme) scheme).deserializeMessageWithMetadata(message, partition, offset); - if (o == null) { - return null; - } else { - return Arrays.asList(o); - } - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/Partition.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/Partition.java b/external/storm-kafka/src/jvm/storm/kafka/Partition.java deleted file mode 100644 index 5f683ef..0000000 --- a/external/storm-kafka/src/jvm/storm/kafka/Partition.java +++ /dev/null @@ -1,87 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka; - -import com.google.common.base.Objects; -import storm.trident.spout.ISpoutPartition; - - -public class Partition implements ISpoutPartition { - - public Broker host; - public int partition; - public String topic; - - //Flag to keep the Partition Path Id backward compatible with Old implementation of Partition.getId() == "partition_" + partition - private Boolean bUseTopicNameForPartitionPathId; - - // for kryo compatibility - private Partition() { - - } - public Partition(Broker host, String topic, int partition) { - this.topic = topic; - this.host = host; - this.partition = partition; - this.bUseTopicNameForPartitionPathId = false; - } - - public Partition(Broker host, String topic, int partition,Boolean bUseTopicNameForPartitionPathId) { - this.topic = topic; - this.host = host; - this.partition = partition; - this.bUseTopicNameForPartitionPathId = bUseTopicNameForPartitionPathId; - } - - @Override - public int hashCode() { - return Objects.hashCode(host, topic, partition); - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (obj == null || getClass() != obj.getClass()) { - return false; - } - final Partition other = (Partition) obj; - return Objects.equal(this.host, other.host) && Objects.equal(this.topic, other.topic) && Objects.equal(this.partition, other.partition); - } - - @Override - public String toString() { - return "Partition{" + - "host=" + host + - ", topic=" + topic + - ", partition=" + partition + - '}'; - } - - @Override - public String getId() { - if (bUseTopicNameForPartitionPathId) { - return topic + "/partition_" + partition; - } else { - //Keep the Partition Id backward compatible with Old implementation of Partition.getId() == "partition_" + partition - return "partition_" + partition; - } - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/PartitionCoordinator.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/PartitionCoordinator.java b/external/storm-kafka/src/jvm/storm/kafka/PartitionCoordinator.java deleted file mode 100644 index 9cfed60..0000000 --- a/external/storm-kafka/src/jvm/storm/kafka/PartitionCoordinator.java +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka; - -import java.util.List; - -public interface PartitionCoordinator { - List<PartitionManager> getMyManagedPartitions(); - - PartitionManager getManager(Partition partition); - - void refresh(); -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java deleted file mode 100644 index ff02e22..0000000 --- a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java +++ /dev/null @@ -1,316 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka; - -import backtype.storm.Config; -import backtype.storm.metric.api.CombinedMetric; -import backtype.storm.metric.api.CountMetric; -import backtype.storm.metric.api.MeanReducer; -import backtype.storm.metric.api.ReducedMetric; -import backtype.storm.spout.SpoutOutputCollector; - -import com.google.common.base.Strings; -import com.google.common.collect.ImmutableMap; - -import kafka.javaapi.consumer.SimpleConsumer; -import kafka.javaapi.message.ByteBufferMessageSet; -import kafka.message.MessageAndOffset; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import storm.kafka.KafkaSpout.EmitState; -import storm.kafka.trident.MaxMetric; - -import java.util.*; - -public class PartitionManager { - public static final Logger LOG = LoggerFactory.getLogger(PartitionManager.class); - - private final CombinedMetric _fetchAPILatencyMax; - private final ReducedMetric _fetchAPILatencyMean; - private final CountMetric _fetchAPICallCount; - private final CountMetric _fetchAPIMessageCount; - Long _emittedToOffset; - // _pending key = Kafka offset, value = time at which the message was first submitted to the topology - private SortedMap<Long,Long> _pending = new TreeMap<Long,Long>(); - private final FailedMsgRetryManager _failedMsgRetryManager; - - // retryRecords key = Kafka offset, value = retry info for the given message - Long _committedTo; - LinkedList<MessageAndOffset> _waitingToEmit = new LinkedList<MessageAndOffset>(); - Partition _partition; - SpoutConfig _spoutConfig; - String _topologyInstanceId; - SimpleConsumer _consumer; - DynamicPartitionConnections _connections; - ZkState _state; - Map _stormConf; - long numberFailed, numberAcked; - public PartitionManager(DynamicPartitionConnections connections, String topologyInstanceId, ZkState state, Map stormConf, SpoutConfig spoutConfig, Partition id) { - _partition = id; - _connections = connections; - _spoutConfig = spoutConfig; - _topologyInstanceId = topologyInstanceId; - _consumer = connections.register(id.host, id.topic, id.partition); - _state = state; - _stormConf = stormConf; - numberAcked = numberFailed = 0; - - _failedMsgRetryManager = new ExponentialBackoffMsgRetryManager(_spoutConfig.retryInitialDelayMs, - _spoutConfig.retryDelayMultiplier, - _spoutConfig.retryDelayMaxMs); - - String jsonTopologyId = null; - Long jsonOffset = null; - String path = committedPath(); - try { - Map<Object, Object> json = _state.readJSON(path); - LOG.info("Read partition information from: " + path + " --> " + json ); - if (json != null) { - jsonTopologyId = (String) ((Map<Object, Object>) json.get("topology")).get("id"); - jsonOffset = (Long) json.get("offset"); - } - } catch (Throwable e) { - LOG.warn("Error reading and/or parsing at ZkNode: " + path, e); - } - - String topic = _partition.topic; - Long currentOffset = KafkaUtils.getOffset(_consumer, topic, id.partition, spoutConfig); - - if (jsonTopologyId == null || jsonOffset == null) { // failed to parse JSON? - _committedTo = currentOffset; - LOG.info("No partition information found, using configuration to determine offset"); - } else if (!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.ignoreZkOffsets) { - _committedTo = KafkaUtils.getOffset(_consumer, topic, id.partition, spoutConfig.startOffsetTime); - LOG.info("Topology change detected and ignore zookeeper offsets set to true, using configuration to determine offset"); - } else { - _committedTo = jsonOffset; - LOG.info("Read last commit offset from zookeeper: " + _committedTo + "; old topology_id: " + jsonTopologyId + " - new topology_id: " + topologyInstanceId ); - } - - if (currentOffset - _committedTo > spoutConfig.maxOffsetBehind || _committedTo <= 0) { - LOG.info("Last commit offset from zookeeper: " + _committedTo); - Long lastCommittedOffset = _committedTo; - _committedTo = currentOffset; - LOG.info("Commit offset " + lastCommittedOffset + " is more than " + - spoutConfig.maxOffsetBehind + " behind latest offset " + currentOffset + ", resetting to startOffsetTime=" + spoutConfig.startOffsetTime); - } - - LOG.info("Starting Kafka " + _consumer.host() + ":" + id.partition + " from offset " + _committedTo); - _emittedToOffset = _committedTo; - - _fetchAPILatencyMax = new CombinedMetric(new MaxMetric()); - _fetchAPILatencyMean = new ReducedMetric(new MeanReducer()); - _fetchAPICallCount = new CountMetric(); - _fetchAPIMessageCount = new CountMetric(); - } - - public Map getMetricsDataMap() { - Map ret = new HashMap(); - ret.put(_partition + "/fetchAPILatencyMax", _fetchAPILatencyMax.getValueAndReset()); - ret.put(_partition + "/fetchAPILatencyMean", _fetchAPILatencyMean.getValueAndReset()); - ret.put(_partition + "/fetchAPICallCount", _fetchAPICallCount.getValueAndReset()); - ret.put(_partition + "/fetchAPIMessageCount", _fetchAPIMessageCount.getValueAndReset()); - return ret; - } - - //returns false if it's reached the end of current batch - public EmitState next(SpoutOutputCollector collector) { - if (_waitingToEmit.isEmpty()) { - fill(); - } - while (true) { - MessageAndOffset toEmit = _waitingToEmit.pollFirst(); - if (toEmit == null) { - return EmitState.NO_EMITTED; - } - - Iterable<List<Object>> tups; - if (_spoutConfig.scheme instanceof MessageMetadataSchemeAsMultiScheme) { - tups = KafkaUtils.generateTuples((MessageMetadataSchemeAsMultiScheme) _spoutConfig.scheme, toEmit.message(), _partition, toEmit.offset()); - } else { - tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.message(), _partition.topic); - } - - if ((tups != null) && tups.iterator().hasNext()) { - if (!Strings.isNullOrEmpty(_spoutConfig.outputStreamId)) { - for (List<Object> tup : tups) { - collector.emit(_spoutConfig.topic, tup, new KafkaMessageId(_partition, toEmit.offset())); - } - } else { - for (List<Object> tup : tups) { - collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset())); - } - } - break; - } else { - ack(toEmit.offset()); - } - } - if (!_waitingToEmit.isEmpty()) { - return EmitState.EMITTED_MORE_LEFT; - } else { - return EmitState.EMITTED_END; - } - } - - - private void fill() { - long start = System.nanoTime(); - Long offset; - - // Are there failed tuples? If so, fetch those first. - offset = this._failedMsgRetryManager.nextFailedMessageToRetry(); - final boolean processingNewTuples = (offset == null); - if (processingNewTuples) { - offset = _emittedToOffset; - } - - ByteBufferMessageSet msgs = null; - try { - msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, offset); - } catch (TopicOffsetOutOfRangeException e) { - _emittedToOffset = KafkaUtils.getOffset(_consumer, _partition.topic, _partition.partition, kafka.api.OffsetRequest.EarliestTime()); - LOG.warn("{} Using new offset: {}", _partition.partition, _emittedToOffset); - // fetch failed, so don't update the metrics - - //fix bug [STORM-643] : remove outdated failed offsets - if (!processingNewTuples) { - // For the case of EarliestTime it would be better to discard - // all the failed offsets, that are earlier than actual EarliestTime - // offset, since they are anyway not there. - // These calls to broker API will be then saved. - Set<Long> omitted = this._failedMsgRetryManager.clearInvalidMessages(_emittedToOffset); - - LOG.warn("Removing the failed offsets that are out of range: {}", omitted); - } - - return; - } - long end = System.nanoTime(); - long millis = (end - start) / 1000000; - _fetchAPILatencyMax.update(millis); - _fetchAPILatencyMean.update(millis); - _fetchAPICallCount.incr(); - if (msgs != null) { - int numMessages = 0; - - for (MessageAndOffset msg : msgs) { - final Long cur_offset = msg.offset(); - if (cur_offset < offset) { - // Skip any old offsets. - continue; - } - if (processingNewTuples || this._failedMsgRetryManager.shouldRetryMsg(cur_offset)) { - numMessages += 1; - if (!_pending.containsKey(cur_offset)) { - _pending.put(cur_offset, System.currentTimeMillis()); - } - _waitingToEmit.add(msg); - _emittedToOffset = Math.max(msg.nextOffset(), _emittedToOffset); - if (_failedMsgRetryManager.shouldRetryMsg(cur_offset)) { - this._failedMsgRetryManager.retryStarted(cur_offset); - } - } - } - _fetchAPIMessageCount.incrBy(numMessages); - } - } - - public void ack(Long offset) { - if (!_pending.isEmpty() && _pending.firstKey() < offset - _spoutConfig.maxOffsetBehind) { - // Too many things pending! - _pending.headMap(offset - _spoutConfig.maxOffsetBehind).clear(); - } - _pending.remove(offset); - this._failedMsgRetryManager.acked(offset); - numberAcked++; - } - - public void fail(Long offset) { - if (offset < _emittedToOffset - _spoutConfig.maxOffsetBehind) { - LOG.info( - "Skipping failed tuple at offset=" + offset + - " because it's more than maxOffsetBehind=" + _spoutConfig.maxOffsetBehind + - " behind _emittedToOffset=" + _emittedToOffset - ); - } else { - LOG.debug("failing at offset={} with _pending.size()={} pending and _emittedToOffset={}", offset, _pending.size(), _emittedToOffset); - numberFailed++; - if (numberAcked == 0 && numberFailed > _spoutConfig.maxOffsetBehind) { - throw new RuntimeException("Too many tuple failures"); - } - - this._failedMsgRetryManager.failed(offset); - } - } - - public void commit() { - long lastCompletedOffset = lastCompletedOffset(); - if (_committedTo != lastCompletedOffset) { - LOG.debug("Writing last completed offset ({}) to ZK for {} for topology: {}", lastCompletedOffset, _partition, _topologyInstanceId); - Map<Object, Object> data = (Map<Object, Object>) ImmutableMap.builder() - .put("topology", ImmutableMap.of("id", _topologyInstanceId, - "name", _stormConf.get(Config.TOPOLOGY_NAME))) - .put("offset", lastCompletedOffset) - .put("partition", _partition.partition) - .put("broker", ImmutableMap.of("host", _partition.host.host, - "port", _partition.host.port)) - .put("topic", _partition.topic).build(); - _state.writeJSON(committedPath(), data); - - _committedTo = lastCompletedOffset; - LOG.debug("Wrote last completed offset ({}) to ZK for {} for topology: {}", lastCompletedOffset, _partition, _topologyInstanceId); - } else { - LOG.debug("No new offset for {} for topology: {}", _partition, _topologyInstanceId); - } - } - - private String committedPath() { - return _spoutConfig.zkRoot + "/" + _spoutConfig.id + "/" + _partition.getId(); - } - - public long lastCompletedOffset() { - if (_pending.isEmpty()) { - return _emittedToOffset; - } else { - return _pending.firstKey(); - } - } - - public Partition getPartition() { - return _partition; - } - - public void close() { - commit(); - _connections.unregister(_partition.host, _partition.topic , _partition.partition); - } - - static class KafkaMessageId { - public Partition partition; - public long offset; - - - public KafkaMessageId(Partition partition, long offset) { - this.partition = partition; - this.offset = offset; - } - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java b/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java deleted file mode 100644 index d125ebb..0000000 --- a/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka; - -import java.io.Serializable; -import java.util.List; - - -public class SpoutConfig extends KafkaConfig implements Serializable { - public List<String> zkServers = null; - public Integer zkPort = null; - public String zkRoot = null; - public String id = null; - - public String outputStreamId; - - // setting for how often to save the current kafka offset to ZooKeeper - public long stateUpdateIntervalMs = 2000; - - // Exponential back-off retry settings. These are used when retrying messages after a bolt - // calls OutputCollector.fail(). - public long retryInitialDelayMs = 0; - public double retryDelayMultiplier = 1.0; - public long retryDelayMaxMs = 60 * 1000; - - public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id) { - super(hosts, topic); - this.zkRoot = zkRoot; - this.id = id; - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/StaticCoordinator.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/StaticCoordinator.java b/external/storm-kafka/src/jvm/storm/kafka/StaticCoordinator.java deleted file mode 100644 index 4b20d84..0000000 --- a/external/storm-kafka/src/jvm/storm/kafka/StaticCoordinator.java +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka; - -import storm.kafka.trident.GlobalPartitionInformation; - -import java.util.*; - - -public class StaticCoordinator implements PartitionCoordinator { - Map<Partition, PartitionManager> _managers = new HashMap<Partition, PartitionManager>(); - List<PartitionManager> _allManagers = new ArrayList(); - - public StaticCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig config, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId) { - StaticHosts hosts = (StaticHosts) config.hosts; - List<GlobalPartitionInformation> partitions = new ArrayList<GlobalPartitionInformation>(); - partitions.add(hosts.getPartitionInformation()); - List<Partition> myPartitions = KafkaUtils.calculatePartitionsForTask(partitions, totalTasks, taskIndex); - for (Partition myPartition : myPartitions) { - _managers.put(myPartition, new PartitionManager(connections, topologyInstanceId, state, stormConf, config, myPartition)); - } - _allManagers = new ArrayList(_managers.values()); - } - - @Override - public List<PartitionManager> getMyManagedPartitions() { - return _allManagers; - } - - public PartitionManager getManager(Partition partition) { - return _managers.get(partition); - } - - @Override - public void refresh() { return; } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/StaticHosts.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/StaticHosts.java b/external/storm-kafka/src/jvm/storm/kafka/StaticHosts.java deleted file mode 100644 index bee7118..0000000 --- a/external/storm-kafka/src/jvm/storm/kafka/StaticHosts.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka; - -import storm.kafka.trident.GlobalPartitionInformation; - -/** - * Date: 11/05/2013 - * Time: 14:43 - */ -public class StaticHosts implements BrokerHosts { - - - private GlobalPartitionInformation partitionInformation; - - public StaticHosts(GlobalPartitionInformation partitionInformation) { - this.partitionInformation = partitionInformation; - } - - public GlobalPartitionInformation getPartitionInformation() { - return partitionInformation; - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/StaticPartitionConnections.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/StaticPartitionConnections.java b/external/storm-kafka/src/jvm/storm/kafka/StaticPartitionConnections.java deleted file mode 100644 index 1353b6c..0000000 --- a/external/storm-kafka/src/jvm/storm/kafka/StaticPartitionConnections.java +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka; - -import kafka.javaapi.consumer.SimpleConsumer; - -import java.util.HashMap; -import java.util.Map; - -public class StaticPartitionConnections { - Map<Integer, SimpleConsumer> _kafka = new HashMap<Integer, SimpleConsumer>(); - KafkaConfig _config; - StaticHosts hosts; - - public StaticPartitionConnections(KafkaConfig conf) { - _config = conf; - if (!(conf.hosts instanceof StaticHosts)) { - throw new RuntimeException("Must configure with static hosts"); - } - this.hosts = (StaticHosts) conf.hosts; - } - - public SimpleConsumer getConsumer(int partition) { - if (!_kafka.containsKey(partition)) { - Broker hp = hosts.getPartitionInformation().getBrokerFor(partition); - _kafka.put(partition, new SimpleConsumer(hp.host, hp.port, _config.socketTimeoutMs, _config.bufferSizeBytes, _config.clientId)); - - } - return _kafka.get(partition); - } - - public void close() { - for (SimpleConsumer consumer : _kafka.values()) { - consumer.close(); - } - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/StringKeyValueScheme.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/StringKeyValueScheme.java b/external/storm-kafka/src/jvm/storm/kafka/StringKeyValueScheme.java deleted file mode 100644 index 6f6d339..0000000 --- a/external/storm-kafka/src/jvm/storm/kafka/StringKeyValueScheme.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka; - -import backtype.storm.tuple.Values; -import com.google.common.collect.ImmutableMap; - -import java.nio.ByteBuffer; -import java.util.List; - -public class StringKeyValueScheme extends StringScheme implements KeyValueScheme { - - @Override - public List<Object> deserializeKeyAndValue(ByteBuffer key, ByteBuffer value) { - if ( key == null ) { - return deserialize(value); - } - String keyString = StringScheme.deserializeString(key); - String valueString = StringScheme.deserializeString(value); - return new Values(ImmutableMap.of(keyString, valueString)); - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/StringMessageAndMetadataScheme.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/StringMessageAndMetadataScheme.java b/external/storm-kafka/src/jvm/storm/kafka/StringMessageAndMetadataScheme.java deleted file mode 100644 index 1708b97..0000000 --- a/external/storm-kafka/src/jvm/storm/kafka/StringMessageAndMetadataScheme.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka; - -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Values; - -import java.nio.ByteBuffer; -import java.util.List; - -public class StringMessageAndMetadataScheme extends StringScheme implements MessageMetadataScheme { - private static final long serialVersionUID = -5441841920447947374L; - - public static final String STRING_SCHEME_PARTITION_KEY = "partition"; - public static final String STRING_SCHEME_OFFSET = "offset"; - - @Override - public List<Object> deserializeMessageWithMetadata(ByteBuffer message, Partition partition, long offset) { - String stringMessage = StringScheme.deserializeString(message); - return new Values(stringMessage, partition.partition, offset); - } - - @Override - public Fields getOutputFields() { - return new Fields(STRING_SCHEME_KEY, STRING_SCHEME_PARTITION_KEY, STRING_SCHEME_OFFSET); - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/StringMultiSchemeWithTopic.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/StringMultiSchemeWithTopic.java b/external/storm-kafka/src/jvm/storm/kafka/StringMultiSchemeWithTopic.java deleted file mode 100644 index 1e7f216..0000000 --- a/external/storm-kafka/src/jvm/storm/kafka/StringMultiSchemeWithTopic.java +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka; - -import backtype.storm.spout.MultiScheme; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Values; -import sun.reflect.generics.reflectiveObjects.NotImplementedException; - -import java.nio.ByteBuffer; -import java.util.Collections; -import java.util.List; - -public class StringMultiSchemeWithTopic - implements MultiScheme { - public static final String STRING_SCHEME_KEY = "str"; - - public static final String TOPIC_KEY = "topic"; - - @Override - public Iterable<List<Object>> deserialize(ByteBuffer bytes) { - throw new NotImplementedException(); - } - - public Iterable<List<Object>> deserializeWithTopic(String topic, ByteBuffer bytes) { - List<Object> items = new Values(StringScheme.deserializeString(bytes), topic); - return Collections.singletonList(items); - } - - public Fields getOutputFields() { - return new Fields(STRING_SCHEME_KEY, TOPIC_KEY); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/StringScheme.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/StringScheme.java b/external/storm-kafka/src/jvm/storm/kafka/StringScheme.java deleted file mode 100644 index 1071e60..0000000 --- a/external/storm-kafka/src/jvm/storm/kafka/StringScheme.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka; - -import backtype.storm.spout.Scheme; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Values; -import backtype.storm.utils.Utils; - -import java.nio.ByteBuffer; -import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; -import java.util.List; - -public class StringScheme implements Scheme { - private static final Charset UTF8_CHARSET = StandardCharsets.UTF_8; - public static final String STRING_SCHEME_KEY = "str"; - - public List<Object> deserialize(ByteBuffer bytes) { - return new Values(deserializeString(bytes)); - } - - public static String deserializeString(ByteBuffer string) { - if (string.hasArray()) { - int base = string.arrayOffset(); - return new String(string.array(), base + string.position(), string.remaining()); - } else { - return new String(Utils.toByteArray(string), UTF8_CHARSET); - } - } - - public Fields getOutputFields() { - return new Fields(STRING_SCHEME_KEY); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/TopicOffsetOutOfRangeException.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/TopicOffsetOutOfRangeException.java b/external/storm-kafka/src/jvm/storm/kafka/TopicOffsetOutOfRangeException.java deleted file mode 100644 index 5101a3e..0000000 --- a/external/storm-kafka/src/jvm/storm/kafka/TopicOffsetOutOfRangeException.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka; - -public class TopicOffsetOutOfRangeException extends RuntimeException { - - public TopicOffsetOutOfRangeException(String message) { - super(message); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/ZkCoordinator.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/ZkCoordinator.java b/external/storm-kafka/src/jvm/storm/kafka/ZkCoordinator.java deleted file mode 100644 index 8650e6f..0000000 --- a/external/storm-kafka/src/jvm/storm/kafka/ZkCoordinator.java +++ /dev/null @@ -1,113 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import storm.kafka.trident.GlobalPartitionInformation; - -import java.util.*; - -import static storm.kafka.KafkaUtils.taskId; - -public class ZkCoordinator implements PartitionCoordinator { - public static final Logger LOG = LoggerFactory.getLogger(ZkCoordinator.class); - - SpoutConfig _spoutConfig; - int _taskIndex; - int _totalTasks; - String _topologyInstanceId; - Map<Partition, PartitionManager> _managers = new HashMap(); - List<PartitionManager> _cachedList = new ArrayList<PartitionManager>(); - Long _lastRefreshTime = null; - int _refreshFreqMs; - DynamicPartitionConnections _connections; - DynamicBrokersReader _reader; - ZkState _state; - Map _stormConf; - - public ZkCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId) { - this(connections, stormConf, spoutConfig, state, taskIndex, totalTasks, topologyInstanceId, buildReader(stormConf, spoutConfig)); - } - - public ZkCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId, DynamicBrokersReader reader) { - _spoutConfig = spoutConfig; - _connections = connections; - _taskIndex = taskIndex; - _totalTasks = totalTasks; - _topologyInstanceId = topologyInstanceId; - _stormConf = stormConf; - _state = state; - ZkHosts brokerConf = (ZkHosts) spoutConfig.hosts; - _refreshFreqMs = brokerConf.refreshFreqSecs * 1000; - _reader = reader; - } - - private static DynamicBrokersReader buildReader(Map stormConf, SpoutConfig spoutConfig) { - ZkHosts hosts = (ZkHosts) spoutConfig.hosts; - return new DynamicBrokersReader(stormConf, hosts.brokerZkStr, hosts.brokerZkPath, spoutConfig.topic); - } - - @Override - public List<PartitionManager> getMyManagedPartitions() { - if (_lastRefreshTime == null || (System.currentTimeMillis() - _lastRefreshTime) > _refreshFreqMs) { - refresh(); - _lastRefreshTime = System.currentTimeMillis(); - } - return _cachedList; - } - - @Override - public void refresh() { - try { - LOG.info(taskId(_taskIndex, _totalTasks) + "Refreshing partition manager connections"); - List<GlobalPartitionInformation> brokerInfo = _reader.getBrokerInfo(); - List<Partition> mine = KafkaUtils.calculatePartitionsForTask(brokerInfo, _totalTasks, _taskIndex); - - Set<Partition> curr = _managers.keySet(); - Set<Partition> newPartitions = new HashSet<Partition>(mine); - newPartitions.removeAll(curr); - - Set<Partition> deletedPartitions = new HashSet<Partition>(curr); - deletedPartitions.removeAll(mine); - - LOG.info(taskId(_taskIndex, _totalTasks) + "Deleted partition managers: " + deletedPartitions.toString()); - - for (Partition id : deletedPartitions) { - PartitionManager man = _managers.remove(id); - man.close(); - } - LOG.info(taskId(_taskIndex, _totalTasks) + "New partition managers: " + newPartitions.toString()); - - for (Partition id : newPartitions) { - PartitionManager man = new PartitionManager(_connections, _topologyInstanceId, _state, _stormConf, _spoutConfig, id); - _managers.put(id, man); - } - - } catch (Exception e) { - throw new RuntimeException(e); - } - _cachedList = new ArrayList<PartitionManager>(_managers.values()); - LOG.info(taskId(_taskIndex, _totalTasks) + "Finished refreshing"); - } - - @Override - public PartitionManager getManager(Partition partition) { - return _managers.get(partition); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/ZkHosts.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/ZkHosts.java b/external/storm-kafka/src/jvm/storm/kafka/ZkHosts.java deleted file mode 100644 index 4e4327d..0000000 --- a/external/storm-kafka/src/jvm/storm/kafka/ZkHosts.java +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka; - - -public class ZkHosts implements BrokerHosts { - private static final String DEFAULT_ZK_PATH = "/brokers"; - - public String brokerZkStr = null; - public String brokerZkPath = null; // e.g., /kafka/brokers - public int refreshFreqSecs = 60; - - public ZkHosts(String brokerZkStr, String brokerZkPath) { - this.brokerZkStr = brokerZkStr; - this.brokerZkPath = brokerZkPath; - } - - public ZkHosts(String brokerZkStr) { - this(brokerZkStr, DEFAULT_ZK_PATH); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/ZkState.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/ZkState.java b/external/storm-kafka/src/jvm/storm/kafka/ZkState.java deleted file mode 100644 index e5e67e5..0000000 --- a/external/storm-kafka/src/jvm/storm/kafka/ZkState.java +++ /dev/null @@ -1,116 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka; - -import backtype.storm.Config; -import backtype.storm.utils.Utils; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.retry.RetryNTimes; -import org.apache.zookeeper.CreateMode; -import org.json.simple.JSONValue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.nio.charset.Charset; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -public class ZkState { - public static final Logger LOG = LoggerFactory.getLogger(ZkState.class); - CuratorFramework _curator; - - private CuratorFramework newCurator(Map stateConf) throws Exception { - Integer port = (Integer) stateConf.get(Config.TRANSACTIONAL_ZOOKEEPER_PORT); - String serverPorts = ""; - for (String server : (List<String>) stateConf.get(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS)) { - serverPorts = serverPorts + server + ":" + port + ","; - } - return CuratorFrameworkFactory.newClient(serverPorts, - Utils.getInt(stateConf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)), - Utils.getInt(stateConf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT)), - new RetryNTimes(Utils.getInt(stateConf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)), - Utils.getInt(stateConf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL)))); - } - - public CuratorFramework getCurator() { - assert _curator != null; - return _curator; - } - - public ZkState(Map stateConf) { - stateConf = new HashMap(stateConf); - - try { - _curator = newCurator(stateConf); - _curator.start(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - public void writeJSON(String path, Map<Object, Object> data) { - LOG.debug("Writing {} the data {}", path, data.toString()); - writeBytes(path, JSONValue.toJSONString(data).getBytes(Charset.forName("UTF-8"))); - } - - public void writeBytes(String path, byte[] bytes) { - try { - if (_curator.checkExists().forPath(path) == null) { - _curator.create() - .creatingParentsIfNeeded() - .withMode(CreateMode.PERSISTENT) - .forPath(path, bytes); - } else { - _curator.setData().forPath(path, bytes); - } - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - public Map<Object, Object> readJSON(String path) { - try { - byte[] b = readBytes(path); - if (b == null) { - return null; - } - return (Map<Object, Object>) JSONValue.parse(new String(b, "UTF-8")); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - public byte[] readBytes(String path) { - try { - if (_curator.checkExists().forPath(path) != null) { - return _curator.getData().forPath(path); - } else { - return null; - } - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - public void close() { - _curator.close(); - _curator = null; - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java deleted file mode 100644 index 1ebe142..0000000 --- a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java +++ /dev/null @@ -1,178 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka.bolt; - -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.base.BaseRichBolt; -import backtype.storm.tuple.Tuple; -import backtype.storm.utils.TupleUtils; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.clients.producer.Callback; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper; -import storm.kafka.bolt.mapper.TupleToKafkaMapper; -import storm.kafka.bolt.selector.DefaultTopicSelector; -import storm.kafka.bolt.selector.KafkaTopicSelector; -import java.util.concurrent.Future; -import java.util.concurrent.ExecutionException; -import java.util.Map; -import java.util.Properties; - - -/** - * Bolt implementation that can send Tuple data to Kafka - * <p/> - * It expects the producer configuration and topic in storm config under - * <p/> - * 'kafka.broker.properties' and 'topic' - * <p/> - * respectively. - * <p/> - * This bolt uses 0.8.2 Kafka Producer API. - * <p/> - * It works for sending tuples to older Kafka version (0.8.1). - */ -public class KafkaBolt<K, V> extends BaseRichBolt { - - private static final Logger LOG = LoggerFactory.getLogger(KafkaBolt.class); - - public static final String TOPIC = "topic"; - - private KafkaProducer<K, V> producer; - private OutputCollector collector; - private TupleToKafkaMapper<K,V> mapper; - private KafkaTopicSelector topicSelector; - private Properties boltSpecfiedProperties = new Properties(); - /** - * With default setting for fireAndForget and async, the callback is called when the sending succeeds. - * By setting fireAndForget true, the send will not wait at all for kafka to ack. - * "acks" setting in 0.8.2 Producer API config doesn't matter if fireAndForget is set. - * By setting async false, synchronous sending is used. - */ - private boolean fireAndForget = false; - private boolean async = true; - - public KafkaBolt() {} - - public KafkaBolt<K,V> withTupleToKafkaMapper(TupleToKafkaMapper<K,V> mapper) { - this.mapper = mapper; - return this; - } - - public KafkaBolt<K,V> withTopicSelector(KafkaTopicSelector selector) { - this.topicSelector = selector; - return this; - } - - public KafkaBolt<K,V> withProducerProperties(Properties producerProperties) { - this.boltSpecfiedProperties = producerProperties; - return this; - } - - @Override - public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { - //for backward compatibility. - if(mapper == null) { - this.mapper = new FieldNameBasedTupleToKafkaMapper<K,V>(); - } - - //for backward compatibility. - if(topicSelector == null) { - this.topicSelector = new DefaultTopicSelector((String) stormConf.get(TOPIC)); - } - - producer = new KafkaProducer<>(boltSpecfiedProperties); - this.collector = collector; - } - - @Override - public void execute(final Tuple input) { - if (TupleUtils.isTick(input)) { - collector.ack(input); - return; // Do not try to send ticks to Kafka - } - K key = null; - V message = null; - String topic = null; - try { - key = mapper.getKeyFromTuple(input); - message = mapper.getMessageFromTuple(input); - topic = topicSelector.getTopic(input); - if (topic != null ) { - Callback callback = null; - - if (!fireAndForget && async) { - callback = new Callback() { - @Override - public void onCompletion(RecordMetadata ignored, Exception e) { - synchronized (collector) { - if (e != null) { - collector.reportError(e); - collector.fail(input); - } else { - collector.ack(input); - } - } - } - }; - } - Future<RecordMetadata> result = producer.send(new ProducerRecord<K, V>(topic, key, message), callback); - if (!async) { - try { - result.get(); - collector.ack(input); - } catch (ExecutionException err) { - collector.reportError(err); - collector.fail(input); - } - } else if (fireAndForget) { - collector.ack(input); - } - } else { - LOG.warn("skipping key = " + key + ", topic selector returned null."); - collector.ack(input); - } - } catch (Exception ex) { - collector.reportError(ex); - collector.fail(input); - } - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - - } - - @Override - public void cleanup() { - producer.close(); - } - - public void setFireAndForget(boolean fireAndForget) { - this.fireAndForget = fireAndForget; - } - - public void setAsync(boolean async) { - this.async = async; - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/bolt/mapper/FieldNameBasedTupleToKafkaMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/bolt/mapper/FieldNameBasedTupleToKafkaMapper.java b/external/storm-kafka/src/jvm/storm/kafka/bolt/mapper/FieldNameBasedTupleToKafkaMapper.java deleted file mode 100644 index 936b7e5..0000000 --- a/external/storm-kafka/src/jvm/storm/kafka/bolt/mapper/FieldNameBasedTupleToKafkaMapper.java +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka.bolt.mapper; - -import backtype.storm.tuple.Tuple; - -public class FieldNameBasedTupleToKafkaMapper<K,V> implements TupleToKafkaMapper<K, V> { - - public static final String BOLT_KEY = "key"; - public static final String BOLT_MESSAGE = "message"; - public String boltKeyField; - public String boltMessageField; - - public FieldNameBasedTupleToKafkaMapper() { - this(BOLT_KEY, BOLT_MESSAGE); - } - - public FieldNameBasedTupleToKafkaMapper(String boltKeyField, String boltMessageField) { - this.boltKeyField = boltKeyField; - this.boltMessageField = boltMessageField; - } - - @Override - public K getKeyFromTuple(Tuple tuple) { - //for backward compatibility, we return null when key is not present. - return tuple.contains(boltKeyField) ? (K) tuple.getValueByField(boltKeyField) : null; - } - - @Override - public V getMessageFromTuple(Tuple tuple) { - return (V) tuple.getValueByField(boltMessageField); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/bolt/mapper/TupleToKafkaMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/bolt/mapper/TupleToKafkaMapper.java b/external/storm-kafka/src/jvm/storm/kafka/bolt/mapper/TupleToKafkaMapper.java deleted file mode 100644 index d92de7b..0000000 --- a/external/storm-kafka/src/jvm/storm/kafka/bolt/mapper/TupleToKafkaMapper.java +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka.bolt.mapper; - -import backtype.storm.tuple.Tuple; - -import java.io.Serializable; - -/** - * as the really verbose name suggests this interface mapps a storm tuple to kafka key and message. - * @param <K> type of key. - * @param <V> type of value. - */ -public interface TupleToKafkaMapper<K,V> extends Serializable { - K getKeyFromTuple(Tuple tuple); - V getMessageFromTuple(Tuple tuple); -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/bolt/selector/DefaultTopicSelector.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/bolt/selector/DefaultTopicSelector.java b/external/storm-kafka/src/jvm/storm/kafka/bolt/selector/DefaultTopicSelector.java deleted file mode 100644 index 9c87658..0000000 --- a/external/storm-kafka/src/jvm/storm/kafka/bolt/selector/DefaultTopicSelector.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka.bolt.selector; - -import backtype.storm.tuple.Tuple; - -public class DefaultTopicSelector implements KafkaTopicSelector { - - private final String topicName; - - public DefaultTopicSelector(final String topicName) { - this.topicName = topicName; - } - - @Override - public String getTopic(Tuple tuple) { - return topicName; - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/bolt/selector/KafkaTopicSelector.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/bolt/selector/KafkaTopicSelector.java b/external/storm-kafka/src/jvm/storm/kafka/bolt/selector/KafkaTopicSelector.java deleted file mode 100644 index f77fc47..0000000 --- a/external/storm-kafka/src/jvm/storm/kafka/bolt/selector/KafkaTopicSelector.java +++ /dev/null @@ -1,26 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka.bolt.selector; - -import backtype.storm.tuple.Tuple; - -import java.io.Serializable; - -public interface KafkaTopicSelector extends Serializable { - String getTopic(Tuple tuple); -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/trident/Coordinator.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/Coordinator.java b/external/storm-kafka/src/jvm/storm/kafka/trident/Coordinator.java deleted file mode 100644 index bd786b3..0000000 --- a/external/storm-kafka/src/jvm/storm/kafka/trident/Coordinator.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka.trident; - -import storm.kafka.KafkaUtils; -import storm.trident.spout.IOpaquePartitionedTridentSpout; -import storm.trident.spout.IPartitionedTridentSpout; - -import java.util.List; -import java.util.Map; - -class Coordinator implements IPartitionedTridentSpout.Coordinator<List<GlobalPartitionInformation>>, IOpaquePartitionedTridentSpout.Coordinator<List<GlobalPartitionInformation>> { - - private IBrokerReader reader; - private TridentKafkaConfig config; - - public Coordinator(Map conf, TridentKafkaConfig tridentKafkaConfig) { - config = tridentKafkaConfig; - reader = KafkaUtils.makeBrokerReader(conf, config); - } - - @Override - public void close() { - config.coordinator.close(); - } - - @Override - public boolean isReady(long txid) { - return config.coordinator.isReady(txid); - } - - @Override - public List<GlobalPartitionInformation> getPartitionsForBatch() { - return reader.getAllBrokers(); - } -}
