Repository: storm Updated Branches: refs/heads/1.x-branch 96864e835 -> 5d0889b84
STORM-2781: Refactor storm-kafka-client KafkaSpout Processing Guarantees - Define processing guarantees as AT_LEAST_ONCE, AT_MOST_ONCE, NONE - Refactor method name from setForceEnableTupleTracking to setTupleTrackingEnforced - Throw IllegalStateException instead of IllegalArgumentException if spout attempts to emit an already committed message - Update documentation to reflect these changes Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ac16fe1e Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ac16fe1e Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ac16fe1e Branch: refs/heads/1.x-branch Commit: ac16fe1ee9d974af64a30769819561c0abae23af Parents: 3414a8c Author: Hugo Louro <[email protected]> Authored: Sun Oct 22 17:44:54 2017 -0700 Committer: Hugo Louro <[email protected]> Committed: Sun Oct 29 16:07:53 2017 -0700 ---------------------------------------------------------------------- docs/storm-kafka-client.md | 37 ++++++-- .../apache/storm/kafka/spout/KafkaSpout.java | 94 +++++++++++--------- .../storm/kafka/spout/KafkaSpoutConfig.java | 55 +++++++----- .../spout/KafkaSpoutMessagingGuaranteeTest.java | 14 +-- 4 files changed, 121 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/ac16fe1e/docs/storm-kafka-client.md ---------------------------------------------------------------------- diff --git a/docs/storm-kafka-client.md b/docs/storm-kafka-client.md index 841e8ef..0a12910 100644 --- a/docs/storm-kafka-client.md +++ b/docs/storm-kafka-client.md @@ -316,7 +316,7 @@ use Kafka-clients 0.10.0.0, you would use the following dependency in your `pom. You can also override the kafka clients version while building from maven, with parameter `storm.kafka.client.version` e.g. `mvn clean install -Dstorm.kafka.client.version=0.10.0.0` -When selecting a kafka client version, you should ensure - +When selecting a kafka client version, you should ensure - 1. kafka api is compatible. storm-kafka-client module only supports **0.10 or newer** kafka client API. For older versions, you can use storm-kafka module (https://github.com/apache/storm/tree/master/external/storm-kafka). 2. The kafka client selected by you should be wire compatible with the broker. e.g. 0.9.x client will not work with @@ -348,25 +348,46 @@ Currently the Kafka spout has has the following default values, which have been * max.uncommitted.offsets = 10000000 <br/> -# Messaging reliability modes +# Processing Guarantees -In some cases you may not need or want the spout to guarantee at-least-once processing of messages. The spout also supports at-most-once and any-times modes. At-most-once guarantees that any tuple emitted to the topology will never be reemitted. Any-times makes no guarantees, but may reduce the overhead of committing offsets to Kafka in cases where you truly don't care how many times a message is processed. +The `KafkaSpoutConfig.ProcessingGuarantee` enum parameter controls when an offset is committed to Kafka. This is +conceptually equivalent to marking the tuple with the `ConsumerRecord` for that offset as being successfully processed +because the tuple won't get re-emitted in case of failure or time out. + +For the AT_LEAST_ONCE and AT_MOST_ONCE processing guarantees the spout controls when the commit happens. +When the guarantee is NONE Kafka controls when the commit happens. + +* AT_LEAST_ONCE - an offset is ready to commit only after the corresponding tuple has been processed (at-least-once) + and acked. If a tuple fails or times out it will be re-emitted. A tuple can be processed more than once if for instance + the ack gets lost. + +* AT_MOST_ONCE - Offsets will be committed to Kafka right after being polled but before being emitted to the downstream + components of the topology. Offsets are processed at most once because tuples that fail or timeout won't be retried. + +* NONE - the polled offsets are committed to Kafka periodically as controlled by the Kafka properties + "enable.auto.commit" and "auto.commit.interval.ms". Because the spout does not control when the commit happens + it cannot give any message processing guarantees, i.e. a message may be processed 0, 1 or more times. + This option requires "enable.auto.commit=true". If "enable.auto.commit=false" an exception will be thrown. + +To set the processing guarantee use the `KafkaSpoutConfig.Builder.setProcessingGuarantee` method as follows: -To set the processing guarantee, use the KafkaSpoutConfig.Builder.setProcessingGuarantee method, e.g. ```java KafkaSpoutConfig<String, String> kafkaConf = KafkaSpoutConfig .builder(String bootstrapServers, String ... topics) .setProcessingGuarantee(ProcessingGuarantee.AT_MOST_ONCE) ``` -The spout will disable tuple tracking for emitted tuples by default when you use at-most-once or any-times. In some cases you may want to enable tracking anyway, because tuple tracking is necessary for some features of Storm, e.g. showing complete latency in Storm UI, or enabling backpressure through the `Config.TOPOLOGY_MAX_SPOUT_PENDING` parameter. +# Tuple Tracking + +By default the spout only tracks emitted tuples when the processing guarantee is AT_LEAST_ONCE. It may be necessary to track +emitted tuples with other processing guarantees to benefit from Storm features such as showing complete latency in the UI, +or enabling backpressure with Config.TOPOLOGY_MAX_SPOUT_PENDING. -If you need to enable tracking, use the KafkaSpoutConfig.Builder.setForceEnableTupleTracking method, e.g. ```java KafkaSpoutConfig<String, String> kafkaConf = KafkaSpoutConfig .builder(String bootstrapServers, String ... topics) .setProcessingGuarantee(ProcessingGuarantee.AT_MOST_ONCE) - .setForceEnableTupleTracking(true) + .setTupleTrackingEnforced(true) ``` -Note that this setting has no effect in at-least-once mode, where tuple tracking is always enabled. \ No newline at end of file +Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, where tuple tracking is required and therefore always enabled. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/ac16fe1e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index 5022862..0a4d788 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -78,17 +78,17 @@ public class KafkaSpout<K, V> extends BaseRichSpout { private transient KafkaSpoutRetryService retryService; // Handles tuple events (emit, ack etc.) private transient KafkaTupleListener tupleListener; - // timer == null for modes other than at-least-once + // timer == null if processing guarantee is none or at-most-once private transient Timer commitTimer; // Flag indicating that the spout is still undergoing initialization process. private transient boolean initialized; // Initialization is only complete after the first call to KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned() // Tuples that were successfully acked/emitted. These tuples will be committed periodically when the commit timer expires, - //or after a consumer rebalance, or during close/deactivate. Always empty if not using at-least-once mode. + // or after a consumer rebalance, or during close/deactivate. Always empty if processing guarantee is none or at-most-once. private transient Map<TopicPartition, OffsetManager> offsetManagers; // Tuples that have been emitted but that are "on the wire", i.e. pending being acked or failed. - // Always empty if not using at-least-once mode. + // Always empty if processing guarantee is none or at-most-once private transient Set<KafkaSpoutMessageId> emitted; // Records that have been polled and are queued to be emitted in the nextTuple() call. One record is emitted per nextTuple() private transient Iterator<ConsumerRecord<K, V>> waitingToEmit; @@ -124,11 +124,9 @@ public class KafkaSpout<K, V> extends BaseRichSpout { retryService = kafkaSpoutConfig.getRetryService(); tupleListener = kafkaSpoutConfig.getTupleListener(); - tupleListener.open(conf, context); - - if (isAtLeastOnce()) { - // Only used if the spout commits offsets for acked tuples + if (isAtLeastOnceProcessing()) { + // Only used if the spout should commit an offset to Kafka only after the corresponding tuple has been acked. commitTimer = new Timer(TIMER_DELAY_MS, kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS); } refreshSubscriptionTimer = new Timer(TIMER_DELAY_MS, kafkaSpoutConfig.getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS); @@ -137,10 +135,12 @@ public class KafkaSpout<K, V> extends BaseRichSpout { emitted = new HashSet<>(); waitingToEmit = Collections.emptyListIterator(); + tupleListener.open(conf, context); + LOG.info("Kafka Spout opened with the following configuration: {}", kafkaSpoutConfig); } - private boolean isAtLeastOnce() { + private boolean isAtLeastOnceProcessing() { return kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_LEAST_ONCE; } @@ -154,7 +154,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout { LOG.info("Partitions revoked. [consumer-group={}, consumer={}, topic-partitions={}]", kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions); previousAssignment = partitions; - if (isAtLeastOnce() && initialized) { + if (isAtLeastOnceProcessing() && initialized) { initialized = false; commitOffsetsForAckedTuples(); } @@ -165,13 +165,14 @@ public class KafkaSpout<K, V> extends BaseRichSpout { LOG.info("Partitions reassignment. [task-ID={}, consumer-group={}, consumer={}, topic-partitions={}]", context.getThisTaskId(), kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions); - initialize(partitions); tupleListener.onPartitionsReassigned(partitions); + initialize(partitions); } private void initialize(Collection<TopicPartition> partitions) { - if (isAtLeastOnce()) { - offsetManagers.keySet().retainAll(partitions); // remove from acked all partitions that are no longer assigned to this spout + if (isAtLeastOnceProcessing()) { + // remove from acked all partitions that are no longer assigned to this spout + offsetManagers.keySet().retainAll(partitions); retryService.retainAll(partitions); /* @@ -193,7 +194,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout { final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp); final long fetchOffset = doSeek(tp, committedOffset); // If this partition was previously assigned to this spout, leave the acked offsets as they were to resume where it left off - if (isAtLeastOnce() && !offsetManagers.containsKey(tp)) { + if (isAtLeastOnceProcessing() && !offsetManagers.containsKey(tp)) { offsetManagers.put(tp, new OffsetManager(tp, fetchOffset)); } } @@ -260,26 +261,27 @@ public class KafkaSpout<K, V> extends BaseRichSpout { } private boolean commit() { - return isAtLeastOnce() && commitTimer.isExpiredResetOnTrue(); // timer != null for non auto commit mode + return isAtLeastOnceProcessing() && commitTimer.isExpiredResetOnTrue(); // timer != null for non auto commit mode } private boolean poll() { final int maxUncommittedOffsets = kafkaSpoutConfig.getMaxUncommittedOffsets(); final int readyMessageCount = retryService.readyMessageCount(); final boolean poll = !waitingToEmit() - //Check that the number of uncommitted, nonretriable tuples is less than the maxUncommittedOffsets limit - //Accounting for retriable tuples this way still guarantees that the limit is followed on a per partition basis, - //and prevents locking up the spout when there are too many retriable tuples - && (numUncommittedOffsets - readyMessageCount < maxUncommittedOffsets - || !isAtLeastOnce()); + // Check that the number of uncommitted, non-retriable tuples is less than the maxUncommittedOffsets limit. + // Accounting for retriable tuples in this way still guarantees that the limit is followed on a per partition basis, + // and prevents locking up the spout when there are too many retriable tuples + && (numUncommittedOffsets - readyMessageCount < maxUncommittedOffsets || !isAtLeastOnceProcessing()); if (!poll) { if (waitingToEmit()) { - LOG.debug("Not polling. Tuples waiting to be emitted. [{}] uncommitted offsets across all topic partitions", numUncommittedOffsets); + LOG.debug("Not polling. Tuples waiting to be emitted." + + " [{}] uncommitted offsets across all topic partitions", numUncommittedOffsets); } - if (numUncommittedOffsets >= maxUncommittedOffsets && isAtLeastOnce()) { - LOG.debug("Not polling. [{}] uncommitted offsets across all topic partitions has reached the threshold of [{}]", numUncommittedOffsets, maxUncommittedOffsets); + if (numUncommittedOffsets >= maxUncommittedOffsets && isAtLeastOnceProcessing()) { + LOG.debug("Not polling. [{}] uncommitted offsets across all topic partitions has reached the threshold of [{}]", + numUncommittedOffsets, maxUncommittedOffsets); } } return poll; @@ -289,8 +291,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout { return waitingToEmit != null && waitingToEmit.hasNext(); } - public void setWaitingToEmit(ConsumerRecords<K,V> consumerRecords) { - List<ConsumerRecord<K,V>> waitingToEmitList = new LinkedList<>(); + private void setWaitingToEmit(ConsumerRecords<K, V> consumerRecords) { + List<ConsumerRecord<K, V>> waitingToEmitList = new LinkedList<>(); for (TopicPartition tp : consumerRecords.partitions()) { waitingToEmitList.addAll(consumerRecords.records(tp)); } @@ -305,7 +307,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout { } final ConsumerRecords<K, V> consumerRecords = kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs()); final int numPolledRecords = consumerRecords.count(); - LOG.debug("Polled [{}] records from Kafka. [{}] uncommitted offsets across all topic partitions", numPolledRecords, numUncommittedOffsets); + LOG.debug("Polled [{}] records from Kafka. [{}] uncommitted offsets across all topic partitions", + numPolledRecords, numUncommittedOffsets); if (kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) { //Commit polled records immediately to ensure delivery is at-most-once. kafkaConsumer.commitSync(); @@ -330,7 +333,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout { } /** - * Creates a tuple from the kafka record and emits it if it was not yet emitted + * Creates a tuple from the kafka record and emits it if it was not yet emitted. * * @param record to be emitted * @return true if tuple was emitted. False if tuple has been acked or has been emitted and is pending ack or fail @@ -338,22 +341,25 @@ public class KafkaSpout<K, V> extends BaseRichSpout { private boolean emitTupleIfNotEmitted(ConsumerRecord<K, V> record) { final TopicPartition tp = new TopicPartition(record.topic(), record.partition()); final KafkaSpoutMessageId msgId = retryService.getMessageId(record); + if (offsetManagers.containsKey(tp) && offsetManagers.get(tp).contains(msgId)) { // has been acked LOG.trace("Tuple for record [{}] has already been acked. Skipping", record); - } else if (emitted.contains(msgId)) { // has been emitted and it's pending ack or fail + } else if (emitted.contains(msgId)) { // has been emitted and it is pending ack or fail LOG.trace("Tuple for record [{}] has already been emitted. Skipping", record); } else { - Validate.isTrue(kafkaConsumer.committed(tp) == null || kafkaConsumer.committed(tp).offset() < kafkaConsumer.position(tp), - "The spout is about to emit a message that has already been committed." - + " This should never occur, and indicates a bug in the spout"); + if (kafkaConsumer.committed(tp) != null && (kafkaConsumer.committed(tp).offset() >= kafkaConsumer.position(tp))) { + throw new IllegalStateException("Attempting to emit a message that has already been committed."); + } + final List<Object> tuple = kafkaSpoutConfig.getTranslator().apply(record); if (isEmitTuple(tuple)) { final boolean isScheduled = retryService.isScheduled(msgId); // not scheduled <=> never failed (i.e. never emitted), or scheduled and ready to be retried if (!isScheduled || retryService.isReady(msgId)) { - String stream = tuple instanceof KafkaTuple ? ((KafkaTuple) tuple).getStream() : Utils.DEFAULT_STREAM_ID; - if (!isAtLeastOnce()) { - if (kafkaSpoutConfig.getForceEnableTupleTracking()) { + final String stream = tuple instanceof KafkaTuple ? ((KafkaTuple) tuple).getStream() : Utils.DEFAULT_STREAM_ID; + + if (!isAtLeastOnceProcessing()) { + if (kafkaSpoutConfig.isTupleTrackingEnforced()) { collector.emit(stream, tuple, msgId); LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", tuple, record, msgId); } else { @@ -369,7 +375,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout { numUncommittedOffsets++; } collector.emit(stream, tuple, msgId); - tupleListener.onEmit(tuple, msgId); + tupleListener.onEmit(tuple, msgId); LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", tuple, record, msgId); } return true; @@ -384,7 +390,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout { } /** - * Emits a tuple if it is not a null tuple, or if the spout is configured to emit null tuples + * Emits a tuple if it is not a null tuple, or if the spout is configured to emit null tuples. */ private boolean isEmitTuple(List<Object> tuple) { return tuple != null || kafkaSpoutConfig.isEmitNullTuples(); @@ -440,11 +446,12 @@ public class KafkaSpout<K, V> extends BaseRichSpout { // ======== Ack ======= @Override public void ack(Object messageId) { - if (!isAtLeastOnce()) { - // Only need to keep track of acked tuples if commits are done based on acks + if (!isAtLeastOnceProcessing()) { return; } + // Only need to keep track of acked tuples if commits to Kafka are controlled by + // tuple acks, which happens only for at-least-once processing semantics final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId; if (!emitted.contains(msgId)) { if (msgId.isEmitted()) { @@ -466,19 +473,22 @@ public class KafkaSpout<K, V> extends BaseRichSpout { // ======== Fail ======= @Override public void fail(Object messageId) { - if (!isAtLeastOnce()) { - // Only need to keep track of failed tuples if commits are done based on acks + if (!isAtLeastOnceProcessing()) { return; } - + // Only need to keep track of failed tuples if commits to Kafka are controlled by + // tuple acks, which happens only for at-least-once processing semantics final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId; if (!emitted.contains(msgId)) { - LOG.debug("Received fail for tuple this spout is no longer tracking. Partitions may have been reassigned. Ignoring message [{}]", msgId); + LOG.debug("Received fail for tuple this spout is no longer tracking." + + " Partitions may have been reassigned. Ignoring message [{}]", msgId); return; } Validate.isTrue(!retryService.isScheduled(msgId), "The message id " + msgId + " is queued for retry while being failed." + " This should never occur barring errors in the RetryService implementation or the spout code."); + msgId.incrementNumFails(); + if (!retryService.schedule(msgId)) { LOG.debug("Reached maximum number of retries. Message [{}] being marked as acked.", msgId); // this tuple should be removed from emitted only inside the ack() method. This is to ensure @@ -527,7 +537,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout { private void shutdown() { try { - if (isAtLeastOnce()) { + if (isAtLeastOnceProcessing()) { commitOffsetsForAckedTuples(); } } finally { http://git-wip-us.apache.org/repos/asf/storm/blob/ac16fe1e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java index b7de812..758979c 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java @@ -56,10 +56,13 @@ public class KafkaSpoutConfig<K, V> implements Serializable { public static final int DEFAULT_MAX_UNCOMMITTED_OFFSETS = 10_000_000; // 2s public static final long DEFAULT_PARTITION_REFRESH_PERIOD_MS = 2_000; + public static final FirstPollOffsetStrategy DEFAULT_FIRST_POLL_OFFSET_STRATEGY = FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST; + public static final KafkaSpoutRetryService DEFAULT_RETRY_SERVICE = new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(2), DEFAULT_MAX_RETRIES, TimeInterval.seconds(10)); + public static final ProcessingGuarantee DEFAULT_PROCESSING_GUARANTEE = ProcessingGuarantee.AT_LEAST_ONCE; public static final KafkaTupleListener DEFAULT_TUPLE_LISTENER = new EmptyKafkaTupleListener(); @@ -83,7 +86,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable { private final SerializableDeserializer<V> valueDes; private final Class<? extends Deserializer<V>> valueDesClazz; private final ProcessingGuarantee processingGuarantee; - private final boolean forceEnableTupleTracking; + private final boolean tupleTrackingEnforced; /** * Creates a new KafkaSpoutConfig using a Builder. @@ -108,7 +111,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable { this.valueDes = builder.valueDes; this.valueDesClazz = builder.valueDesClazz; this.processingGuarantee = builder.processingGuarantee; - this.forceEnableTupleTracking = builder.forceEnableTupleTracking; + this.tupleTrackingEnforced = builder.tupleTrackingEnforced; } /** @@ -135,22 +138,29 @@ public class KafkaSpoutConfig<K, V> implements Serializable { } /** - * The processing guarantee supported by the spout. This parameter affects when the spout commits offsets to Kafka, marking them as - * processed. + * This enum controls when the tuple with the {@link ConsumerRecord} for an offset is marked as processed, + * i.e. when the offset is committed to Kafka. For AT_LEAST_ONCE and AT_MOST_ONCE the spout controls when + * the commit happens. When the guarantee is NONE Kafka controls when the commit happens. * * <ul> - * <li>AT_LEAST_ONCE means that the Kafka spout considers an offset ready for commit once a tuple corresponding to that offset has been - * acked on the spout. This corresponds to an at-least-once guarantee.</li> - * <li>ANY_TIMES means that the Kafka spout may commit polled offsets at any time. This means the message may be processed any number of - * times (including 0), and causes the spout to enable auto offset committing on the underlying consumer.</li> - * <li>AT_MOST_ONCE means that the spout will commit polled offsets before emitting them to the topology. This guarantees at-most-once - * processing.</li> + * <li>AT_LEAST_ONCE - an offset is ready to commit only after the corresponding tuple has been processed (at-least-once) + * and acked. If a tuple fails or times-out it will be re-emitted. A tuple can be processed more than once if for instance + * the ack gets lost.</li> + * <br/> + * <li>AT_MOST_ONCE - every offset will be committed to Kafka right after being polled but before being emitted + * to the downstream components of the topology. It guarantees that the offset is processed at-most-once because it + * won't retry tuples that fail or timeout after the commit to Kafka has been done.</li> + * <br/> + * <li>NONE - the polled offsets are committed to Kafka periodically as controlled by the Kafka properties + * "enable.auto.commit" and "auto.commit.interval.ms". Because the spout does not control when the commit happens + * it cannot give any message processing guarantees, i.e. a message may be processed 0, 1 or more times. + * This option requires "enable.auto.commit=true". If "enable.auto.commit=false" an exception will be thrown.</li> * </ul> */ - public static enum ProcessingGuarantee { + public enum ProcessingGuarantee { AT_LEAST_ONCE, - ANY_TIMES, - AT_MOST_ONCE + AT_MOST_ONCE, + NONE, } public static class Builder<K, V> { @@ -171,7 +181,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable { private long partitionRefreshPeriodMs = DEFAULT_PARTITION_REFRESH_PERIOD_MS; private boolean emitNullTuples = false; private ProcessingGuarantee processingGuarantee = DEFAULT_PROCESSING_GUARANTEE; - private boolean forceEnableTupleTracking = false; + private boolean tupleTrackingEnforced = false; public Builder(String bootstrapServers, String... topics) { this(bootstrapServers, (SerializableDeserializer) null, (SerializableDeserializer) null, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new NamedTopicFilter(topics))); @@ -649,10 +659,10 @@ public class KafkaSpoutConfig<K, V> implements Serializable { * {@link Config#TOPOLOGY_MAX_SPOUT_PENDING} to have an effect, and enables some spout metrics (e.g. complete-latency) that would * otherwise be disabled. * - * @param forceEnableTupleTracking true if Storm should track emitted tuples, false otherwise + * @param tupleTrackingEnforced true if Storm should track emitted tuples, false otherwise */ - public Builder<K, V> setForceEnableTupleTracking(boolean forceEnableTupleTracking) { - this.forceEnableTupleTracking = forceEnableTupleTracking; + public Builder<K, V> setTupleTrackingEnforced(boolean tupleTrackingEnforced) { + this.tupleTrackingEnforced = tupleTrackingEnforced; return this; } @@ -706,14 +716,15 @@ public class KafkaSpoutConfig<K, V> implements Serializable { + " Instead use KafkaSpoutConfig.Builder.setProcessingGuarantee." + " This will be treated as an error in the next major release." + " For now the spout will be configured to behave like it would have in pre-1.2.0 releases."); - boolean enableAutoCommit = (boolean)builder.kafkaProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG); + + final boolean enableAutoCommit = (boolean)builder.kafkaProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG); if(enableAutoCommit) { - builder.processingGuarantee = ProcessingGuarantee.ANY_TIMES; + builder.processingGuarantee = ProcessingGuarantee.NONE; } else { builder.processingGuarantee = ProcessingGuarantee.AT_LEAST_ONCE; } } - if (builder.processingGuarantee == ProcessingGuarantee.ANY_TIMES) { + if (builder.processingGuarantee == ProcessingGuarantee.NONE) { builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); } else { builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); @@ -788,8 +799,8 @@ public class KafkaSpoutConfig<K, V> implements Serializable { return processingGuarantee; } - public boolean getForceEnableTupleTracking() { - return forceEnableTupleTracking; + public boolean isTupleTrackingEnforced() { + return tupleTrackingEnforced; } public String getConsumerGroupId() { http://git-wip-us.apache.org/repos/asf/storm/blob/ac16fe1e/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java index 1f23cc5..8613ce7 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java @@ -111,7 +111,7 @@ public class KafkaSpoutMessagingGuaranteeTest { public void testAnyTimesModeDisregardsMaxUncommittedOffsets() throws Exception { //The maxUncommittedOffsets limit should not be enforced, since it is only meaningful in at-least-once mode KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(-1) - .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.ANY_TIMES) + .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NONE) .build(); doTestModeDisregardsMaxUncommittedOffsets(spoutConfig); } @@ -146,7 +146,7 @@ public class KafkaSpoutMessagingGuaranteeTest { //When tuple tracking is enabled, the spout must not replay tuples in at-most-once mode KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(-1) .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) - .setForceEnableTupleTracking(true) + .setTupleTrackingEnforced(true) .build(); doTestModeCannotReplayTuples(spoutConfig); } @@ -155,8 +155,8 @@ public class KafkaSpoutMessagingGuaranteeTest { public void testAnyTimesModeCannotReplayTuples() throws Exception { //When tuple tracking is enabled, the spout must not replay tuples in any-times mode KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(-1) - .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.ANY_TIMES) - .setForceEnableTupleTracking(true) + .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NONE) + .setTupleTrackingEnforced(true) .build(); doTestModeCannotReplayTuples(spoutConfig); } @@ -189,7 +189,7 @@ public class KafkaSpoutMessagingGuaranteeTest { //When tuple tracking is enabled, the spout must not commit acked tuples in at-most-once mode because they were committed before being emitted KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(-1) .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) - .setForceEnableTupleTracking(true) + .setTupleTrackingEnforced(true) .build(); doTestModeDoesNotCommitAckedTuples(spoutConfig); } @@ -198,8 +198,8 @@ public class KafkaSpoutMessagingGuaranteeTest { public void testAnyTimesModeDoesNotCommitAckedTuples() throws Exception { //When tuple tracking is enabled, the spout must not commit acked tuples in any-times mode because committing is managed by the consumer KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(-1) - .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.ANY_TIMES) - .setForceEnableTupleTracking(true) + .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NONE) + .setTupleTrackingEnforced(true) .build(); doTestModeDoesNotCommitAckedTuples(spoutConfig); }
