Repository: storm Updated Branches: refs/heads/1.x-branch fca692da3 -> cb2d7e8be
STORM-2648/STORM-2357: Add storm-kafka-client support for at-most-once processing and a toggle for whether messages should be emitted with a message id when not using at-least-once * Minor refactor of emit statements * Add tests for at-most-once and any-times mode, deduplicate some test code in other tests * Fix rebase conflicts and fix leaking state through unit test retry service * Update storm-kafka-client doc Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/11a7a157 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/11a7a157 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/11a7a157 Branch: refs/heads/1.x-branch Commit: 11a7a15746010e0baaa5a4db70374dff1d4ab800 Parents: fca692d Author: Stig Rohde Døssing <[email protected]> Authored: Mon Jul 31 20:26:55 2017 +0200 Committer: Stig Rohde Døssing <[email protected]> Committed: Mon Oct 2 18:08:12 2017 +0200 ---------------------------------------------------------------------- docs/storm-kafka-client.md | 31 ++- .../apache/storm/kafka/spout/KafkaSpout.java | 175 +++++++++------- .../storm/kafka/spout/KafkaSpoutConfig.java | 99 ++++++++- .../storm/kafka/spout/KafkaSpoutCommitTest.java | 5 +- .../storm/kafka/spout/KafkaSpoutEmitTest.java | 8 +- .../spout/KafkaSpoutMessagingGuaranteeTest.java | 207 +++++++++++++++++++ .../kafka/spout/KafkaSpoutRebalanceTest.java | 8 +- .../kafka/spout/KafkaSpoutRetryLimitTest.java | 5 +- .../kafka/spout/MaxUncommittedOffsetTest.java | 32 +-- .../kafka/spout/SingleTopicKafkaSpoutTest.java | 47 ++--- .../spout/SingleTopicKafkaUnitSetupHelper.java | 67 ++++++ .../SingleTopicKafkaSpoutConfiguration.java | 23 +-- 12 files changed, 522 insertions(+), 185 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/11a7a157/docs/storm-kafka-client.md ---------------------------------------------------------------------- diff --git a/docs/storm-kafka-client.md b/docs/storm-kafka-client.md index 93d622e..841e8ef 100644 --- a/docs/storm-kafka-client.md +++ b/docs/storm-kafka-client.md @@ -341,33 +341,32 @@ Depending on the structure of your Kafka cluster, distribution of the data, and ### Default values -Currently the Kafka spout has has the following default values, which have shown to give good performance in the test environment as described in this [blog post] (https://hortonworks.com/blog/microbenchmarking-storm-1-0-performance/) +Currently the Kafka spout has has the following default values, which have been shown to give good performance in the test environment as described in this [blog post] (https://hortonworks.com/blog/microbenchmarking-storm-1-0-performance/) * poll.timeout.ms = 200 * offset.commit.period.ms = 30000 (30s) * max.uncommitted.offsets = 10000000 <br/> -# Kafka AutoCommitMode +# Messaging reliability modes -If reliability isn't important to you -- that is, you don't care about losing tuples in failure situations --, and want to remove the overhead of tuple tracking, then you can run a KafkaSpout with AutoCommitMode. - -To enable it, you need to: - -* set Config.TOPOLOGY_ACKERS to 0; -* enable *AutoCommitMode* in Kafka consumer configuration; - -Here's one example to set AutoCommitMode in KafkaSpout: +In some cases you may not need or want the spout to guarantee at-least-once processing of messages. The spout also supports at-most-once and any-times modes. At-most-once guarantees that any tuple emitted to the topology will never be reemitted. Any-times makes no guarantees, but may reduce the overhead of committing offsets to Kafka in cases where you truly don't care how many times a message is processed. +To set the processing guarantee, use the KafkaSpoutConfig.Builder.setProcessingGuarantee method, e.g. ```java KafkaSpoutConfig<String, String> kafkaConf = KafkaSpoutConfig - .builder(String bootstrapServers, String ... topics) - .setProp(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true") - .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.EARLIEST) - .build(); + .builder(String bootstrapServers, String ... topics) + .setProcessingGuarantee(ProcessingGuarantee.AT_MOST_ONCE) ``` -*Note that it's not exactly At-Most-Once in Storm, as offset is committed periodically by Kafka consumer, some tuples could be replayed when KafkaSpout is crashed.* - +The spout will disable tuple tracking for emitted tuples by default when you use at-most-once or any-times. In some cases you may want to enable tracking anyway, because tuple tracking is necessary for some features of Storm, e.g. showing complete latency in Storm UI, or enabling backpressure through the `Config.TOPOLOGY_MAX_SPOUT_PENDING` parameter. +If you need to enable tracking, use the KafkaSpoutConfig.Builder.setForceEnableTupleTracking method, e.g. +```java +KafkaSpoutConfig<String, String> kafkaConf = KafkaSpoutConfig + .builder(String bootstrapServers, String ... topics) + .setProcessingGuarantee(ProcessingGuarantee.AT_MOST_ONCE) + .setForceEnableTupleTracking(true) +``` +Note that this setting has no effect in at-least-once mode, where tuple tracking is always enabled. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/11a7a157/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index 9f806b5..fbd869c 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -52,10 +52,12 @@ import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class KafkaSpout<K, V> extends BaseRichSpout { + private static final long serialVersionUID = 4151921085047987154L; //Initial delay for the commit and subscription refresh timers public static final long TIMER_DELAY_MS = 500; @@ -66,26 +68,36 @@ public class KafkaSpout<K, V> extends BaseRichSpout { // Kafka private final KafkaSpoutConfig<K, V> kafkaSpoutConfig; - private KafkaConsumerFactory kafkaConsumerFactory; + private KafkaConsumerFactory<K, V> kafkaConsumerFactory; private transient KafkaConsumer<K, V> kafkaConsumer; - private transient boolean consumerAutoCommitMode; // Bookkeeping - private transient FirstPollOffsetStrategy firstPollOffsetStrategy; // Strategy to determine the fetch offset of the first realized by the spout upon activation - private transient KafkaSpoutRetryService retryService; // Class that has the logic to handle tuple failure - private transient KafkaTupleListener tupleListener; // Handles tuple events (emit, ack etc.) - private transient Timer commitTimer; // timer == null for auto commit mode - private transient boolean initialized; // Flag indicating that the spout is still undergoing initialization process. + // Strategy to determine the fetch offset of the first realized by the spout upon activation + private transient FirstPollOffsetStrategy firstPollOffsetStrategy; + // Class that has the logic to handle tuple failure. + private transient KafkaSpoutRetryService retryService; + // Handles tuple events (emit, ack etc.) + private transient KafkaTupleListener tupleListener; + // timer == null for modes other than at-least-once + private transient Timer commitTimer; + // Flag indicating that the spout is still undergoing initialization process. + private transient boolean initialized; // Initialization is only complete after the first call to KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned() - private transient Map<TopicPartition, OffsetManager> offsetManagers;// Tuples that were successfully acked/emitted. These tuples will be committed periodically when the commit timer expires, or after a consumer rebalance, or during close/deactivate - private transient Set<KafkaSpoutMessageId> emitted; // Tuples that have been emitted but that are "on the wire", i.e. pending being acked or failed. Not used if it's AutoCommitMode - private transient Iterator<ConsumerRecord<K, V>> waitingToEmit; // Records that have been polled and are queued to be emitted in the nextTuple() call. One record is emitted per nextTuple() - private transient long numUncommittedOffsets; // Number of offsets that have been polled and emitted but not yet been committed. Not used if auto commit mode is enabled. - private transient Timer refreshSubscriptionTimer; // Triggers when a subscription should be refreshed + // Tuples that were successfully acked/emitted. These tuples will be committed periodically when the commit timer expires, + //or after a consumer rebalance, or during close/deactivate. Always empty if not using at-least-once mode. + private transient Map<TopicPartition, OffsetManager> offsetManagers; + // Tuples that have been emitted but that are "on the wire", i.e. pending being acked or failed. + // Always empty if not using at-least-once mode. + private transient Set<KafkaSpoutMessageId> emitted; + // Records that have been polled and are queued to be emitted in the nextTuple() call. One record is emitted per nextTuple() + private transient Iterator<ConsumerRecord<K, V>> waitingToEmit; + // Number of offsets that have been polled and emitted but not yet been committed. Not used if auto commit mode is enabled. + private transient long numUncommittedOffsets; + // Triggers when a subscription should be refreshed + private transient Timer refreshSubscriptionTimer; private transient TopologyContext context; - public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig) { this(kafkaSpoutConfig, new KafkaConsumerFactoryDefault<K, V>()); } @@ -107,9 +119,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout { // Offset management firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy(); - // with AutoCommitMode, offset will be periodically committed in the background by Kafka consumer - consumerAutoCommitMode = kafkaSpoutConfig.isConsumerAutoCommitMode(); - + // Retries management retryService = kafkaSpoutConfig.getRetryService(); @@ -117,7 +127,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout { tupleListener.open(conf, context); - if (!consumerAutoCommitMode) { // If it is auto commit, no need to commit offsets manually + if (isAtLeastOnce()) { + // Only used if the spout commits offsets for acked tuples commitTimer = new Timer(TIMER_DELAY_MS, kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS); } refreshSubscriptionTimer = new Timer(TIMER_DELAY_MS, kafkaSpoutConfig.getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS); @@ -129,14 +140,18 @@ public class KafkaSpout<K, V> extends BaseRichSpout { LOG.info("Kafka Spout opened with the following configuration: {}", kafkaSpoutConfig); } - // =========== Consumer Rebalance Listener - On the same thread as the caller =========== + private boolean isAtLeastOnce() { + return kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_LEAST_ONCE; + } + // =========== Consumer Rebalance Listener - On the same thread as the caller =========== private class KafkaSpoutConsumerRebalanceListener implements ConsumerRebalanceListener { + @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { LOG.info("Partitions revoked. [consumer-group={}, consumer={}, topic-partitions={}]", - kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions); - if (!consumerAutoCommitMode && initialized) { + kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions); + if (isAtLeastOnce() && initialized) { initialized = false; commitOffsetsForAckedTuples(); } @@ -145,36 +160,39 @@ public class KafkaSpout<K, V> extends BaseRichSpout { @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { LOG.info("Partitions reassignment. [task-ID={}, consumer-group={}, consumer={}, topic-partitions={}]", - context.getThisTaskId(), kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions); + context.getThisTaskId(), kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions); tupleListener.onPartitionsReassigned(partitions); initialize(partitions); } private void initialize(Collection<TopicPartition> partitions) { - if (!consumerAutoCommitMode) { + if (isAtLeastOnce()) { offsetManagers.keySet().retainAll(partitions); // remove from acked all partitions that are no longer assigned to this spout - } - - retryService.retainAll(partitions); - - /* - * Emitted messages for partitions that are no longer assigned to this spout can't - * be acked and should not be retried, hence remove them from emitted collection. - */ - Set<TopicPartition> partitionsSet = new HashSet<>(partitions); - Iterator<KafkaSpoutMessageId> msgIdIterator = emitted.iterator(); - while (msgIdIterator.hasNext()) { - KafkaSpoutMessageId msgId = msgIdIterator.next(); - if (!partitionsSet.contains(msgId.getTopicPartition())) { - msgIdIterator.remove(); + retryService.retainAll(partitions); + + /* + * Emitted messages for partitions that are no longer assigned to this spout can't + * be acked and should not be retried, hence remove them from emitted collection. + */ + Set<TopicPartition> partitionsSet = new HashSet<>(partitions); + Iterator<KafkaSpoutMessageId> msgIdIterator = emitted.iterator(); + while (msgIdIterator.hasNext()) { + KafkaSpoutMessageId msgId = msgIdIterator.next(); + if (!partitionsSet.contains(msgId.getTopicPartition())) { + msgIdIterator.remove(); + } } } for (TopicPartition tp : partitions) { final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp); final long fetchOffset = doSeek(tp, committedOffset); - setAcked(tp, fetchOffset); + // Add offset managers for the new partitions. + // If this partition was previously assigned to this spout, leave the acked offsets as they were to resume where it left off + if (isAtLeastOnce() && !offsetManagers.containsKey(tp)) { + offsetManagers.put(tp, new OffsetManager(tp, fetchOffset)); + } } initialized = true; LOG.info("Initialization complete"); @@ -209,15 +227,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout { } } - private void setAcked(TopicPartition tp, long fetchOffset) { - // If this partition was previously assigned to this spout, leave the acked offsets as they were to resume where it left off - if (!consumerAutoCommitMode && !offsetManagers.containsKey(tp)) { - offsetManagers.put(tp, new OffsetManager(tp, fetchOffset)); - } - } - // ======== Next Tuple ======= - @Override public void nextTuple() { try { @@ -252,24 +262,25 @@ public class KafkaSpout<K, V> extends BaseRichSpout { } private boolean commit() { - return !consumerAutoCommitMode && commitTimer.isExpiredResetOnTrue(); // timer != null for non auto commit mode + return isAtLeastOnce() && commitTimer.isExpiredResetOnTrue(); // timer != null for non auto commit mode } private boolean poll() { final int maxUncommittedOffsets = kafkaSpoutConfig.getMaxUncommittedOffsets(); final int readyMessageCount = retryService.readyMessageCount(); - final boolean poll = !waitingToEmit() && + final boolean poll = !waitingToEmit() //Check that the number of uncommitted, nonretriable tuples is less than the maxUncommittedOffsets limit - //Accounting for retriable tuples this way still guarantees that the limit is followed on a per partition basis, and prevents locking up the spout when there are too many retriable tuples - (numUncommittedOffsets - readyMessageCount < maxUncommittedOffsets || - consumerAutoCommitMode); - + //Accounting for retriable tuples this way still guarantees that the limit is followed on a per partition basis, + //and prevents locking up the spout when there are too many retriable tuples + && (numUncommittedOffsets - readyMessageCount < maxUncommittedOffsets + || !isAtLeastOnce()); + if (!poll) { if (waitingToEmit()) { LOG.debug("Not polling. Tuples waiting to be emitted. [{}] uncommitted offsets across all topic partitions", numUncommittedOffsets); } - if (numUncommittedOffsets >= maxUncommittedOffsets && !consumerAutoCommitMode) { + if (numUncommittedOffsets >= maxUncommittedOffsets && isAtLeastOnce()) { LOG.debug("Not polling. [{}] uncommitted offsets across all topic partitions has reached the threshold of [{}]", numUncommittedOffsets, maxUncommittedOffsets); } } @@ -297,6 +308,10 @@ public class KafkaSpout<K, V> extends BaseRichSpout { final ConsumerRecords<K, V> consumerRecords = kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs()); final int numPolledRecords = consumerRecords.count(); LOG.debug("Polled [{}] records from Kafka. [{}] uncommitted offsets across all topic partitions", numPolledRecords, numUncommittedOffsets); + if (kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) { + //Commit polled records immediately to ensure delivery is at-most-once. + kafkaConsumer.commitSync(); + } return consumerRecords; } @@ -335,11 +350,14 @@ public class KafkaSpout<K, V> extends BaseRichSpout { final boolean isScheduled = retryService.isScheduled(msgId); // not scheduled <=> never failed (i.e. never emitted), or scheduled and ready to be retried if (!isScheduled || retryService.isReady(msgId)) { - if (consumerAutoCommitMode) { - if (tuple instanceof KafkaTuple) { - collector.emit(((KafkaTuple) tuple).getStream(), tuple); + String stream = tuple instanceof KafkaTuple ? ((KafkaTuple) tuple).getStream() : Utils.DEFAULT_STREAM_ID; + if (!isAtLeastOnce()) { + if (kafkaSpoutConfig.getForceEnableTupleTracking()) { + collector.emit(stream, tuple, msgId); + LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", tuple, record, msgId); } else { - collector.emit(tuple); + collector.emit(stream, tuple); + LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record); } } else { emitted.add(msgId); @@ -349,15 +367,10 @@ public class KafkaSpout<K, V> extends BaseRichSpout { } else { //New tuple, hence increment the uncommitted offset counter numUncommittedOffsets++; } - - if (tuple instanceof KafkaTuple) { - collector.emit(((KafkaTuple) tuple).getStream(), tuple, msgId); - } else { - collector.emit(tuple, msgId); - } + collector.emit(stream, tuple, msgId); tupleListener.onEmit(tuple, msgId); + LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", tuple, record, msgId); } - LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", tuple, record, msgId); return true; } } else { @@ -407,31 +420,37 @@ public class KafkaSpout<K, V> extends BaseRichSpout { } // ======== Ack ======= - @Override public void ack(Object messageId) { + if (!isAtLeastOnce()) { + // Only need to keep track of acked tuples if commits are done based on acks + return; + } + final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId; if (!emitted.contains(msgId)) { if (msgId.isEmitted()) { - LOG.debug("Received ack for message [{}], associated with tuple emitted for a ConsumerRecord that " + - "came from a topic-partition that this consumer group instance is no longer tracking " + - "due to rebalance/partition reassignment. No action taken.", msgId); + LOG.debug("Received ack for message [{}], associated with tuple emitted for a ConsumerRecord that " + + "came from a topic-partition that this consumer group instance is no longer tracking " + + "due to rebalance/partition reassignment. No action taken.", msgId); } else { LOG.debug("Received direct ack for message [{}], associated with null tuple", msgId); } } else { - if (!consumerAutoCommitMode) { // Only need to keep track of acked tuples if commits are not done automatically - offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId); - } + offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId); emitted.remove(msgId); } tupleListener.onAck(msgId); } // ======== Fail ======= - @Override public void fail(Object messageId) { + if (!isAtLeastOnce()) { + // Only need to keep track of failed tuples if commits are done based on acks + return; + } + final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId; if (!emitted.contains(msgId)) { LOG.debug("Received fail for tuple this spout is no longer tracking. Partitions may have been reassigned. Ignoring message [{}]", msgId); @@ -451,7 +470,6 @@ public class KafkaSpout<K, V> extends BaseRichSpout { } // ======== Activate / Deactivate / Close / Declare Outputs ======= - @Override public void activate() { try { @@ -487,7 +505,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout { private void shutdown() { try { - if (!consumerAutoCommitMode) { + if (isAtLeastOnce()) { commitOffsetsForAckedTuples(); } } finally { @@ -506,10 +524,10 @@ public class KafkaSpout<K, V> extends BaseRichSpout { @Override public String toString() { - return "KafkaSpout{" + - "offsetManagers =" + offsetManagers + - ", emitted=" + emitted + - "}"; + return "KafkaSpout{" + + "offsetManagers =" + offsetManagers + + ", emitted=" + emitted + + "}"; } @Override @@ -532,6 +550,3 @@ public class KafkaSpout<K, V> extends BaseRichSpout { return kafkaSpoutConfig.getSubscription().getTopicsString(); } } - - - http://git-wip-us.apache.org/repos/asf/storm/blob/11a7a157/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java index 5cad0f4..b7de812 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java @@ -33,14 +33,18 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.storm.Config; import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval; import org.apache.storm.tuple.Fields; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * KafkaSpoutConfig defines the required configuration to connect a consumer to a consumer group, as well as the subscribing topics */ public class KafkaSpoutConfig<K, V> implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutConfig.class); private static final long serialVersionUID = 141902646130682494L; // 200ms public static final long DEFAULT_POLL_TIMEOUT_MS = 200; @@ -56,6 +60,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable { public static final KafkaSpoutRetryService DEFAULT_RETRY_SERVICE = new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(2), DEFAULT_MAX_RETRIES, TimeInterval.seconds(10)); + public static final ProcessingGuarantee DEFAULT_PROCESSING_GUARANTEE = ProcessingGuarantee.AT_LEAST_ONCE; public static final KafkaTupleListener DEFAULT_TUPLE_LISTENER = new EmptyKafkaTupleListener(); @@ -77,6 +82,8 @@ public class KafkaSpoutConfig<K, V> implements Serializable { private final Class<? extends Deserializer<K>> keyDesClazz; private final SerializableDeserializer<V> valueDes; private final Class<? extends Deserializer<V>> valueDesClazz; + private final ProcessingGuarantee processingGuarantee; + private final boolean forceEnableTupleTracking; /** * Creates a new KafkaSpoutConfig using a Builder. @@ -84,7 +91,8 @@ public class KafkaSpoutConfig<K, V> implements Serializable { * @param builder The Builder to construct the KafkaSpoutConfig from */ public KafkaSpoutConfig(Builder<K, V> builder) { - this.kafkaProps = setDefaultsAndGetKafkaProps(builder.kafkaProps); + setAutoCommitMode(builder); + this.kafkaProps = builder.kafkaProps; this.subscription = builder.subscription; this.translator = builder.translator; this.pollTimeoutMs = builder.pollTimeoutMs; @@ -99,6 +107,8 @@ public class KafkaSpoutConfig<K, V> implements Serializable { this.keyDesClazz = builder.keyDesClazz; this.valueDes = builder.valueDes; this.valueDesClazz = builder.valueDesClazz; + this.processingGuarantee = builder.processingGuarantee; + this.forceEnableTupleTracking = builder.forceEnableTupleTracking; } /** @@ -124,6 +134,25 @@ public class KafkaSpoutConfig<K, V> implements Serializable { UNCOMMITTED_LATEST } + /** + * The processing guarantee supported by the spout. This parameter affects when the spout commits offsets to Kafka, marking them as + * processed. + * + * <ul> + * <li>AT_LEAST_ONCE means that the Kafka spout considers an offset ready for commit once a tuple corresponding to that offset has been + * acked on the spout. This corresponds to an at-least-once guarantee.</li> + * <li>ANY_TIMES means that the Kafka spout may commit polled offsets at any time. This means the message may be processed any number of + * times (including 0), and causes the spout to enable auto offset committing on the underlying consumer.</li> + * <li>AT_MOST_ONCE means that the spout will commit polled offsets before emitting them to the topology. This guarantees at-most-once + * processing.</li> + * </ul> + */ + public static enum ProcessingGuarantee { + AT_LEAST_ONCE, + ANY_TIMES, + AT_MOST_ONCE + } + public static class Builder<K, V> { private final Map<String, Object> kafkaProps; @@ -141,6 +170,8 @@ public class KafkaSpoutConfig<K, V> implements Serializable { private KafkaTupleListener tupleListener = DEFAULT_TUPLE_LISTENER; private long partitionRefreshPeriodMs = DEFAULT_PARTITION_REFRESH_PERIOD_MS; private boolean emitNullTuples = false; + private ProcessingGuarantee processingGuarantee = DEFAULT_PROCESSING_GUARANTEE; + private boolean forceEnableTupleTracking = false; public Builder(String bootstrapServers, String... topics) { this(bootstrapServers, (SerializableDeserializer) null, (SerializableDeserializer) null, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new NamedTopicFilter(topics))); @@ -482,6 +513,8 @@ public class KafkaSpoutConfig<K, V> implements Serializable { /** * Specifies the period, in milliseconds, the offset commit task is periodically called. Default is 15s. * + * <p>This setting only has an effect if the configured {@link ProcessingGuarantee} is {@link ProcessingGuarantee#AT_LEAST_ONCE}. + * * @param offsetCommitPeriodMs time in ms */ public Builder<K, V> setOffsetCommitPeriodMs(long offsetCommitPeriodMs) { @@ -495,6 +528,8 @@ public class KafkaSpoutConfig<K, V> implements Serializable { * below the threshold. The default is {@link #DEFAULT_MAX_UNCOMMITTED_OFFSETS}. Note that this limit can in some cases be exceeded, * but no partition will exceed this limit by more than maxPollRecords - 1. * + * <p>This setting only has an effect if the configured {@link ProcessingGuarantee} is {@link ProcessingGuarantee#AT_LEAST_ONCE}. + * * @param maxUncommittedOffsets max number of records that can be be pending commit */ public Builder<K, V> setMaxUncommittedOffsets(int maxUncommittedOffsets) { @@ -516,6 +551,8 @@ public class KafkaSpoutConfig<K, V> implements Serializable { /** * Sets the retry service for the spout to use. * + * <p>This setting only has an effect if the configured {@link ProcessingGuarantee} is {@link ProcessingGuarantee#AT_LEAST_ONCE}. + * * @param retryService the new retry service * @return the builder (this). */ @@ -593,6 +630,32 @@ public class KafkaSpoutConfig<K, V> implements Serializable { return this; } + /** + * Specifies which processing guarantee the spout should offer. Refer to the documentation for {@link ProcessingGuarantee}. + * + * @param processingGuarantee The processing guarantee the spout should offer. + */ + public Builder<K, V> setProcessingGuarantee(ProcessingGuarantee processingGuarantee) { + this.processingGuarantee = processingGuarantee; + return this; + } + + /** + * Specifies whether the spout should require Storm to track emitted tuples when using a {@link ProcessingGuarantee} other than + * {@link ProcessingGuarantee#AT_LEAST_ONCE}. The spout will always track emitted tuples when offering at-least-once guarantees + * regardless of this setting. This setting is false by default. + * + * <p>Enabling tracking can be useful even in cases where reliability is not a concern, because it allows + * {@link Config#TOPOLOGY_MAX_SPOUT_PENDING} to have an effect, and enables some spout metrics (e.g. complete-latency) that would + * otherwise be disabled. + * + * @param forceEnableTupleTracking true if Storm should track emitted tuples, false otherwise + */ + public Builder<K, V> setForceEnableTupleTracking(boolean forceEnableTupleTracking) { + this.forceEnableTupleTracking = forceEnableTupleTracking; + return this; + } + public KafkaSpoutConfig<K, V> build() { return new KafkaSpoutConfig<>(this); } @@ -637,12 +700,24 @@ public class KafkaSpoutConfig<K, V> implements Serializable { return builder; } - private static Map<String, Object> setDefaultsAndGetKafkaProps(Map<String, Object> kafkaProps) { - // set defaults for properties not specified - if (!kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) { - kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + private static void setAutoCommitMode(Builder<?, ?> builder) { + if (builder.kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) { + LOG.warn("Do not set " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + " manually." + + " Instead use KafkaSpoutConfig.Builder.setProcessingGuarantee." + + " This will be treated as an error in the next major release." + + " For now the spout will be configured to behave like it would have in pre-1.2.0 releases."); + boolean enableAutoCommit = (boolean)builder.kafkaProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG); + if(enableAutoCommit) { + builder.processingGuarantee = ProcessingGuarantee.ANY_TIMES; + } else { + builder.processingGuarantee = ProcessingGuarantee.AT_LEAST_ONCE; + } + } + if (builder.processingGuarantee == ProcessingGuarantee.ANY_TIMES) { + builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); + } else { + builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); } - return kafkaProps; } /** @@ -700,10 +775,22 @@ public class KafkaSpoutConfig<K, V> implements Serializable { return offsetCommitPeriodMs; } + /** + * @deprecated Use {@link #getProcessingGuarantee()} instead. + */ + @Deprecated public boolean isConsumerAutoCommitMode() { return kafkaProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG) == null // default is false || Boolean.valueOf((String) kafkaProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)); } + + public ProcessingGuarantee getProcessingGuarantee() { + return processingGuarantee; + } + + public boolean getForceEnableTupleTracking() { + return forceEnableTupleTracking; + } public String getConsumerGroupId() { return (String) kafkaProps.get(ConsumerConfig.GROUP_ID_CONFIG); http://git-wip-us.apache.org/repos/asf/storm/blob/11a7a157/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java index 17ba378..afc9b82 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java @@ -15,7 +15,6 @@ */ package org.apache.storm.kafka.spout; -import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.*; @@ -41,6 +40,8 @@ import org.mockito.Captor; import org.mockito.InOrder; import org.mockito.MockitoAnnotations; +import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder; + public class KafkaSpoutCommitTest { private final long offsetCommitPeriodMs = 2_000; @@ -57,7 +58,7 @@ public class KafkaSpoutCommitTest { @Before public void setUp() { MockitoAnnotations.initMocks(this); - spoutConfig = getKafkaSpoutConfigBuilder(-1) + spoutConfig = createKafkaSpoutConfigBuilder(-1) .setOffsetCommitPeriodMs(offsetCommitPeriodMs) .build(); consumerMock = mock(KafkaConsumer.class); http://git-wip-us.apache.org/repos/asf/storm/blob/11a7a157/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java index e8e93b0..3b1ce2d 100755 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java @@ -15,8 +15,6 @@ */ package org.apache.storm.kafka.spout; -import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertThat; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyObject; import static org.mockito.Mockito.mock; @@ -40,18 +38,18 @@ import org.apache.storm.task.TopologyContext; import org.junit.Test; import org.mockito.ArgumentCaptor; -import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder; import static org.mockito.Matchers.anyList; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.never; -import java.util.HashSet; import org.apache.storm.utils.Time; import org.apache.storm.utils.Time.SimulatedTime; import org.junit.Before; import org.mockito.InOrder; +import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder; + public class KafkaSpoutEmitTest { private final long offsetCommitPeriodMs = 2_000; @@ -64,7 +62,7 @@ public class KafkaSpoutEmitTest { @Before public void setUp() { - spoutConfig = getKafkaSpoutConfigBuilder(-1) + spoutConfig = createKafkaSpoutConfigBuilder(-1) .setOffsetCommitPeriodMs(offsetCommitPeriodMs) .build(); consumerMock = mock(KafkaConsumer.class); http://git-wip-us.apache.org/repos/asf/storm/blob/11a7a157/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java new file mode 100644 index 0000000..1f23cc5 --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java @@ -0,0 +1,207 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyList; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Time.SimulatedTime; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.InOrder; + +public class KafkaSpoutMessagingGuaranteeTest { + + private final TopologyContext contextMock = mock(TopologyContext.class); + private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class); + private final Map<String, Object> conf = new HashMap<>(); + private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1); + private KafkaConsumer<String, String> consumerMock; + + @Before + public void setUp() { + consumerMock = mock(KafkaConsumer.class); + } + + @Test + public void testAtMostOnceModeCommitsBeforeEmit() throws Exception { + //At-most-once mode must commit tuples before they are emitted to the topology to ensure that a spout crash won't cause replays. + KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(-1) + .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) + .build(); + KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition)); + + when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition, + SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, 1)))); + + spout.nextTuple(); + + //The spout should have emitted the tuple, and must have committed it before emit + InOrder inOrder = inOrder(consumerMock, collectorMock); + inOrder.verify(consumerMock).poll(anyLong()); + inOrder.verify(consumerMock).commitSync(); + inOrder.verify(collectorMock).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList()); + } + + private void doTestModeDisregardsMaxUncommittedOffsets(KafkaSpoutConfig<String, String> spoutConfig) { + KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition)); + + when(consumerMock.poll(anyLong())) + .thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition, + SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, spoutConfig.getMaxUncommittedOffsets())))) + .thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition, + SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, spoutConfig.getMaxUncommittedOffsets() - 1, spoutConfig.getMaxUncommittedOffsets())))); + + for (int i = 0; i < spoutConfig.getMaxUncommittedOffsets() * 2; i++) { + spout.nextTuple(); + } + + verify(consumerMock, times(2)).poll(anyLong()); + verify(collectorMock, times(spoutConfig.getMaxUncommittedOffsets() * 2)).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList()); + } + + @Test + public void testAtMostOnceModeDisregardsMaxUncommittedOffsets() throws Exception { + //The maxUncommittedOffsets limit should not be enforced, since it is only meaningful in at-least-once mode + KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(-1) + .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) + .build(); + doTestModeDisregardsMaxUncommittedOffsets(spoutConfig); + } + + @Test + public void testAnyTimesModeDisregardsMaxUncommittedOffsets() throws Exception { + //The maxUncommittedOffsets limit should not be enforced, since it is only meaningful in at-least-once mode + KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(-1) + .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.ANY_TIMES) + .build(); + doTestModeDisregardsMaxUncommittedOffsets(spoutConfig); + } + + private void doTestModeCannotReplayTuples(KafkaSpoutConfig<String, String> spoutConfig) { + KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition)); + + when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition, + SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, 1)))); + + spout.nextTuple(); + + ArgumentCaptor<KafkaSpoutMessageId> msgIdCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); + verify(collectorMock).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList(), msgIdCaptor.capture()); + assertThat("Should have captured a message id", msgIdCaptor.getValue(), not(nullValue())); + + spout.fail(msgIdCaptor.getValue()); + + reset(consumerMock); + + when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition, + SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 1, 1)))); + + spout.nextTuple(); + + //The consumer should not be seeking to retry the failed tuple, it should just be continuing from the current position + verify(consumerMock, never()).seek(eq(partition), anyLong()); + } + + @Test + public void testAtMostOnceModeCannotReplayTuples() throws Exception { + //When tuple tracking is enabled, the spout must not replay tuples in at-most-once mode + KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(-1) + .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) + .setForceEnableTupleTracking(true) + .build(); + doTestModeCannotReplayTuples(spoutConfig); + } + + @Test + public void testAnyTimesModeCannotReplayTuples() throws Exception { + //When tuple tracking is enabled, the spout must not replay tuples in any-times mode + KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(-1) + .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.ANY_TIMES) + .setForceEnableTupleTracking(true) + .build(); + doTestModeCannotReplayTuples(spoutConfig); + } + + private void doTestModeDoesNotCommitAckedTuples(KafkaSpoutConfig<String, String> spoutConfig) { + try (SimulatedTime time = new SimulatedTime()) { + KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition)); + + when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition, + SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, 1)))); + + spout.nextTuple(); + + ArgumentCaptor<KafkaSpoutMessageId> msgIdCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); + verify(collectorMock).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList(), msgIdCaptor.capture()); + assertThat("Should have captured a message id", msgIdCaptor.getValue(), not(nullValue())); + + spout.ack(msgIdCaptor.getValue()); + + Time.advanceTime(spoutConfig.getOffsetsCommitPeriodMs()); + + spout.nextTuple(); + + verify(consumerMock, never()).commitSync(any(Map.class)); + } + } + + @Test + public void testAtMostOnceModeDoesNotCommitAckedTuples() throws Exception { + //When tuple tracking is enabled, the spout must not commit acked tuples in at-most-once mode because they were committed before being emitted + KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(-1) + .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) + .setForceEnableTupleTracking(true) + .build(); + doTestModeDoesNotCommitAckedTuples(spoutConfig); + } + + @Test + public void testAnyTimesModeDoesNotCommitAckedTuples() throws Exception { + //When tuple tracking is enabled, the spout must not commit acked tuples in any-times mode because committing is managed by the consumer + KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(-1) + .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.ANY_TIMES) + .setForceEnableTupleTracking(true) + .build(); + doTestModeDoesNotCommitAckedTuples(spoutConfig); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/11a7a157/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java index 8996190..ab57052 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java @@ -15,7 +15,6 @@ */ package org.apache.storm.kafka.spout; -import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder; import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.Matchers.hasKey; import static org.junit.Assert.assertThat; @@ -54,6 +53,9 @@ import org.mockito.Captor; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; +import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder; +import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder; + public class KafkaSpoutRebalanceTest { @Captor @@ -131,7 +133,7 @@ public class KafkaSpoutRebalanceTest { doNothing() .when(subscriptionMock) .subscribe(any(KafkaConsumer.class), rebalanceListenerCapture.capture(), any(TopologyContext.class)); - KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfigBuilder(subscriptionMock, -1) + KafkaSpout<String, String> spout = new KafkaSpout<>(createKafkaSpoutConfigBuilder(subscriptionMock, -1) .setOffsetCommitPeriodMs(offsetCommitPeriodMs) .build(), consumerFactoryMock); String topic = SingleTopicKafkaSpoutConfiguration.TOPIC; @@ -168,7 +170,7 @@ public class KafkaSpoutRebalanceTest { .when(subscriptionMock) .subscribe(any(KafkaConsumer.class), rebalanceListenerCapture.capture(), any(TopologyContext.class)); KafkaSpoutRetryService retryServiceMock = mock(KafkaSpoutRetryService.class); - KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfigBuilder(subscriptionMock, -1) + KafkaSpout<String, String> spout = new KafkaSpout<>(createKafkaSpoutConfigBuilder(subscriptionMock, -1) .setOffsetCommitPeriodMs(10) .setRetry(retryServiceMock) .build(), consumerFactoryMock); http://git-wip-us.apache.org/repos/asf/storm/blob/11a7a157/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java index 79f7398..39fa42c 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java @@ -15,7 +15,6 @@ */ package org.apache.storm.kafka.spout; -import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.*; @@ -45,6 +44,8 @@ import org.mockito.Captor; import org.mockito.InOrder; import org.mockito.MockitoAnnotations; +import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder; + public class KafkaSpoutRetryLimitTest { private final long offsetCommitPeriodMs = 2_000; @@ -65,7 +66,7 @@ public class KafkaSpoutRetryLimitTest { @Before public void setUp() { MockitoAnnotations.initMocks(this); - spoutConfig = getKafkaSpoutConfigBuilder(-1) + spoutConfig = createKafkaSpoutConfigBuilder(-1) .setOffsetCommitPeriodMs(offsetCommitPeriodMs) .setRetry(ZERO_RETRIES_RETRY_SERVICE) .build(); http://git-wip-us.apache.org/repos/asf/storm/blob/11a7a157/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java index ccb2a6c..80a8e1d 100755 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java @@ -15,14 +15,12 @@ */ package org.apache.storm.kafka.spout; -import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder; import static org.hamcrest.CoreMatchers.everyItem; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.isIn; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.junit.Assert.assertThat; -import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyList; import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.anyString; @@ -30,15 +28,12 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.storm.kafka.KafkaUnitRule; import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration; import org.apache.storm.spout.SpoutOutputCollector; @@ -50,6 +45,8 @@ import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.MockitoAnnotations; +import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder; + public class MaxUncommittedOffsetTest { @Rule @@ -63,7 +60,7 @@ public class MaxUncommittedOffsetTest { private final int maxUncommittedOffsets = 10; private final int maxPollRecords = 5; private final int initialRetryDelaySecs = 60; - private final KafkaSpoutConfig<String, String> spoutConfig = getKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort()) + private final KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort()) .setOffsetCommitPeriodMs(commitOffsetPeriodMs) .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords) .setMaxUncommittedOffsets(maxUncommittedOffsets) @@ -84,30 +81,15 @@ public class MaxUncommittedOffsetTest { this.spout = new KafkaSpout<>(spoutConfig); } - private void populateTopicData(String topicName, int msgCount) throws Exception { - kafkaUnitRule.getKafkaUnit().createTopic(topicName); - - for (int i = 0; i < msgCount; i++) { - ProducerRecord<String, String> producerRecord = new ProducerRecord<>( - topicName, Integer.toString(i), - Integer.toString(i)); - - kafkaUnitRule.getKafkaUnit().sendMessage(producerRecord); - } - } - - private void initializeSpout(int msgCount) throws Exception { - populateTopicData(SingleTopicKafkaSpoutConfiguration.TOPIC, msgCount); - when(topologyContext.getThisTaskIndex()).thenReturn(0); - when(topologyContext.getComponentTasks(anyString())).thenReturn(Collections.singletonList(0)); - spout.open(conf, topologyContext, collector); - spout.activate(); + private void prepareSpout(int msgCount) throws Exception { + SingleTopicKafkaUnitSetupHelper.populateTopicData(kafkaUnitRule.getKafkaUnit(), SingleTopicKafkaSpoutConfiguration.TOPIC, msgCount); + SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, topologyContext, collector); } private ArgumentCaptor<KafkaSpoutMessageId> emitMaxUncommittedOffsetsMessagesAndCheckNoMoreAreEmitted(int messageCount) throws Exception { assertThat("The message count is less than maxUncommittedOffsets. This test is not meaningful with this configuration.", messageCount, greaterThanOrEqualTo(maxUncommittedOffsets)); //The spout must respect maxUncommittedOffsets when requesting/emitting tuples - initializeSpout(messageCount); + prepareSpout(messageCount); //Try to emit all messages. Ensure only maxUncommittedOffsets are emitted ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); http://git-wip-us.apache.org/repos/asf/storm/blob/11a7a157/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java index 7759b3c..cbbb391 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java @@ -17,10 +17,8 @@ */ package org.apache.storm.kafka.spout; -import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder; import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.storm.kafka.KafkaUnitRule; import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration; import org.apache.storm.spout.SpoutOutputCollector; @@ -31,12 +29,9 @@ import org.junit.Test; import org.mockito.ArgumentCaptor; import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeoutException; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; -import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyList; import static org.mockito.Matchers.anyListOf; import static org.mockito.Matchers.anyObject; @@ -48,9 +43,7 @@ import static org.mockito.Mockito.reset; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -65,6 +58,8 @@ import org.junit.Before; import org.mockito.Captor; import org.mockito.MockitoAnnotations; +import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder; + public class SingleTopicKafkaSpoutTest { @Rule @@ -77,15 +72,15 @@ public class SingleTopicKafkaSpoutTest { private final Map<String, Object> conf = new HashMap<>(); private final SpoutOutputCollector collector = mock(SpoutOutputCollector.class); private final long commitOffsetPeriodMs = 2_000; + private final int maxRetries = 3; private KafkaConsumer<String, String> consumerSpy; private KafkaConsumerFactory<String, String> consumerFactory; private KafkaSpout<String, String> spout; - private int maxRetries = 3; @Before public void setUp() { MockitoAnnotations.initMocks(this); - KafkaSpoutConfig<String, String> spoutConfig = getKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort()) + KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort()) .setOffsetCommitPeriodMs(commitOffsetPeriodMs) .setRetry(new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), maxRetries, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0))) @@ -101,25 +96,11 @@ public class SingleTopicKafkaSpoutTest { this.spout = new KafkaSpout<>(spoutConfig, consumerFactory); } - void populateTopicData(String topicName, int msgCount) throws InterruptedException, ExecutionException, TimeoutException { - kafkaUnitRule.getKafkaUnit().createTopic(topicName); - - for (int i = 0; i < msgCount; i++) { - ProducerRecord<String, String> producerRecord = new ProducerRecord<>( - topicName, Integer.toString(i), - Integer.toString(i)); - kafkaUnitRule.getKafkaUnit().sendMessage(producerRecord); - } + private void prepareSpout(int messageCount) throws Exception { + SingleTopicKafkaUnitSetupHelper.populateTopicData(kafkaUnitRule.getKafkaUnit(), SingleTopicKafkaSpoutConfiguration.TOPIC, messageCount); + SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, topologyContext, collector); } - - private void initializeSpout(int msgCount) throws InterruptedException, ExecutionException, TimeoutException { - populateTopicData(SingleTopicKafkaSpoutConfiguration.TOPIC, msgCount); - when(topologyContext.getThisTaskIndex()).thenReturn(0); - when(topologyContext.getComponentTasks(anyString())).thenReturn(Collections.singletonList(0)); - spout.open(conf, topologyContext, collector); - spout.activate(); - } - + /* * Asserts that commitSync has been called once, * that there are only commits on one topic, @@ -137,7 +118,7 @@ public class SingleTopicKafkaSpoutTest { public void shouldContinueWithSlowDoubleAcks() throws Exception { try (SimulatedTime simulatedTime = new SimulatedTime()) { int messageCount = 20; - initializeSpout(messageCount); + prepareSpout(messageCount); //play 1st tuple ArgumentCaptor<Object> messageIdToDoubleAck = ArgumentCaptor.forClass(Object.class); @@ -178,7 +159,7 @@ public class SingleTopicKafkaSpoutTest { public void shouldEmitAllMessages() throws Exception { try (SimulatedTime simulatedTime = new SimulatedTime()) { int messageCount = 10; - initializeSpout(messageCount); + prepareSpout(messageCount); //Emit all messages and check that they are emitted. Ack the messages too for(int i = 0; i < messageCount; i++) { @@ -206,7 +187,7 @@ public class SingleTopicKafkaSpoutTest { public void shouldReplayInOrderFailedMessages() throws Exception { try (SimulatedTime simulatedTime = new SimulatedTime()) { int messageCount = 10; - initializeSpout(messageCount); + prepareSpout(messageCount); //play and ack 1 tuple ArgumentCaptor<Object> messageIdAcked = ArgumentCaptor.forClass(Object.class); @@ -249,7 +230,7 @@ public class SingleTopicKafkaSpoutTest { public void shouldReplayFirstTupleFailedOutOfOrder() throws Exception { try (SimulatedTime simulatedTime = new SimulatedTime()) { int messageCount = 10; - initializeSpout(messageCount); + prepareSpout(messageCount); //play 1st tuple ArgumentCaptor<Object> messageIdToFail = ArgumentCaptor.forClass(Object.class); @@ -296,7 +277,7 @@ public class SingleTopicKafkaSpoutTest { //The spout must reemit retriable tuples, even if they fail out of order. //The spout should be able to skip tuples it has already emitted when retrying messages, even if those tuples are also retries. int messageCount = 10; - initializeSpout(messageCount); + prepareSpout(messageCount); //play all tuples for (int i = 0; i < messageCount; i++) { @@ -329,7 +310,7 @@ public class SingleTopicKafkaSpoutTest { public void shouldDropMessagesAfterMaxRetriesAreReached() throws Exception { //Check that if one message fails repeatedly, the retry cap limits how many times the message can be reemitted int messageCount = 1; - initializeSpout(messageCount); + prepareSpout(messageCount); //Emit and fail the same tuple until we've reached retry limit for (int i = 0; i <= maxRetries; i++) { http://git-wip-us.apache.org/repos/asf/storm/blob/11a7a157/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaUnitSetupHelper.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaUnitSetupHelper.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaUnitSetupHelper.java new file mode 100644 index 0000000..81fe362 --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaUnitSetupHelper.java @@ -0,0 +1,67 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.when; + +import java.util.Collections; +import java.util.Map; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.storm.kafka.KafkaUnit; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; + +public class SingleTopicKafkaUnitSetupHelper { + + /** + * Using the given KafkaUnit instance, put some messages in the specified topic. + * + * @param kafkaUnit The KafkaUnit instance to use + * @param topicName The topic to produce messages for + * @param msgCount The number of messages to produce + */ + public static void populateTopicData(KafkaUnit kafkaUnit, String topicName, int msgCount) throws Exception { + kafkaUnit.createTopic(topicName); + + for (int i = 0; i < msgCount; i++) { + ProducerRecord<String, String> producerRecord = new ProducerRecord<>( + topicName, Integer.toString(i), + Integer.toString(i)); + kafkaUnit.sendMessage(producerRecord); + } + } + + /** + * Open and activate a KafkaSpout that acts as a single-task/executor spout. + * + * @param <K> Kafka key type + * @param <V> Kafka value type + * @param spout The spout to prepare + * @param topoConf The topoConf + * @param topoContextMock The TopologyContext mock + * @param collectorMock The output collector mock + */ + public static <K, V> void initializeSpout(KafkaSpout<K, V> spout, Map<String, Object> topoConf, TopologyContext topoContextMock, + SpoutOutputCollector collectorMock) throws Exception { + when(topoContextMock.getThisTaskIndex()).thenReturn(0); + when(topoContextMock.getComponentTasks(anyString())).thenReturn(Collections.singletonList(0)); + spout.open(topoConf, topoContextMock, collectorMock); + spout.activate(); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/11a7a157/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java index 1ab4966..f4cb833 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java @@ -41,13 +41,6 @@ public class SingleTopicKafkaSpoutConfiguration { public static final String STREAM = "test_stream"; public static final String TOPIC = "test"; - /** - * Retry in a tight loop (keep unit tests fasts). - */ - public static final KafkaSpoutRetryService UNIT_TEST_RETRY_SERVICE = - new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0), - DEFAULT_MAX_RETRIES, KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0)); - public static Config getConfig() { Config config = new Config(); config.setDebug(true); @@ -56,7 +49,7 @@ public class SingleTopicKafkaSpoutConfiguration { public static StormTopology getTopologyKafkaSpout(int port) { final TopologyBuilder tp = new TopologyBuilder(); - tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfigBuilder(port).build()), 1); + tp.setSpout("kafka_spout", new KafkaSpout<>(SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder(port).build()), 1); tp.setBolt("kafka_bolt", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAM); return tp.createTopology(); } @@ -68,11 +61,11 @@ public class SingleTopicKafkaSpoutConfiguration { } }; - public static KafkaSpoutConfig.Builder<String, String> getKafkaSpoutConfigBuilder(int port) { + public static KafkaSpoutConfig.Builder<String, String> createKafkaSpoutConfigBuilder(int port) { return setCommonSpoutConfig(KafkaSpoutConfig.builder("127.0.0.1:" + port, TOPIC)); } - public static KafkaSpoutConfig.Builder<String, String> getKafkaSpoutConfigBuilder(Subscription subscription, int port) { + public static KafkaSpoutConfig.Builder<String, String> createKafkaSpoutConfigBuilder(Subscription subscription, int port) { return setCommonSpoutConfig(new KafkaSpoutConfig.Builder<String, String>("127.0.0.1:" + port, subscription)); } @@ -82,14 +75,18 @@ public class SingleTopicKafkaSpoutConfiguration { new Fields("topic", "key", "value"), STREAM) .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup") .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5) - .setRetry(getRetryService()) + .setRetry(getNoDelayRetryService()) .setOffsetCommitPeriodMs(10_000) .setFirstPollOffsetStrategy(EARLIEST) .setMaxUncommittedOffsets(250) .setPollTimeoutMs(1000); } - protected static KafkaSpoutRetryService getRetryService() { - return UNIT_TEST_RETRY_SERVICE; + protected static KafkaSpoutRetryService getNoDelayRetryService() { + /** + * Retry in a tight loop (keep unit tests fasts). + */ + return new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0), + DEFAULT_MAX_RETRIES, KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0)); } }
