Repository: storm Updated Branches: refs/heads/1.x-branch 620d2be86 -> 609fe10f5
STORM-2409: Storm-Kafka-Client KafkaSpout Support for Failed and Null Tuples - Created config property to make emit null tuples configurable - Ack directly null tuples that are not emitted - Added emit field to KafkaSpoutMessageId Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8670f60d Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8670f60d Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8670f60d Branch: refs/heads/1.x-branch Commit: 8670f60d459f4c27ea040bd2f401ecd22c4bf497 Parents: 3ebf81b Author: Hugo Louro <[email protected]> Authored: Mon Mar 13 19:29:12 2017 -0700 Committer: Hugo Louro <[email protected]> Committed: Wed Mar 15 17:43:36 2017 -0700 ---------------------------------------------------------------------- .../apache/storm/kafka/spout/KafkaSpout.java | 124 +++++++++++-------- .../storm/kafka/spout/KafkaSpoutConfig.java | 33 +++-- .../storm/kafka/spout/KafkaSpoutMessageId.java | 21 +++- .../storm/kafka/spout/RecordTranslator.java | 9 +- .../storm/kafka/spout/KafkaSpoutConfigTest.java | 19 ++- 5 files changed, 140 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/8670f60d/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index bbad9e8..207ba23 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -76,18 +76,18 @@ public class KafkaSpout<K, V> extends BaseRichSpout { private transient boolean initialized; // Flag indicating that the spout is still undergoing initialization process. // Initialization is only complete after the first call to KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned() - private transient Map<TopicPartition, OffsetManager> acked; // Tuples that were successfully acked. These tuples will be committed periodically when the commit timer expires, after consumer rebalance, or on close/deactivate + private transient Map<TopicPartition, OffsetManager> acked; // Tuples that were successfully acked. These tuples will be committed periodically when the commit timer expires, or after a consumer rebalance, or during close/deactivate private transient Set<KafkaSpoutMessageId> emitted; // Tuples that have been emitted but that are "on the wire", i.e. pending being acked or failed. Not used if it's AutoCommitMode - private transient Iterator<ConsumerRecord<K, V>> waitingToEmit; // Records that have been polled and are queued to be emitted in the nextTuple() call. One record is emitted per nextTuple() + private transient Iterator<ConsumerRecord<K, V>> waitingToEmit; // Records that have been polled and are queued to be emitted in the nextTuple() call. One record is emitted per nextTuple() private transient long numUncommittedOffsets; // Number of offsets that have been polled and emitted but not yet been committed. Not used if it's AutoCommitMode + private transient Timer refreshSubscriptionTimer; // Triggers when a subscription should be refreshed private transient TopologyContext context; - private transient Timer refreshSubscriptionTimer; // Used to say when a subscription should be refreshed public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig) { this(kafkaSpoutConfig, new KafkaConsumerFactoryDefault<K, V>()); } - + //This constructor is here for testing KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig, KafkaConsumerFactory<K, V> kafkaConsumerFactory) { this.kafkaConsumerFactory = kafkaConsumerFactory; @@ -106,7 +106,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout { // Offset management firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy(); // with AutoCommitMode, offset will be periodically committed in the background by Kafka consumer - consumerAutoCommitMode = kafkaSpoutConfig.isConsumerAutoCommitMode(); + consumerAutoCommitMode = kafkaSpoutConfig.isConsumerAutoCommitMode(); // Retries management retryService = kafkaSpoutConfig.getRetryService(); @@ -150,9 +150,12 @@ public class KafkaSpout<K, V> extends BaseRichSpout { } retryService.retainAll(partitions); - - //Emitted messages for partitions that are no longer assigned to this spout can't be acked, and they shouldn't be retried. Remove them from emitted. - Set<TopicPartition> partitionsSet = new HashSet(partitions); + + /* + * Emitted messages for partitions that are no longer assigned to this spout can't + * be acked and should not be retried, hence remove them from emitted collection. + */ + Set<TopicPartition> partitionsSet = new HashSet<>(partitions); Iterator<KafkaSpoutMessageId> msgIdIterator = emitted.iterator(); while (msgIdIterator.hasNext()) { KafkaSpoutMessageId msgId = msgIdIterator.next(); @@ -210,7 +213,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout { @Override public void nextTuple() { - try{ + try { if (initialized) { if (commit()) { commitOffsetsForAckedTuples(); @@ -234,7 +237,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout { throwKafkaConsumerInterruptedException(); } } - + private void throwKafkaConsumerInterruptedException() { //Kafka throws their own type of exception when interrupted. //Throw a new Java InterruptedException to ensure Storm can recognize the exception as a reaction to an interrupt. @@ -247,8 +250,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout { private boolean poll() { final int maxUncommittedOffsets = kafkaSpoutConfig.getMaxUncommittedOffsets(); - final boolean poll = !waitingToEmit() - && ( numUncommittedOffsets < maxUncommittedOffsets || consumerAutoCommitMode); + final boolean poll = !waitingToEmit() + && (numUncommittedOffsets < maxUncommittedOffsets || consumerAutoCommitMode); if (!poll) { if (waitingToEmit()) { @@ -301,13 +304,17 @@ public class KafkaSpout<K, V> extends BaseRichSpout { // ======== emit ========= private void emit() { - while(!emitTupleIfNotEmitted(waitingToEmit.next()) && waitingToEmit.hasNext()) { + while (!emitTupleIfNotEmitted(waitingToEmit.next()) && waitingToEmit.hasNext()) { waitingToEmit.remove(); } } - //Emits one tuple per record - //@return true if tuple was emitted + /** + * Creates a tuple from the kafka record and emits it if it was not yet emitted + * + * @param record to be emitted + * @return true if tuple was emitted. False if tuple has been acked or has been emitted and is pending ack or fail + */ private boolean emitTupleIfNotEmitted(ConsumerRecord<K, V> record) { final TopicPartition tp = new TopicPartition(record.topic(), record.partition()); final KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(record); @@ -317,37 +324,51 @@ public class KafkaSpout<K, V> extends BaseRichSpout { } else if (emitted.contains(msgId)) { // has been emitted and it's pending ack or fail LOG.trace("Tuple for record [{}] has already been emitted. Skipping", record); } else { - boolean isScheduled = retryService.isScheduled(msgId); - if (!isScheduled || retryService.isReady(msgId)) { // not scheduled <=> never failed (i.e. never emitted) or ready to be retried - final List<Object> tuple = kafkaSpoutConfig.getTranslator().apply(record); - - if(consumerAutoCommitMode){ - if (tuple instanceof KafkaTuple) { - collector.emit(((KafkaTuple)tuple).getStream(), tuple); + final List<Object> tuple = kafkaSpoutConfig.getTranslator().apply(record); + if (isEmitTuple(tuple)) { + final boolean isScheduled = retryService.isScheduled(msgId); + // not scheduled <=> never failed (i.e. never emitted), or scheduled and ready to be retried + if (!isScheduled || retryService.isReady(msgId)) { + if (consumerAutoCommitMode) { + if (tuple instanceof KafkaTuple) { + collector.emit(((KafkaTuple) tuple).getStream(), tuple); + } else { + collector.emit(tuple); + } } else { - collector.emit(tuple); - } - }else{ - if (tuple instanceof KafkaTuple) { - collector.emit(((KafkaTuple)tuple).getStream(), tuple, msgId); - } else { - collector.emit(tuple, msgId); - } - - emitted.add(msgId); - numUncommittedOffsets++; - - if (isScheduled) { // Was scheduled for retry, now being re-emitted. Remove from schedule. - retryService.remove(msgId); + if (tuple instanceof KafkaTuple) { + collector.emit(((KafkaTuple) tuple).getStream(), tuple, msgId); + } else { + collector.emit(tuple, msgId); + } + + emitted.add(msgId); + + if (isScheduled) { // Was scheduled for retry and re-emitted, so remove from schedule. + retryService.remove(msgId); + } else { //New tuple, hence increment the uncommitted offset counter + numUncommittedOffsets++; + } } + LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", tuple, record, msgId); + return true; } - LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record); - return true; + } else { + LOG.debug("Not emitting null tuple for record [{}] as defined in configuration.", record); + msgId.setEmitted(false); + ack(msgId); } } return false; } + /** + * Emits a tuple if it is not a null tuple, or if the spout is configured to emit null tuples + */ + private boolean isEmitTuple(List<Object> tuple) { + return tuple != null || kafkaSpoutConfig.isEmitNullTuples(); + } + private void commitOffsetsForAckedTuples() { // Find offsets that are ready to be committed for every topic partition final Map<TopicPartition, OffsetAndMetadata> nextCommitOffsets = new HashMap<>(); @@ -383,15 +404,20 @@ public class KafkaSpout<K, V> extends BaseRichSpout { @Override public void ack(Object messageId) { final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId; - if(!emitted.contains(msgId)) { - LOG.debug("Received ack for tuple this spout is no longer tracking. Partitions may have been reassigned. Ignoring message [{}]", msgId); - return; - } - - if (!consumerAutoCommitMode) { // Only need to keep track of acked tuples if commits are not done automatically - acked.get(msgId.getTopicPartition()).add(msgId); + if (!emitted.contains(msgId)) { + if (msgId.isEmitted()) { + LOG.debug("Received ack for message [{}], associated with tuple emitted for a ConsumerRecord that " + + "came from a topic-partition that this consumer group instance is no longer tracking " + + "due to rebalance/partition reassignment. No action taken.", msgId); + } else { + LOG.debug("Received direct ack for message [{}], associated with null tuple", msgId); + } + } else { + if (!consumerAutoCommitMode) { // Only need to keep track of acked tuples if commits are not done automatically + acked.get(msgId.getTopicPartition()).add(msgId); + } + emitted.remove(msgId); } - emitted.remove(msgId); } // ======== Fail ======= @@ -399,7 +425,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout { @Override public void fail(Object messageId) { final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId; - if(!emitted.contains(msgId)) { + if (!emitted.contains(msgId)) { LOG.debug("Received fail for tuple this spout is no longer tracking. Partitions may have been reassigned. Ignoring message [{}]", msgId); return; } @@ -460,7 +486,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout { @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { RecordTranslator<K, V> translator = kafkaSpoutConfig.getTranslator(); - for (String stream: translator.streams()) { + for (String stream : translator.streams()) { declarer.declareStream(stream, translator.getFieldsFor(stream)); } } @@ -474,7 +500,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout { } @Override - public Map<String, Object> getComponentConfiguration () { + public Map<String, Object> getComponentConfiguration() { Map<String, Object> configuration = super.getComponentConfiguration(); if (configuration == null) { configuration = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/storm/blob/8670f60d/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java index 2b81dea..920dca9 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java @@ -18,6 +18,13 @@ package org.apache.storm.kafka.spout; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval; +import org.apache.storm.tuple.Fields; + import java.io.Serializable; import java.util.Collection; import java.util.HashMap; @@ -26,13 +33,6 @@ import java.util.Map; import java.util.Properties; import java.util.regex.Pattern; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval; -import org.apache.storm.tuple.Fields; - /** * KafkaSpoutConfig defines the required configuration to connect a consumer to a consumer group, as well as the subscribing topics */ @@ -106,6 +106,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable { private int maxUncommittedOffsets = DEFAULT_MAX_UNCOMMITTED_OFFSETS; private KafkaSpoutRetryService retryService = DEFAULT_RETRY_SERVICE; private long partitionRefreshPeriodMs = DEFAULT_PARTITION_REFRESH_PERIOD_MS; + private boolean emitNullTuples = false; public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, String ... topics) { this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics)); @@ -402,7 +403,17 @@ public class KafkaSpoutConfig<K, V> implements Serializable { this.partitionRefreshPeriodMs = partitionRefreshPeriodMs; return this; } - + + /** + * Specifies if the spout should emit null tuples to the component downstream, or rather not emit and directly + * ack them. By default this parameter is set to false, which means that null tuples are not emitted. + * @param emitNullTuples sets if null tuples should or not be emitted downstream + */ + public Builder<K, V> setEmitNullTuples(boolean emitNullTuples) { + this.emitNullTuples = emitNullTuples; + return this; + } + public KafkaSpoutConfig<K,V> build() { return new KafkaSpoutConfig<>(this); } @@ -424,6 +435,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable { private final FirstPollOffsetStrategy firstPollOffsetStrategy; private final KafkaSpoutRetryService retryService; private final long partitionRefreshPeriodMs; + private final boolean emitNullTuples; private KafkaSpoutConfig(Builder<K,V> builder) { this.kafkaProps = setDefaultsAndGetKafkaProps(builder.kafkaProps); @@ -439,6 +451,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable { this.valueDes = builder.valueDes; this.valueDesClazz = builder.valueDesClazz; this.partitionRefreshPeriodMs = builder.partitionRefreshPeriodMs; + this.emitNullTuples = builder.emitNullTuples; } public Map<String, Object> getKafkaProps() { @@ -508,6 +521,10 @@ public class KafkaSpoutConfig<K, V> implements Serializable { return partitionRefreshPeriodMs; } + public boolean isEmitNullTuples() { + return emitNullTuples; + } + @Override public String toString() { return "KafkaSpoutConfig{" + http://git-wip-us.apache.org/repos/asf/storm/blob/8670f60d/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java index 3cfad9d..dea57c4 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java @@ -25,14 +25,25 @@ public class KafkaSpoutMessageId { private transient TopicPartition topicPart; private transient long offset; private transient int numFails = 0; + private boolean emitted; // true if the record was emitted using a form of collector.emit(...). + // false when skipping null tuples as configured by the user in KafkaSpoutConfig public KafkaSpoutMessageId(ConsumerRecord<?, ?> consumerRecord) { - this(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord.offset()); + this(consumerRecord, true); + } + + public KafkaSpoutMessageId(ConsumerRecord<?, ?> consumerRecord, boolean emitted) { + this(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord.offset(), emitted); } public KafkaSpoutMessageId(TopicPartition topicPart, long offset) { + this(topicPart, offset, true); + } + + public KafkaSpoutMessageId(TopicPartition topicPart, long offset, boolean emitted) { this.topicPart = topicPart; this.offset = offset; + this.emitted = emitted; } public int partition() { @@ -59,6 +70,14 @@ public class KafkaSpoutMessageId { return topicPart; } + public boolean isEmitted() { + return emitted; + } + + public void setEmitted(boolean emitted) { + this.emitted = emitted; + } + public String getMetadata(Thread currThread) { return "{" + "topic-partition=" + topicPart + http://git-wip-us.apache.org/repos/asf/storm/blob/8670f60d/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RecordTranslator.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RecordTranslator.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RecordTranslator.java index 2e72c99..71af4d0 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RecordTranslator.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RecordTranslator.java @@ -17,12 +17,14 @@ */ package org.apache.storm.kafka.spout; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.storm.tuple.Fields; + import java.io.Serializable; import java.util.Collections; import java.util.List; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.storm.tuple.Fields; +import static org.apache.storm.kafka.spout.KafkaSpoutConfig.Builder; /** * Translate a {@link org.apache.kafka.clients.consumer.ConsumerRecord} to a tuple. @@ -34,7 +36,8 @@ public interface RecordTranslator<K, V> extends Serializable, Func<ConsumerRecor * Translate the ConsumerRecord into a list of objects that can be emitted * @param record the record to translate * @return the objects in the tuple. Return a {@link KafkaTuple} - * if you want to route the tuple to a non-default stream + * if you want to route the tuple to a non-default stream. + * Return null to discard an invalid {@link ConsumerRecord} if {@link Builder#setEmitNullTuples(boolean)} is set to true */ List<Object> apply(ConsumerRecord<K,V> record); http://git-wip-us.apache.org/repos/asf/storm/blob/8670f60d/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java index 57e0120..9f62b90 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java @@ -17,14 +17,14 @@ */ package org.apache.storm.kafka.spout; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; +import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy; +import org.junit.Test; import java.util.HashMap; -import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy; -import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; public class KafkaSpoutConfigTest { @@ -39,4 +39,13 @@ public class KafkaSpoutConfigTest { expected.put("enable.auto.commit", "false"); assertEquals(expected, conf.getKafkaProps()); } + + @Test + public void test_setEmitNullTuples_true_true() { + final KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic") + .setEmitNullTuples(true) + .build(); + + assertTrue("Failed to set emit null tuples to true", conf.isEmitNullTuples()); + } }
