Repository: storm Updated Branches: refs/heads/master 402a371cc -> 19cdedfc8
STORM-2994: KafkaSpout consumes messages but doesn't commit offsets Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/33961ac7 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/33961ac7 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/33961ac7 Branch: refs/heads/master Commit: 33961ac7b76589a1fc8ecb1d4559517f755bc7d5 Parents: ffa607e Author: Rui Abreu <[email protected]> Authored: Tue Apr 3 10:21:05 2018 +0100 Committer: Rui Abreu <[email protected]> Committed: Tue Apr 3 10:27:30 2018 +0100 ---------------------------------------------------------------------- .../apache/storm/kafka/spout/KafkaSpout.java | 35 +++++--- .../storm/kafka/spout/KafkaSpoutMessageId.java | 28 +++--- .../kafka/spout/KafkaSpoutNullTupleTest.java | 93 ++++++++++++++++++++ .../SingleTopicKafkaSpoutConfiguration.java | 3 + 4 files changed, 131 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/33961ac7/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index 9f9f5bb..901e97f 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -24,7 +24,7 @@ import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrat import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST; import com.google.common.annotations.VisibleForTesting; -import java.io.IOException; + import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -49,7 +49,6 @@ import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.errors.RetriableException; import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy; import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee; -import org.apache.storm.kafka.spout.internal.CommitMetadata; import org.apache.storm.kafka.spout.internal.CommitMetadataManager; import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault; @@ -280,7 +279,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout { if (isAtLeastOnceProcessing()) { commitOffsetsForAckedTuples(kafkaConsumer.assignment()); } else if (kafkaSpoutConfig.getProcessingGuarantee() == ProcessingGuarantee.NO_GUARANTEE) { - Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = + Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = createFetchedOffsetsMetadata(kafkaConsumer.assignment()); kafkaConsumer.commitAsync(offsetsToCommit, null); LOG.debug("Committed offsets {} to Kafka", offsetsToCommit); @@ -368,7 +367,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout { numPolledRecords); if (kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) { //Commit polled records immediately to ensure delivery is at-most-once. - Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = + Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = createFetchedOffsetsMetadata(kafkaConsumer.assignment()); kafkaConsumer.commitSync(offsetsToCommit); LOG.debug("Committed offsets {} to Kafka", offsetsToCommit); @@ -484,8 +483,11 @@ public class KafkaSpout<K, V> extends BaseRichSpout { return true; } } else { + /*if a null tuple is not configured to be emitted, it should be marked as emitted and acked immediately + * to allow its offset to be commited to Kafka*/ LOG.debug("Not emitting null tuple for record [{}] as defined in configuration.", record); - msgId.setEmitted(false); + msgId.setNullTuple(true); + offsetManagers.get(tp).addToEmitMsgs(msgId.offset()); ack(msgId); } } @@ -506,7 +508,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout { } return offsetsToCommit; } - + private void commitOffsetsForAckedTuples(Set<TopicPartition> assignedPartitions) { // Find offsets that are ready to be committed for every assigned topic partition final Map<TopicPartition, OffsetManager> assignedOffsetManagers = offsetManagers.entrySet().stream() @@ -570,17 +572,22 @@ public class KafkaSpout<K, V> extends BaseRichSpout { // Only need to keep track of acked tuples if commits to Kafka are controlled by // tuple acks, which happens only for at-least-once processing semantics final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId; + + if (msgId.isNullTuple()) { + //a null tuple should be added to the ack list since by definition is a direct ack + offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId); + LOG.debug("Received direct ack for message [{}], associated with null tuple", msgId); + tupleListener.onAck(msgId); + return; + } + if (!emitted.contains(msgId)) { - if (msgId.isEmitted()) { - LOG.debug("Received ack for message [{}], associated with tuple emitted for a ConsumerRecord that " - + "came from a topic-partition that this consumer group instance is no longer tracking " - + "due to rebalance/partition reassignment. No action taken.", msgId); - } else { - LOG.debug("Received direct ack for message [{}], associated with null tuple", msgId); - } + LOG.debug("Received ack for message [{}], associated with tuple emitted for a ConsumerRecord that " + + "came from a topic-partition that this consumer group instance is no longer tracking " + + "due to rebalance/partition reassignment. No action taken.", msgId); } else { Validate.isTrue(!retryService.isScheduled(msgId), "The message id " + msgId + " is queued for retry while being acked." - + " This should never occur barring errors in the RetryService implementation or the spout code."); + + " This should never occur barring errors in the RetryService implementation or the spout code."); offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId); emitted.remove(msgId); } http://git-wip-us.apache.org/repos/asf/storm/blob/33961ac7/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java index 1626fee..ddf6391 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java @@ -27,33 +27,33 @@ public class KafkaSpoutMessageId implements Serializable { private final long offset; private int numFails = 0; /** - * true if the record was emitted using a form of collector.emit(...). false + * false if the record was emitted using a form of collector.emit(...). true * when skipping null tuples as configured by the user in KafkaSpoutConfig */ - private boolean emitted; + private boolean nullTuple; public KafkaSpoutMessageId(ConsumerRecord<?, ?> consumerRecord) { - this(consumerRecord, true); + this(consumerRecord, false); } - public KafkaSpoutMessageId(ConsumerRecord<?, ?> consumerRecord, boolean emitted) { - this(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord.offset(), emitted); + public KafkaSpoutMessageId(ConsumerRecord<?, ?> consumerRecord, boolean nullTuple) { + this(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord.offset(), nullTuple); } public KafkaSpoutMessageId(TopicPartition topicPart, long offset) { - this(topicPart, offset, true); + this(topicPart, offset, false); } /** * Creates a new KafkaSpoutMessageId. * @param topicPart The topic partition this message belongs to * @param offset The offset of this message - * @param emitted True iff this message is not being skipped as a null tuple + * @param nullTuple True if this message is being skipped as a null tuple */ - public KafkaSpoutMessageId(TopicPartition topicPart, long offset, boolean emitted) { + public KafkaSpoutMessageId(TopicPartition topicPart, long offset, boolean nullTuple) { this.topicPart = topicPart; this.offset = offset; - this.emitted = emitted; + this.nullTuple = nullTuple; } public int partition() { @@ -80,12 +80,12 @@ public class KafkaSpoutMessageId implements Serializable { return topicPart; } - public boolean isEmitted() { - return emitted; + public boolean isNullTuple() { + return nullTuple; } - public void setEmitted(boolean emitted) { - this.emitted = emitted; + public void setNullTuple(boolean nullTuple) { + this.nullTuple = nullTuple; } @Override @@ -94,7 +94,7 @@ public class KafkaSpoutMessageId implements Serializable { + "topic-partition=" + topicPart + ", offset=" + offset + ", numFails=" + numFails - + ", emitted=" + emitted + + ", nullTuple=" + nullTuple + '}'; } http://git-wip-us.apache.org/repos/asf/storm/blob/33961ac7/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutNullTupleTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutNullTupleTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutNullTupleTest.java new file mode 100644 index 0000000..0bbdb55 --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutNullTupleTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout; + + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration; +import org.apache.storm.tuple.Fields; +import org.apache.storm.utils.Time; +import org.junit.Test; + +import java.util.List; +import java.util.regex.Pattern; + +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; + +public class KafkaSpoutNullTupleTest extends KafkaSpoutAbstractTest { + + public KafkaSpoutNullTupleTest() { + super(2_000); + } + + + @Override + KafkaSpoutConfig<String, String> createSpoutConfig() { + + return KafkaSpoutConfig.builder("127.0.0.1:" + kafkaUnitRule.getKafkaUnit().getKafkaPort(), + Pattern.compile(SingleTopicKafkaSpoutConfiguration.TOPIC)) + .setOffsetCommitPeriodMs(commitOffsetPeriodMs) + .setRecordTranslator(new NullRecordExtractor()) + .build(); + } + + @Test + public void testShouldCommitAllMessagesIfNotSetToEmitNullTuples() throws Exception { + final int messageCount = 10; + prepareSpout(messageCount); + + //All null tuples should be commited, meaning they were considered by to be emitted and acked + for(int i = 0; i < messageCount; i++) { + spout.nextTuple(); + } + + verify(collectorMock,never()).emit( + anyString(), + anyList(), + any()); + + Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS); + //Commit offsets + spout.nextTuple(); + + verifyAllMessagesCommitted(messageCount); + } + + private class NullRecordExtractor implements RecordTranslator { + + @Override + public List<Object> apply(ConsumerRecord record) { + return null; + + } + + @Override + public Fields getFieldsFor(String stream) { + return new Fields("topic", "key", "value"); + } + + @Override + public Object apply(Object record) { + return null; + } + } + + +} http://git-wip-us.apache.org/repos/asf/storm/blob/33961ac7/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/config/builder/SingleTopicKafkaSpoutConfiguration.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/config/builder/SingleTopicKafkaSpoutConfiguration.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/config/builder/SingleTopicKafkaSpoutConfiguration.java index 4896267..f7e0a96 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/config/builder/SingleTopicKafkaSpoutConfiguration.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/config/builder/SingleTopicKafkaSpoutConfiguration.java @@ -29,6 +29,7 @@ import org.apache.storm.kafka.spout.subscription.TopicFilter; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; + public class SingleTopicKafkaSpoutConfiguration { public static final String STREAM = "test_stream"; @@ -54,6 +55,7 @@ public class SingleTopicKafkaSpoutConfiguration { .setPollTimeoutMs(1000); } + protected static KafkaSpoutRetryService getNoDelayRetryService() { /** * Retry in a tight loop (keep unit tests fasts). @@ -61,4 +63,5 @@ public class SingleTopicKafkaSpoutConfiguration { return new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0), DEFAULT_MAX_RETRIES, KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0)); } + }
