Repository: storm Updated Branches: refs/heads/master 3fd66b5e8 -> db4695d2d
STORM-2340 fix AutoCommitMode issue in KafkaSpout * Closes #1919 * fix: KafkaSpout is blocked in AutoCommitMode * add comments for impacts of AutoCommitMode * add doc about how to use KafkaSpout with at-most-once. * remove at-most-once for better describe the changes; emit null msgId when AutoCommitMode; * update sample code in storm-kafka-client to use inline setProp() Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/914a4768 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/914a4768 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/914a4768 Branch: refs/heads/master Commit: 914a4768c9a8aa0320b49ff4bfb1ec338bf1d042 Parents: 3fd66b5 Author: mingmxu <[email protected]> Authored: Fri Feb 3 12:03:37 2017 -0800 Committer: Jungtaek Lim <[email protected]> Committed: Tue Feb 14 11:25:49 2017 +0900 ---------------------------------------------------------------------- docs/storm-kafka-client.md | 22 ++++++++++ .../apache/storm/kafka/spout/KafkaSpout.java | 43 +++++++++++++------- 2 files changed, 50 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/914a4768/docs/storm-kafka-client.md ---------------------------------------------------------------------- diff --git a/docs/storm-kafka-client.md b/docs/storm-kafka-client.md index ec5056f..79c4115 100644 --- a/docs/storm-kafka-client.md +++ b/docs/storm-kafka-client.md @@ -345,3 +345,25 @@ Currently the Kafka spout has has the following default values, which have shown * offset.commit.period.ms = 30000 (30s) * max.uncommitted.offsets = 10000000 <br/> + +# Kafka AutoCommitMode + +If reliability isn't important to you -- that is, you don't care about losing tuples in failure situations --, and want to remove the overhead of tuple tracking, then you can run a KafkaSpout with AutoCommitMode. + +To enable it, you need to: +* set Config.TOPOLOGY_ACKERS to 0; +* enable *AutoCommitMode* in Kafka consumer configuration; + +Here's one example to set AutoCommitMode in KafkaSpout: +```java +KafkaSpoutConfig<String, String> kafkaConf = KafkaSpoutConfig + .builder(String bootstrapServers, String ... topics) + .setProp(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true") + .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.EARLIEST) + .build(); +``` + +*Note that it's not exactly At-Most-Once in Storm, as offset is committed periodically by Kafka consumer, some tuples could be replayed when KafkaSpout is crashed.* + + + http://git-wip-us.apache.org/repos/asf/storm/blob/914a4768/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index 864235c..b96f3f9 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -78,10 +78,10 @@ public class KafkaSpout<K, V> extends BaseRichSpout { private transient boolean initialized; // Flag indicating that the spout is still undergoing initialization process. // Initialization is only complete after the first call to KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned() - transient Map<TopicPartition, OffsetEntry> acked; // Tuples that were successfully acked. These tuples will be committed periodically when the commit timer expires, after consumer rebalance, or on close/deactivate - private transient Set<KafkaSpoutMessageId> emitted; // Tuples that have been emitted but that are "on the wire", i.e. pending being acked or failed + transient Map<TopicPartition, OffsetEntry> acked; // Tuples that were successfully acked. These tuples will be committed periodically when the commit timer expires, after consumer rebalance, or on close/deactivate. Not used if it's AutoCommitMode + private transient Set<KafkaSpoutMessageId> emitted; // Tuples that have been emitted but that are "on the wire", i.e. pending being acked or failed. Not used if it's AutoCommitMode private transient Iterator<ConsumerRecord<K, V>> waitingToEmit; // Records that have been polled and are queued to be emitted in the nextTuple() call. One record is emitted per nextTuple() - private transient long numUncommittedOffsets; // Number of offsets that have been polled and emitted but not yet been committed + private transient long numUncommittedOffsets; // Number of offsets that have been polled and emitted but not yet been committed. Not used if it's AutoCommitMode private transient TopologyContext context; private transient Timer refreshSubscriptionTimer; // Used to say when a subscription should be refreshed @@ -107,7 +107,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout { // Offset management firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy(); - consumerAutoCommitMode = kafkaSpoutConfig.isConsumerAutoCommitMode(); + // with AutoCommitMode, offset will be periodically committed in the background by Kafka consumer + consumerAutoCommitMode = kafkaSpoutConfig.isConsumerAutoCommitMode(); // Retries management retryService = kafkaSpoutConfig.getRetryService(); @@ -242,14 +243,15 @@ public class KafkaSpout<K, V> extends BaseRichSpout { private boolean poll() { final int maxUncommittedOffsets = kafkaSpoutConfig.getMaxUncommittedOffsets(); - final boolean poll = !waitingToEmit() && numUncommittedOffsets < maxUncommittedOffsets; + final boolean poll = !waitingToEmit() + && ( numUncommittedOffsets < maxUncommittedOffsets || consumerAutoCommitMode); if (!poll) { if (waitingToEmit()) { LOG.debug("Not polling. Tuples waiting to be emitted. [{}] uncommitted offsets across all topic partitions", numUncommittedOffsets); } - if (numUncommittedOffsets >= maxUncommittedOffsets) { + if (numUncommittedOffsets >= maxUncommittedOffsets && !consumerAutoCommitMode) { LOG.debug("Not polling. [{}] uncommitted offsets across all topic partitions has reached the threshold of [{}]", numUncommittedOffsets, maxUncommittedOffsets); } } @@ -314,15 +316,26 @@ public class KafkaSpout<K, V> extends BaseRichSpout { boolean isScheduled = retryService.isScheduled(msgId); if (!isScheduled || retryService.isReady(msgId)) { // not scheduled <=> never failed (i.e. never emitted) or ready to be retried final List<Object> tuple = kafkaSpoutConfig.getTranslator().apply(record); - if (tuple instanceof KafkaTuple) { - collector.emit(((KafkaTuple)tuple).getStream(), tuple, msgId); - } else { - collector.emit(tuple, msgId); - } - emitted.add(msgId); - numUncommittedOffsets++; - if (isScheduled) { // Was scheduled for retry, now being re-emitted. Remove from schedule. - retryService.remove(msgId); + + if(consumerAutoCommitMode){ + if (tuple instanceof KafkaTuple) { + collector.emit(((KafkaTuple)tuple).getStream(), tuple); + } else { + collector.emit(tuple); + } + }else{ + if (tuple instanceof KafkaTuple) { + collector.emit(((KafkaTuple)tuple).getStream(), tuple, msgId); + } else { + collector.emit(tuple, msgId); + } + + emitted.add(msgId); + numUncommittedOffsets++; + + if (isScheduled) { // Was scheduled for retry, now being re-emitted. Remove from schedule. + retryService.remove(msgId); + } } LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record); return true;
