Repository: storm Updated Branches: refs/heads/master a894fe63a -> 41d6cdcc9
STORM-2869: Only discard outdated records when adjusting KafkaConsumer position during commit Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/df5187e8 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/df5187e8 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/df5187e8 Branch: refs/heads/master Commit: df5187e803d38f2cbb35ac9f1853ed7f2410ad0e Parents: f998e26 Author: Stig Rohde Døssing <s...@apache.org> Authored: Tue Dec 26 20:55:30 2017 +0100 Committer: Stig Rohde Døssing <s...@apache.org> Committed: Tue Dec 26 23:17:13 2017 +0100 ---------------------------------------------------------------------- .../apache/storm/kafka/spout/KafkaSpout.java | 63 +-- .../kafka/spout/KafkaSpoutAbstractTest.java | 8 +- .../kafka/spout/KafkaSpoutSingleTopicTest.java | 81 ++-- ...outTopologyDeployActivateDeactivateTest.java | 4 +- .../kafka/spout/SingleTopicKafkaSpoutTest.java | 379 ------------------- .../spout/SingleTopicKafkaUnitSetupHelper.java | 4 +- 6 files changed, 86 insertions(+), 453 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/df5187e8/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index b7e3136..9fe654f 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -28,12 +28,12 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -98,7 +98,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout { // Always empty if processing guarantee is none or at-most-once private transient Set<KafkaSpoutMessageId> emitted; // Records that have been polled and are queued to be emitted in the nextTuple() call. One record is emitted per nextTuple() - private transient Iterator<ConsumerRecord<K, V>> waitingToEmit; + private transient Map<TopicPartition, List<ConsumerRecord<K, V>>> waitingToEmit; // Triggers when a subscription should be refreshed private transient Timer refreshSubscriptionTimer; private transient TopologyContext context; @@ -138,7 +138,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout { offsetManagers = new HashMap<>(); emitted = new HashSet<>(); - waitingToEmit = Collections.emptyListIterator(); + waitingToEmit = new HashMap<>(); setCommitMetadata(context); tupleListener.open(conf, context); @@ -151,7 +151,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout { commitMetadata = JSON_MAPPER.writeValueAsString(new CommitMetadata( context.getStormId(), context.getThisTaskId(), Thread.currentThread().getName())); } catch (JsonProcessingException e) { - LOG.error("Failed to create Kafka commit metadata due to JSON serialization error",e); + LOG.error("Failed to create Kafka commit metadata due to JSON serialization error", e); throw new RuntimeException(e); } } @@ -164,7 +164,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout { private class KafkaSpoutConsumerRebalanceListener implements ConsumerRebalanceListener { private Collection<TopicPartition> previousAssignment = new HashSet<>(); - + @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { previousAssignment = partitions; @@ -198,6 +198,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout { */ emitted.removeIf(msgId -> !partitions.contains(msgId.getTopicPartition())); } + waitingToEmit.keySet().retainAll(partitions); Set<TopicPartition> newPartitions = new HashSet<>(partitions); newPartitions.removeAll(previousAssignment); @@ -252,8 +253,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout { } /** - * Checks If {@link OffsetAndMetadata} was committed by a {@link KafkaSpout} instance in this topology. - * This info is used to decide if {@link FirstPollOffsetStrategy} should be applied + * Checks if {@link OffsetAndMetadata} was committed by a {@link KafkaSpout} instance in this topology. This info is used to decide if + * {@link FirstPollOffsetStrategy} should be applied * * @param tp topic-partition * @param committedOffset {@link OffsetAndMetadata} info committed to Kafka @@ -271,7 +272,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout { LOG.warn("Failed to deserialize [{}]. Error likely occurred because the last commit " + "for this topic-partition was done using an earlier version of Storm. " + "Defaulting to behavior compatible with earlier version", committedOffset); - LOG.trace("",e); + LOG.trace("", e); return false; } } @@ -318,12 +319,12 @@ public class KafkaSpout<K, V> extends BaseRichSpout { LOG.debug("Not polling. Tuples waiting to be emitted."); return new PollablePartitionsInfo(Collections.emptySet(), Collections.emptyMap()); } - + Set<TopicPartition> assignment = kafkaConsumer.assignment(); if (!isAtLeastOnceProcessing()) { return new PollablePartitionsInfo(assignment, Collections.emptyMap()); } - + Map<TopicPartition, Long> earliestRetriableOffsets = retryService.earliestRetriableOffsets(); Set<TopicPartition> pollablePartitions = new HashSet<>(); final int maxUncommittedOffsets = kafkaSpoutConfig.getMaxUncommittedOffsets(); @@ -349,15 +350,14 @@ public class KafkaSpout<K, V> extends BaseRichSpout { } private boolean isWaitingToEmit() { - return waitingToEmit != null && waitingToEmit.hasNext(); + return waitingToEmit.values().stream() + .anyMatch(list -> !list.isEmpty()); } private void setWaitingToEmit(ConsumerRecords<K, V> consumerRecords) { - List<ConsumerRecord<K, V>> waitingToEmitList = new LinkedList<>(); for (TopicPartition tp : consumerRecords.partitions()) { - waitingToEmitList.addAll(consumerRecords.records(tp)); + waitingToEmit.put(tp, new ArrayList<>(consumerRecords.records(tp))); } - waitingToEmit = waitingToEmitList.iterator(); } // ======== poll ========= @@ -417,12 +417,17 @@ public class KafkaSpout<K, V> extends BaseRichSpout { // ======== emit ========= private void emitIfWaitingNotEmitted() { - while (isWaitingToEmit()) { - final boolean emittedTuple = emitOrRetryTuple(waitingToEmit.next()); - waitingToEmit.remove(); - if (emittedTuple) { - break; + Iterator<List<ConsumerRecord<K, V>>> waitingToEmitIter = waitingToEmit.values().iterator(); + outerLoop: + while (waitingToEmitIter.hasNext()) { + List<ConsumerRecord<K, V>> waitingToEmitForTp = waitingToEmitIter.next(); + while (!waitingToEmitForTp.isEmpty()) { + final boolean emittedTuple = emitOrRetryTuple(waitingToEmitForTp.remove(0)); + if (emittedTuple) { + break outerLoop; + } } + waitingToEmitIter.remove(); } } @@ -443,7 +448,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout { } else { final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp); if (committedOffset != null && isOffsetCommittedByThisTopology(tp, committedOffset) - && committedOffset.offset() > kafkaConsumer.position(tp)) { + && committedOffset.offset() > record.offset()) { // Ensures that after a topology with this id is started, the consumer fetch // position never falls behind the committed offset (STORM-2844) throw new IllegalStateException("Attempting to emit a message that has already been committed."); @@ -522,15 +527,21 @@ public class KafkaSpout<K, V> extends BaseRichSpout { * The position is behind the committed offset. This can happen in some cases, e.g. if a message failed, * lots of (more than max.poll.records) later messages were acked, and the failed message then gets acked. * The consumer may only be part way through "catching up" to where it was when it went back to retry the failed tuple. - * Skip the consumer forward to the committed offset drop the current waiting to emit list, + * Skip the consumer forward to the committed offset and drop the current waiting to emit list, * since it'll likely contain committed offsets. */ LOG.debug("Consumer fell behind committed offset. Catching up. Position was [{}], skipping to [{}]", position, committedOffset); kafkaConsumer.seek(tp, committedOffset); - waitingToEmit = null; + List<ConsumerRecord<K, V>> waitingToEmitForTp = waitingToEmit.get(tp); + if (waitingToEmitForTp != null) { + //Discard the pending records that are already committed + waitingToEmit.put(tp, waitingToEmitForTp.stream() + .filter(record -> record.offset() >= committedOffset) + .collect(Collectors.toList())); + } } - + final OffsetManager offsetManager = assignedOffsetManagers.get(tp); offsetManager.commit(tpOffset.getValue()); LOG.debug("[{}] uncommitted offsets for partition [{}] after commit", offsetManager.getNumUncommittedOffsets(), tp); @@ -678,20 +689,20 @@ public class KafkaSpout<K, V> extends BaseRichSpout { private String getTopicsString() { return kafkaSpoutConfig.getSubscription().getTopicsString(); } - + private static class PollablePartitionsInfo { private final Set<TopicPartition> pollablePartitions; //The subset of earliest retriable offsets that are on pollable partitions private final Map<TopicPartition, Long> pollableEarliestRetriableOffsets; - + public PollablePartitionsInfo(Set<TopicPartition> pollablePartitions, Map<TopicPartition, Long> earliestRetriableOffsets) { this.pollablePartitions = pollablePartitions; this.pollableEarliestRetriableOffsets = earliestRetriableOffsets.entrySet().stream() .filter(entry -> pollablePartitions.contains(entry.getKey())) .collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue())); } - + public boolean shouldPoll() { return !this.pollablePartitions.isEmpty(); } http://git-wip-us.apache.org/repos/asf/storm/blob/df5187e8/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java index 5254320..dd29696 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java @@ -53,7 +53,7 @@ public abstract class KafkaSpoutAbstractTest { final TopologyContext topologyContext = mock(TopologyContext.class); final Map<String, Object> conf = new HashMap<>(); - final SpoutOutputCollector collector = mock(SpoutOutputCollector.class); + final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class); final long commitOffsetPeriodMs; KafkaConsumer<String, String> consumerSpy; @@ -109,7 +109,7 @@ public abstract class KafkaSpoutAbstractTest { void prepareSpout(int messageCount) throws Exception { SingleTopicKafkaUnitSetupHelper.populateTopicData(kafkaUnitRule.getKafkaUnit(), SingleTopicKafkaSpoutConfiguration.TOPIC, messageCount); - SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, topologyContext, collector); + SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, topologyContext, collectorMock); } /** @@ -131,7 +131,7 @@ public abstract class KafkaSpoutAbstractTest { spout.ack(messageId.getValue()); - reset(collector); + reset(collectorMock); return messageId; } @@ -140,7 +140,7 @@ public abstract class KafkaSpoutAbstractTest { ArgumentCaptor<Object> verifyMessageEmitted(int offset) { final ArgumentCaptor<Object> messageId = ArgumentCaptor.forClass(Object.class); - verify(collector).emit( + verify(collectorMock).emit( eq(SingleTopicKafkaSpoutConfiguration.STREAM), eq(new Values(SingleTopicKafkaSpoutConfiguration.TOPIC, Integer.toString(offset), http://git-wip-us.apache.org/repos/asf/storm/blob/df5187e8/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java index 33c71bc..1470332 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java @@ -28,13 +28,14 @@ import java.util.Map; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; -import static org.mockito.Matchers.anyListOf; -import static org.mockito.Matchers.anyObject; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.never; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.anyListOf; +import static org.mockito.ArgumentMatchers.anyObject; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; import java.util.HashSet; import java.util.List; @@ -43,9 +44,9 @@ import java.util.Set; import org.apache.kafka.common.TopicPartition; import org.apache.storm.utils.Time; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyList; -import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.clearInvocations; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; import java.util.regex.Pattern; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -81,7 +82,7 @@ public class KafkaSpoutSingleTopicTest extends KafkaSpoutAbstractTest { spout.nextTuple(); } ArgumentCaptor<KafkaSpoutMessageId> messageIdCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); - verify(collector, times(messageCount)).emit(anyString(), anyList(), messageIdCaptor.capture()); + verify(collectorMock, times(messageCount)).emit(anyString(), anyList(), messageIdCaptor.capture()); List<KafkaSpoutMessageId> messageIds = messageIdCaptor.getAllValues(); for (int i = 1; i < messageIds.size(); i++) { spout.ack(messageIds.get(i)); @@ -90,16 +91,17 @@ public class KafkaSpoutSingleTopicTest extends KafkaSpoutAbstractTest { spout.fail(failedTuple); //Advance the time and replay the failed tuple. - reset(collector); + reset(collectorMock); spout.nextTuple(); ArgumentCaptor<KafkaSpoutMessageId> failedIdReplayCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); - verify(collector).emit(anyString(), anyList(), failedIdReplayCaptor.capture()); + verify(collectorMock).emit(anyString(), anyList(), failedIdReplayCaptor.capture()); assertThat("Expected replay of failed tuple", failedIdReplayCaptor.getValue(), is(failedTuple)); /* Ack the tuple, and commit. * Since the tuple is more than max poll records behind the most recent emitted tuple, the consumer won't catch up in this poll. */ + clearInvocations(collectorMock); Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + commitOffsetPeriodMs); spout.ack(failedIdReplayCaptor.getValue()); spout.nextTuple(); @@ -110,16 +112,15 @@ public class KafkaSpoutSingleTopicTest extends KafkaSpoutAbstractTest { assertThat("Should have committed to the right topic", capturedCommit, Matchers.hasKey(expectedTp)); assertThat("Should have committed all the acked messages", capturedCommit.get(expectedTp).offset(), is((long)messageCount)); - /* Verify that the following acked (now committed) tuples are not emitted again - * Since the consumer position was somewhere in the middle of the acked tuples when the commit happened, - * this verifies that the spout keeps the consumer position ahead of the committed offset when committing - */ - reset(collector); + /* Verify that the following acked (now committed) tuples are not emitted again + * Since the consumer position was somewhere in the middle of the acked tuples when the commit happened, + * this verifies that the spout keeps the consumer position ahead of the committed offset when committing + */ //Just do a few polls to check that nothing more is emitted for(int i = 0; i < 3; i++) { spout.nextTuple(); } - verify(collector, never()).emit(anyString(), anyList(), anyObject()); + verify(collectorMock, never()).emit(anyString(), anyList(), anyObject()); } @Test @@ -130,7 +131,7 @@ public class KafkaSpoutSingleTopicTest extends KafkaSpoutAbstractTest { //play 1st tuple ArgumentCaptor<Object> messageIdToDoubleAck = ArgumentCaptor.forClass(Object.class); spout.nextTuple(); - verify(collector).emit(anyString(), anyList(), messageIdToDoubleAck.capture()); + verify(collectorMock).emit(anyString(), anyList(), messageIdToDoubleAck.capture()); spout.ack(messageIdToDoubleAck.getValue()); //Emit some more messages @@ -147,7 +148,7 @@ public class KafkaSpoutSingleTopicTest extends KafkaSpoutAbstractTest { //Verify that all messages are emitted, ack all the messages ArgumentCaptor<Object> messageIds = ArgumentCaptor.forClass(Object.class); - verify(collector, times(messageCount)).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), + verify(collectorMock, times(messageCount)).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList(), messageIds.capture()); for(Object id : messageIds.getAllValues()) { @@ -170,14 +171,14 @@ public class KafkaSpoutSingleTopicTest extends KafkaSpoutAbstractTest { for(int i = 0; i < messageCount; i++) { spout.nextTuple(); ArgumentCaptor<Object> messageId = ArgumentCaptor.forClass(Object.class); - verify(collector).emit( + verify(collectorMock).emit( eq(SingleTopicKafkaSpoutConfiguration.STREAM), eq(new Values(SingleTopicKafkaSpoutConfiguration.TOPIC, Integer.toString(i), Integer.toString(i))), messageId.capture()); spout.ack(messageId.getValue()); - reset(collector); + reset(collectorMock); } Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS); @@ -195,16 +196,16 @@ public class KafkaSpoutSingleTopicTest extends KafkaSpoutAbstractTest { //play and ack 1 tuple ArgumentCaptor<Object> messageIdAcked = ArgumentCaptor.forClass(Object.class); spout.nextTuple(); - verify(collector).emit(anyString(), anyList(), messageIdAcked.capture()); + verify(collectorMock).emit(anyString(), anyList(), messageIdAcked.capture()); spout.ack(messageIdAcked.getValue()); - reset(collector); + reset(collectorMock); //play and fail 1 tuple ArgumentCaptor<Object> messageIdFailed = ArgumentCaptor.forClass(Object.class); spout.nextTuple(); - verify(collector).emit(anyString(), anyList(), messageIdFailed.capture()); + verify(collectorMock).emit(anyString(), anyList(), messageIdFailed.capture()); spout.fail(messageIdFailed.getValue()); - reset(collector); + reset(collectorMock); //Emit all remaining messages. Failed tuples retry immediately with current configuration, so no need to wait. for(int i = 0; i < messageCount; i++) { @@ -213,7 +214,7 @@ public class KafkaSpoutSingleTopicTest extends KafkaSpoutAbstractTest { ArgumentCaptor<Object> remainingMessageIds = ArgumentCaptor.forClass(Object.class); //All messages except the first acked message should have been emitted - verify(collector, times(messageCount - 1)).emit( + verify(collectorMock, times(messageCount - 1)).emit( eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList(), remainingMessageIds.capture()); @@ -236,14 +237,14 @@ public class KafkaSpoutSingleTopicTest extends KafkaSpoutAbstractTest { //play 1st tuple ArgumentCaptor<Object> messageIdToFail = ArgumentCaptor.forClass(Object.class); spout.nextTuple(); - verify(collector).emit(anyString(), anyList(), messageIdToFail.capture()); - reset(collector); + verify(collectorMock).emit(anyString(), anyList(), messageIdToFail.capture()); + reset(collectorMock); //play 2nd tuple ArgumentCaptor<Object> messageIdToAck = ArgumentCaptor.forClass(Object.class); spout.nextTuple(); - verify(collector).emit(anyString(), anyList(), messageIdToAck.capture()); - reset(collector); + verify(collectorMock).emit(anyString(), anyList(), messageIdToAck.capture()); + reset(collectorMock); //ack 2nd tuple spout.ack(messageIdToAck.getValue()); @@ -257,7 +258,7 @@ public class KafkaSpoutSingleTopicTest extends KafkaSpoutAbstractTest { ArgumentCaptor<Object> remainingIds = ArgumentCaptor.forClass(Object.class); //All messages except the first acked message should have been emitted - verify(collector, times(messageCount - 1)).emit( + verify(collectorMock, times(messageCount - 1)).emit( eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList(), remainingIds.capture()); @@ -284,8 +285,8 @@ public class KafkaSpoutSingleTopicTest extends KafkaSpoutAbstractTest { spout.nextTuple(); } ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); - verify(collector, times(messageCount)).emit(anyString(), anyList(), messageIds.capture()); - reset(collector); + verify(collectorMock, times(messageCount)).emit(anyString(), anyList(), messageIds.capture()); + reset(collectorMock); //Fail tuple 5 and 3, call nextTuple, then fail tuple 2 List<KafkaSpoutMessageId> capturedMessageIds = messageIds.getAllValues(); spout.fail(capturedMessageIds.get(5)); @@ -298,7 +299,7 @@ public class KafkaSpoutSingleTopicTest extends KafkaSpoutAbstractTest { for (int i = 0; i < messageCount; i++) { spout.nextTuple(); } - verify(collector, times(3)).emit(anyString(), anyList(), reemittedMessageIds.capture()); + verify(collectorMock, times(3)).emit(anyString(), anyList(), reemittedMessageIds.capture()); Set<KafkaSpoutMessageId> expectedReemitIds = new HashSet<>(); expectedReemitIds.add(capturedMessageIds.get(5)); expectedReemitIds.add(capturedMessageIds.get(3)); @@ -316,31 +317,31 @@ public class KafkaSpoutSingleTopicTest extends KafkaSpoutAbstractTest { for (int i = 0; i <= maxRetries; i++) { ArgumentCaptor<KafkaSpoutMessageId> messageIdFailed = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); spout.nextTuple(); - verify(collector).emit(anyString(), anyListOf(Object.class), messageIdFailed.capture()); + verify(collectorMock).emit(anyString(), anyListOf(Object.class), messageIdFailed.capture()); KafkaSpoutMessageId msgId = messageIdFailed.getValue(); spout.fail(msgId); assertThat("Expected message id number of failures to match the number of times the message has failed", msgId.numFails(), is(i + 1)); - reset(collector); + reset(collectorMock); } //Verify that the tuple is not emitted again spout.nextTuple(); - verify(collector, never()).emit(anyString(), anyListOf(Object.class), anyObject()); + verify(collectorMock, never()).emit(anyString(), anyListOf(Object.class), anyObject()); } @Test public void testSpoutMustRefreshPartitionsEvenIfNotPolling() throws Exception { - SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, topologyContext, collector); + SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, topologyContext, collectorMock); //Nothing is assigned yet, should emit nothing spout.nextTuple(); - verify(collector, never()).emit(anyString(), anyList(), any(KafkaSpoutMessageId.class)); + verify(collectorMock, never()).emit(anyString(), anyList(), any(KafkaSpoutMessageId.class)); SingleTopicKafkaUnitSetupHelper.populateTopicData(kafkaUnitRule.getKafkaUnit(), SingleTopicKafkaSpoutConfiguration.TOPIC, 1); Time.advanceTime(KafkaSpoutConfig.DEFAULT_PARTITION_REFRESH_PERIOD_MS + KafkaSpout.TIMER_DELAY_MS); //The new partition should be discovered and the message should be emitted spout.nextTuple(); - verify(collector).emit(anyString(), anyList(), any(KafkaSpoutMessageId.class)); + verify(collectorMock).emit(anyString(), anyList(), any(KafkaSpoutMessageId.class)); } } http://git-wip-us.apache.org/repos/asf/storm/blob/df5187e8/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyDeployActivateDeactivateTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyDeployActivateDeactivateTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyDeployActivateDeactivateTest.java index a96e284..a9d7c75 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyDeployActivateDeactivateTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyDeployActivateDeactivateTest.java @@ -79,7 +79,7 @@ public class KafkaSpoutTopologyDeployActivateDeactivateTest extends KafkaSpoutAb // Restart topology with the same topology id, which mimics the behavior of partition reassignment setUp(); // Initialize spout using the same populated data (i.e same kafkaUnitRule) - SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, topologyContext, collector); + SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, topologyContext, collectorMock); nextTuple_verifyEmitted_ack_resetCollector(1); @@ -104,7 +104,7 @@ public class KafkaSpoutTopologyDeployActivateDeactivateTest extends KafkaSpoutAb setUp(); when(topologyContext.getStormId()).thenReturn("topology-2"); // Initialize spout using the same populated data (i.e same kafkaUnitRule) - SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, topologyContext, collector); + SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, topologyContext, collectorMock); //Emit all messages and check that they are emitted. Ack the messages too for (int i = 0; i < messageCount; i++) { http://git-wip-us.apache.org/repos/asf/storm/blob/df5187e8/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java deleted file mode 100644 index cdbe05d..0000000 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java +++ /dev/null @@ -1,379 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.kafka.spout; - -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.reset; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.stream.IntStream; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.common.TopicPartition; -import org.apache.storm.kafka.KafkaUnitRule; -import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration; -import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault; -import org.apache.storm.spout.SpoutOutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.tuple.Values; -import org.apache.storm.utils.Time; -import org.apache.storm.utils.Time.SimulatedTime; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.mockito.ArgumentCaptor; -import org.mockito.Captor; -import org.mockito.MockitoAnnotations; - -import java.util.regex.Pattern; - -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyList; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.eq; - -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.hamcrest.Matchers; - -public class SingleTopicKafkaSpoutTest { - - @Rule - public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule(); - - @Captor - private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture; - - private final TopologyContext topologyContext = mock(TopologyContext.class); - private final Map<String, Object> conf = new HashMap<>(); - private final SpoutOutputCollector collector = mock(SpoutOutputCollector.class); - private final long commitOffsetPeriodMs = 2_000; - private final int maxRetries = 3; - private KafkaConsumer<String, String> consumerSpy; - private KafkaSpout<String, String> spout; - private final int maxPollRecords = 10; - - @Before - public void setUp() { - MockitoAnnotations.initMocks(this); - KafkaSpoutConfig<String, String> spoutConfig = - SingleTopicKafkaSpoutConfiguration.setCommonSpoutConfig( - KafkaSpoutConfig.builder("127.0.0.1:" + kafkaUnitRule.getKafkaUnit().getKafkaPort(), - Pattern.compile(SingleTopicKafkaSpoutConfiguration.TOPIC))) - .setOffsetCommitPeriodMs(commitOffsetPeriodMs) - .setRetry(new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), - maxRetries, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0))) - .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords) - .build(); - this.consumerSpy = spy(new KafkaConsumerFactoryDefault<String, String>().createConsumer(spoutConfig)); - this.spout = new KafkaSpout<>(spoutConfig, (ignored) -> consumerSpy); - } - - private void prepareSpout(int messageCount) throws Exception { - SingleTopicKafkaUnitSetupHelper.populateTopicData(kafkaUnitRule.getKafkaUnit(), SingleTopicKafkaSpoutConfiguration.TOPIC, messageCount); - SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, topologyContext, collector); - } - - @Test - public void testSeekToCommittedOffsetIfConsumerPositionIsBehindWhenCommitting() throws Exception { - try (SimulatedTime simulatedTime = new SimulatedTime()) { - int messageCount = maxPollRecords * 2; - prepareSpout(messageCount); - - //Emit all messages and fail the first one while acking the rest - for (int i = 0; i < messageCount; i++) { - spout.nextTuple(); - } - ArgumentCaptor<KafkaSpoutMessageId> messageIdCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); - verify(collector, times(messageCount)).emit(anyString(), anyList(), messageIdCaptor.capture()); - List<KafkaSpoutMessageId> messageIds = messageIdCaptor.getAllValues(); - for (int i = 1; i < messageIds.size(); i++) { - spout.ack(messageIds.get(i)); - } - KafkaSpoutMessageId failedTuple = messageIds.get(0); - spout.fail(failedTuple); - - //Advance the time and replay the failed tuple. - reset(collector); - spout.nextTuple(); - ArgumentCaptor<KafkaSpoutMessageId> failedIdReplayCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); - verify(collector).emit(anyString(), anyList(), failedIdReplayCaptor.capture()); - - assertThat("Expected replay of failed tuple", failedIdReplayCaptor.getValue(), is(failedTuple)); - - /* Ack the tuple, and commit. - * Since the tuple is more than max poll records behind the most recent emitted tuple, the consumer won't catch up in this poll. - */ - Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + commitOffsetPeriodMs); - spout.ack(failedIdReplayCaptor.getValue()); - spout.nextTuple(); - verify(consumerSpy).commitSync(commitCapture.capture()); - - Map<TopicPartition, OffsetAndMetadata> capturedCommit = commitCapture.getValue(); - TopicPartition expectedTp = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0); - assertThat("Should have committed to the right topic", capturedCommit, Matchers.hasKey(expectedTp)); - assertThat("Should have committed all the acked messages", capturedCommit.get(expectedTp).offset(), is((long)messageCount)); - - /* Verify that the following acked (now committed) tuples are not emitted again - * Since the consumer position was somewhere in the middle of the acked tuples when the commit happened, - * this verifies that the spout keeps the consumer position ahead of the committed offset when committing - */ - reset(collector); - //Just do a few polls to check that nothing more is emitted - for(int i = 0; i < 3; i++) { - spout.nextTuple(); - } - verify(collector, never()).emit(any(), any(), any()); - } - } - - @Test - public void testShouldContinueWithSlowDoubleAcks() throws Exception { - try (SimulatedTime simulatedTime = new SimulatedTime()) { - int messageCount = 20; - prepareSpout(messageCount); - - //play 1st tuple - ArgumentCaptor<Object> messageIdToDoubleAck = ArgumentCaptor.forClass(Object.class); - spout.nextTuple(); - verify(collector).emit(anyString(), anyList(), messageIdToDoubleAck.capture()); - spout.ack(messageIdToDoubleAck.getValue()); - - //Emit some more messages - IntStream.range(0, messageCount / 2).forEach(value -> { - spout.nextTuple(); - }); - - spout.ack(messageIdToDoubleAck.getValue()); - - //Emit any remaining messages - IntStream.range(0, messageCount).forEach(value -> { - spout.nextTuple(); - }); - - //Verify that all messages are emitted, ack all the messages - ArgumentCaptor<Object> messageIds = ArgumentCaptor.forClass(Object.class); - verify(collector, times(messageCount)).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), - anyList(), - messageIds.capture()); - messageIds.getAllValues().iterator().forEachRemaining(spout::ack); - - Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS); - //Commit offsets - spout.nextTuple(); - - SingleTopicKafkaUnitSetupHelper.verifyAllMessagesCommitted(consumerSpy, commitCapture, messageCount); - } - } - - @Test - public void testShouldEmitAllMessages() throws Exception { - try (SimulatedTime simulatedTime = new SimulatedTime()) { - int messageCount = 10; - prepareSpout(messageCount); - - //Emit all messages and check that they are emitted. Ack the messages too - IntStream.range(0, messageCount).forEach(value -> { - spout.nextTuple(); - ArgumentCaptor<Object> messageId = ArgumentCaptor.forClass(Object.class); - verify(collector).emit( - eq(SingleTopicKafkaSpoutConfiguration.STREAM), - eq(new Values(SingleTopicKafkaSpoutConfiguration.TOPIC, - Integer.toString(value), - Integer.toString(value))), - messageId.capture()); - spout.ack(messageId.getValue()); - reset(collector); - }); - - Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS); - //Commit offsets - spout.nextTuple(); - - SingleTopicKafkaUnitSetupHelper.verifyAllMessagesCommitted(consumerSpy, commitCapture, messageCount); - } - } - - @Test - public void testShouldReplayInOrderFailedMessages() throws Exception { - try (SimulatedTime simulatedTime = new SimulatedTime()) { - int messageCount = 10; - prepareSpout(messageCount); - - //play and ack 1 tuple - ArgumentCaptor<Object> messageIdAcked = ArgumentCaptor.forClass(Object.class); - spout.nextTuple(); - verify(collector).emit(anyString(), anyList(), messageIdAcked.capture()); - spout.ack(messageIdAcked.getValue()); - reset(collector); - - //play and fail 1 tuple - ArgumentCaptor<Object> messageIdFailed = ArgumentCaptor.forClass(Object.class); - spout.nextTuple(); - verify(collector).emit(anyString(), anyList(), messageIdFailed.capture()); - spout.fail(messageIdFailed.getValue()); - reset(collector); - - //Emit all remaining messages. Failed tuples retry immediately with current configuration, so no need to wait. - IntStream.range(0, messageCount).forEach(value -> { - spout.nextTuple(); - }); - - ArgumentCaptor<Object> remainingMessageIds = ArgumentCaptor.forClass(Object.class); - //All messages except the first acked message should have been emitted - verify(collector, times(messageCount - 1)).emit( - eq(SingleTopicKafkaSpoutConfiguration.STREAM), - anyList(), - remainingMessageIds.capture()); - remainingMessageIds.getAllValues().iterator().forEachRemaining(spout::ack); - - Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS); - //Commit offsets - spout.nextTuple(); - - SingleTopicKafkaUnitSetupHelper.verifyAllMessagesCommitted(consumerSpy, commitCapture, messageCount); - } - } - - @Test - public void testShouldReplayFirstTupleFailedOutOfOrder() throws Exception { - try (SimulatedTime simulatedTime = new SimulatedTime()) { - int messageCount = 10; - prepareSpout(messageCount); - - //play 1st tuple - ArgumentCaptor<Object> messageIdToFail = ArgumentCaptor.forClass(Object.class); - spout.nextTuple(); - verify(collector).emit(anyString(), anyList(), messageIdToFail.capture()); - reset(collector); - - //play 2nd tuple - ArgumentCaptor<Object> messageIdToAck = ArgumentCaptor.forClass(Object.class); - spout.nextTuple(); - verify(collector).emit(anyString(), anyList(), messageIdToAck.capture()); - reset(collector); - - //ack 2nd tuple - spout.ack(messageIdToAck.getValue()); - //fail 1st tuple - spout.fail(messageIdToFail.getValue()); - - //Emit all remaining messages. Failed tuples retry immediately with current configuration, so no need to wait. - IntStream.range(0, messageCount).forEach(value -> { - spout.nextTuple(); - }); - - ArgumentCaptor<Object> remainingIds = ArgumentCaptor.forClass(Object.class); - //All messages except the first acked message should have been emitted - verify(collector, times(messageCount - 1)).emit( - eq(SingleTopicKafkaSpoutConfiguration.STREAM), - anyList(), - remainingIds.capture()); - remainingIds.getAllValues().iterator().forEachRemaining(spout::ack); - - Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS); - //Commit offsets - spout.nextTuple(); - - SingleTopicKafkaUnitSetupHelper.verifyAllMessagesCommitted(consumerSpy, commitCapture, messageCount); - } - } - - @Test - public void testShouldReplayAllFailedTuplesWhenFailedOutOfOrder() throws Exception { - //The spout must reemit retriable tuples, even if they fail out of order. - //The spout should be able to skip tuples it has already emitted when retrying messages, even if those tuples are also retries. - int messageCount = 10; - prepareSpout(messageCount); - - //play all tuples - for (int i = 0; i < messageCount; i++) { - spout.nextTuple(); - } - ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); - verify(collector, times(messageCount)).emit(anyString(), anyList(), messageIds.capture()); - reset(collector); - //Fail tuple 5 and 3, call nextTuple, then fail tuple 2 - List<KafkaSpoutMessageId> capturedMessageIds = messageIds.getAllValues(); - spout.fail(capturedMessageIds.get(5)); - spout.fail(capturedMessageIds.get(3)); - spout.nextTuple(); - spout.fail(capturedMessageIds.get(2)); - - //Check that the spout will reemit all 3 failed tuples and no other tuples - ArgumentCaptor<KafkaSpoutMessageId> reemittedMessageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); - for (int i = 0; i < messageCount; i++) { - spout.nextTuple(); - } - verify(collector, times(3)).emit(anyString(), anyList(), reemittedMessageIds.capture()); - Set<KafkaSpoutMessageId> expectedReemitIds = new HashSet<>(); - expectedReemitIds.add(capturedMessageIds.get(5)); - expectedReemitIds.add(capturedMessageIds.get(3)); - expectedReemitIds.add(capturedMessageIds.get(2)); - assertThat("Expected reemits to be the 3 failed tuples", new HashSet<>(reemittedMessageIds.getAllValues()), is(expectedReemitIds)); - } - - @Test - public void testShouldDropMessagesAfterMaxRetriesAreReached() throws Exception { - //Check that if one message fails repeatedly, the retry cap limits how many times the message can be reemitted - int messageCount = 1; - prepareSpout(messageCount); - - //Emit and fail the same tuple until we've reached retry limit - for (int i = 0; i <= maxRetries; i++) { - ArgumentCaptor<KafkaSpoutMessageId> messageIdFailed = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); - spout.nextTuple(); - verify(collector).emit(anyString(), anyList(), messageIdFailed.capture()); - KafkaSpoutMessageId msgId = messageIdFailed.getValue(); - spout.fail(msgId); - assertThat("Expected message id number of failures to match the number of times the message has failed", msgId.numFails(), is(i + 1)); - reset(collector); - } - - //Verify that the tuple is not emitted again - spout.nextTuple(); - verify(collector, never()).emit(any(), any(), any()); - } - - @Test - public void testSpoutMustRefreshPartitionsEvenIfNotPolling() throws Exception { - try (SimulatedTime time = new SimulatedTime()) { - SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, topologyContext, collector); - - //Nothing is assigned yet, should emit nothing - spout.nextTuple(); - verify(collector, never()).emit(anyString(), anyList(), any(KafkaSpoutMessageId.class)); - - SingleTopicKafkaUnitSetupHelper.populateTopicData(kafkaUnitRule.getKafkaUnit(), SingleTopicKafkaSpoutConfiguration.TOPIC, 1); - Time.advanceTime(KafkaSpoutConfig.DEFAULT_PARTITION_REFRESH_PERIOD_MS + KafkaSpout.TIMER_DELAY_MS); - - //The new partition should be discovered and the message should be emitted - spout.nextTuple(); - verify(collector).emit(anyString(), anyList(), any(KafkaSpoutMessageId.class)); - } - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/df5187e8/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaUnitSetupHelper.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaUnitSetupHelper.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaUnitSetupHelper.java index 3690aef..eba9625 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaUnitSetupHelper.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaUnitSetupHelper.java @@ -45,7 +45,7 @@ public class SingleTopicKafkaUnitSetupHelper { */ public static void populateTopicData(KafkaUnit kafkaUnit, String topicName, int msgCount) throws Exception { kafkaUnit.createTopic(topicName); - + for (int i = 0; i < msgCount; i++) { ProducerRecord<String, String> producerRecord = new ProducerRecord<>( topicName, Integer.toString(i), @@ -53,7 +53,7 @@ public class SingleTopicKafkaUnitSetupHelper { kafkaUnit.sendMessage(producerRecord); } } - + /* * Asserts that commitSync has been called once, * that there are only commits on one topic,