STORM-822: Kafka Spout New Consumer API - Refactored code to avoid keeping records data inside spout state - Refactored code to specify output fields per stream and build tuples per topic - Implement exponential backoff retry strategy - Send one tuple per call to nextTuple
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8b16afbf Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8b16afbf Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8b16afbf Branch: refs/heads/master Commit: 8b16afbf595690ee428350f58bdec0fb2f3e60c7 Parents: d0aac5f Author: Hugo Louro <[email protected]> Authored: Mon Mar 21 14:42:50 2016 -0700 Committer: Hugo Louro <[email protected]> Committed: Tue Mar 29 18:01:45 2016 -0700 ---------------------------------------------------------------------- .../kafka/spout/KafkaRecordTupleBuilder.java | 44 --- .../apache/storm/kafka/spout/KafkaSpout.java | 160 +++++++---- .../storm/kafka/spout/KafkaSpoutConfig.java | 87 +++--- .../storm/kafka/spout/KafkaSpoutMessageId.java | 16 +- .../KafkaSpoutRetryExponentialBackoff.java | 281 +++++++++++++++++++ .../kafka/spout/KafkaSpoutRetryService.java | 72 +++++ .../storm/kafka/spout/KafkaSpoutStream.java | 14 +- .../storm/kafka/spout/KafkaSpoutStreams.java | 26 +- .../kafka/spout/KafkaSpoutTupleBuilder.java | 34 ++- .../kafka/spout/KafkaSpoutTuplesBuilder.java | 82 ++++++ .../kafka/spout/test/KafkaSpoutTestBolt.java | 50 ++++ .../spout/test/KafkaSpoutTopologyMain.java | 37 ++- .../storm/kafka/spout/test/KafkaTestBolt.java | 52 ---- .../spout/test/TopicTest2TupleBuilder.java | 40 +++ .../test/TopicsTest0Test1TupleBuilder.java | 42 +++ 15 files changed, 798 insertions(+), 239 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/8b16afbf/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaRecordTupleBuilder.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaRecordTupleBuilder.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaRecordTupleBuilder.java deleted file mode 100644 index 4d67632..0000000 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaRecordTupleBuilder.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.kafka.spout; - -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.Values; - -import java.util.List; - -public class KafkaRecordTupleBuilder<K, V> implements KafkaSpoutTupleBuilder<K, V> { - @Override - public List<Object> buildTuple(final ConsumerRecord<K, V> consumerRecord, KafkaSpoutStreams kafkaSpoutStreams) { - final Fields outputFields = kafkaSpoutStreams.getOutputFields(consumerRecord.topic()); - if (outputFields != null) { - if (outputFields.size() == 3) { - return new Values(consumerRecord.topic(), - consumerRecord.partition(), - consumerRecord.offset()); - } else if (outputFields.size() == 5) { - return new Values(consumerRecord.topic(), - consumerRecord.partition(), - consumerRecord.offset(), - consumerRecord.key(), - consumerRecord.value()); - } - } - throw new RuntimeException("Failed to build tuple. " + consumerRecord + " " + kafkaSpoutStreams); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/8b16afbf/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index 9a49ee8..d211ae9 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -25,7 +25,6 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy; -import org.apache.storm.kafka.spout.KafkaSpoutConfig.PollStrategy; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; @@ -34,12 +33,16 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Collection; +import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.NavigableSet; +import java.util.Set; import java.util.TreeSet; import java.util.concurrent.TimeUnit; @@ -62,23 +65,25 @@ public class KafkaSpout<K, V> extends BaseRichSpout { // Bookkeeping - private KafkaSpoutStreams kafkaSpoutStreams; - private KafkaSpoutTupleBuilder<K, V> tupleBuilder; - private transient Timer commitTimer; // timer == null for auto commit mode - private transient Timer logTimer; - private transient Map<TopicPartition, OffsetEntry> acked; // emitted tuples that were successfully acked. These tuples will be committed periodically when the timer expires, on consumer rebalance, or on close/deactivate - private transient int maxRetries; // Max number of times a tuple is retried - private transient boolean initialized; // Flag indicating that the spout is still undergoing initialization process. + private transient int maxRetries; // Max number of times a tuple is retried + private transient FirstPollOffsetStrategy firstPollOffsetStrategy; // Strategy to determine the fetch offset of the first realized by the spout upon activation + private transient KafkaSpoutRetryService retryService; // Class that has the logic to handle tuple failure + private transient Timer commitTimer; // timer == null for auto commit mode + private transient boolean initialized; // Flag indicating that the spout is still undergoing initialization process. // Initialization is only complete after the first call to KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned() - private transient long numUncommittedOffsets; // Number of offsets that have been polled and emitted but not yet been committed - private transient FirstPollOffsetStrategy firstPollOffsetStrategy; - private transient PollStrategy pollStrategy; + private KafkaSpoutStreams kafkaSpoutStreams; // Object that wraps all the logic to declare output fields and emit tuples + private transient KafkaSpoutTuplesBuilder<K, V> tuplesBuilder; // Object that contains the logic to build tuples for each ConsumerRecord - public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig, KafkaSpoutTupleBuilder<K, V> tupleBuilder) { + private transient Map<TopicPartition, OffsetEntry> acked; // Tuples that were successfully acked. These tuples will be committed periodically when the commit timer expires, after consumer rebalance, or on close/deactivate + private transient Set<KafkaSpoutMessageId> emitted; // Tuples that have been emitted but that are "on the wire", i.e. pending being acked or failed + private transient Iterator<ConsumerRecord<K, V>> waitingToEmit; // Records that have been polled and are queued to be emitted in the nextTuple() call. One record is emitted per nextTuple() + private transient long numUncommittedOffsets; // Number of offsets that have been polled and emitted but not yet been committed + + + public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig) { this.kafkaSpoutConfig = kafkaSpoutConfig; // Pass in configuration this.kafkaSpoutStreams = kafkaSpoutConfig.getKafkaSpoutStreams(); - this.tupleBuilder = tupleBuilder; } @Override @@ -89,18 +94,25 @@ public class KafkaSpout<K, V> extends BaseRichSpout { this.collector = collector; maxRetries = kafkaSpoutConfig.getMaxTupleRetries(); numUncommittedOffsets = 0; - logTimer = new Timer(500, Math.min(1000, kafkaSpoutConfig.getOffsetsCommitPeriodMs()/2), TimeUnit.MILLISECONDS); // Offset management firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy(); - pollStrategy = kafkaSpoutConfig.getPollStrategy(); consumerAutoCommitMode = kafkaSpoutConfig.isConsumerAutoCommitMode(); + // Retries management + retryService = kafkaSpoutConfig.getRetryService(); + + // Tuples builder delegate + tuplesBuilder = kafkaSpoutConfig.getTuplesBuilder(); + if (!consumerAutoCommitMode) { // If it is auto commit, no need to commit offsets manually commitTimer = new Timer(500, kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS); - acked = new HashMap<>(); } + acked = new HashMap<>(); + emitted = new HashSet<>(); + waitingToEmit = Collections.emptyListIterator(); + LOG.info("Kafka Spout opened with the following configuration: {}", kafkaSpoutConfig); } @@ -130,6 +142,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout { acked.keySet().retainAll(partitions); // remove from acked all partitions that are no longer assigned to this spout } + retryService.retainAll(partitions); + for (TopicPartition tp : partitions) { final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp); final long fetchOffset = doSeek(tp, committedOffset); @@ -182,67 +196,88 @@ public class KafkaSpout<K, V> extends BaseRichSpout { if (initialized) { if (commit()) { commitOffsetsForAckedTuples(); - } else if (poll()) { - emitTuples(pollKafkaBroker()); - } else if (logTimer.isExpiredResetOnTrue()) { // to limit the number of messages that get printed. - log(); + } + + if (poll()) { + setWaitingToEmit(pollKafkaBroker()); + } + + if (waitingToEmit()) { + emit(); } } else { LOG.debug("Spout not initialized. Not sending tuples until initialization completes"); } } - private void log() { - switch(pollStrategy) { - case STREAM: - LOG.trace("Reached the maximum number number of uncommitted records [{}]. " + - "No more polls will occur until a sequence of commits sets the count under the [{}] threshold ", - numUncommittedOffsets, kafkaSpoutConfig.getMaxUncommittedOffsets()); - break; - case BATCH: - LOG.trace("No more polls will occur until the last batch completes. [{}] emitted tuples pending", numUncommittedOffsets); - break; - default: - throw new IllegalStateException("No implementation defined for polling strategy " + pollStrategy); - } + private boolean commit() { + return !consumerAutoCommitMode && commitTimer.isExpiredResetOnTrue(); // timer != null for non auto commit mode + } + private boolean poll() { + return !waitingToEmit() && numUncommittedOffsets < kafkaSpoutConfig.getMaxUncommittedOffsets(); } - // always poll in auto commit mode because no state is kept and therefore there is no need to set an upper limit in memory - private boolean poll() { - switch(pollStrategy) { - case STREAM: - return consumerAutoCommitMode || numUncommittedOffsets < kafkaSpoutConfig.getMaxUncommittedOffsets(); - case BATCH: - return consumerAutoCommitMode || numUncommittedOffsets <= 0; - default: - throw new IllegalStateException("No implementation defined for polling strategy " + pollStrategy); - } + private boolean waitingToEmit() { + return waitingToEmit != null && waitingToEmit.hasNext(); } - private boolean commit() { - return !consumerAutoCommitMode && commitTimer.isExpiredResetOnTrue(); // timer != null for non auto commit mode + public void setWaitingToEmit(ConsumerRecords<K,V> consumerRecords) { + List<ConsumerRecord<K,V>> waitingToEmitList = new LinkedList<>(); + for (TopicPartition tp : consumerRecords.partitions()) { + waitingToEmitList.addAll(consumerRecords.records(tp)); + } + waitingToEmit = waitingToEmitList.iterator(); + LOG.trace("Records waiting to be emitted {}", waitingToEmitList); } + // ======== poll ========= private ConsumerRecords<K, V> pollKafkaBroker() { + doSeekRetriableTopicPartitions(); + final ConsumerRecords<K, V> consumerRecords = kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs()); final int numPolledRecords = consumerRecords.count(); - numUncommittedOffsets+= numPolledRecords; LOG.debug("Polled [{}] records from Kafka. NumUncommittedOffsets=[{}]", numPolledRecords, numUncommittedOffsets); return consumerRecords; } - private void emitTuples(ConsumerRecords<K, V> consumerRecords) { - for (TopicPartition tp : consumerRecords.partitions()) { - final Iterable<ConsumerRecord<K, V>> records = consumerRecords.records(tp.topic()); + private void doSeekRetriableTopicPartitions() { + final Set<TopicPartition> retriableTopicPartitions = retryService.retriableTopicPartitions(); + + for (TopicPartition rtp : retriableTopicPartitions) { + final OffsetAndMetadata offsetAndMeta = acked.get(rtp).findNextCommitOffset(); + if (offsetAndMeta != null) { + kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1); // seek to the next offset that is ready to commit in next commit cycle + } else { + kafkaConsumer.seekToEnd(rtp); // Seek to last committed offset + } + } + } - for (final ConsumerRecord<K, V> record : records) { - final List<Object> tuple = tupleBuilder.buildTuple(record, kafkaSpoutStreams); - final KafkaSpoutMessageId messageId = new KafkaSpoutMessageId(record, tuple); + // ======== emit ========= + private void emit() { + emitTupleIfNotEmitted(waitingToEmit.next()); + waitingToEmit.remove(); + } - kafkaSpoutStreams.emit(collector, messageId); // emits one tuple per record - LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record); + // emits one tuple per record + private void emitTupleIfNotEmitted(ConsumerRecord<K, V> record) { + final TopicPartition tp = new TopicPartition(record.topic(), record.partition()); + final KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(record); + + if (acked.containsKey(tp) && acked.get(tp).contains(msgId)) { // has been acked + LOG.trace("Tuple for record [{}] has already been acked. Skipping", record); + } else if (emitted.contains(msgId)) { // has been emitted and it's pending ack or fail + LOG.trace("Tuple for record [{}] has already been emitted. Skipping", record); + } else if (!retryService.isScheduled(msgId) || retryService.isReady(msgId)) { // not scheduled <=> never failed (i.e. never emitted) or ready to be retried + final List<Object> tuple = tuplesBuilder.buildTuple(record); + kafkaSpoutStreams.emit(collector, tuple, msgId); + emitted.add(msgId); + numUncommittedOffsets++; + if (retryService.isReady(msgId)) { // has failed. Is it ready for retry ? + retryService.remove(msgId); // re-emitted hence remove from failed } + LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record); } } @@ -275,11 +310,12 @@ public class KafkaSpout<K, V> extends BaseRichSpout { @Override public void ack(Object messageId) { + final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId; if (!consumerAutoCommitMode) { // Only need to keep track of acked tuples if commits are not done automatically - final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId; acked.get(msgId.getTopicPartition()).add(msgId); LOG.trace("Acked message [{}]. Messages acked and pending commit [{}]", msgId, acked); } + emitted.remove(msgId); } // ======== Fail ======= @@ -287,10 +323,10 @@ public class KafkaSpout<K, V> extends BaseRichSpout { @Override public void fail(Object messageId) { final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId; + emitted.remove(msgId); if (msgId.numFails() < maxRetries) { msgId.incrementNumFails(); - kafkaSpoutStreams.emit(collector, msgId); - LOG.trace("Retried tuple with message id [{}]", msgId); + retryService.schedule(msgId); } else { // limit to max number of retries LOG.debug("Reached maximum number of retries. Message [{}] being marked as acked.", msgId); ack(msgId); @@ -367,7 +403,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout { this.tp = tp; this.initialFetchOffset = initialFetchOffset; this.committedOffset = initialFetchOffset - 1; - LOG.debug("Created OffsetEntry. {}", this); + LOG.debug("Instantiated {}", this); } public void add(KafkaSpoutMessageId msgId) { // O(Log N) @@ -434,6 +470,14 @@ public class KafkaSpout<K, V> extends BaseRichSpout { return ackedMsgs.isEmpty(); } + public boolean contains(ConsumerRecord record) { + return contains(new KafkaSpoutMessageId(record)); + } + + public boolean contains(KafkaSpoutMessageId msgId) { + return ackedMsgs.contains(msgId); + } + @Override public String toString() { return "OffsetEntry{" + http://git-wip-us.apache.org/repos/asf/storm/blob/8b16afbf/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java index d969f1f..29cedb2 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java @@ -18,7 +18,9 @@ package org.apache.storm.kafka.spout; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.Deserializer; +import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval; import java.io.Serializable; import java.util.ArrayList; @@ -63,24 +65,6 @@ public class KafkaSpoutConfig<K, V> implements Serializable { UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST } - /** - * Defines when to poll the next batch of records from Kafka. The choice of this parameter will affect throughput and the memory - * footprint of the Kafka spout. The allowed values are STREAM and BATCH. STREAM will likely have higher throughput and use more memory - * (it stores in memory the entire KafkaRecord,including data). BATCH will likely have less throughput but also use less memory. - * The BATCH behavior is similar to the behavior of the previous Kafka Spout. De default value is STREAM. - * <ul> - * <li>STREAM Every periodic call to nextTuple polls a new batch of records from Kafka as long as the maxUncommittedOffsets - * threshold has not yet been reached. When the threshold his reached, no more records are polled until enough offsets have been - * committed, such that the number of pending offsets is less than maxUncommittedOffsets. See {@link Builder#setMaxUncommittedOffsets(int)} - * </li> - * <li>BATCH Only polls a new batch of records from kafka once all the records that came in the previous poll have been acked.</li> - * </ul> - */ - public enum PollStrategy { - STREAM, - BATCH - } - // Kafka consumer configuration private final Map<String, Object> kafkaProps; private final Deserializer<K> keyDeserializer; @@ -92,8 +76,9 @@ public class KafkaSpoutConfig<K, V> implements Serializable { private final int maxRetries; private final int maxUncommittedOffsets; private final FirstPollOffsetStrategy firstPollOffsetStrategy; - private final PollStrategy pollStrategy; private final KafkaSpoutStreams kafkaSpoutStreams; + private final KafkaSpoutTuplesBuilder<K, V> tuplesBuilder; + private final KafkaSpoutRetryService retryService; private KafkaSpoutConfig(Builder<K,V> builder) { this.kafkaProps = setDefaultsAndGetKafkaProps(builder.kafkaProps); @@ -103,9 +88,10 @@ public class KafkaSpoutConfig<K, V> implements Serializable { this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs; this.maxRetries = builder.maxRetries; this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy; - this.pollStrategy = builder.pollStrategy; this.kafkaSpoutStreams = builder.kafkaSpoutStreams; this.maxUncommittedOffsets = builder.maxUncommittedOffsets; + this.tuplesBuilder = builder.tuplesBuilder; + this.retryService = builder.retryService; } private Map<String, Object> setDefaultsAndGetKafkaProps(Map<String, Object> kafkaProps) { @@ -117,33 +103,61 @@ public class KafkaSpoutConfig<K, V> implements Serializable { } public static class Builder<K,V> { - private Map<String, Object> kafkaProps; + private final Map<String, Object> kafkaProps; private Deserializer<K> keyDeserializer; private Deserializer<V> valueDeserializer; private long pollTimeoutMs = DEFAULT_POLL_TIMEOUT_MS; private long offsetCommitPeriodMs = DEFAULT_OFFSET_COMMIT_PERIOD_MS; private int maxRetries = DEFAULT_MAX_RETRIES; private FirstPollOffsetStrategy firstPollOffsetStrategy = FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST; - private KafkaSpoutStreams kafkaSpoutStreams; + private final KafkaSpoutStreams kafkaSpoutStreams; private int maxUncommittedOffsets = DEFAULT_MAX_UNCOMMITTED_OFFSETS; - private PollStrategy pollStrategy = PollStrategy.STREAM; + private final KafkaSpoutTuplesBuilder<K, V> tuplesBuilder; + private final KafkaSpoutRetryService retryService; + + /** + * Please refer to javadoc in {@link #Builder(Map, KafkaSpoutStreams, KafkaSpoutTuplesBuilder, KafkaSpoutRetryService)}.<p/> + * This constructor uses by the default the following implementation for {@link KafkaSpoutRetryService}:<p/> + * {@code new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(2), + * DEFAULT_MAX_RETRIES, TimeInterval.seconds(10)))} + */ + public Builder(Map<String, Object> kafkaProps, KafkaSpoutStreams kafkaSpoutStreams, + KafkaSpoutTuplesBuilder<K,V> tuplesBuilder) { + this(kafkaProps, kafkaSpoutStreams, tuplesBuilder, + new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(2), + DEFAULT_MAX_RETRIES, TimeInterval.seconds(10))); + } /*** * KafkaSpoutConfig defines the required configuration to connect a consumer to a consumer group, as well as the subscribing topics * The optional configuration can be specified using the set methods of this builder * @param kafkaProps properties defining consumer connection to Kafka broker as specified in @see <a href="http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html">KafkaConsumer</a> * @param kafkaSpoutStreams streams to where the tuples are emitted for each tuple. Multiple topics can emit in the same stream. + * @param tuplesBuilder logic to build tuples from {@link ConsumerRecord}s. + * @param retryService logic that manages the retrial of failed tuples */ - public Builder(Map<String, Object> kafkaProps, KafkaSpoutStreams kafkaSpoutStreams) { + public Builder(Map<String, Object> kafkaProps, KafkaSpoutStreams kafkaSpoutStreams, + KafkaSpoutTuplesBuilder<K,V> tuplesBuilder, KafkaSpoutRetryService retryService) { if (kafkaProps == null || kafkaProps.isEmpty()) { - throw new IllegalArgumentException("Properties defining consumer connection to Kafka broker are required. " + kafkaProps); + throw new IllegalArgumentException("Properties defining consumer connection to Kafka broker are required: " + kafkaProps); } if (kafkaSpoutStreams == null) { - throw new IllegalArgumentException("Must specify stream associated with each topic. Multiple topics can emit in the same stream."); + throw new IllegalArgumentException("Must specify stream associated with each topic. Multiple topics can emit to the same stream"); + } + + if (tuplesBuilder == null) { + throw new IllegalArgumentException("Must specify at last one tuple builder per topic declared in KafkaSpoutStreams"); } + + if (retryService == null) { + throw new IllegalArgumentException("Must specify at implementation of retry service"); + } + this.kafkaProps = kafkaProps; this.kafkaSpoutStreams = kafkaSpoutStreams; + this.tuplesBuilder = tuplesBuilder; + this.retryService = retryService; } /** @@ -214,16 +228,6 @@ public class KafkaSpoutConfig<K, V> implements Serializable { return this; } - /** - * Sets the strategy used by the the Kafka spout to decide when to poll the next batch of records from Kafka. - * Please refer to to the documentation in {@link PollStrategy} - * @param pollStrategy strategy used to decide when to poll - * */ - public Builder<K, V> setPollStrategy(PollStrategy pollStrategy) { - this.pollStrategy = pollStrategy; - return this; - } - public KafkaSpoutConfig<K,V> build() { return new KafkaSpoutConfig<>(this); } @@ -258,6 +262,9 @@ public class KafkaSpoutConfig<K, V> implements Serializable { return (String) kafkaProps.get(Consumer.GROUP_ID); } + /** + * @return list of topics subscribed and emitting tuples to a stream as configured by {@link KafkaSpoutStream} + */ public List<String> getSubscribedTopics() { return new ArrayList<>(kafkaSpoutStreams.getTopics()); } @@ -278,8 +285,12 @@ public class KafkaSpoutConfig<K, V> implements Serializable { return maxUncommittedOffsets; } - public PollStrategy getPollStrategy() { - return pollStrategy; + public KafkaSpoutTuplesBuilder<K, V> getTuplesBuilder() { + return tuplesBuilder; + } + + public KafkaSpoutRetryService getRetryService() { + return retryService; } @Override http://git-wip-us.apache.org/repos/asf/storm/blob/8b16afbf/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java index 0a6b126..71f8327 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java @@ -21,23 +21,18 @@ package org.apache.storm.kafka.spout; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; -import java.util.Collections; -import java.util.List; - public class KafkaSpoutMessageId { private transient TopicPartition topicPart; private transient long offset; - private transient List<Object> tuple; private transient int numFails = 0; - public KafkaSpoutMessageId(ConsumerRecord consumerRecord, List<Object> tuple) { - this(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord.offset(), tuple); + public KafkaSpoutMessageId(ConsumerRecord consumerRecord) { + this(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord.offset()); } - public KafkaSpoutMessageId(TopicPartition topicPart, long offset, List<Object> tuple) { + public KafkaSpoutMessageId(TopicPartition topicPart, long offset) { this.topicPart = topicPart; this.offset = offset; - this.tuple = tuple; } public int partition() { @@ -64,10 +59,6 @@ public class KafkaSpoutMessageId { return topicPart; } - public List<Object> getTuple() { - return Collections.unmodifiableList(tuple); - } - public String getMetadata(Thread currThread) { return "{" + "topic-partition=" + topicPart + @@ -83,7 +74,6 @@ public class KafkaSpoutMessageId { "topic-partition=" + topicPart + ", offset=" + offset + ", numFails=" + numFails + - ", tuple=" + tuple + '}'; } http://git-wip-us.apache.org/repos/asf/storm/blob/8b16afbf/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java new file mode 100644 index 0000000..208cef4 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import org.apache.kafka.common.TopicPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Collection; +import java.util.Comparator; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.TimeUnit; + +/** + * Implementation of {@link KafkaSpoutRetryService} using the exponential backoff formula. The time of the nextRetry is set as follows: + * nextRetry = failCount == 1 ? currentTime + initialDelay : currentTime + delayPeriod^(failCount-1) where failCount = 1, 2, 3, ... + * nextRetry = Min(nextRetry, currentTime + maxDelay) + */ +public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService { + private static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutRetryExponentialBackoff.class); + private static final RetryEntryTimeStampComparator RETRY_ENTRY_TIME_STAMP_COMPARATOR = new RetryEntryTimeStampComparator(); + + private TimeInterval initialDelay; + private TimeInterval delayPeriod; + private TimeInterval maxDelay; + private int maxRetries; + + private Set<RetrySchedule> retrySchedules = new TreeSet<>(RETRY_ENTRY_TIME_STAMP_COMPARATOR); + private Set<KafkaSpoutMessageId> toRetryMsgs = new HashSet<>(); // Convenience data structure to speedup lookups + + /** + * Comparator ordering by timestamp + */ + private static class RetryEntryTimeStampComparator implements Serializable, Comparator<RetrySchedule> { + public int compare(RetrySchedule entry1, RetrySchedule entry2) { + return Long.valueOf(entry1.nextRetryTimeNanos()).compareTo(entry2.nextRetryTimeNanos()); + } + } + + private class RetrySchedule { + private KafkaSpoutMessageId msgId; + private long nextRetryTimeNanos; + + public RetrySchedule(KafkaSpoutMessageId msgId, long nextRetryTime) { + this.msgId = msgId; + this.nextRetryTimeNanos = nextRetryTime; + LOG.debug("Created {}", this); + } + + public void setNextRetryTime() { + nextRetryTimeNanos = nextTime(msgId); + LOG.debug("Updated {}", this); + } + + public boolean retry(long currentTimeNanos) { + return nextRetryTimeNanos <= currentTimeNanos; + } + + @Override + public String toString() { + return "RetrySchedule{" + + "msgId=" + msgId + + ", nextRetryTime=" + nextRetryTimeNanos + + '}'; + } + + public KafkaSpoutMessageId msgId() { + return msgId; + } + + public long nextRetryTimeNanos() { + return nextRetryTimeNanos; + } + } + + public static class TimeInterval implements Serializable { + private long lengthNanos; + private long length; + private TimeUnit timeUnit; + + /** + * @param length length of the time interval in the units specified by {@link TimeUnit} + * @param timeUnit unit used to specify a time interval on which to specify a time unit + */ + public TimeInterval(long length, TimeUnit timeUnit) { + this.length = length; + this.timeUnit = timeUnit; + this.lengthNanos = timeUnit.toNanos(length); + } + + public static TimeInterval seconds(long length) { + return new TimeInterval(length, TimeUnit.SECONDS); + } + + public static TimeInterval milliSeconds(long length) { + return new TimeInterval(length, TimeUnit.MILLISECONDS); + } + + public static TimeInterval microSeconds(long length) { + return new TimeInterval(length, TimeUnit.MILLISECONDS); + } + + public long lengthNanos() { + return lengthNanos; + } + + public long length() { + return length; + } + + public TimeUnit timeUnit() { + return timeUnit; + } + + @Override + public String toString() { + return "TimeInterval{" + + "length=" + length + + ", timeUnit=" + timeUnit + + '}'; + } + } + + /** + * The time stamp of the next retry is scheduled according to the exponential backoff formula ( geometric progression): + * nextRetry = failCount == 1 ? currentTime + initialDelay : currentTime + delayPeriod^(failCount-1) where failCount = 1, 2, 3, ... + * nextRetry = Min(nextRetry, currentTime + maxDelay) + * + * @param initialDelay initial delay of the first retry + * @param delayPeriod the time interval that is the ratio of the exponential backoff formula (geometric progression) + * @param maxRetries maximum number of times a tuple is retried before being acked and scheduled for commit + * @param maxDelay maximum amount of time waiting before retrying + * + */ + public KafkaSpoutRetryExponentialBackoff(TimeInterval initialDelay, TimeInterval delayPeriod, int maxRetries, TimeInterval maxDelay) { + this.initialDelay = initialDelay; + this.delayPeriod = delayPeriod; + this.maxRetries = maxRetries; + this.maxDelay = maxDelay; + LOG.debug("Instantiated {}", this); + } + + @Override + public Set<TopicPartition> retriableTopicPartitions() { + final Set<TopicPartition> tps = new TreeSet<>(); + final long currentTimeNanos = System.nanoTime(); + for (RetrySchedule retrySchedule : retrySchedules) { + if (retrySchedule.retry(currentTimeNanos)) { + final KafkaSpoutMessageId msgId = retrySchedule.msgId; + tps.add(new TopicPartition(msgId.topic(), msgId.partition())); + } else { + break; // Stop searching as soon as passed current time + } + } + LOG.debug("Topic partitions with entries ready to be retried [{}] ", tps); + return tps; + } + + @Override + public boolean isReady(KafkaSpoutMessageId msgId) { + boolean retry = false; + if (toRetryMsgs.contains(msgId)) { + final long currentTimeNanos = System.nanoTime(); + for (RetrySchedule retrySchedule : retrySchedules) { + if (retrySchedule.retry(currentTimeNanos)) { + if (retrySchedule.msgId.equals(msgId)) { + retry = true; + LOG.debug("Found entry to retry {}", retrySchedule); + } + } else { + LOG.debug("Entry to retry not found {}", retrySchedule); + break; // Stop searching as soon as passed current time + } + } + } + return retry; + } + + @Override + public boolean isScheduled(KafkaSpoutMessageId msgId) { + return toRetryMsgs.contains(msgId); + } + + @Override + public boolean remove(KafkaSpoutMessageId msgId) { + boolean removed = false; + if (toRetryMsgs.contains(msgId)) { + for (Iterator<RetrySchedule> iterator = retrySchedules.iterator(); iterator.hasNext(); ) { + final RetrySchedule retrySchedule = iterator.next(); + if (retrySchedule.msgId().equals(msgId)) { + iterator.remove(); + toRetryMsgs.remove(msgId); + removed = true; + break; + } + } + } + LOG.debug(removed ? "Removed {} " : "Not removed {}", msgId); + LOG.trace("Current state {}", retrySchedules); + return removed; + } + + @Override + public boolean retainAll(Collection<TopicPartition> topicPartitions) { + boolean result = false; + for (Iterator<RetrySchedule> rsIterator = retrySchedules.iterator(); rsIterator.hasNext(); ) { + final RetrySchedule retrySchedule = rsIterator.next(); + final KafkaSpoutMessageId msgId = retrySchedule.msgId; + final TopicPartition tpRetry= new TopicPartition(msgId.topic(), msgId.partition()); + if (!topicPartitions.contains(tpRetry)) { + rsIterator.remove(); + toRetryMsgs.remove(msgId); + LOG.debug("Removed {}", retrySchedule); + LOG.trace("Current state {}", retrySchedules); + result = true; + } + } + return result; + } + + @Override + public void schedule(KafkaSpoutMessageId msgId) { + if (msgId.numFails() > maxRetries) { + LOG.debug("Not scheduling [{}] because reached maximum number of retries [{}].", msgId, maxRetries); + } else { + if (toRetryMsgs.contains(msgId)) { + for (Iterator<RetrySchedule> iterator = retrySchedules.iterator(); iterator.hasNext(); ) { + final RetrySchedule retrySchedule = iterator.next(); + if (retrySchedule.msgId().equals(msgId)) { + iterator.remove(); + toRetryMsgs.remove(msgId); + } + } + } + final RetrySchedule retrySchedule = new RetrySchedule(msgId, nextTime(msgId)); + retrySchedules.add(retrySchedule); + toRetryMsgs.add(msgId); + LOG.debug("Scheduled. {}", retrySchedule); + LOG.trace("Current state {}", retrySchedules); + } + } + + // if value is greater than Long.MAX_VALUE it truncates to Long.MAX_VALUE + private long nextTime(KafkaSpoutMessageId msgId) { + final long currentTimeNanos = System.nanoTime(); + final long nextTimeNanos = msgId.numFails() == 1 // numFails = 1, 2, 3, ... + ? currentTimeNanos + initialDelay.lengthNanos() + : (long) (currentTimeNanos + Math.pow(delayPeriod.lengthNanos, msgId.numFails() - 1)); + return Math.min(nextTimeNanos, currentTimeNanos + maxDelay.lengthNanos); + } + + @Override + public String toString() { + return "KafkaSpoutRetryExponentialBackoff{" + + "delay=" + initialDelay + + ", ratio=" + delayPeriod + + ", maxRetries=" + maxRetries + + ", maxRetryDelay=" + maxDelay + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/8b16afbf/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java new file mode 100644 index 0000000..5aab167 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import org.apache.kafka.common.TopicPartition; + +import java.io.Serializable; +import java.util.Collection; +import java.util.Set; + +/** + * Represents the logic that manages the retrial of failed tuples. + */ +public interface KafkaSpoutRetryService extends Serializable { + /** + * Schedules this {@link KafkaSpoutMessageId} if not yet scheduled, or updates retry time if it has already been scheduled. + * @param msgId message to schedule for retrial + */ + void schedule(KafkaSpoutMessageId msgId); + + /** + * Removes a message from the list of messages scheduled for retrial + * @param msgId message to remove from retrial + */ + boolean remove(KafkaSpoutMessageId msgId); + + /** + * Retains all the messages whose {@link TopicPartition} belongs to the specified {@code Collection<TopicPartition>}. + * All messages that come from a {@link TopicPartition} NOT existing in the collection will be removed. + * This method is useful to cleanup state following partition rebalance. + * @param topicPartitions Collection of {@link TopicPartition} for which to keep messages + * @return true if at least one message was removed, false otherwise + */ + boolean retainAll(Collection<TopicPartition> topicPartitions); + + /** + * @return set of topic partitions that have offsets that are ready to be retried, i.e., + * for which a tuple has failed and has retry time less than current time + */ + Set<TopicPartition> retriableTopicPartitions(); + + /** + * Checks if a specific failed {@link KafkaSpoutMessageId} is is ready to be retried, + * i.e is scheduled and has retry time that is less than current time. + * @return true if message is ready to be retried, false otherwise + */ + boolean isReady(KafkaSpoutMessageId msgId); + + /** + * Checks if a specific failed {@link KafkaSpoutMessageId} is scheduled to be retried. + * The message may or may not be ready to be retried yet. + * @return true if the message is scheduled to be retried, regardless of being or not ready to be retried. + * Returns false is this message is not scheduled for retrial + */ + boolean isScheduled(KafkaSpoutMessageId msgId); +} http://git-wip-us.apache.org/repos/asf/storm/blob/8b16afbf/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java index 43464a9..064a8bb 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java @@ -31,27 +31,31 @@ public class KafkaSpoutStream implements Serializable { private final String streamId; private final String topic; - /** Declare specified outputFields with default stream for the specified topic */ + /** Represents the specified outputFields and topic with the default stream */ KafkaSpoutStream(Fields outputFields, String topic) { this(outputFields, Utils.DEFAULT_STREAM_ID, topic); } - /** Declare specified outputFields with specified stream for the specified topic */ + /** Represents the specified outputFields and topic with the specified stream */ KafkaSpoutStream(Fields outputFields, String streamId, String topic) { + if (outputFields == null || streamId == null || topic == null) { + throw new IllegalArgumentException(String.format("Constructor parameters cannot be null. " + + "[outputFields=%s, streamId=%s, topic=%s]", outputFields, streamId, topic)); + } this.outputFields = outputFields; this.streamId = streamId; this.topic = topic; } - public Fields getOutputFields() { + Fields getOutputFields() { return outputFields; } - public String getStreamId() { + String getStreamId() { return streamId; } - public String getTopic() { + String getTopic() { return topic; } http://git-wip-us.apache.org/repos/asf/storm/blob/8b16afbf/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java index 30215d1..dc5892e 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java @@ -33,7 +33,7 @@ import java.util.List; import java.util.Map; /** - * Represents the output streams associated with each topic, and provides a public API to + * Represents the {@link KafkaSpoutStream} associated with each topic, and provides a public API to * declare output streams and emmit tuples, on the appropriate stream, for all the topics specified. */ public class KafkaSpoutStreams implements Serializable { @@ -48,7 +48,7 @@ public class KafkaSpoutStreams implements Serializable { /** * @param topic the topic for which to get output fields - * @return the output fields declared + * @return the declared output fields */ public Fields getOutputFields(String topic) { if (topicToStream.containsKey(topic)) { @@ -79,7 +79,7 @@ public class KafkaSpoutStreams implements Serializable { return new ArrayList<>(topicToStream.keySet()); } - void declareOutputFields(OutputFieldsDeclarer declarer) { + public void declareOutputFields(OutputFieldsDeclarer declarer) { for (KafkaSpoutStream stream : topicToStream.values()) { if (!((OutputFieldsGetter)declarer).getFieldsDeclaration().containsKey(stream.getStreamId())) { declarer.declareStream(stream.getStreamId(), stream.getOutputFields()); @@ -88,8 +88,8 @@ public class KafkaSpoutStreams implements Serializable { } } - void emit(SpoutOutputCollector collector, KafkaSpoutMessageId messageId) { - collector.emit(getStreamId(messageId.topic()), messageId.getTuple(), messageId); + public void emit(SpoutOutputCollector collector, List<Object> tuple, KafkaSpoutMessageId messageId) { + collector.emit(getStreamId(messageId.topic()), tuple, messageId); } @Override @@ -103,11 +103,11 @@ public class KafkaSpoutStreams implements Serializable { private final Map<String, KafkaSpoutStream> topicToStream = new HashMap<>();; /** - * Creates a {@link KafkaSpoutStream} with this particular stream for each topic specified. - * All the topics will have the same stream id and output fields. + * Creates a {@link KafkaSpoutStream} with the given output Fields for each topic specified. + * All topics will have the same stream id and output fields. */ public Builder(Fields outputFields, String... topics) { - this(outputFields, Utils.DEFAULT_STREAM_ID, topics); + addStream(outputFields, topics); } /** @@ -115,16 +115,14 @@ public class KafkaSpoutStreams implements Serializable { * All the topics will have the same stream id and output fields. */ public Builder (Fields outputFields, String streamId, String... topics) { - for (String topic : topics) { - topicToStream.put(topic, new KafkaSpoutStream(outputFields, streamId, topic)); - } + addStream(outputFields, streamId, topics); } /** * Adds this stream to the state representing the streams associated with each topic */ public Builder(KafkaSpoutStream stream) { - topicToStream.put(stream.getTopic(), stream); + addStream(stream); } /** @@ -139,9 +137,7 @@ public class KafkaSpoutStreams implements Serializable { * Please refer to javadoc in {@link #Builder(Fields, String...)} */ public Builder addStream(Fields outputFields, String... topics) { - for (String topic : topics) { - topicToStream.put(topic, new KafkaSpoutStream(outputFields, topic)); - } + addStream(outputFields, Utils.DEFAULT_STREAM_ID, topics); return this; } http://git-wip-us.apache.org/repos/asf/storm/blob/8b16afbf/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTupleBuilder.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTupleBuilder.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTupleBuilder.java index 45aab48..3bb71a8 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTupleBuilder.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTupleBuilder.java @@ -21,8 +21,38 @@ package org.apache.storm.kafka.spout; import org.apache.kafka.clients.consumer.ConsumerRecord; import java.io.Serializable; +import java.util.Arrays; +import java.util.Collections; import java.util.List; -public interface KafkaSpoutTupleBuilder<K,V> extends Serializable { - List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord, KafkaSpoutStreams kafkaSpoutStreams); +/** + * Implementations of {@link KafkaSpoutTupleBuilder} contain the logic to build tuples from {@link ConsumerRecord}s. + * Users must subclass this abstract class to provide their implementation. See also {@link KafkaSpoutTuplesBuilder} + */ +public abstract class KafkaSpoutTupleBuilder<K,V> implements Serializable { + private List<String> topics; + + /** + * @param topics list of topics that use this implementation to build tuples + */ + public KafkaSpoutTupleBuilder(String... topics) { + if (topics == null || topics.length == 0) { + throw new IllegalArgumentException("Must specify at least one topic. It cannot be null or empty"); + } + this.topics = Arrays.asList(topics); + } + + /** + * @return list of topics that use this implementation to build tuples + */ + public List<String> getTopics() { + return Collections.unmodifiableList(topics); + } + + /** + * Builds a list of tuples using the ConsumerRecord specified as parameter + * @param consumerRecord whose contents are used to build tuples + * @return list of tuples + */ + public abstract List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord); } http://git-wip-us.apache.org/repos/asf/storm/blob/8b16afbf/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilder.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilder.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilder.java new file mode 100644 index 0000000..d67c69d --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilder.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * {@link KafkaSpoutTuplesBuilder} wraps all the logic that builds tuples from {@link ConsumerRecord}s. + * The logic is provided by the user by implementing the appropriate number of {@link KafkaSpoutTupleBuilder} instances + */ +public class KafkaSpoutTuplesBuilder<K,V> implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutTuplesBuilder.class); + + private Map<String, KafkaSpoutTupleBuilder<K, V>> topicToTupleBuilders; + + private KafkaSpoutTuplesBuilder(Builder<K,V> builder) { + this.topicToTupleBuilders = builder.topicToTupleBuilders; + LOG.debug("Instantiated {}", this); + } + + public static class Builder<K,V> { + private List<KafkaSpoutTupleBuilder<K, V>> tupleBuilders; + private Map<String, KafkaSpoutTupleBuilder<K, V>> topicToTupleBuilders; + + @SafeVarargs + public Builder(KafkaSpoutTupleBuilder<K,V>... tupleBuilders) { + if (tupleBuilders == null || tupleBuilders.length == 0) { + throw new IllegalArgumentException("Must specify at last one tuple builder per topic declared in KafkaSpoutStreams"); + } + + this.tupleBuilders = Arrays.asList(tupleBuilders); + topicToTupleBuilders = new HashMap<>(); + } + + public KafkaSpoutTuplesBuilder<K,V> build() { + for (KafkaSpoutTupleBuilder<K, V> tupleBuilder : tupleBuilders) { + for (String topic : tupleBuilder.getTopics()) { + if (!topicToTupleBuilders.containsKey(topic)) { + topicToTupleBuilders.put(topic, tupleBuilder); + } + } + } + return new KafkaSpoutTuplesBuilder<>(this); + } + } + + public List<Object>buildTuple(ConsumerRecord<K,V> consumerRecord) { + final String topic = consumerRecord.topic(); + return topicToTupleBuilders.get(topic).buildTuple(consumerRecord); + } + + @Override + public String toString() { + return "KafkaSpoutTuplesBuilder{" + + "topicToTupleBuilders=" + topicToTupleBuilders + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/8b16afbf/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTestBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTestBolt.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTestBolt.java new file mode 100644 index 0000000..7a94a50 --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTestBolt.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.test; + +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Tuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +public class KafkaSpoutTestBolt extends BaseRichBolt { + protected static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutTestBolt.class); + private OutputCollector collector; + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.collector = collector; + } + + @Override + public void execute(Tuple input) { + LOG.debug("input = [" + input + "]"); + collector.ack(input); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/8b16afbf/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMain.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMain.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMain.java index 4fcc3ef..0691dd3 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMain.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMain.java @@ -22,11 +22,13 @@ import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.StormTopology; -import org.apache.storm.kafka.spout.KafkaRecordTupleBuilder; import org.apache.storm.kafka.spout.KafkaSpout; import org.apache.storm.kafka.spout.KafkaSpoutConfig; +import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff; +import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval; +import org.apache.storm.kafka.spout.KafkaSpoutRetryService; import org.apache.storm.kafka.spout.KafkaSpoutStreams; -import org.apache.storm.kafka.spout.KafkaSpoutTupleBuilder; +import org.apache.storm.kafka.spout.KafkaSpoutTuplesBuilder; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields; @@ -35,9 +37,9 @@ import java.io.IOException; import java.io.InputStreamReader; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST; -import static org.apache.storm.kafka.spout.KafkaSpoutConfig.PollStrategy.STREAM; public class KafkaSpoutTopologyMain { private static final String[] STREAMS = new String[]{"test_stream","test1_stream","test2_stream"}; @@ -80,21 +82,29 @@ public class KafkaSpoutTopologyMain { public static StormTopology getTopolgyKafkaSpout() { final TopologyBuilder tp = new TopologyBuilder(); - tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams()), getTupleBuilder()), 1); - tp.setBolt("kafka_bolt", new KafkaTestBolt()).shuffleGrouping("kafka_spout", STREAMS[0]); - tp.setBolt("kafka_bolt_1", new KafkaTestBolt()).shuffleGrouping("kafka_spout", STREAMS[2]); + tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams())), 1); + tp.setBolt("kafka_bolt", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAMS[0]); + tp.setBolt("kafka_bolt_1", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAMS[2]); return tp.createTopology(); } public static KafkaSpoutConfig<String,String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams) { - return new KafkaSpoutConfig.Builder<String, String>(getKafkaConsumerProps(), kafkaSpoutStreams) + return new KafkaSpoutConfig.Builder<String, String>(getKafkaConsumerProps(), kafkaSpoutStreams, getTuplesBuilder(), getRetryService()) .setOffsetCommitPeriodMs(10_000) .setFirstPollOffsetStrategy(EARLIEST) - .setPollStrategy(STREAM) .setMaxUncommittedOffsets(250) .build(); } + private static KafkaSpoutRetryService getRetryService() { + return new KafkaSpoutRetryExponentialBackoff(getTimeInterval(500, TimeUnit.MICROSECONDS), + TimeInterval.milliSeconds(2), Integer.MAX_VALUE, TimeInterval.seconds(10)); + } + + private static TimeInterval getTimeInterval(long delay, TimeUnit timeUnit) { + return new TimeInterval(delay, timeUnit); + } + public static Map<String,Object> getKafkaConsumerProps() { Map<String, Object> props = new HashMap<>(); // props.put(KafkaSpoutConfig.Consumer.ENABLE_AUTO_COMMIT, "true"); @@ -105,16 +115,19 @@ public class KafkaSpoutTopologyMain { return props; } - public static KafkaSpoutTupleBuilder<String,String> getTupleBuilder() { - return new KafkaRecordTupleBuilder<>(); + public static KafkaSpoutTuplesBuilder<String, String> getTuplesBuilder() { + return new KafkaSpoutTuplesBuilder.Builder<>( + new TopicsTest0Test1TupleBuilder<String, String>(TOPICS[0], TOPICS[1]), + new TopicTest2TupleBuilder<String, String>(TOPICS[2])) + .build(); } public static KafkaSpoutStreams getKafkaSpoutStreams() { final Fields outputFields = new Fields("topic", "partition", "offset", "key", "value"); final Fields outputFields1 = new Fields("topic", "partition", "offset"); return new KafkaSpoutStreams.Builder(outputFields, STREAMS[0], new String[]{TOPICS[0], TOPICS[1]}) // contents of topics test, test1, sent to test_stream - .addStream(outputFields, STREAMS[0], new String[]{TOPICS[2]}) // contents topic test2 sent to test_stream - .addStream(outputFields1, STREAMS[2], new String[]{TOPICS[2]}) // contents topic test2 sent to test_stream2 + .addStream(outputFields, STREAMS[0], new String[]{TOPICS[2]}) // contents of topic test2 sent to test_stream + .addStream(outputFields1, STREAMS[2], new String[]{TOPICS[2]}) // contents of topic test2 sent to test2_stream .build(); } } http://git-wip-us.apache.org/repos/asf/storm/blob/8b16afbf/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaTestBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaTestBolt.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaTestBolt.java deleted file mode 100644 index c9ff9d5..0000000 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaTestBolt.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.kafka.spout.test; - -import org.apache.storm.task.OutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.topology.base.BaseRichBolt; -import org.apache.storm.tuple.Tuple; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Map; - -public class KafkaTestBolt extends BaseRichBolt { - protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestBolt.class); - - - private OutputCollector collector; - - @Override - public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { - this.collector = collector; - } - - @Override - public void execute(Tuple input) { - LOG.debug("input = [" + input + "]"); - collector.ack(input); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/8b16afbf/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicTest2TupleBuilder.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicTest2TupleBuilder.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicTest2TupleBuilder.java new file mode 100644 index 0000000..ca65177 --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicTest2TupleBuilder.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout.test; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.storm.kafka.spout.KafkaSpoutTupleBuilder; +import org.apache.storm.tuple.Values; + +import java.util.List; + +public class TopicTest2TupleBuilder<K, V> extends KafkaSpoutTupleBuilder<K,V> { + /** + * @param topics list of topics that use this implementation to build tuples + */ + public TopicTest2TupleBuilder(String... topics) { + super(topics); + } + + @Override + public List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord) { + return new Values(consumerRecord.topic(), + consumerRecord.partition(), + consumerRecord.offset()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/8b16afbf/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicsTest0Test1TupleBuilder.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicsTest0Test1TupleBuilder.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicsTest0Test1TupleBuilder.java new file mode 100644 index 0000000..4c55aa1 --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicsTest0Test1TupleBuilder.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout.test; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.storm.kafka.spout.KafkaSpoutTupleBuilder; +import org.apache.storm.tuple.Values; + +import java.util.List; + +public class TopicsTest0Test1TupleBuilder<K, V> extends KafkaSpoutTupleBuilder<K,V> { + /** + * @param topics list of topics that use this implementation to build tuples + */ + public TopicsTest0Test1TupleBuilder(String... topics) { + super(topics); + } + + @Override + public List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord) { + return new Values(consumerRecord.topic(), + consumerRecord.partition(), + consumerRecord.offset(), + consumerRecord.key(), + consumerRecord.value()); + } +} \ No newline at end of file
