Repository: storm Updated Branches: refs/heads/master 5077228df -> 29f845b4a
STORM-2781: Refactor storm-kafka-client KafkaSpout Processing Guarantees - Define processing guarantees as AT_LEAST_ONCE, AT_MOST_ONCE, NONE - Refactor method name from setForceEnableTupleTracking to setTupleTrackingEnforced - Throw IllegalStateException instead of IllegalArgumentException if spout attempts to emit an already committed message - Update documentation to reflect these changes Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/87576118 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/87576118 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/87576118 Branch: refs/heads/master Commit: 875761189ca387f0575aeaea33a769d70d36ae47 Parents: 7b940ae Author: Hugo Louro <[email protected]> Authored: Sun Oct 22 17:44:54 2017 -0700 Committer: Hugo Louro <[email protected]> Committed: Fri Oct 27 15:42:18 2017 -0700 ---------------------------------------------------------------------- docs/storm-kafka-client.md | 37 +++++++++--- .../apache/storm/kafka/spout/KafkaSpout.java | 63 +++++++++++--------- .../storm/kafka/spout/KafkaSpoutConfig.java | 50 +++++++++------- .../spout/KafkaSpoutMessagingGuaranteeTest.java | 14 ++--- 4 files changed, 100 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/87576118/docs/storm-kafka-client.md ---------------------------------------------------------------------- diff --git a/docs/storm-kafka-client.md b/docs/storm-kafka-client.md index a24c632..0354a0d 100644 --- a/docs/storm-kafka-client.md +++ b/docs/storm-kafka-client.md @@ -266,7 +266,7 @@ use Kafka-clients 0.10.0.0, you would use the following dependency in your `pom. You can also override the kafka clients version while building from maven, with parameter `storm.kafka.client.version` e.g. `mvn clean install -Dstorm.kafka.client.version=0.10.0.0` -When selecting a kafka client version, you should ensure - +When selecting a kafka client version, you should ensure - 1. kafka api is compatible. storm-kafka-client module only supports **0.10 or newer** kafka client API. For older versions, you can use storm-kafka module (https://github.com/apache/storm/tree/master/external/storm-kafka). 2. The kafka client selected by you should be wire compatible with the broker. e.g. 0.9.x client will not work with @@ -298,25 +298,46 @@ Currently the Kafka spout has has the following default values, which have been * max.uncommitted.offsets = 10000000 <br/> -# Messaging reliability modes +# Processing Guarantees -In some cases you may not need or want the spout to guarantee at-least-once processing of messages. The spout also supports at-most-once and any-times modes. At-most-once guarantees that any tuple emitted to the topology will never be reemitted. Any-times makes no guarantees, but may reduce the overhead of committing offsets to Kafka in cases where you truly don't care how many times a message is processed. +The `KafkaSpoutConfig.ProcessingGuarantee` enum parameter controls when an offset is committed to Kafka. This is +conceptually equivalent to marking the tuple with the `ConsumerRecord` for that offset as being successfully processed +because the tuple won't get re-emitted in case of failure or time out. + +For the AT_LEAST_ONCE and AT_MOST_ONCE processing guarantees the spout controls when the commit happens. +When the guarantee is NONE Kafka controls when the commit happens. + +* AT_LEAST_ONCE - an offset is ready to commit only after the corresponding tuple has been processed (at-least-once) + and acked. If a tuple fails or times out it will be re-emitted. A tuple can be processed more than once if for instance + the ack gets lost. + +* AT_MOST_ONCE - Offsets will be committed to Kafka right after being polled but before being emitted to the downstream + components of the topology. Offsets are processed at most once because tuples that fail or timeout won't be retried. + +* NONE - the polled offsets are committed to Kafka periodically as controlled by the Kafka properties + "enable.auto.commit" and "auto.commit.interval.ms". Because the spout does not control when the commit happens + it cannot give any message processing guarantees, i.e. a message may be processed 0, 1 or more times. + This option requires "enable.auto.commit=true". If "enable.auto.commit=false" an exception will be thrown. + +To set the processing guarantee use the `KafkaSpoutConfig.Builder.setProcessingGuarantee` method as follows: -To set the processing guarantee, use the KafkaSpoutConfig.Builder.setProcessingGuarantee method, e.g. ```java KafkaSpoutConfig<String, String> kafkaConf = KafkaSpoutConfig .builder(String bootstrapServers, String ... topics) .setProcessingGuarantee(ProcessingGuarantee.AT_MOST_ONCE) ``` -The spout will disable tuple tracking for emitted tuples by default when you use at-most-once or any-times. In some cases you may want to enable tracking anyway, because tuple tracking is necessary for some features of Storm, e.g. showing complete latency in Storm UI, or enabling backpressure through the `Config.TOPOLOGY_MAX_SPOUT_PENDING` parameter. +# Tuple Tracking + +By default the spout only tracks emitted tuples when the processing guarantee is AT_LEAST_ONCE. It may be necessary to track +emitted tuples with other processing guarantees to benefit from Storm features such as showing complete latency in the UI, +or enabling backpressure with Config.TOPOLOGY_MAX_SPOUT_PENDING. -If you need to enable tracking, use the KafkaSpoutConfig.Builder.setForceEnableTupleTracking method, e.g. ```java KafkaSpoutConfig<String, String> kafkaConf = KafkaSpoutConfig .builder(String bootstrapServers, String ... topics) .setProcessingGuarantee(ProcessingGuarantee.AT_MOST_ONCE) - .setForceEnableTupleTracking(true) + .setTupleTrackingEnforced(true) ``` -Note that this setting has no effect in at-least-once mode, where tuple tracking is always enabled. \ No newline at end of file +Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, where tuple tracking is required and therefore always enabled. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/87576118/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index 9253a2d..170c025 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -78,17 +78,17 @@ public class KafkaSpout<K, V> extends BaseRichSpout { private transient KafkaSpoutRetryService retryService; // Handles tuple events (emit, ack etc.) private transient KafkaTupleListener tupleListener; - // timer == null for modes other than at-least-once + // timer == null if processing guarantee is none or at-most-once private transient Timer commitTimer; // Flag indicating that the spout is still undergoing initialization process. private transient boolean initialized; // Initialization is only complete after the first call to KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned() // Tuples that were successfully acked/emitted. These tuples will be committed periodically when the commit timer expires, - //or after a consumer rebalance, or during close/deactivate. Always empty if not using at-least-once mode. + // or after a consumer rebalance, or during close/deactivate. Always empty if processing guarantee is none or at-most-once. private transient Map<TopicPartition, OffsetManager> offsetManagers; // Tuples that have been emitted but that are "on the wire", i.e. pending being acked or failed. - // Always empty if not using at-least-once mode. + // Always empty if processing guarantee is none or at-most-once private transient Set<KafkaSpoutMessageId> emitted; // Records that have been polled and are queued to be emitted in the nextTuple() call. One record is emitted per nextTuple() private transient Iterator<ConsumerRecord<K, V>> waitingToEmit; @@ -125,8 +125,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout { tupleListener = kafkaSpoutConfig.getTupleListener(); - if (isAtLeastOnce()) { - // Only used if the spout commits offsets for acked tuples + if (isAtLeastOnceProcessing()) { + // Only used if the spout should commit an offset to Kafka only after the corresponding tuple has been acked. commitTimer = new Timer(TIMER_DELAY_MS, kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS); } refreshSubscriptionTimer = new Timer(TIMER_DELAY_MS, kafkaSpoutConfig.getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS); @@ -140,7 +140,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout { LOG.info("Kafka Spout opened with the following configuration: {}", kafkaSpoutConfig); } - private boolean isAtLeastOnce() { + private boolean isAtLeastOnceProcessing() { return kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_LEAST_ONCE; } @@ -154,7 +154,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout { LOG.info("Partitions revoked. [consumer-group={}, consumer={}, topic-partitions={}]", kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions); previousAssignment = partitions; - if (isAtLeastOnce() && initialized) { + if (isAtLeastOnceProcessing() && initialized) { initialized = false; commitOffsetsForAckedTuples(); } @@ -170,7 +170,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout { } private void initialize(Collection<TopicPartition> partitions) { - if (isAtLeastOnce()) { + if (isAtLeastOnceProcessing()) { // remove from acked all partitions that are no longer assigned to this spout offsetManagers.keySet().retainAll(partitions); retryService.retainAll(partitions); @@ -188,7 +188,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout { final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp); final long fetchOffset = doSeek(tp, committedOffset); // If this partition was previously assigned to this spout, leave the acked offsets as they were to resume where it left off - if (isAtLeastOnce() && !offsetManagers.containsKey(tp)) { + if (isAtLeastOnceProcessing() && !offsetManagers.containsKey(tp)) { offsetManagers.put(tp, new OffsetManager(tp, fetchOffset)); } } @@ -255,18 +255,17 @@ public class KafkaSpout<K, V> extends BaseRichSpout { } private boolean commit() { - return isAtLeastOnce() && commitTimer.isExpiredResetOnTrue(); // timer != null for non auto commit mode + return isAtLeastOnceProcessing() && commitTimer.isExpiredResetOnTrue(); // timer != null for non auto commit mode } private boolean poll() { final int maxUncommittedOffsets = kafkaSpoutConfig.getMaxUncommittedOffsets(); final int readyMessageCount = retryService.readyMessageCount(); final boolean poll = !waitingToEmit() - //Check that the number of uncommitted, nonretriable tuples is less than the maxUncommittedOffsets limit - //Accounting for retriable tuples this way still guarantees that the limit is followed on a per partition basis, - //and prevents locking up the spout when there are too many retriable tuples - && (numUncommittedOffsets - readyMessageCount < maxUncommittedOffsets - || !isAtLeastOnce()); + // Check that the number of uncommitted, non-retriable tuples is less than the maxUncommittedOffsets limit. + // Accounting for retriable tuples in this way still guarantees that the limit is followed on a per partition basis, + // and prevents locking up the spout when there are too many retriable tuples + && (numUncommittedOffsets - readyMessageCount < maxUncommittedOffsets || !isAtLeastOnceProcessing()); if (!poll) { if (waitingToEmit()) { @@ -274,7 +273,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout { + " [{}] uncommitted offsets across all topic partitions", numUncommittedOffsets); } - if (numUncommittedOffsets >= maxUncommittedOffsets && isAtLeastOnce()) { + if (numUncommittedOffsets >= maxUncommittedOffsets && isAtLeastOnceProcessing()) { LOG.debug("Not polling. [{}] uncommitted offsets across all topic partitions has reached the threshold of [{}]", numUncommittedOffsets, maxUncommittedOffsets); } @@ -336,22 +335,25 @@ public class KafkaSpout<K, V> extends BaseRichSpout { private boolean emitTupleIfNotEmitted(ConsumerRecord<K, V> record) { final TopicPartition tp = new TopicPartition(record.topic(), record.partition()); final KafkaSpoutMessageId msgId = retryService.getMessageId(tp, record.offset()); + if (offsetManagers.containsKey(tp) && offsetManagers.get(tp).contains(msgId)) { // has been acked LOG.trace("Tuple for record [{}] has already been acked. Skipping", record); - } else if (emitted.contains(msgId)) { // has been emitted and it's pending ack or fail + } else if (emitted.contains(msgId)) { // has been emitted and it is pending ack or fail LOG.trace("Tuple for record [{}] has already been emitted. Skipping", record); } else { - Validate.isTrue(kafkaConsumer.committed(tp) == null || kafkaConsumer.committed(tp).offset() < kafkaConsumer.position(tp), - "The spout is about to emit a message that has already been committed." - + " This should never occur, and indicates a bug in the spout"); + if (kafkaConsumer.committed(tp) != null && (kafkaConsumer.committed(tp).offset() >= kafkaConsumer.position(tp))) { + throw new IllegalStateException("Attempting to emit a message that has already been committed."); + } + final List<Object> tuple = kafkaSpoutConfig.getTranslator().apply(record); if (isEmitTuple(tuple)) { final boolean isScheduled = retryService.isScheduled(msgId); // not scheduled <=> never failed (i.e. never emitted), or scheduled and ready to be retried if (!isScheduled || retryService.isReady(msgId)) { - String stream = tuple instanceof KafkaTuple ? ((KafkaTuple) tuple).getStream() : Utils.DEFAULT_STREAM_ID; - if (!isAtLeastOnce()) { - if (kafkaSpoutConfig.getForceEnableTupleTracking()) { + final String stream = tuple instanceof KafkaTuple ? ((KafkaTuple) tuple).getStream() : Utils.DEFAULT_STREAM_ID; + + if (!isAtLeastOnceProcessing()) { + if (kafkaSpoutConfig.isTupleTrackingEnforced()) { collector.emit(stream, tuple, msgId); LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", tuple, record, msgId); } else { @@ -438,11 +440,12 @@ public class KafkaSpout<K, V> extends BaseRichSpout { // ======== Ack ======= @Override public void ack(Object messageId) { - if (!isAtLeastOnce()) { - // Only need to keep track of acked tuples if commits are done based on acks + if (!isAtLeastOnceProcessing()) { return; } + // Only need to keep track of acked tuples if commits to Kafka are controlled by + // tuple acks, which happens only for at-least-once processing semantics final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId; if (!emitted.contains(msgId)) { if (msgId.isEmitted()) { @@ -464,11 +467,11 @@ public class KafkaSpout<K, V> extends BaseRichSpout { // ======== Fail ======= @Override public void fail(Object messageId) { - if (!isAtLeastOnce()) { - // Only need to keep track of failed tuples if commits are done based on acks + if (!isAtLeastOnceProcessing()) { return; } - + // Only need to keep track of failed tuples if commits to Kafka are controlled by + // tuple acks, which happens only for at-least-once processing semantics final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId; if (!emitted.contains(msgId)) { LOG.debug("Received fail for tuple this spout is no longer tracking." @@ -477,7 +480,9 @@ public class KafkaSpout<K, V> extends BaseRichSpout { } Validate.isTrue(!retryService.isScheduled(msgId), "The message id " + msgId + " is queued for retry while being failed." + " This should never occur barring errors in the RetryService implementation or the spout code."); + msgId.incrementNumFails(); + if (!retryService.schedule(msgId)) { LOG.debug("Reached maximum number of retries. Message [{}] being marked as acked.", msgId); // this tuple should be removed from emitted only inside the ack() method. This is to ensure @@ -526,7 +531,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout { private void shutdown() { try { - if (isAtLeastOnce()) { + if (isAtLeastOnceProcessing()) { commitOffsetsForAckedTuples(); } } finally { http://git-wip-us.apache.org/repos/asf/storm/blob/87576118/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java index f211697..6a693fe 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java @@ -55,10 +55,13 @@ public class KafkaSpoutConfig<K, V> implements Serializable { public static final int DEFAULT_MAX_UNCOMMITTED_OFFSETS = 10_000_000; // 2s public static final long DEFAULT_PARTITION_REFRESH_PERIOD_MS = 2_000; + public static final FirstPollOffsetStrategy DEFAULT_FIRST_POLL_OFFSET_STRATEGY = FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST; + public static final KafkaSpoutRetryService DEFAULT_RETRY_SERVICE = new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(2), DEFAULT_MAX_RETRIES, TimeInterval.seconds(10)); + public static final ProcessingGuarantee DEFAULT_PROCESSING_GUARANTEE = ProcessingGuarantee.AT_LEAST_ONCE; public static final KafkaTupleListener DEFAULT_TUPLE_LISTENER = new EmptyKafkaTupleListener(); @@ -78,7 +81,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable { private final long partitionRefreshPeriodMs; private final boolean emitNullTuples; private final ProcessingGuarantee processingGuarantee; - private final boolean forceEnableTupleTracking; + private final boolean tupleTrackingEnforced; /** * Creates a new KafkaSpoutConfig using a Builder. @@ -99,7 +102,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable { this.partitionRefreshPeriodMs = builder.partitionRefreshPeriodMs; this.emitNullTuples = builder.emitNullTuples; this.processingGuarantee = builder.processingGuarantee; - this.forceEnableTupleTracking = builder.forceEnableTupleTracking; + this.tupleTrackingEnforced = builder.tupleTrackingEnforced; } /** @@ -126,22 +129,29 @@ public class KafkaSpoutConfig<K, V> implements Serializable { } /** - * The processing guarantee supported by the spout. This parameter affects when the spout commits offsets to Kafka, marking them as - * processed. + * This enum controls when the tuple with the {@link ConsumerRecord} for an offset is marked as processed, + * i.e. when the offset is committed to Kafka. For AT_LEAST_ONCE and AT_MOST_ONCE the spout controls when + * the commit happens. When the guarantee is NONE Kafka controls when the commit happens. * * <ul> - * <li>AT_LEAST_ONCE means that the Kafka spout considers an offset ready for commit once a tuple corresponding to that offset has been - * acked on the spout. This corresponds to an at-least-once guarantee.</li> - * <li>ANY_TIMES means that the Kafka spout may commit polled offsets at any time. This means the message may be processed any number of - * times (including 0), and causes the spout to enable auto offset committing on the underlying consumer.</li> - * <li>AT_MOST_ONCE means that the spout will commit polled offsets before emitting them to the topology. This guarantees at-most-once - * processing.</li> + * <li>AT_LEAST_ONCE - an offset is ready to commit only after the corresponding tuple has been processed (at-least-once) + * and acked. If a tuple fails or times-out it will be re-emitted. A tuple can be processed more than once if for instance + * the ack gets lost.</li> + * <br/> + * <li>AT_MOST_ONCE - every offset will be committed to Kafka right after being polled but before being emitted + * to the downstream components of the topology. It guarantees that the offset is processed at-most-once because it + * won't retry tuples that fail or timeout after the commit to Kafka has been done.</li> + * <br/> + * <li>NONE - the polled offsets are committed to Kafka periodically as controlled by the Kafka properties + * "enable.auto.commit" and "auto.commit.interval.ms". Because the spout does not control when the commit happens + * it cannot give any message processing guarantees, i.e. a message may be processed 0, 1 or more times. + * This option requires "enable.auto.commit=true". If "enable.auto.commit=false" an exception will be thrown.</li> * </ul> */ - public static enum ProcessingGuarantee { + public enum ProcessingGuarantee { AT_LEAST_ONCE, - ANY_TIMES, - AT_MOST_ONCE + AT_MOST_ONCE, + NONE, } public static class Builder<K, V> { @@ -158,7 +168,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable { private long partitionRefreshPeriodMs = DEFAULT_PARTITION_REFRESH_PERIOD_MS; private boolean emitNullTuples = false; private ProcessingGuarantee processingGuarantee = DEFAULT_PROCESSING_GUARANTEE; - private boolean forceEnableTupleTracking = false; + private boolean tupleTrackingEnforced = false; public Builder(String bootstrapServers, String... topics) { this(bootstrapServers, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new NamedTopicFilter(topics))); @@ -369,10 +379,10 @@ public class KafkaSpoutConfig<K, V> implements Serializable { * {@link Config#TOPOLOGY_MAX_SPOUT_PENDING} to have an effect, and enables some spout metrics (e.g. complete-latency) that would * otherwise be disabled. * - * @param forceEnableTupleTracking true if Storm should track emitted tuples, false otherwise + * @param tupleTrackingEnforced true if Storm should track emitted tuples, false otherwise */ - public Builder<K, V> setForceEnableTupleTracking(boolean forceEnableTupleTracking) { - this.forceEnableTupleTracking = forceEnableTupleTracking; + public Builder<K, V> setTupleTrackingEnforced(boolean tupleTrackingEnforced) { + this.tupleTrackingEnforced = tupleTrackingEnforced; return this; } @@ -425,7 +435,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable { throw new IllegalArgumentException("Do not set " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + " manually." + " Instead use KafkaSpoutConfig.Builder.setProcessingGuarantee"); } - if (builder.processingGuarantee == ProcessingGuarantee.ANY_TIMES) { + if (builder.processingGuarantee == ProcessingGuarantee.NONE) { builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); } else { builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); @@ -461,8 +471,8 @@ public class KafkaSpoutConfig<K, V> implements Serializable { return processingGuarantee; } - public boolean getForceEnableTupleTracking() { - return forceEnableTupleTracking; + public boolean isTupleTrackingEnforced() { + return tupleTrackingEnforced; } public String getConsumerGroupId() { http://git-wip-us.apache.org/repos/asf/storm/blob/87576118/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java index cc24261..07ee2dc 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java @@ -111,7 +111,7 @@ public class KafkaSpoutMessagingGuaranteeTest { public void testAnyTimesModeDisregardsMaxUncommittedOffsets() throws Exception { //The maxUncommittedOffsets limit should not be enforced, since it is only meaningful in at-least-once mode KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(-1) - .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.ANY_TIMES) + .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NONE) .build(); doTestModeDisregardsMaxUncommittedOffsets(spoutConfig); } @@ -146,7 +146,7 @@ public class KafkaSpoutMessagingGuaranteeTest { //When tuple tracking is enabled, the spout must not replay tuples in at-most-once mode KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(-1) .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) - .setForceEnableTupleTracking(true) + .setTupleTrackingEnforced(true) .build(); doTestModeCannotReplayTuples(spoutConfig); } @@ -155,8 +155,8 @@ public class KafkaSpoutMessagingGuaranteeTest { public void testAnyTimesModeCannotReplayTuples() throws Exception { //When tuple tracking is enabled, the spout must not replay tuples in any-times mode KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(-1) - .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.ANY_TIMES) - .setForceEnableTupleTracking(true) + .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NONE) + .setTupleTrackingEnforced(true) .build(); doTestModeCannotReplayTuples(spoutConfig); } @@ -189,7 +189,7 @@ public class KafkaSpoutMessagingGuaranteeTest { //When tuple tracking is enabled, the spout must not commit acked tuples in at-most-once mode because they were committed before being emitted KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(-1) .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) - .setForceEnableTupleTracking(true) + .setTupleTrackingEnforced(true) .build(); doTestModeDoesNotCommitAckedTuples(spoutConfig); } @@ -198,8 +198,8 @@ public class KafkaSpoutMessagingGuaranteeTest { public void testAnyTimesModeDoesNotCommitAckedTuples() throws Exception { //When tuple tracking is enabled, the spout must not commit acked tuples in any-times mode because committing is managed by the consumer KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(-1) - .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.ANY_TIMES) - .setForceEnableTupleTracking(true) + .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NONE) + .setTupleTrackingEnforced(true) .build(); doTestModeDoesNotCommitAckedTuples(spoutConfig); }
