http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java new file mode 100644 index 0000000..6eddaf5 --- /dev/null +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java @@ -0,0 +1,287 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.trident; + +import org.apache.storm.Config; +import org.apache.storm.metric.api.CombinedMetric; +import org.apache.storm.metric.api.MeanReducer; +import org.apache.storm.metric.api.ReducedMetric; +import org.apache.storm.task.TopologyContext; +import com.google.common.collect.ImmutableMap; +import kafka.javaapi.consumer.SimpleConsumer; +import kafka.javaapi.message.ByteBufferMessageSet; +import kafka.message.Message; +import kafka.message.MessageAndOffset; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.storm.kafka.*; +import org.apache.storm.kafka.TopicOffsetOutOfRangeException; +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout; +import org.apache.storm.trident.spout.IPartitionedTridentSpout; +import org.apache.storm.trident.topology.TransactionAttempt; + +import java.util.*; + +public class TridentKafkaEmitter { + + public static final Logger LOG = LoggerFactory.getLogger(TridentKafkaEmitter.class); + + private DynamicPartitionConnections _connections; + private String _topologyName; + private KafkaUtils.KafkaOffsetMetric _kafkaOffsetMetric; + private ReducedMetric _kafkaMeanFetchLatencyMetric; + private CombinedMetric _kafkaMaxFetchLatencyMetric; + private TridentKafkaConfig _config; + private String _topologyInstanceId; + + public TridentKafkaEmitter(Map conf, TopologyContext context, TridentKafkaConfig config, String topologyInstanceId) { + _config = config; + _topologyInstanceId = topologyInstanceId; + _connections = new DynamicPartitionConnections(_config, KafkaUtils.makeBrokerReader(conf, _config)); + _topologyName = (String) conf.get(Config.TOPOLOGY_NAME); + _kafkaOffsetMetric = new KafkaUtils.KafkaOffsetMetric(_connections); + context.registerMetric("kafkaOffset", _kafkaOffsetMetric, _config.metricsTimeBucketSizeInSecs); + _kafkaMeanFetchLatencyMetric = context.registerMetric("kafkaFetchAvg", new MeanReducer(), _config.metricsTimeBucketSizeInSecs); + _kafkaMaxFetchLatencyMetric = context.registerMetric("kafkaFetchMax", new MaxMetric(), _config.metricsTimeBucketSizeInSecs); + } + + + private Map failFastEmitNewPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map lastMeta) { + SimpleConsumer consumer = _connections.register(partition); + Map ret = doEmitNewPartitionBatch(consumer, partition, collector, lastMeta); + _kafkaOffsetMetric.setLatestEmittedOffset(partition, (Long) ret.get("offset")); + return ret; + } + + private Map emitNewPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map lastMeta) { + try { + return failFastEmitNewPartitionBatch(attempt, collector, partition, lastMeta); + } catch (FailedFetchException e) { + LOG.warn("Failed to fetch from partition " + partition); + if (lastMeta == null) { + return null; + } else { + Map ret = new HashMap(); + ret.put("offset", lastMeta.get("nextOffset")); + ret.put("nextOffset", lastMeta.get("nextOffset")); + ret.put("partition", partition.partition); + ret.put("broker", ImmutableMap.of("host", partition.host.host, "port", partition.host.port)); + ret.put("topic", partition.topic); + ret.put("topology", ImmutableMap.of("name", _topologyName, "id", _topologyInstanceId)); + return ret; + } + } + } + + private Map doEmitNewPartitionBatch(SimpleConsumer consumer, Partition partition, TridentCollector collector, Map lastMeta) { + long offset; + if (lastMeta != null) { + String lastInstanceId = null; + Map lastTopoMeta = (Map) lastMeta.get("topology"); + if (lastTopoMeta != null) { + lastInstanceId = (String) lastTopoMeta.get("id"); + } + if (_config.ignoreZkOffsets && !_topologyInstanceId.equals(lastInstanceId)) { + offset = KafkaUtils.getOffset(consumer, partition.topic, partition.partition, _config.startOffsetTime); + } else { + offset = (Long) lastMeta.get("nextOffset"); + } + } else { + offset = KafkaUtils.getOffset(consumer, partition.topic, partition.partition, _config); + } + + ByteBufferMessageSet msgs = null; + try { + msgs = fetchMessages(consumer, partition, offset); + } catch (TopicOffsetOutOfRangeException e) { + long newOffset = KafkaUtils.getOffset(consumer, partition.topic, partition.partition, kafka.api.OffsetRequest.EarliestTime()); + LOG.warn("OffsetOutOfRange: Updating offset from offset = " + offset + " to offset = " + newOffset); + offset = newOffset; + msgs = KafkaUtils.fetchMessages(_config, consumer, partition, offset); + } + + long endoffset = offset; + for (MessageAndOffset msg : msgs) { + emit(collector, msg.message(), partition, msg.offset()); + endoffset = msg.nextOffset(); + } + Map newMeta = new HashMap(); + newMeta.put("offset", offset); + newMeta.put("nextOffset", endoffset); + newMeta.put("instanceId", _topologyInstanceId); + newMeta.put("partition", partition.partition); + newMeta.put("broker", ImmutableMap.of("host", partition.host.host, "port", partition.host.port)); + newMeta.put("topic", partition.topic); + newMeta.put("topology", ImmutableMap.of("name", _topologyName, "id", _topologyInstanceId)); + return newMeta; + } + + private ByteBufferMessageSet fetchMessages(SimpleConsumer consumer, Partition partition, long offset) { + long start = System.nanoTime(); + ByteBufferMessageSet msgs = null; + msgs = KafkaUtils.fetchMessages(_config, consumer, partition, offset); + long end = System.nanoTime(); + long millis = (end - start) / 1000000; + _kafkaMeanFetchLatencyMetric.update(millis); + _kafkaMaxFetchLatencyMetric.update(millis); + return msgs; + } + + /** + * re-emit the batch described by the meta data provided + * + * @param attempt + * @param collector + * @param partition + * @param meta + */ + private void reEmitPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map meta) { + LOG.info("re-emitting batch, attempt " + attempt); + String instanceId = (String) meta.get("instanceId"); + if (!_config.ignoreZkOffsets || instanceId.equals(_topologyInstanceId)) { + SimpleConsumer consumer = _connections.register(partition); + long offset = (Long) meta.get("offset"); + long nextOffset = (Long) meta.get("nextOffset"); + ByteBufferMessageSet msgs = null; + msgs = fetchMessages(consumer, partition, offset); + + if(msgs != null) { + for (MessageAndOffset msg : msgs) { + if (offset == nextOffset) { + break; + } + if (offset > nextOffset) { + throw new RuntimeException("Error when re-emitting batch. overshot the end offset"); + } + emit(collector, msg.message(), partition, msg.offset()); + offset = msg.nextOffset(); + } + } + } + } + + private void emit(TridentCollector collector, Message msg, Partition partition, long offset) { + Iterable<List<Object>> values; + if (_config.scheme instanceof MessageMetadataSchemeAsMultiScheme) { + values = KafkaUtils.generateTuples((MessageMetadataSchemeAsMultiScheme) _config.scheme, msg, partition, offset); + } else { + values = KafkaUtils.generateTuples(_config, msg, partition.topic); + } + + if (values != null) { + for (List<Object> value : values) { + collector.emit(value); + } + } + } + + private void clear() { + _connections.clear(); + } + + private List<Partition> orderPartitions(List<GlobalPartitionInformation> partitions) { + List<Partition> part = new ArrayList<Partition>(); + for (GlobalPartitionInformation globalPartitionInformation : partitions) + part.addAll(globalPartitionInformation.getOrderedPartitions()); + return part; + } + + private void refresh(List<Partition> list) { + _connections.clear(); + _kafkaOffsetMetric.refreshPartitions(new HashSet<Partition>(list)); + } + + + public IOpaquePartitionedTridentSpout.Emitter<List<GlobalPartitionInformation>, Partition, Map> asOpaqueEmitter() { + + return new IOpaquePartitionedTridentSpout.Emitter<List<GlobalPartitionInformation>, Partition, Map>() { + + /** + * Emit a batch of tuples for a partition/transaction. + * + * Return the metadata describing this batch that will be used as lastPartitionMeta + * for defining the parameters of the next batch. + */ + @Override + public Map emitPartitionBatch(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map map) { + return emitNewPartitionBatch(transactionAttempt, tridentCollector, partition, map); + } + + @Override + public void refreshPartitions(List<Partition> partitions) { + refresh(partitions); + } + + @Override + public List<Partition> getOrderedPartitions(List<GlobalPartitionInformation> partitionInformation) { + return orderPartitions(partitionInformation); + } + + @Override + public void close() { + clear(); + } + }; + } + + public IPartitionedTridentSpout.Emitter asTransactionalEmitter() { + return new IPartitionedTridentSpout.Emitter<List<GlobalPartitionInformation>, Partition, Map>() { + + /** + * Emit a batch of tuples for a partition/transaction that's never been emitted before. + * Return the metadata that can be used to reconstruct this partition/batch in the future. + */ + @Override + public Map emitPartitionBatchNew(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map map) { + return failFastEmitNewPartitionBatch(transactionAttempt, tridentCollector, partition, map); + } + + /** + * Emit a batch of tuples for a partition/transaction that has been emitted before, using + * the metadata created when it was first emitted. + */ + @Override + public void emitPartitionBatch(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map map) { + reEmitPartitionBatch(transactionAttempt, tridentCollector, partition, map); + } + + /** + * This method is called when this task is responsible for a new set of partitions. Should be used + * to manage things like connections to brokers. + */ + @Override + public void refreshPartitions(List<Partition> partitions) { + refresh(partitions); + } + + @Override + public List<Partition> getOrderedPartitions(List<GlobalPartitionInformation> partitionInformation) { + return orderPartitions(partitionInformation); + } + + @Override + public void close() { + clear(); + } + }; + + } + + +}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java new file mode 100644 index 0000000..5741dc7 --- /dev/null +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.trident; + +import org.apache.storm.task.OutputCollector; +import org.apache.storm.topology.FailedException; +import org.apache.commons.lang.Validate; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.storm.kafka.trident.mapper.TridentTupleToKafkaMapper; +import org.apache.storm.kafka.trident.selector.KafkaTopicSelector; +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.state.State; +import org.apache.storm.trident.tuple.TridentTuple; + +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +public class TridentKafkaState implements State { + private static final Logger LOG = LoggerFactory.getLogger(TridentKafkaState.class); + + private KafkaProducer producer; + private OutputCollector collector; + + private TridentTupleToKafkaMapper mapper; + private KafkaTopicSelector topicSelector; + + public TridentKafkaState withTridentTupleToKafkaMapper(TridentTupleToKafkaMapper mapper) { + this.mapper = mapper; + return this; + } + + public TridentKafkaState withKafkaTopicSelector(KafkaTopicSelector selector) { + this.topicSelector = selector; + return this; + } + + @Override + public void beginCommit(Long txid) { + LOG.debug("beginCommit is Noop."); + } + + @Override + public void commit(Long txid) { + LOG.debug("commit is Noop."); + } + + public void prepare(Properties options) { + Validate.notNull(mapper, "mapper can not be null"); + Validate.notNull(topicSelector, "topicSelector can not be null"); + producer = new KafkaProducer(options); + } + + public void updateState(List<TridentTuple> tuples, TridentCollector collector) { + String topic = null; + for (TridentTuple tuple : tuples) { + try { + topic = topicSelector.getTopic(tuple); + + if(topic != null) { + Future<RecordMetadata> result = producer.send(new ProducerRecord(topic, + mapper.getKeyFromTuple(tuple), mapper.getMessageFromTuple(tuple))); + try { + result.get(); + } catch (ExecutionException e) { + String errorMsg = "Could not retrieve result for message with key = " + + mapper.getKeyFromTuple(tuple) + " from topic = " + topic; + LOG.error(errorMsg, e); + throw new FailedException(errorMsg, e); + } + } else { + LOG.warn("skipping key = " + mapper.getKeyFromTuple(tuple) + ", topic selector returned null."); + } + } catch (Exception ex) { + String errorMsg = "Could not send message with key = " + mapper.getKeyFromTuple(tuple) + + " to topic = " + topic; + LOG.warn(errorMsg, ex); + throw new FailedException(errorMsg, ex); + } + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java new file mode 100644 index 0000000..f564510 --- /dev/null +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.trident; + +import org.apache.storm.task.IMetricsContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.storm.kafka.trident.mapper.TridentTupleToKafkaMapper; +import org.apache.storm.kafka.trident.selector.KafkaTopicSelector; +import org.apache.storm.trident.state.State; +import org.apache.storm.trident.state.StateFactory; + +import java.util.Map; +import java.util.Properties; + +public class TridentKafkaStateFactory implements StateFactory { + + private static final Logger LOG = LoggerFactory.getLogger(TridentKafkaStateFactory.class); + + private TridentTupleToKafkaMapper mapper; + private KafkaTopicSelector topicSelector; + private Properties producerProperties = new Properties(); + + public TridentKafkaStateFactory withTridentTupleToKafkaMapper(TridentTupleToKafkaMapper mapper) { + this.mapper = mapper; + return this; + } + + public TridentKafkaStateFactory withKafkaTopicSelector(KafkaTopicSelector selector) { + this.topicSelector = selector; + return this; + } + + public TridentKafkaStateFactory withProducerProperties(Properties props) { + this.producerProperties = props; + return this; + } + + @Override + public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { + LOG.info("makeState(partitonIndex={}, numpartitions={}", partitionIndex, numPartitions); + TridentKafkaState state = new TridentKafkaState() + .withKafkaTopicSelector(this.topicSelector) + .withTridentTupleToKafkaMapper(this.mapper); + state.prepare(producerProperties); + return state; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaUpdater.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaUpdater.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaUpdater.java new file mode 100644 index 0000000..7a905ab --- /dev/null +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaUpdater.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.trident; + +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.state.BaseStateUpdater; +import org.apache.storm.trident.tuple.TridentTuple; + +import java.util.List; + +public class TridentKafkaUpdater extends BaseStateUpdater<TridentKafkaState> { + @Override + public void updateState(TridentKafkaState state, List<TridentTuple> tuples, TridentCollector collector) { + state.updateState(tuples, collector); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/ZkBrokerReader.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/ZkBrokerReader.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/ZkBrokerReader.java new file mode 100644 index 0000000..d26c341 --- /dev/null +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/ZkBrokerReader.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.trident; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.storm.kafka.DynamicBrokersReader; +import org.apache.storm.kafka.ZkHosts; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + + +public class ZkBrokerReader implements IBrokerReader { + + public static final Logger LOG = LoggerFactory.getLogger(ZkBrokerReader.class); + + List<GlobalPartitionInformation> cachedBrokers = new ArrayList<GlobalPartitionInformation>(); + DynamicBrokersReader reader; + long lastRefreshTimeMs; + + + long refreshMillis; + + public ZkBrokerReader(Map conf, String topic, ZkHosts hosts) { + try { + reader = new DynamicBrokersReader(conf, hosts.brokerZkStr, hosts.brokerZkPath, topic); + cachedBrokers = reader.getBrokerInfo(); + lastRefreshTimeMs = System.currentTimeMillis(); + refreshMillis = hosts.refreshFreqSecs * 1000L; + } catch (java.net.SocketTimeoutException e) { + LOG.warn("Failed to update brokers", e); + } + + } + + private void refresh() { + long currTime = System.currentTimeMillis(); + if (currTime > lastRefreshTimeMs + refreshMillis) { + try { + LOG.info("brokers need refreshing because " + refreshMillis + "ms have expired"); + cachedBrokers = reader.getBrokerInfo(); + lastRefreshTimeMs = currTime; + } catch (java.net.SocketTimeoutException e) { + LOG.warn("Failed to update brokers", e); + } + } + } + @Override + public GlobalPartitionInformation getBrokerForTopic(String topic) { + refresh(); + for(GlobalPartitionInformation partitionInformation : cachedBrokers) { + if (partitionInformation.topic.equals(topic)) return partitionInformation; + } + return null; + } + + @Override + public List<GlobalPartitionInformation> getAllBrokers() { + refresh(); + return cachedBrokers; + } + + @Override + public void close() { + reader.close(); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java new file mode 100644 index 0000000..2d04971 --- /dev/null +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.trident.mapper; + +import org.apache.storm.trident.tuple.TridentTuple; + +public class FieldNameBasedTupleToKafkaMapper<K, V> implements TridentTupleToKafkaMapper { + + public final String keyFieldName; + public final String msgFieldName; + + public FieldNameBasedTupleToKafkaMapper(String keyFieldName, String msgFieldName) { + this.keyFieldName = keyFieldName; + this.msgFieldName = msgFieldName; + } + + @Override + public K getKeyFromTuple(TridentTuple tuple) { + return (K) tuple.getValueByField(keyFieldName); + } + + @Override + public V getMessageFromTuple(TridentTuple tuple) { + return (V) tuple.getValueByField(msgFieldName); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java new file mode 100644 index 0000000..28c6c89 --- /dev/null +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.trident.mapper; + +import org.apache.storm.tuple.Tuple; +import org.apache.storm.trident.tuple.TridentTuple; + +import java.io.Serializable; + +public interface TridentTupleToKafkaMapper<K,V> extends Serializable { + K getKeyFromTuple(TridentTuple tuple); + V getMessageFromTuple(TridentTuple tuple); +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/DefaultTopicSelector.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/DefaultTopicSelector.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/DefaultTopicSelector.java new file mode 100644 index 0000000..7ae49a3 --- /dev/null +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/DefaultTopicSelector.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.trident.selector; + +import org.apache.storm.trident.tuple.TridentTuple; + +public class DefaultTopicSelector implements KafkaTopicSelector { + + private final String topicName; + + public DefaultTopicSelector(final String topicName) { + this.topicName = topicName; + } + + @Override + public String getTopic(TridentTuple tuple) { + return topicName; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/KafkaTopicSelector.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/KafkaTopicSelector.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/KafkaTopicSelector.java new file mode 100644 index 0000000..012a6c7 --- /dev/null +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/KafkaTopicSelector.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.trident.selector; + +import org.apache.storm.trident.tuple.TridentTuple; + +import java.io.Serializable; + +public interface KafkaTopicSelector extends Serializable { + String getTopic(TridentTuple tuple); +} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/Broker.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/Broker.java b/external/storm-kafka/src/jvm/storm/kafka/Broker.java deleted file mode 100644 index 513ab22..0000000 --- a/external/storm-kafka/src/jvm/storm/kafka/Broker.java +++ /dev/null @@ -1,86 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka; - -import com.google.common.base.Objects; - -import java.io.Serializable; - -public class Broker implements Serializable, Comparable<Broker> { - public String host; - public int port; - - // for kryo compatibility - private Broker() { - - } - - public Broker(String host, int port) { - this.host = host; - this.port = port; - } - - public Broker(String host) { - this(host, 9092); - } - - @Override - public int hashCode() { - return Objects.hashCode(host, port); - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (obj == null || getClass() != obj.getClass()) { - return false; - } - final Broker other = (Broker) obj; - return Objects.equal(this.host, other.host) && Objects.equal(this.port, other.port); - } - - @Override - public String toString() { - return host + ":" + port; - } - - public static Broker fromString(String host) { - Broker hp; - String[] spec = host.split(":"); - if (spec.length == 1) { - hp = new Broker(spec[0]); - } else if (spec.length == 2) { - hp = new Broker(spec[0], Integer.parseInt(spec[1])); - } else { - throw new IllegalArgumentException("Invalid host specification: " + host); - } - return hp; - } - - - @Override - public int compareTo(Broker o) { - if (this.host.equals(o.host)) { - return this.port - o.port; - } else { - return this.host.compareTo(o.host); - } - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/BrokerHosts.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/BrokerHosts.java b/external/storm-kafka/src/jvm/storm/kafka/BrokerHosts.java deleted file mode 100644 index 1a06fc5..0000000 --- a/external/storm-kafka/src/jvm/storm/kafka/BrokerHosts.java +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka; - -import java.io.Serializable; - - -public interface BrokerHosts extends Serializable { - -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/ByteBufferSerializer.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/ByteBufferSerializer.java b/external/storm-kafka/src/jvm/storm/kafka/ByteBufferSerializer.java deleted file mode 100644 index 1a7238e..0000000 --- a/external/storm-kafka/src/jvm/storm/kafka/ByteBufferSerializer.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka; - -import backtype.storm.utils.Utils; -import org.apache.kafka.common.serialization.Serializer; - -import java.nio.ByteBuffer; -import java.util.Map; - -public class ByteBufferSerializer implements Serializer<ByteBuffer> { - @Override - public void configure(Map<String, ?> map, boolean b) { - - } - - @Override - public void close() { - - } - - @Override - public byte[] serialize(String s, ByteBuffer b) { - return Utils.toByteArray(b); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/DynamicBrokersReader.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/DynamicBrokersReader.java b/external/storm-kafka/src/jvm/storm/kafka/DynamicBrokersReader.java deleted file mode 100644 index d0f6724..0000000 --- a/external/storm-kafka/src/jvm/storm/kafka/DynamicBrokersReader.java +++ /dev/null @@ -1,213 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka; - -import backtype.storm.Config; -import backtype.storm.utils.Utils; -import com.google.common.base.Preconditions; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.retry.RetryNTimes; -import org.json.simple.JSONValue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import storm.kafka.trident.GlobalPartitionInformation; - -import java.io.UnsupportedEncodingException; -import java.net.SocketTimeoutException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - - -public class DynamicBrokersReader { - - public static final Logger LOG = LoggerFactory.getLogger(DynamicBrokersReader.class); - - private CuratorFramework _curator; - private String _zkPath; - private String _topic; - private Boolean _isWildcardTopic; - - public DynamicBrokersReader(Map conf, String zkStr, String zkPath, String topic) { - // Check required parameters - Preconditions.checkNotNull(conf, "conf cannot be null"); - - validateConfig(conf); - - Preconditions.checkNotNull(zkStr,"zkString cannot be null"); - Preconditions.checkNotNull(zkPath, "zkPath cannot be null"); - Preconditions.checkNotNull(topic, "topic cannot be null"); - - _zkPath = zkPath; - _topic = topic; - _isWildcardTopic = Utils.getBoolean(conf.get("kafka.topic.wildcard.match"), false); - try { - _curator = CuratorFrameworkFactory.newClient( - zkStr, - Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)), - Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT)), - new RetryNTimes(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)), - Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL)))); - _curator.start(); - } catch (Exception ex) { - LOG.error("Couldn't connect to zookeeper", ex); - throw new RuntimeException(ex); - } - } - - /** - * Get all partitions with their current leaders - */ - public List<GlobalPartitionInformation> getBrokerInfo() throws SocketTimeoutException { - List<String> topics = getTopics(); - List<GlobalPartitionInformation> partitions = new ArrayList<GlobalPartitionInformation>(); - - for (String topic : topics) { - GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation(topic, this._isWildcardTopic); - try { - int numPartitionsForTopic = getNumPartitions(topic); - String brokerInfoPath = brokerPath(); - for (int partition = 0; partition < numPartitionsForTopic; partition++) { - int leader = getLeaderFor(topic,partition); - String path = brokerInfoPath + "/" + leader; - try { - byte[] brokerData = _curator.getData().forPath(path); - Broker hp = getBrokerHost(brokerData); - globalPartitionInformation.addPartition(partition, hp); - } catch (org.apache.zookeeper.KeeperException.NoNodeException e) { - LOG.error("Node {} does not exist ", path); - } - } - } catch (SocketTimeoutException e) { - throw e; - } catch (Exception e) { - throw new RuntimeException(e); - } - LOG.info("Read partition info from zookeeper: " + globalPartitionInformation); - partitions.add(globalPartitionInformation); - } - return partitions; - } - - private int getNumPartitions(String topic) { - try { - String topicBrokersPath = partitionPath(topic); - List<String> children = _curator.getChildren().forPath(topicBrokersPath); - return children.size(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - private List<String> getTopics() { - List<String> topics = new ArrayList<String>(); - if (!_isWildcardTopic) { - topics.add(_topic); - return topics; - } else { - try { - List<String> children = _curator.getChildren().forPath(topicsPath()); - for (String t : children) { - if (t.matches(_topic)) { - LOG.info(String.format("Found matching topic %s", t)); - topics.add(t); - } - } - return topics; - } catch (Exception e) { - throw new RuntimeException(e); - } - } - } - - public String topicsPath () { - return _zkPath + "/topics"; - } - public String partitionPath(String topic) { - return topicsPath() + "/" + topic + "/partitions"; - } - - public String brokerPath() { - return _zkPath + "/ids"; - } - - - - /** - * get /brokers/topics/distributedTopic/partitions/1/state - * { "controller_epoch":4, "isr":[ 1, 0 ], "leader":1, "leader_epoch":1, "version":1 } - * @param topic - * @param partition - * @return - */ - private int getLeaderFor(String topic, long partition) { - try { - String topicBrokersPath = partitionPath(topic); - byte[] hostPortData = _curator.getData().forPath(topicBrokersPath + "/" + partition + "/state"); - Map<Object, Object> value = (Map<Object, Object>) JSONValue.parse(new String(hostPortData, "UTF-8")); - Integer leader = ((Number) value.get("leader")).intValue(); - if (leader == -1) { - throw new RuntimeException("No leader found for partition " + partition); - } - return leader; - } catch (RuntimeException e) { - throw e; - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - public void close() { - _curator.close(); - } - - /** - * [zk: localhost:2181(CONNECTED) 56] get /brokers/ids/0 - * { "host":"localhost", "jmx_port":9999, "port":9092, "version":1 } - * - * @param contents - * @return - */ - private Broker getBrokerHost(byte[] contents) { - try { - Map<Object, Object> value = (Map<Object, Object>) JSONValue.parse(new String(contents, "UTF-8")); - String host = (String) value.get("host"); - Integer port = ((Long) value.get("port")).intValue(); - return new Broker(host, port); - } catch (UnsupportedEncodingException e) { - throw new RuntimeException(e); - } - } - - /** - * Validate required parameters in the input configuration Map - * @param conf - */ - private void validateConfig(final Map conf) { - Preconditions.checkNotNull(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT), - "%s cannot be null", Config.STORM_ZOOKEEPER_SESSION_TIMEOUT); - Preconditions.checkNotNull(conf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT), - "%s cannot be null", Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT); - Preconditions.checkNotNull(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES), - "%s cannot be null", Config.STORM_ZOOKEEPER_RETRY_TIMES); - Preconditions.checkNotNull(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL), - "%s cannot be null", Config.STORM_ZOOKEEPER_RETRY_INTERVAL); - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/DynamicPartitionConnections.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/DynamicPartitionConnections.java b/external/storm-kafka/src/jvm/storm/kafka/DynamicPartitionConnections.java deleted file mode 100644 index e237a7a..0000000 --- a/external/storm-kafka/src/jvm/storm/kafka/DynamicPartitionConnections.java +++ /dev/null @@ -1,98 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka; - -import kafka.javaapi.consumer.SimpleConsumer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import storm.kafka.trident.IBrokerReader; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - - -public class DynamicPartitionConnections { - - public static final Logger LOG = LoggerFactory.getLogger(DynamicPartitionConnections.class); - - static class ConnectionInfo { - SimpleConsumer consumer; - Set<String> partitions = new HashSet<String>(); - - public ConnectionInfo(SimpleConsumer consumer) { - this.consumer = consumer; - } - } - - Map<Broker, ConnectionInfo> _connections = new HashMap(); - KafkaConfig _config; - IBrokerReader _reader; - - public DynamicPartitionConnections(KafkaConfig config, IBrokerReader brokerReader) { - _config = config; - _reader = brokerReader; - } - - public SimpleConsumer register(Partition partition) { - Broker broker = _reader.getBrokerForTopic(partition.topic).getBrokerFor(partition.partition); - return register(broker, partition.topic, partition.partition); - } - - public SimpleConsumer register(Broker host, String topic, int partition) { - if (!_connections.containsKey(host)) { - _connections.put(host, new ConnectionInfo(new SimpleConsumer(host.host, host.port, _config.socketTimeoutMs, _config.bufferSizeBytes, _config.clientId))); - } - ConnectionInfo info = _connections.get(host); - info.partitions.add(getHashKey(topic,partition)); - return info.consumer; - } - - public SimpleConsumer getConnection(Partition partition) { - ConnectionInfo info = _connections.get(partition.host); - if (info != null) { - return info.consumer; - } - return null; - } - - public void unregister(Broker port, String topic, int partition) { - ConnectionInfo info = _connections.get(port); - info.partitions.remove(getHashKey(topic,partition)); - if (info.partitions.isEmpty()) { - info.consumer.close(); - _connections.remove(port); - } - } - - public void unregister(Partition partition) { - unregister(partition.host, partition.topic, partition.partition); - } - - public void clear() { - for (ConnectionInfo info : _connections.values()) { - info.consumer.close(); - } - _connections.clear(); - } - - private String getHashKey(String topic, int partition) { - return topic + "_" + partition; - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/ExponentialBackoffMsgRetryManager.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/ExponentialBackoffMsgRetryManager.java b/external/storm-kafka/src/jvm/storm/kafka/ExponentialBackoffMsgRetryManager.java deleted file mode 100644 index 5664f12..0000000 --- a/external/storm-kafka/src/jvm/storm/kafka/ExponentialBackoffMsgRetryManager.java +++ /dev/null @@ -1,184 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka; - -import java.util.Comparator; -import java.util.HashSet; -import java.util.Map; -import java.util.PriorityQueue; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - -public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager { - - private final long retryInitialDelayMs; - private final double retryDelayMultiplier; - private final long retryDelayMaxMs; - - private Queue<MessageRetryRecord> waiting = new PriorityQueue<MessageRetryRecord>(11, new RetryTimeComparator()); - private Map<Long,MessageRetryRecord> records = new ConcurrentHashMap<Long,MessageRetryRecord>(); - - public ExponentialBackoffMsgRetryManager(long retryInitialDelayMs, double retryDelayMultiplier, long retryDelayMaxMs) { - this.retryInitialDelayMs = retryInitialDelayMs; - this.retryDelayMultiplier = retryDelayMultiplier; - this.retryDelayMaxMs = retryDelayMaxMs; - } - - @Override - public void failed(Long offset) { - MessageRetryRecord oldRecord = this.records.get(offset); - MessageRetryRecord newRecord = oldRecord == null ? - new MessageRetryRecord(offset) : - oldRecord.createNextRetryRecord(); - this.records.put(offset, newRecord); - this.waiting.add(newRecord); - } - - @Override - public void acked(Long offset) { - MessageRetryRecord record = this.records.remove(offset); - if (record != null) { - this.waiting.remove(record); - } - } - - @Override - public void retryStarted(Long offset) { - MessageRetryRecord record = this.records.get(offset); - if (record == null || !this.waiting.contains(record)) { - throw new IllegalStateException("cannot retry a message that has not failed"); - } else { - this.waiting.remove(record); - } - } - - @Override - public Long nextFailedMessageToRetry() { - if (this.waiting.size() > 0) { - MessageRetryRecord first = this.waiting.peek(); - if (System.currentTimeMillis() >= first.retryTimeUTC) { - if (this.records.containsKey(first.offset)) { - return first.offset; - } else { - // defensive programming - should be impossible - this.waiting.remove(first); - return nextFailedMessageToRetry(); - } - } - } - return null; - } - - @Override - public boolean shouldRetryMsg(Long offset) { - MessageRetryRecord record = this.records.get(offset); - return record != null && - this.waiting.contains(record) && - System.currentTimeMillis() >= record.retryTimeUTC; - } - - @Override - public Set<Long> clearInvalidMessages(Long kafkaOffset) { - Set<Long> invalidOffsets = new HashSet<Long>(); - for(Long offset : records.keySet()){ - if(offset < kafkaOffset){ - MessageRetryRecord record = this.records.remove(offset); - if (record != null) { - this.waiting.remove(record); - invalidOffsets.add(offset); - } - } - } - return invalidOffsets; - } - - /** - * A MessageRetryRecord holds the data of how many times a message has - * failed and been retried, and when the last failure occurred. It can - * determine whether it is ready to be retried by employing an exponential - * back-off calculation using config values stored in SpoutConfig: - * <ul> - * <li>retryInitialDelayMs - time to delay before the first retry</li> - * <li>retryDelayMultiplier - multiplier by which to increase the delay for each subsequent retry</li> - * <li>retryDelayMaxMs - maximum retry delay (once this delay time is reached, subsequent retries will - * delay for this amount of time every time) - * </li> - * </ul> - */ - private class MessageRetryRecord { - private final long offset; - private final int retryNum; - private final long retryTimeUTC; - - public MessageRetryRecord(long offset) { - this(offset, 1); - } - - private MessageRetryRecord(long offset, int retryNum) { - this.offset = offset; - this.retryNum = retryNum; - this.retryTimeUTC = System.currentTimeMillis() + calculateRetryDelay(); - } - - /** - * Create a MessageRetryRecord for the next retry that should occur after this one. - * @return MessageRetryRecord with the next retry time, or null to indicate that another - * retry should not be performed. The latter case can happen if we are about to - * run into the backtype.storm.Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS in the Storm - * configuration. - */ - public MessageRetryRecord createNextRetryRecord() { - return new MessageRetryRecord(this.offset, this.retryNum + 1); - } - - private long calculateRetryDelay() { - double delayMultiplier = Math.pow(retryDelayMultiplier, this.retryNum - 1); - double delay = retryInitialDelayMs * delayMultiplier; - Long maxLong = Long.MAX_VALUE; - long delayThisRetryMs = delay >= maxLong.doubleValue() - ? maxLong - : (long) delay; - return Math.min(delayThisRetryMs, retryDelayMaxMs); - } - - @Override - public boolean equals(Object other) { - return (other instanceof MessageRetryRecord - && this.offset == ((MessageRetryRecord) other).offset); - } - - @Override - public int hashCode() { - return Long.valueOf(this.offset).hashCode(); - } - } - - private static class RetryTimeComparator implements Comparator<MessageRetryRecord> { - - @Override - public int compare(MessageRetryRecord record1, MessageRetryRecord record2) { - return Long.valueOf(record1.retryTimeUTC).compareTo(Long.valueOf(record2.retryTimeUTC)); - } - - @Override - public boolean equals(Object obj) { - return false; - } - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/FailedFetchException.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/FailedFetchException.java b/external/storm-kafka/src/jvm/storm/kafka/FailedFetchException.java deleted file mode 100644 index 011240e..0000000 --- a/external/storm-kafka/src/jvm/storm/kafka/FailedFetchException.java +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka; - -public class FailedFetchException extends RuntimeException { - - public FailedFetchException(String message) { - super(message); - } - - public FailedFetchException(Exception e) { - super(e); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/FailedMsgRetryManager.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/FailedMsgRetryManager.java b/external/storm-kafka/src/jvm/storm/kafka/FailedMsgRetryManager.java deleted file mode 100644 index 30c9a24..0000000 --- a/external/storm-kafka/src/jvm/storm/kafka/FailedMsgRetryManager.java +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka; - -import java.util.Set; - -public interface FailedMsgRetryManager { - public void failed(Long offset); - public void acked(Long offset); - public void retryStarted(Long offset); - public Long nextFailedMessageToRetry(); - public boolean shouldRetryMsg(Long offset); - public Set<Long> clearInvalidMessages(Long kafkaOffset); -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/IntSerializer.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/IntSerializer.java b/external/storm-kafka/src/jvm/storm/kafka/IntSerializer.java deleted file mode 100644 index 07cbd26..0000000 --- a/external/storm-kafka/src/jvm/storm/kafka/IntSerializer.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka; - -import org.apache.kafka.common.serialization.Serializer; - -import java.nio.ByteBuffer; -import java.nio.IntBuffer; -import java.util.Map; - -public class IntSerializer implements Serializer<Integer> { - @Override - public void configure(Map<String, ?> map, boolean b) { - } - - @Override - public byte[] serialize(String topic, Integer val) { - byte[] r = new byte[4]; - IntBuffer b = ByteBuffer.wrap(r).asIntBuffer(); - b.put(val); - return r; - } - - @Override - public void close() { - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java deleted file mode 100644 index 49c7526..0000000 --- a/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka; - -import backtype.storm.spout.MultiScheme; -import backtype.storm.spout.RawMultiScheme; - -import java.io.Serializable; - -public class KafkaConfig implements Serializable { - private static final long serialVersionUID = 5276718734571623855L; - - public final BrokerHosts hosts; - public final String topic; - public final String clientId; - - public int fetchSizeBytes = 1024 * 1024; - public int socketTimeoutMs = 10000; - public int fetchMaxWait = 10000; - public int bufferSizeBytes = 1024 * 1024; - public MultiScheme scheme = new RawMultiScheme(); - public boolean ignoreZkOffsets = false; - public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime(); - public long maxOffsetBehind = Long.MAX_VALUE; - public boolean useStartOffsetTimeIfOffsetOutOfRange = true; - public int metricsTimeBucketSizeInSecs = 60; - - public KafkaConfig(BrokerHosts hosts, String topic) { - this(hosts, topic, kafka.api.OffsetRequest.DefaultClientId()); - } - - public KafkaConfig(BrokerHosts hosts, String topic, String clientId) { - this.hosts = hosts; - this.topic = topic; - this.clientId = clientId; - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/KafkaError.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaError.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaError.java deleted file mode 100644 index 634af85..0000000 --- a/external/storm-kafka/src/jvm/storm/kafka/KafkaError.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka; - -public enum KafkaError { - NO_ERROR, - OFFSET_OUT_OF_RANGE, - INVALID_MESSAGE, - UNKNOWN_TOPIC_OR_PARTITION, - INVALID_FETCH_SIZE, - LEADER_NOT_AVAILABLE, - NOT_LEADER_FOR_PARTITION, - REQUEST_TIMED_OUT, - BROKER_NOT_AVAILABLE, - REPLICA_NOT_AVAILABLE, - MESSAGE_SIZE_TOO_LARGE, - STALE_CONTROLLER_EPOCH, - OFFSET_METADATA_TOO_LARGE, - UNKNOWN; - - public static KafkaError getError(int errorCode) { - if (errorCode < 0 || errorCode >= UNKNOWN.ordinal()) { - return UNKNOWN; - } else { - return values()[errorCode]; - } - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java deleted file mode 100644 index 8169014..0000000 --- a/external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java +++ /dev/null @@ -1,198 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.kafka; - -import backtype.storm.Config; -import backtype.storm.metric.api.IMetric; -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.base.BaseRichSpout; -import com.google.common.base.Strings; -import kafka.message.Message; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import storm.kafka.PartitionManager.KafkaMessageId; - -import java.util.*; - -// TODO: need to add blacklisting -// TODO: need to make a best effort to not re-emit messages if don't have to -public class KafkaSpout extends BaseRichSpout { - static enum EmitState { - EMITTED_MORE_LEFT, - EMITTED_END, - NO_EMITTED - } - - public static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class); - - SpoutConfig _spoutConfig; - SpoutOutputCollector _collector; - PartitionCoordinator _coordinator; - DynamicPartitionConnections _connections; - ZkState _state; - - long _lastUpdateMs = 0; - - int _currPartitionIndex = 0; - - public KafkaSpout(SpoutConfig spoutConf) { - _spoutConfig = spoutConf; - } - - @Override - public void open(Map conf, final TopologyContext context, final SpoutOutputCollector collector) { - _collector = collector; - String topologyInstanceId = context.getStormId(); - Map stateConf = new HashMap(conf); - List<String> zkServers = _spoutConfig.zkServers; - if (zkServers == null) { - zkServers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS); - } - Integer zkPort = _spoutConfig.zkPort; - if (zkPort == null) { - zkPort = ((Number) conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue(); - } - stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, zkServers); - stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, zkPort); - stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, _spoutConfig.zkRoot); - _state = new ZkState(stateConf); - - _connections = new DynamicPartitionConnections(_spoutConfig, KafkaUtils.makeBrokerReader(conf, _spoutConfig)); - - // using TransactionalState like this is a hack - int totalTasks = context.getComponentTasks(context.getThisComponentId()).size(); - if (_spoutConfig.hosts instanceof StaticHosts) { - _coordinator = new StaticCoordinator(_connections, conf, - _spoutConfig, _state, context.getThisTaskIndex(), - totalTasks, topologyInstanceId); - } else { - _coordinator = new ZkCoordinator(_connections, conf, - _spoutConfig, _state, context.getThisTaskIndex(), - totalTasks, topologyInstanceId); - } - - context.registerMetric("kafkaOffset", new IMetric() { - KafkaUtils.KafkaOffsetMetric _kafkaOffsetMetric = new KafkaUtils.KafkaOffsetMetric(_connections); - - @Override - public Object getValueAndReset() { - List<PartitionManager> pms = _coordinator.getMyManagedPartitions(); - Set<Partition> latestPartitions = new HashSet(); - for (PartitionManager pm : pms) { - latestPartitions.add(pm.getPartition()); - } - _kafkaOffsetMetric.refreshPartitions(latestPartitions); - for (PartitionManager pm : pms) { - _kafkaOffsetMetric.setLatestEmittedOffset(pm.getPartition(), pm.lastCompletedOffset()); - } - return _kafkaOffsetMetric.getValueAndReset(); - } - }, _spoutConfig.metricsTimeBucketSizeInSecs); - - context.registerMetric("kafkaPartition", new IMetric() { - @Override - public Object getValueAndReset() { - List<PartitionManager> pms = _coordinator.getMyManagedPartitions(); - Map concatMetricsDataMaps = new HashMap(); - for (PartitionManager pm : pms) { - concatMetricsDataMaps.putAll(pm.getMetricsDataMap()); - } - return concatMetricsDataMaps; - } - }, _spoutConfig.metricsTimeBucketSizeInSecs); - } - - @Override - public void close() { - _state.close(); - } - - @Override - public void nextTuple() { - List<PartitionManager> managers = _coordinator.getMyManagedPartitions(); - for (int i = 0; i < managers.size(); i++) { - - try { - // in case the number of managers decreased - _currPartitionIndex = _currPartitionIndex % managers.size(); - EmitState state = managers.get(_currPartitionIndex).next(_collector); - if (state != EmitState.EMITTED_MORE_LEFT) { - _currPartitionIndex = (_currPartitionIndex + 1) % managers.size(); - } - if (state != EmitState.NO_EMITTED) { - break; - } - } catch (FailedFetchException e) { - LOG.warn("Fetch failed", e); - _coordinator.refresh(); - } - } - - long diffWithNow = System.currentTimeMillis() - _lastUpdateMs; - - /* - As far as the System.currentTimeMillis() is dependent on System clock, - additional check on negative value of diffWithNow in case of external changes. - */ - if (diffWithNow > _spoutConfig.stateUpdateIntervalMs || diffWithNow < 0) { - commit(); - } - } - - @Override - public void ack(Object msgId) { - KafkaMessageId id = (KafkaMessageId) msgId; - PartitionManager m = _coordinator.getManager(id.partition); - if (m != null) { - m.ack(id.offset); - } - } - - @Override - public void fail(Object msgId) { - KafkaMessageId id = (KafkaMessageId) msgId; - PartitionManager m = _coordinator.getManager(id.partition); - if (m != null) { - m.fail(id.offset); - } - } - - @Override - public void deactivate() { - commit(); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - if (!Strings.isNullOrEmpty(_spoutConfig.outputStreamId)) { - declarer.declareStream(_spoutConfig.outputStreamId, _spoutConfig.scheme.getOutputFields()); - } else { - declarer.declare(_spoutConfig.scheme.getOutputFields()); - } - } - - private void commit() { - _lastUpdateMs = System.currentTimeMillis(); - for (PartitionManager manager : _coordinator.getMyManagedPartitions()) { - manager.commit(); - } - } - -}
