Repository: storm Updated Branches: refs/heads/master a1ef0f616 -> 3ee5543f0
STORM-2546: Fix storm-kafka-client spout getting stuck when retriable offsets were deleted from the Kafka log due to topic compaction Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/88583fea Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/88583fea Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/88583fea Branch: refs/heads/master Commit: 88583feabf6e39c6bc7c4e657c989ca5ba2671d2 Parents: a1ef0f6 Author: Stig Rohde Døssing <[email protected]> Authored: Sat Sep 2 23:50:33 2017 +0200 Committer: Stig Rohde Døssing <[email protected]> Committed: Wed Nov 15 08:20:20 2017 +0100 ---------------------------------------------------------------------- .../apache/storm/kafka/spout/KafkaSpout.java | 48 +++- .../storm/kafka/spout/KafkaSpoutConfig.java | 25 +++ .../storm/kafka/spout/KafkaSpoutCommitTest.java | 119 ---------- .../storm/kafka/spout/KafkaSpoutConfigTest.java | 24 ++ .../storm/kafka/spout/KafkaSpoutEmitTest.java | 9 +- .../KafkaSpoutLogCompactionSupportTest.java | 221 +++++++++++++++++++ .../spout/KafkaSpoutMessagingGuaranteeTest.java | 8 +- .../kafka/spout/KafkaSpoutRetryLimitTest.java | 3 +- .../SpoutWithMockedConsumerSetupHelper.java | 68 +++++- .../kafka/spout/internal/OffsetManagerTest.java | 1 - 10 files changed, 382 insertions(+), 144 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/88583fea/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index 3364fb0..425681d 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -91,7 +91,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout { // Always empty if processing guarantee is none or at-most-once private transient Set<KafkaSpoutMessageId> emitted; // Records that have been polled and are queued to be emitted in the nextTuple() call. One record is emitted per nextTuple() - private transient Iterator<ConsumerRecord<K, V>> waitingToEmit; + private transient Iterator<ConsumerRecord<K, V>> waitingToEmit; // Triggers when a subscription should be refreshed private transient Timer refreshSubscriptionTimer; private transient TopologyContext context; @@ -152,7 +152,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout { previousAssignment = partitions; LOG.info("Partitions revoked. [consumer-group={}, consumer={}, topic-partitions={}]", - kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions); + kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions); if (isAtLeastOnceProcessing()) { commitOffsetsForAckedTuples(); @@ -265,7 +265,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout { private Set<TopicPartition> poll() { final int maxUncommittedOffsets = kafkaSpoutConfig.getMaxUncommittedOffsets(); - + if (waitingToEmit()) { LOG.debug("Not polling. Tuples waiting to be emitted."); return Collections.emptySet(); @@ -311,12 +311,13 @@ public class KafkaSpout<K, V> extends BaseRichSpout { // ======== poll ========= private ConsumerRecords<K, V> pollKafkaBroker(Set<TopicPartition> pollablePartitions) { - doSeekRetriableTopicPartitions(pollablePartitions); + final Map<TopicPartition, Long> retriableOffsets = doSeekRetriableTopicPartitions(pollablePartitions); Set<TopicPartition> pausedPartitions = new HashSet<>(kafkaConsumer.assignment()); pausedPartitions.removeIf(pollablePartitions::contains); try { kafkaConsumer.pause(pausedPartitions); final ConsumerRecords<K, V> consumerRecords = kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs()); + ackRetriableOffsetsIfCompactedAway(retriableOffsets, consumerRecords); final int numPolledRecords = consumerRecords.count(); LOG.debug("Polled [{}] records from Kafka.", numPolledRecords); @@ -330,13 +331,42 @@ public class KafkaSpout<K, V> extends BaseRichSpout { } } - private void doSeekRetriableTopicPartitions(Set<TopicPartition> pollablePartitions) { + private Map<TopicPartition, Long> doSeekRetriableTopicPartitions(Set<TopicPartition> pollablePartitions) { final Map<TopicPartition, Long> retriableTopicPartitions = retryService.earliestRetriableOffsets(); - + for (TopicPartition tp : retriableTopicPartitions.keySet()) { + if (!pollablePartitions.contains(tp)) { + retriableTopicPartitions.remove(tp); + } + } for (Entry<TopicPartition, Long> retriableTopicPartitionAndOffset : retriableTopicPartitions.entrySet()) { - if (pollablePartitions.contains(retriableTopicPartitionAndOffset.getKey())) { - //Seek directly to the earliest retriable message for each retriable topic partition - kafkaConsumer.seek(retriableTopicPartitionAndOffset.getKey(), retriableTopicPartitionAndOffset.getValue()); + //Seek directly to the earliest retriable message for each retriable topic partition + kafkaConsumer.seek(retriableTopicPartitionAndOffset.getKey(), retriableTopicPartitionAndOffset.getValue()); + } + return retriableTopicPartitions; + } + + private void ackRetriableOffsetsIfCompactedAway(Map<TopicPartition, Long> earliestRetriableOffsets, + ConsumerRecords<K, V> consumerRecords) { + for (Entry<TopicPartition, Long> entry : earliestRetriableOffsets.entrySet()) { + TopicPartition tp = entry.getKey(); + List<ConsumerRecord<K, V>> records = consumerRecords.records(tp); + if (!records.isEmpty()) { + ConsumerRecord<K, V> record = records.get(0); + long seekOffset = entry.getValue(); + long earliestReceivedOffset = record.offset(); + if (seekOffset < earliestReceivedOffset) { + //Since we asked for tuples starting at seekOffset, some retriable records must have been compacted away. + //Ack up to the first offset received if the record is not already acked or currently in the topology + for (long i = seekOffset; i < earliestReceivedOffset; i++) { + KafkaSpoutMessageId msgId = retryService.getMessageId(tp, i); + if (!offsetManagers.get(tp).contains(msgId) && !emitted.contains(msgId)) { + LOG.debug("Record at offset [{}] appears to have been compacted away from topic [{}], marking as acked", i, tp); + retryService.remove(msgId); + emitted.add(msgId); + ack(msgId); + } + } + } } } } http://git-wip-us.apache.org/repos/asf/storm/blob/88583fea/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java index d5fceb4..ca8dcee 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java @@ -22,6 +22,7 @@ import java.io.Serializable; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Properties; import java.util.Set; import java.util.regex.Pattern; @@ -38,6 +39,8 @@ import org.apache.storm.kafka.spout.subscription.PatternTopicFilter; import org.apache.storm.kafka.spout.subscription.RoundRobinManualPartitioner; import org.apache.storm.kafka.spout.subscription.Subscription; import org.apache.storm.tuple.Fields; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * KafkaSpoutConfig defines the required configuration to connect a consumer to a consumer group, as well as the subscribing topics. @@ -65,6 +68,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable { public static final ProcessingGuarantee DEFAULT_PROCESSING_GUARANTEE = ProcessingGuarantee.AT_LEAST_ONCE; public static final KafkaTupleListener DEFAULT_TUPLE_LISTENER = new EmptyKafkaTupleListener(); + public static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutConfig.class); // Kafka consumer configuration private final Map<String, Object> kafkaProps; @@ -439,6 +443,27 @@ public class KafkaSpoutConfig<K, V> implements Serializable { if (builder.processingGuarantee == ProcessingGuarantee.NONE) { builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); } else { + String autoOffsetResetPolicy = (String)builder.kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG); + if (builder.processingGuarantee == ProcessingGuarantee.AT_LEAST_ONCE) { + if (autoOffsetResetPolicy == null) { + /* + If the user wants to explicitly set an auto offset reset policy, we should respect it, but when the spout is configured + for at-least-once processing we should default to seeking to the earliest offset in case there's an offset out of range + error, rather than seeking to the latest (Kafka's default). This type of error will typically happen when the consumer + requests an offset that was deleted. + */ + builder.kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + } else if (!autoOffsetResetPolicy.equals("earliest") && !autoOffsetResetPolicy.equals("none")) { + LOG.warn("Cannot guarantee at-least-once processing with auto.offset.reset.policy other than 'earliest' or 'none'." + + " Some messages may be skipped."); + } + } else if (builder.processingGuarantee == ProcessingGuarantee.AT_MOST_ONCE) { + if (autoOffsetResetPolicy != null + && (!autoOffsetResetPolicy.equals("latest") && !autoOffsetResetPolicy.equals("none"))) { + LOG.warn("Cannot guarantee at-most-once processing with auto.offset.reset.policy other than 'latest' or 'none'." + + " Some messages may be processed more than once."); + } + } builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); } } http://git-wip-us.apache.org/repos/asf/storm/blob/88583fea/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java deleted file mode 100644 index 441e649..0000000 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Copyright 2017 The Apache Software Foundation. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.kafka.spout; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.kafka.clients.consumer.*; -import org.apache.kafka.common.TopicPartition; -import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration; -import org.apache.storm.spout.SpoutOutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.utils.Time; -import org.apache.storm.utils.Time.SimulatedTime; -import org.junit.Before; -import org.junit.Test; -import org.mockito.ArgumentCaptor; -import org.mockito.Captor; -import org.mockito.InOrder; -import org.mockito.MockitoAnnotations; - -import static org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder; -import static org.mockito.ArgumentMatchers.anyList; - -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.inOrder; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -public class KafkaSpoutCommitTest { - - private final long offsetCommitPeriodMs = 2_000; - private final TopologyContext contextMock = mock(TopologyContext.class); - private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class); - private final Map<String, Object> conf = new HashMap<>(); - private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1); - private KafkaConsumer<String, String> consumerMock; - private KafkaSpoutConfig<String, String> spoutConfig; - - @Captor - private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture; - - @Before - public void setUp() { - MockitoAnnotations.initMocks(this); - spoutConfig = createKafkaSpoutConfigBuilder(-1) - .setOffsetCommitPeriodMs(offsetCommitPeriodMs) - .build(); - consumerMock = mock(KafkaConsumer.class); - } - - @Test - public void testCommitSuccessWithOffsetVoids() { - //Verify that the commit logic can handle offset voids - try (SimulatedTime simulatedTime = new SimulatedTime()) { - Set<TopicPartition> assignedPartitions = Collections.singleton(partition); - KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, assignedPartitions); - Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>(); - List<ConsumerRecord<String, String>> recordsForPartition = new ArrayList<>(); - // Offsets emitted are 0,1,2,3,4,<void>,8,9 - recordsForPartition.addAll(SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 5)); - recordsForPartition.addAll(SpoutWithMockedConsumerSetupHelper.createRecords(partition, 8, 2)); - records.put(partition, recordsForPartition); - - when(consumerMock.poll(anyLong())) - .thenReturn(new ConsumerRecords<>(records)); - - for (int i = 0; i < recordsForPartition.size(); i++) { - spout.nextTuple(); - } - - ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); - verify(collectorMock, times(recordsForPartition.size())).emit(anyString(), anyList(), messageIds.capture()); - - for (KafkaSpoutMessageId messageId : messageIds.getAllValues()) { - spout.ack(messageId); - } - - // Advance time and then trigger first call to kafka consumer commit; the commit must progress to offset 9 - Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs); - when(consumerMock.poll(anyLong())) - .thenReturn(new ConsumerRecords<>(Collections.emptyMap())); - spout.nextTuple(); - - InOrder inOrder = inOrder(consumerMock); - inOrder.verify(consumerMock).commitSync(commitCapture.capture()); - inOrder.verify(consumerMock).poll(anyLong()); - - //verify that Offset 10 was last committed offset, since this is the offset the spout should resume at - Map<TopicPartition, OffsetAndMetadata> commits = commitCapture.getValue(); - assertTrue(commits.containsKey(partition)); - assertEquals(10, commits.get(partition).offset()); - } - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/88583fea/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java index 051d212..e42719b 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java @@ -17,8 +17,11 @@ */ package org.apache.storm.kafka.spout; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.nullValue; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import java.util.HashMap; @@ -38,6 +41,7 @@ public class KafkaSpoutConfigTest { HashMap<String, Object> expected = new HashMap<>(); expected.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:1234"); expected.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + expected.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); expected.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); expected.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); assertEquals(expected, conf.getKafkaProps()); @@ -51,4 +55,24 @@ public class KafkaSpoutConfigTest { assertTrue("Failed to set emit null tuples to true", conf.isEmitNullTuples()); } + + @Test + public void testShouldNotChangeAutoOffsetResetPolicyWhenNotUsingAtLeastOnce() { + KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic") + .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) + .build(); + + assertThat("When at-least-once is not specified, the spout should use the Kafka default auto offset reset policy", + conf.getKafkaProps().get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), nullValue()); + } + + @Test + public void testWillRespectExplicitAutoOffsetResetPolicy() { + KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic") + .setProp(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none") + .build(); + + assertThat("Should allow users to pick a different auto offset reset policy than the one recommended for the at-least-once processing guarantee", + conf.getKafkaProps().get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), is("none")); + } } http://git-wip-us.apache.org/repos/asf/storm/blob/88583fea/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java index 7cfd6b7..91831e4 100755 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java @@ -74,7 +74,7 @@ public class KafkaSpoutEmitTest { public void testNextTupleEmitsAtMostOneTuple() { //The spout should emit at most one message per call to nextTuple //This is necessary for Storm to be able to throttle the spout according to maxSpoutPending - KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition)); + KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition); Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>(); records.put(partition, SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 10)); @@ -92,7 +92,7 @@ public class KafkaSpoutEmitTest { //Emit maxUncommittedOffsets messages, and fail all of them. Then ensure that the spout will retry them when the retry backoff has passed try (SimulatedTime simulatedTime = new SimulatedTime()) { - KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition)); + KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition); Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>(); int numRecords = spoutConfig.getMaxUncommittedOffsets(); //This is cheating a bit since maxPollRecords would normally spread this across multiple polls @@ -139,10 +139,7 @@ public class KafkaSpoutEmitTest { //This verifies that partitions can't prevent each other from retrying tuples due to the maxUncommittedOffsets limit. try (SimulatedTime simulatedTime = new SimulatedTime()) { TopicPartition partitionTwo = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 2); - Set<TopicPartition> partitions = new HashSet<>(); - partitions.add(partition); - partitions.add(partitionTwo); - KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partitions); + KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition, partitionTwo); Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>(); //This is cheating a bit since maxPollRecords would normally spread this across multiple polls records.put(partition, SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, spoutConfig.getMaxUncommittedOffsets())); http://git-wip-us.apache.org/repos/asf/storm/blob/88583fea/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java new file mode 100644 index 0000000..b824d07 --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java @@ -0,0 +1,221 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Time.SimulatedTime; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.InOrder; +import org.mockito.MockitoAnnotations; + +import static org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder; +import static org.hamcrest.CoreMatchers.is; + +import static org.hamcrest.Matchers.hasKey; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class KafkaSpoutLogCompactionSupportTest { + + private final long offsetCommitPeriodMs = 2_000; + private final TopologyContext contextMock = mock(TopologyContext.class); + private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class); + private final Map<String, Object> conf = new HashMap<>(); + private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1); + private KafkaConsumer<String, String> consumerMock; + private KafkaSpoutConfig<String, String> spoutConfig; + + @Captor + private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + spoutConfig = createKafkaSpoutConfigBuilder(-1) + .setOffsetCommitPeriodMs(offsetCommitPeriodMs) + .build(); + consumerMock = mock(KafkaConsumer.class); + } + + @Test + public void testCommitSuccessWithOffsetVoids() { + //Verify that the commit logic can handle offset voids due to log compaction + try (SimulatedTime simulatedTime = new SimulatedTime()) { + KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition); + Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>(); + List<ConsumerRecord<String, String>> recordsForPartition = new ArrayList<>(); + // Offsets emitted are 0,1,2,3,4,<void>,8,9 + recordsForPartition.addAll(SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 5)); + recordsForPartition.addAll(SpoutWithMockedConsumerSetupHelper.createRecords(partition, 8, 2)); + records.put(partition, recordsForPartition); + + when(consumerMock.poll(anyLong())) + .thenReturn(new ConsumerRecords<>(records)); + + for (int i = 0; i < recordsForPartition.size(); i++) { + spout.nextTuple(); + } + + ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); + verify(collectorMock, times(recordsForPartition.size())).emit(anyString(), anyList(), messageIds.capture()); + + for (KafkaSpoutMessageId messageId : messageIds.getAllValues()) { + spout.ack(messageId); + } + + // Advance time and then trigger first call to kafka consumer commit; the commit must progress to offset 9 + Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs); + when(consumerMock.poll(anyLong())) + .thenReturn(new ConsumerRecords<>(Collections.emptyMap())); + spout.nextTuple(); + + InOrder inOrder = inOrder(consumerMock); + inOrder.verify(consumerMock).commitSync(commitCapture.capture()); + inOrder.verify(consumerMock).poll(anyLong()); + + //verify that Offset 10 was last committed offset, since this is the offset the spout should resume at + Map<TopicPartition, OffsetAndMetadata> commits = commitCapture.getValue(); + assertTrue(commits.containsKey(partition)); + assertEquals(10, commits.get(partition).offset()); + } + } + + @Test + public void testWillSkipRetriableTuplesIfOffsetsAreCompactedAway() { + /* + Verify that failed offsets will only retry if the corresponding message exists. + When log compaction is enabled in Kafka it is possible that a tuple can fail, + and then be impossible to retry because the message in Kafka has been deleted. + The spout needs to quietly ack such tuples to allow commits to progress past the deleted offset. + */ + try (SimulatedTime simulatedTime = new SimulatedTime()) { + TopicPartition partitionTwo = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 2); + KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition, partitionTwo); + + List<KafkaSpoutMessageId> firstPartitionMsgIds = SpoutWithMockedConsumerSetupHelper + .pollAndEmit(spout, consumerMock, 3, collectorMock, partition, 0, 1, 2); + reset(collectorMock); + List<KafkaSpoutMessageId> secondPartitionMsgIds = SpoutWithMockedConsumerSetupHelper + .pollAndEmit(spout, consumerMock, 3, collectorMock, partitionTwo, 0, 1, 2); + reset(collectorMock); + + for(int i = 0; i < 3; i++) { + spout.fail(firstPartitionMsgIds.get(i)); + spout.fail(secondPartitionMsgIds.get(i)); + } + + Time.advanceTime(50); + + //The failed tuples are ready for retry. Make it appear like 0 and 1 on the first partition were compacted away. + //In this case the second partition acts as control to verify that we only skip past offsets that are no longer present. + Map<TopicPartition, int[]> retryOffsets = new HashMap<>(); + retryOffsets.put(partition, new int[] {2}); + retryOffsets.put(partitionTwo, new int[] {0, 1, 2}); + int expectedEmits = 4; //2 on first partition, 0-2 on second partition + List<KafkaSpoutMessageId> retryMessageIds = SpoutWithMockedConsumerSetupHelper.pollAndEmit(spout, consumerMock, expectedEmits, collectorMock, retryOffsets); + + Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs); + spout.nextTuple(); + + verify(consumerMock).commitSync(commitCapture.capture()); + Map<TopicPartition, OffsetAndMetadata> committed = commitCapture.getValue(); + assertThat(committed.keySet(), is(Collections.singleton(partition))); + assertThat("The first partition should have committed up to the first retriable tuple that is not missing", committed.get(partition).offset(), is(2L)); + + for(KafkaSpoutMessageId msgId : retryMessageIds) { + spout.ack(msgId); + } + + //The spout should now commit all the offsets, since all offsets are either acked or were missing when retrying + Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs); + spout.nextTuple(); + + verify(consumerMock, times(2)).commitSync(commitCapture.capture()); + committed = commitCapture.getValue(); + assertThat(committed, hasKey(partition)); + assertThat(committed, hasKey(partitionTwo)); + assertThat(committed.get(partition).offset(), is(3L)); + assertThat(committed.get(partitionTwo).offset(), is(3L)); + } + } + + @Test + public void testWillSkipRetriableTuplesIfOffsetsAreCompactedAwayWithoutAckingPendingTuples() { + //Demonstrate that the spout doesn't ack pending tuples when skipping compacted tuples. The pending tuples should be allowed to finish normally. + try (SimulatedTime simulatedTime = new SimulatedTime()) { + KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition); + + List<KafkaSpoutMessageId> firstPartitionMsgIds = SpoutWithMockedConsumerSetupHelper + .pollAndEmit(spout, consumerMock, 3, collectorMock, partition, 0, 1, 2); + reset(collectorMock); + + spout.fail(firstPartitionMsgIds.get(0)); + spout.fail(firstPartitionMsgIds.get(2)); + + Time.advanceTime(50); + + //The failed tuples are ready for retry. Make it appear like 0 and 1 were compacted away. + List<KafkaSpoutMessageId> retryMessageIds = SpoutWithMockedConsumerSetupHelper.pollAndEmit(spout, consumerMock, 1, collectorMock, partition, 2); + for(KafkaSpoutMessageId msgId : retryMessageIds) { + spout.ack(msgId); + } + + Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs); + spout.nextTuple(); + + verify(consumerMock).commitSync(commitCapture.capture()); + Map<TopicPartition, OffsetAndMetadata> committed = commitCapture.getValue(); + assertThat(committed.keySet(), is(Collections.singleton(partition))); + assertThat("The first partition should have committed the missing offset, but no further since the next tuple is pending", + committed.get(partition).offset(), is(1L)); + + spout.ack(firstPartitionMsgIds.get(1)); + + Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs); + spout.nextTuple(); + + verify(consumerMock, times(2)).commitSync(commitCapture.capture()); + committed = commitCapture.getValue(); + assertThat(committed.keySet(), is(Collections.singleton(partition))); + assertThat("The first partition should have committed all offsets", committed.get(partition).offset(), is(3L)); + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/88583fea/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java index 07ee2dc..0948957 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java @@ -67,7 +67,7 @@ public class KafkaSpoutMessagingGuaranteeTest { KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(-1) .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) .build(); - KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition)); + KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition); when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition, SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 1)))); @@ -82,7 +82,7 @@ public class KafkaSpoutMessagingGuaranteeTest { } private void doTestModeDisregardsMaxUncommittedOffsets(KafkaSpoutConfig<String, String> spoutConfig) { - KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition)); + KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition); when(consumerMock.poll(anyLong())) .thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition, @@ -117,7 +117,7 @@ public class KafkaSpoutMessagingGuaranteeTest { } private void doTestModeCannotReplayTuples(KafkaSpoutConfig<String, String> spoutConfig) { - KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition)); + KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition); when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition, SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 1)))); @@ -163,7 +163,7 @@ public class KafkaSpoutMessagingGuaranteeTest { private void doTestModeDoesNotCommitAckedTuples(KafkaSpoutConfig<String, String> spoutConfig) { try (SimulatedTime time = new SimulatedTime()) { - KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition)); + KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock,partition); when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition, SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 1)))); http://git-wip-us.apache.org/repos/asf/storm/blob/88583fea/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java index 1cad6c2..2f06db3 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java @@ -23,7 +23,6 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -79,7 +78,7 @@ public class KafkaSpoutRetryLimitTest { public void testFailingTupleCompletesAckAfterRetryLimitIsMet() { //Spout should ack failed messages after they hit the retry limit try (SimulatedTime simulatedTime = new SimulatedTime()) { - KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition)); + KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition); Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>(); int lastOffset = 3; int numRecords = lastOffset + 1; http://git-wip-us.apache.org/repos/asf/storm/blob/88583fea/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java index ad9bd75..407ab84 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java @@ -17,22 +17,32 @@ package org.apache.storm.kafka.spout; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.stream.Collectors; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; +import org.mockito.ArgumentCaptor; public class SpoutWithMockedConsumerSetupHelper { @@ -50,16 +60,17 @@ public class SpoutWithMockedConsumerSetupHelper { * @return The spout */ public static <K, V> KafkaSpout<K, V> setupSpout(KafkaSpoutConfig<K, V> spoutConfig, Map<String, Object> topoConf, - TopologyContext contextMock, SpoutOutputCollector collectorMock, KafkaConsumer<K, V> consumerMock, Set<TopicPartition> assignedPartitions) { + TopologyContext contextMock, SpoutOutputCollector collectorMock, KafkaConsumer<K, V> consumerMock, TopicPartition... assignedPartitions) { + Set<TopicPartition> assignedPartitionsSet = new HashSet<>(Arrays.asList(assignedPartitions)); - stubAssignment(contextMock, consumerMock, assignedPartitions); + stubAssignment(contextMock, consumerMock, assignedPartitionsSet); KafkaConsumerFactory<K, V> consumerFactory = (kafkaSpoutConfig) -> consumerMock; KafkaSpout<K, V> spout = new KafkaSpout<>(spoutConfig, consumerFactory); spout.open(topoConf, contextMock, collectorMock); spout.activate(); - verify(consumerMock).assign(assignedPartitions); + verify(consumerMock).assign(assignedPartitionsSet); return spout; } @@ -104,4 +115,55 @@ public class SpoutWithMockedConsumerSetupHelper { return recordsForPartition; } + /** + * Creates messages for the input offsets, emits the messages by calling nextTuple once per offset and returns the captured message ids + * @param <K> The Kafka key type + * @param <V> The Kafka value type + * @param spout The spout + * @param consumerMock The consumer used by the spout + * @param expectedEmits The number of expected emits + * @param collectorMock The collector used by the spout + * @param partition The partition to emit messages on + * @param offsetsToEmit The offsets to emit + * @return The message ids emitted by the spout during the nextTuple calls + */ + public static <K, V> List<KafkaSpoutMessageId> pollAndEmit(KafkaSpout<K, V> spout, KafkaConsumer<K, V> consumerMock, int expectedEmits, SpoutOutputCollector collectorMock, TopicPartition partition, int... offsetsToEmit) { + return pollAndEmit(spout, consumerMock, expectedEmits, collectorMock, Collections.singletonMap(partition, offsetsToEmit)); + } + + /** + * Creates messages for the input offsets, emits the messages by calling nextTuple once per offset and returns the captured message ids + * @param <K> The Kafka key type + * @param <V> The Kafka value type + * @param spout The spout + * @param consumerMock The consumer used by the spout + * @param collectorMock The collector used by the spout + * @param offsetsToEmit The offsets to emit per partition + * @return The message ids emitted by the spout during the nextTuple calls + */ + public static <K, V> List<KafkaSpoutMessageId> pollAndEmit(KafkaSpout<K, V> spout, KafkaConsumer<K, V> consumerMock, int expectedEmits, SpoutOutputCollector collectorMock, Map<TopicPartition, int[]> offsetsToEmit) { + int totalOffsets = 0; + Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new HashMap<>(); + for (Entry<TopicPartition, int[]> entry : offsetsToEmit.entrySet()) { + TopicPartition tp = entry.getKey(); + List<ConsumerRecord<K, V>> tpRecords = new ArrayList<>(); + for (Integer offset : entry.getValue()) { + tpRecords.add(new ConsumerRecord<>(tp.topic(), tp.partition(), offset, null, null)); + totalOffsets++; + } + records.put(tp, tpRecords); + } + + when(consumerMock.poll(anyLong())) + .thenReturn(new ConsumerRecords<>(records)); + + for (int i = 0; i < totalOffsets; i++) { + spout.nextTuple(); + } + + ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); + verify(collectorMock, times(expectedEmits)).emit(anyString(), anyList(), messageIds.capture()); + return messageIds.getAllValues(); + } + } http://git-wip-us.apache.org/repos/asf/storm/blob/88583fea/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java index 3acc252..46b3c9b 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java @@ -21,7 +21,6 @@ import static org.hamcrest.CoreMatchers.nullValue; import static org.junit.Assert.assertThat; import java.util.NoSuchElementException; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.storm.kafka.spout.KafkaSpoutMessageId;
