http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoffTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoffTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoffTest.java new file mode 100644 index 0000000..f6de6a8 --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoffTest.java @@ -0,0 +1,292 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.sameInstance; +import static org.junit.Assert.assertThat; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Time.SimulatedTime; +import org.junit.Test; + +public class KafkaSpoutRetryExponentialBackoffTest { + + private final TopicPartition testTopic = new TopicPartition("topic", 0); + private final TopicPartition testTopic2 = new TopicPartition("other-topic", 0); + + private KafkaSpoutRetryExponentialBackoff createNoWaitRetryService() { + return new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.seconds(0), 1, TimeInterval.seconds(0)); + } + + private KafkaSpoutRetryExponentialBackoff createOneSecondWaitRetryService() { + return new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(1), TimeInterval.seconds(0), 1, TimeInterval.seconds(1)); + } + + private ConsumerRecord<String, String> createRecord(TopicPartition tp, long offset) { + return new ConsumerRecord<>(tp.topic(), tp.partition(), offset, null, null); + } + + @Test + public void testCanScheduleRetry() { + KafkaSpoutRetryExponentialBackoff retryService = createNoWaitRetryService(); + long offset = 0; + KafkaSpoutMessageId msgId = retryService.getMessageId(createRecord(testTopic, offset)); + msgId.incrementNumFails(); + + boolean scheduled = retryService.schedule(msgId); + + assertThat("The service must schedule the message for retry", scheduled, is(true)); + KafkaSpoutMessageId retrievedMessageId = retryService.getMessageId(createRecord(testTopic, offset)); + assertThat("The service should return the original message id when asked for the same tp/offset twice", retrievedMessageId, sameInstance(msgId)); + assertThat(retryService.isScheduled(msgId), is(true)); + assertThat(retryService.isReady(msgId), is(true)); + assertThat(retryService.readyMessageCount(), is(1)); + assertThat(retryService.earliestRetriableOffsets(), is(Collections.singletonMap(testTopic, msgId.offset()))); + } + + @Test + public void testCanRescheduleRetry() { + try (SimulatedTime time = new SimulatedTime()) { + + KafkaSpoutRetryExponentialBackoff retryService = createOneSecondWaitRetryService(); + long offset = 0; + KafkaSpoutMessageId msgId = retryService.getMessageId(createRecord(testTopic, offset)); + msgId.incrementNumFails(); + + retryService.schedule(msgId); + Time.advanceTime(500); + boolean scheduled = retryService.schedule(msgId); + + assertThat("The service must be able to reschedule an already scheduled id", scheduled, is(true)); + Time.advanceTime(500); + assertThat("The message should not be ready for retry yet since it was rescheduled", retryService.isReady(msgId), is(false)); + assertThat(retryService.isScheduled(msgId), is(true)); + assertThat(retryService.earliestRetriableOffsets(), is(Collections.<TopicPartition, Long>emptyMap())); + assertThat(retryService.readyMessageCount(), is(0)); + Time.advanceTime(500); + assertThat("The message should be ready for retry once the full delay has passed", retryService.isReady(msgId), is(true)); + assertThat(retryService.isScheduled(msgId), is(true)); + assertThat(retryService.earliestRetriableOffsets(), is(Collections.singletonMap(testTopic, msgId.offset()))); + assertThat(retryService.readyMessageCount(), is(1)); + } + } + + @Test + public void testCannotContainMultipleSchedulesForId() { + try (SimulatedTime time = new SimulatedTime()) { + + KafkaSpoutRetryExponentialBackoff retryService = createOneSecondWaitRetryService(); + long offset = 0; + KafkaSpoutMessageId msgId = retryService.getMessageId(createRecord(testTopic, offset)); + msgId.incrementNumFails(); + + retryService.schedule(msgId); + Time.advanceTime(500); + boolean scheduled = retryService.schedule(msgId); + + retryService.remove(msgId); + assertThat("The message should no longer be scheduled", retryService.isScheduled(msgId), is(false)); + Time.advanceTime(500); + assertThat("The message should not be ready for retry because it isn't scheduled", retryService.isReady(msgId), is(false)); + } + } + + @Test + public void testCanRemoveRetry() { + KafkaSpoutRetryExponentialBackoff retryService = createNoWaitRetryService(); + long offset = 0; + KafkaSpoutMessageId msgId = retryService.getMessageId(createRecord(testTopic, offset)); + msgId.incrementNumFails(); + + retryService.schedule(msgId); + boolean removed = retryService.remove(msgId); + + assertThat(removed, is(true)); + assertThat(retryService.isScheduled(msgId), is(false)); + assertThat(retryService.isReady(msgId), is(false)); + assertThat(retryService.earliestRetriableOffsets(), is(Collections.<TopicPartition, Long>emptyMap())); + assertThat(retryService.readyMessageCount(), is(0)); + } + + @Test + public void testCanHandleMultipleTopics() { + try (SimulatedTime time = new SimulatedTime()) { + //Tests that isScheduled, isReady and earliestRetriableOffsets are mutually consistent when there are messages from multiple partitions scheduled + KafkaSpoutRetryExponentialBackoff retryService = createOneSecondWaitRetryService(); + long offset = 0; + + KafkaSpoutMessageId msgIdTp1 = retryService.getMessageId(createRecord(testTopic, offset)); + KafkaSpoutMessageId msgIdTp2 = retryService.getMessageId(createRecord(testTopic2, offset)); + msgIdTp1.incrementNumFails(); + msgIdTp2.incrementNumFails(); + + boolean scheduledOne = retryService.schedule(msgIdTp1); + Time.advanceTime(500); + boolean scheduledTwo = retryService.schedule(msgIdTp2); + + //The retry schedules for two messages should be unrelated + assertThat(scheduledOne, is(true)); + assertThat(retryService.isScheduled(msgIdTp1), is(true)); + assertThat(scheduledTwo, is(true)); + assertThat(retryService.isScheduled(msgIdTp2), is(true)); + assertThat(retryService.isReady(msgIdTp1), is(false)); + assertThat(retryService.isReady(msgIdTp2), is(false)); + + Time.advanceTime(500); + assertThat(retryService.isReady(msgIdTp1), is(true)); + assertThat(retryService.isReady(msgIdTp2), is(false)); + assertThat(retryService.earliestRetriableOffsets(), is(Collections.singletonMap(testTopic, offset))); + + Time.advanceTime(500); + assertThat(retryService.isReady(msgIdTp2), is(true)); + Map<TopicPartition, Long> earliestOffsets = new HashMap<>(); + earliestOffsets.put(testTopic, offset); + earliestOffsets.put(testTopic2, offset); + assertThat(retryService.earliestRetriableOffsets(), is(earliestOffsets)); + + //The service must be able to remove retry schedules for unnecessary partitions + retryService.retainAll(Collections.singleton(testTopic2)); + assertThat(retryService.isScheduled(msgIdTp1), is(false)); + assertThat(retryService.isScheduled(msgIdTp2), is(true)); + assertThat(retryService.isReady(msgIdTp1), is(false)); + assertThat(retryService.isReady(msgIdTp2), is(true)); + assertThat(retryService.earliestRetriableOffsets(), is(Collections.singletonMap(testTopic2, offset))); + } + } + + @Test + public void testCanHandleMultipleMessagesOnPartition() { + try (SimulatedTime time = new SimulatedTime()) { + //Tests that isScheduled, isReady and earliestRetriableOffsets are mutually consistent when there are multiple messages scheduled on a partition + KafkaSpoutRetryExponentialBackoff retryService = createOneSecondWaitRetryService(); + long offset = 0; + + KafkaSpoutMessageId msgIdEarliest = retryService.getMessageId(createRecord(testTopic, offset)); + KafkaSpoutMessageId msgIdLatest = retryService.getMessageId(createRecord(testTopic, offset + 1)); + msgIdEarliest.incrementNumFails(); + msgIdLatest.incrementNumFails(); + + retryService.schedule(msgIdEarliest); + Time.advanceTime(500); + retryService.schedule(msgIdLatest); + + assertThat(retryService.isScheduled(msgIdEarliest), is(true)); + assertThat(retryService.isScheduled(msgIdLatest), is(true)); + + Time.advanceTime(500); + assertThat(retryService.isReady(msgIdEarliest), is(true)); + assertThat(retryService.isReady(msgIdLatest), is(false)); + assertThat(retryService.earliestRetriableOffsets(), is(Collections.singletonMap(testTopic, msgIdEarliest.offset()))); + + Time.advanceTime(500); + assertThat(retryService.isReady(msgIdEarliest), is(true)); + assertThat(retryService.isReady(msgIdLatest), is(true)); + assertThat(retryService.earliestRetriableOffsets(), is(Collections.singletonMap(testTopic, msgIdEarliest.offset()))); + + retryService.remove(msgIdEarliest); + assertThat(retryService.earliestRetriableOffsets(), is(Collections.singletonMap(testTopic, msgIdLatest.offset()))); + } + } + + @Test + public void testMaxRetries() { + try (SimulatedTime time = new SimulatedTime()) { + int maxRetries = 3; + KafkaSpoutRetryExponentialBackoff retryService = new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.seconds(0), maxRetries, TimeInterval.seconds(0)); + long offset = 0; + + KafkaSpoutMessageId msgId = retryService.getMessageId(createRecord(testTopic, offset)); + for (int i = 0; i < maxRetries; i++) { + msgId.incrementNumFails(); + } + + //Should be allowed to retry 3 times, in addition to original try + boolean scheduled = retryService.schedule(msgId); + + assertThat(scheduled, is(true)); + assertThat(retryService.isScheduled(msgId), is(true)); + + retryService.remove(msgId); + msgId.incrementNumFails(); + boolean rescheduled = retryService.schedule(msgId); + + assertThat("The message should not be allowed to retry once the limit is reached", rescheduled, is(false)); + assertThat(retryService.isScheduled(msgId), is(false)); + } + } + + @Test + public void testMaxDelay() { + try (SimulatedTime time = new SimulatedTime()) { + int maxDelaySecs = 2; + KafkaSpoutRetryExponentialBackoff retryService = new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(500), TimeInterval.seconds(0), 1, TimeInterval.seconds(maxDelaySecs)); + long offset = 0; + + KafkaSpoutMessageId msgId = retryService.getMessageId(createRecord(testTopic, offset)); + msgId.incrementNumFails(); + + retryService.schedule(msgId); + + Time.advanceTimeSecs(maxDelaySecs); + assertThat("The message should be ready for retry after the max delay", retryService.isReady(msgId), is(true)); + } + } + + private void validateBackoff(int expectedBackoffSeconds, KafkaSpoutMessageId msgId, KafkaSpoutRetryExponentialBackoff retryService) { + Time.advanceTimeSecs(expectedBackoffSeconds - 1); + assertThat("The message should not be ready for retry until the backoff has expired", retryService.isReady(msgId), is(false)); + Time.advanceTimeSecs(1); + assertThat(retryService.isReady(msgId), is(true)); + } + + @Test + public void testExponentialBackoff() { + try (SimulatedTime time = new SimulatedTime()) { + KafkaSpoutRetryExponentialBackoff retryService = new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.seconds(4), Integer.MAX_VALUE, TimeInterval.seconds(Integer.MAX_VALUE)); + long offset = 0; + + KafkaSpoutMessageId msgId = retryService.getMessageId(createRecord(testTopic, offset)); + msgId.incrementNumFails(); + msgId.incrementNumFails(); //First failure is the initial delay, so not interesting + + //Expecting 4*2^(failCount-1) + List<Integer> expectedBackoffsSecs = Arrays.asList(new Integer[]{8, 16, 32}); + + for (Integer expectedBackoffSecs : expectedBackoffsSecs) { + retryService.schedule(msgId); + + Time.advanceTimeSecs(expectedBackoffSecs - 1); + assertThat("The message should not be ready for retry until backoff " + expectedBackoffSecs + " has expired", retryService.isReady(msgId), is(false)); + Time.advanceTimeSecs(1); + assertThat("The message should be ready for retry once backoff " + expectedBackoffSecs + " has expired", retryService.isReady(msgId), is(true)); + + msgId.incrementNumFails(); + retryService.remove(msgId); + } + } + } + +}
http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java new file mode 100644 index 0000000..569becf --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java @@ -0,0 +1,114 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Time.SimulatedTime; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.InOrder; +import org.mockito.MockitoAnnotations; + +import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder; +import static org.mockito.Matchers.anyList; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; + +public class KafkaSpoutRetryLimitTest { + + private final long offsetCommitPeriodMs = 2_000; + private final TopologyContext contextMock = mock(TopologyContext.class); + private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class); + private final Map<String, Object> conf = new HashMap<>(); + private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1); + private KafkaConsumer<String, String> consumerMock; + private KafkaSpoutConfig<String, String> spoutConfig; + + public static final KafkaSpoutRetryService ZERO_RETRIES_RETRY_SERVICE = + new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0), + 0, KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0)); + + @Captor + private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1) + .setOffsetCommitPeriodMs(offsetCommitPeriodMs) + .setRetry(ZERO_RETRIES_RETRY_SERVICE) + .build(); + consumerMock = mock(KafkaConsumer.class); + } + + @Test + public void testFailingTupleCompletesAckAfterRetryLimitIsMet() { + //Spout should ack failed messages after they hit the retry limit + try (SimulatedTime simulatedTime = new SimulatedTime()) { + KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition); + Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>(); + int lastOffset = 3; + int numRecords = lastOffset + 1; + records.put(partition, SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, numRecords)); + + when(consumerMock.poll(anyLong())) + .thenReturn(new ConsumerRecords<>(records)); + + for (int i = 0; i < numRecords; i++) { + spout.nextTuple(); + } + + ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); + verify(collectorMock, times(numRecords)).emit(anyString(), anyList(), messageIds.capture()); + + for (KafkaSpoutMessageId messageId : messageIds.getAllValues()) { + spout.fail(messageId); + } + + // Advance time and then trigger call to kafka consumer commit + Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs); + spout.nextTuple(); + + InOrder inOrder = inOrder(consumerMock); + inOrder.verify(consumerMock).commitSync(commitCapture.capture()); + inOrder.verify(consumerMock).poll(anyLong()); + + //verify that offset 4 was committed for the given TopicPartition, since processing should resume at 4. + assertTrue(commitCapture.getValue().containsKey(partition)); + assertEquals(lastOffset + 1, ((OffsetAndMetadata) (commitCapture.getValue().get(partition))).offset()); + } + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java new file mode 100644 index 0000000..0bf9219 --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout; + + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration; +import org.apache.storm.tuple.Values; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +import java.util.Map; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Matchers.anyListOf; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.utils.Time; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyList; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.verify; +import static org.junit.Assert.assertEquals; + +import java.util.regex.Pattern; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.hamcrest.Matchers; + +public class KafkaSpoutSingleTopicTest extends KafkaSpoutAbstractTest { + private final int maxPollRecords = 10; + private final int maxRetries = 3; + + public KafkaSpoutSingleTopicTest() { + super(2_000); + } + + @Override + KafkaSpoutConfig<String, String> createSpoutConfig() { + return SingleTopicKafkaSpoutConfiguration.setCommonSpoutConfig( + KafkaSpoutConfig.builder("127.0.0.1:" + kafkaUnitRule.getKafkaUnit().getKafkaPort(), + Pattern.compile(SingleTopicKafkaSpoutConfiguration.TOPIC))) + .setOffsetCommitPeriodMs(commitOffsetPeriodMs) + .setRetry(new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), + maxRetries, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0))) + .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords) + .build(); + } + + @Test + public void testSeekToCommittedOffsetIfConsumerPositionIsBehindWhenCommitting() throws Exception { + final int messageCount = maxPollRecords * 2; + prepareSpout(messageCount); + + //Emit all messages and fail the first one while acking the rest + for (int i = 0; i < messageCount; i++) { + spout.nextTuple(); + } + ArgumentCaptor<KafkaSpoutMessageId> messageIdCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); + verify(collectorMock, times(messageCount)).emit(anyString(), anyList(), messageIdCaptor.capture()); + List<KafkaSpoutMessageId> messageIds = messageIdCaptor.getAllValues(); + for (int i = 1; i < messageIds.size(); i++) { + spout.ack(messageIds.get(i)); + } + KafkaSpoutMessageId failedTuple = messageIds.get(0); + spout.fail(failedTuple); + + //Advance the time and replay the failed tuple. + reset(collectorMock); + spout.nextTuple(); + ArgumentCaptor<KafkaSpoutMessageId> failedIdReplayCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); + verify(collectorMock).emit(anyString(), anyList(), failedIdReplayCaptor.capture()); + + assertThat("Expected replay of failed tuple", failedIdReplayCaptor.getValue(), is(failedTuple)); + + /* Ack the tuple, and commit. + * Since the tuple is more than max poll records behind the most recent emitted tuple, the consumer won't catch up in this poll. + */ + reset(collectorMock); + Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + commitOffsetPeriodMs); + spout.ack(failedIdReplayCaptor.getValue()); + spout.nextTuple(); + verify(consumerSpy).commitSync(commitCapture.capture()); + + Map<TopicPartition, OffsetAndMetadata> capturedCommit = commitCapture.getValue(); + TopicPartition expectedTp = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0); + assertThat("Should have committed to the right topic", capturedCommit, Matchers.hasKey(expectedTp)); + assertThat("Should have committed all the acked messages", capturedCommit.get(expectedTp).offset(), is((long)messageCount)); + + /* Verify that the following acked (now committed) tuples are not emitted again + * Since the consumer position was somewhere in the middle of the acked tuples when the commit happened, + * this verifies that the spout keeps the consumer position ahead of the committed offset when committing + */ + //Just do a few polls to check that nothing more is emitted + for(int i = 0; i < 3; i++) { + spout.nextTuple(); + } + verify(collectorMock, never()).emit(anyString(), anyList(), anyObject()); + } + + @Test + public void testShouldContinueWithSlowDoubleAcks() throws Exception { + final int messageCount = 20; + prepareSpout(messageCount); + + //play 1st tuple + ArgumentCaptor<Object> messageIdToDoubleAck = ArgumentCaptor.forClass(Object.class); + spout.nextTuple(); + verify(collectorMock).emit(anyString(), anyList(), messageIdToDoubleAck.capture()); + spout.ack(messageIdToDoubleAck.getValue()); + + //Emit some more messages + for(int i = 0; i < messageCount / 2; i++) { + spout.nextTuple(); + } + + spout.ack(messageIdToDoubleAck.getValue()); + + //Emit any remaining messages + for(int i = 0; i < messageCount; i++) { + spout.nextTuple(); + } + + //Verify that all messages are emitted, ack all the messages + ArgumentCaptor<Object> messageIds = ArgumentCaptor.forClass(Object.class); + verify(collectorMock, times(messageCount)).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), + anyList(), + messageIds.capture()); + for(Object id : messageIds.getAllValues()) { + spout.ack(id); + } + + Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS); + //Commit offsets + spout.nextTuple(); + + verifyAllMessagesCommitted(messageCount); + } + + @Test + public void testShouldEmitAllMessages() throws Exception { + final int messageCount = 10; + prepareSpout(messageCount); + + //Emit all messages and check that they are emitted. Ack the messages too + for(int i = 0; i < messageCount; i++) { + spout.nextTuple(); + ArgumentCaptor<Object> messageId = ArgumentCaptor.forClass(Object.class); + verify(collectorMock).emit( + eq(SingleTopicKafkaSpoutConfiguration.STREAM), + eq(new Values(SingleTopicKafkaSpoutConfiguration.TOPIC, + Integer.toString(i), + Integer.toString(i))), + messageId.capture()); + spout.ack(messageId.getValue()); + reset(collectorMock); + } + + Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS); + //Commit offsets + spout.nextTuple(); + + verifyAllMessagesCommitted(messageCount); + } + + @Test + public void testShouldReplayInOrderFailedMessages() throws Exception { + final int messageCount = 10; + prepareSpout(messageCount); + + //play and ack 1 tuple + ArgumentCaptor<Object> messageIdAcked = ArgumentCaptor.forClass(Object.class); + spout.nextTuple(); + verify(collectorMock).emit(anyString(), anyList(), messageIdAcked.capture()); + spout.ack(messageIdAcked.getValue()); + reset(collectorMock); + + //play and fail 1 tuple + ArgumentCaptor<Object> messageIdFailed = ArgumentCaptor.forClass(Object.class); + spout.nextTuple(); + verify(collectorMock).emit(anyString(), anyList(), messageIdFailed.capture()); + spout.fail(messageIdFailed.getValue()); + reset(collectorMock); + + //Emit all remaining messages. Failed tuples retry immediately with current configuration, so no need to wait. + for(int i = 0; i < messageCount; i++) { + spout.nextTuple(); + } + + ArgumentCaptor<Object> remainingMessageIds = ArgumentCaptor.forClass(Object.class); + //All messages except the first acked message should have been emitted + verify(collectorMock, times(messageCount - 1)).emit( + eq(SingleTopicKafkaSpoutConfiguration.STREAM), + anyList(), + remainingMessageIds.capture()); + for(Object id : remainingMessageIds.getAllValues()) { + spout.ack(id); + } + + Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS); + //Commit offsets + spout.nextTuple(); + + verifyAllMessagesCommitted(messageCount); + } + + @Test + public void testShouldReplayFirstTupleFailedOutOfOrder() throws Exception { + final int messageCount = 10; + prepareSpout(messageCount); + + //play 1st tuple + ArgumentCaptor<Object> messageIdToFail = ArgumentCaptor.forClass(Object.class); + spout.nextTuple(); + verify(collectorMock).emit(anyString(), anyList(), messageIdToFail.capture()); + reset(collectorMock); + + //play 2nd tuple + ArgumentCaptor<Object> messageIdToAck = ArgumentCaptor.forClass(Object.class); + spout.nextTuple(); + verify(collectorMock).emit(anyString(), anyList(), messageIdToAck.capture()); + reset(collectorMock); + + //ack 2nd tuple + spout.ack(messageIdToAck.getValue()); + //fail 1st tuple + spout.fail(messageIdToFail.getValue()); + + //Emit all remaining messages. Failed tuples retry immediately with current configuration, so no need to wait. + for(int i = 0; i < messageCount; i++) { + spout.nextTuple(); + } + + ArgumentCaptor<Object> remainingIds = ArgumentCaptor.forClass(Object.class); + //All messages except the first acked message should have been emitted + verify(collectorMock, times(messageCount - 1)).emit( + eq(SingleTopicKafkaSpoutConfiguration.STREAM), + anyList(), + remainingIds.capture()); + for(Object id : remainingIds.getAllValues()) { + spout.ack(id); + } + + Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS); + //Commit offsets + spout.nextTuple(); + + verifyAllMessagesCommitted(messageCount); + } + + @Test + public void testShouldReplayAllFailedTuplesWhenFailedOutOfOrder() throws Exception { + //The spout must reemit retriable tuples, even if they fail out of order. + //The spout should be able to skip tuples it has already emitted when retrying messages, even if those tuples are also retries. + final int messageCount = 10; + prepareSpout(messageCount); + + //play all tuples + for (int i = 0; i < messageCount; i++) { + spout.nextTuple(); + } + ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); + verify(collectorMock, times(messageCount)).emit(anyString(), anyList(), messageIds.capture()); + reset(collectorMock); + //Fail tuple 5 and 3, call nextTuple, then fail tuple 2 + List<KafkaSpoutMessageId> capturedMessageIds = messageIds.getAllValues(); + spout.fail(capturedMessageIds.get(5)); + spout.fail(capturedMessageIds.get(3)); + spout.nextTuple(); + spout.fail(capturedMessageIds.get(2)); + + //Check that the spout will reemit all 3 failed tuples and no other tuples + ArgumentCaptor<KafkaSpoutMessageId> reemittedMessageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); + for (int i = 0; i < messageCount; i++) { + spout.nextTuple(); + } + verify(collectorMock, times(3)).emit(anyString(), anyList(), reemittedMessageIds.capture()); + Set<KafkaSpoutMessageId> expectedReemitIds = new HashSet<>(); + expectedReemitIds.add(capturedMessageIds.get(5)); + expectedReemitIds.add(capturedMessageIds.get(3)); + expectedReemitIds.add(capturedMessageIds.get(2)); + assertThat("Expected reemits to be the 3 failed tuples", new HashSet<>(reemittedMessageIds.getAllValues()), is(expectedReemitIds)); + } + + @Test + public void testShouldDropMessagesAfterMaxRetriesAreReached() throws Exception { + //Check that if one message fails repeatedly, the retry cap limits how many times the message can be reemitted + final int messageCount = 1; + prepareSpout(messageCount); + + //Emit and fail the same tuple until we've reached retry limit + for (int i = 0; i <= maxRetries; i++) { + ArgumentCaptor<KafkaSpoutMessageId> messageIdFailed = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); + spout.nextTuple(); + verify(collectorMock).emit(anyString(), anyListOf(Object.class), messageIdFailed.capture()); + KafkaSpoutMessageId msgId = messageIdFailed.getValue(); + spout.fail(msgId); + assertThat("Expected message id number of failures to match the number of times the message has failed", msgId.numFails(), is(i + 1)); + reset(collectorMock); + } + + //Verify that the tuple is not emitted again + spout.nextTuple(); + verify(collectorMock, never()).emit(anyString(), anyListOf(Object.class), anyObject()); + } + + @Test + public void testSpoutMustRefreshPartitionsEvenIfNotPolling() throws Exception { + SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, topologyContext, collectorMock); + + //Nothing is assigned yet, should emit nothing + spout.nextTuple(); + verify(collectorMock, never()).emit(anyString(), anyList(), any(KafkaSpoutMessageId.class)); + + SingleTopicKafkaUnitSetupHelper.populateTopicData(kafkaUnitRule.getKafkaUnit(), SingleTopicKafkaSpoutConfiguration.TOPIC, 1); + Time.advanceTime(KafkaSpoutConfig.DEFAULT_PARTITION_REFRESH_PERIOD_MS + KafkaSpout.TIMER_DELAY_MS); + + //The new partition should be discovered and the message should be emitted + spout.nextTuple(); + verify(collectorMock).emit(anyString(), anyList(), any(KafkaSpoutMessageId.class)); + } + + @Test + public void testOffsetMetrics() throws Exception { + final int messageCount = 10; + prepareSpout(messageCount); + + Map<String, Long> offsetMetric = (Map<String, Long>) spout.getKafkaOffsetMetric().getValueAndReset(); + assertEquals(0, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalEarliestTimeOffset").longValue()); + // the offset of the last available message + 1. + assertEquals(10, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestTimeOffset").longValue()); + assertEquals(10, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalRecordsInPartitions").longValue()); + assertEquals(0, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestEmittedOffset").longValue()); + assertEquals(0, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestCompletedOffset").longValue()); + //totalSpoutLag = totalLatestTimeOffset-totalLatestCompletedOffset + assertEquals(10, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalSpoutLag").longValue()); + + //Emit all messages and check that they are emitted. Ack the messages too + for (int i = 0; i < messageCount; i++) { + nextTuple_verifyEmitted_ack_resetCollector(i); + } + + commitAndVerifyAllMessagesCommitted(messageCount); + + offsetMetric = (Map<String, Long>) spout.getKafkaOffsetMetric().getValueAndReset(); + assertEquals(0, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalEarliestTimeOffset").longValue()); + assertEquals(10, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestTimeOffset").longValue()); + //latest offset + assertEquals(9, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestEmittedOffset").longValue()); + // offset where processing will resume upon spout restart + assertEquals(10, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestCompletedOffset").longValue()); + assertEquals(0, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalSpoutLag").longValue()); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyDeployActivateDeactivateTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyDeployActivateDeactivateTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyDeployActivateDeactivateTest.java new file mode 100644 index 0000000..a860cef --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyDeployActivateDeactivateTest.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration; +import org.junit.Test; + +import java.util.regex.Pattern; + +import static org.mockito.Mockito.when; + +public class KafkaSpoutTopologyDeployActivateDeactivateTest extends KafkaSpoutAbstractTest { + + public KafkaSpoutTopologyDeployActivateDeactivateTest() { + super(2_000); + } + + @Override + KafkaSpoutConfig<String, String> createSpoutConfig() { + return SingleTopicKafkaSpoutConfiguration.setCommonSpoutConfig( + KafkaSpoutConfig.builder("127.0.0.1:" + kafkaUnitRule.getKafkaUnit().getKafkaPort(), + Pattern.compile(SingleTopicKafkaSpoutConfiguration.TOPIC))) + .setOffsetCommitPeriodMs(commitOffsetPeriodMs) + .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST) + .build(); + } + + @Test + public void test_FirstPollStrategy_Earliest_NotEnforced_OnTopologyActivateDeactivate() throws Exception { + final int messageCount = 2; + prepareSpout(messageCount); + + nextTuple_verifyEmitted_ack_resetCollector(0); + + //Commits offsets during deactivation + spout.deactivate(); + + verifyAllMessagesCommitted(1); + + consumerSpy = createConsumerSpy(); + + spout.activate(); + + nextTuple_verifyEmitted_ack_resetCollector(1); + + commitAndVerifyAllMessagesCommitted(messageCount); + } + + @Test + public void test_FirstPollStrategy_Earliest_NotEnforced_OnPartitionReassignment() throws Exception { + when(topologyContext.getStormId()).thenReturn("topology-1"); + + final int messageCount = 2; + prepareSpout(messageCount); + + nextTuple_verifyEmitted_ack_resetCollector(0); + + //Commits offsets during deactivation + spout.deactivate(); + + verifyAllMessagesCommitted(1); + + // Restart topology with the same topology id, which mimics the behavior of partition reassignment + setUp(); + // Initialize spout using the same populated data (i.e same kafkaUnitRule) + SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, topologyContext, collectorMock); + + nextTuple_verifyEmitted_ack_resetCollector(1); + + commitAndVerifyAllMessagesCommitted(messageCount); + } + + @Test + public void test_FirstPollStrategy_Earliest_Enforced_OnlyOnTopologyDeployment() throws Exception { + when(topologyContext.getStormId()).thenReturn("topology-1"); + + final int messageCount = 2; + prepareSpout(messageCount); + + nextTuple_verifyEmitted_ack_resetCollector(0); + + //Commits offsets during deactivation + spout.deactivate(); + + verifyAllMessagesCommitted(1); + + // Restart topology with a different topology id + setUp(); + when(topologyContext.getStormId()).thenReturn("topology-2"); + // Initialize spout using the same populated data (i.e same kafkaUnitRule) + SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, topologyContext, collectorMock); + + //Emit all messages and check that they are emitted. Ack the messages too + for (int i = 0; i < messageCount; i++) { + nextTuple_verifyEmitted_ack_resetCollector(i); + } + + commitAndVerifyAllMessagesCommitted(messageCount); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java new file mode 100755 index 0000000..b90a49d --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java @@ -0,0 +1,293 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout; + +import static org.hamcrest.CoreMatchers.either; +import static org.hamcrest.CoreMatchers.everyItem; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.isIn; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.anyList; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.storm.kafka.KafkaUnitRule; +import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.utils.Time; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.MockitoAnnotations; + +import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.never; + +public class MaxUncommittedOffsetTest { + + @Rule + public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule(); + + private final TopologyContext topologyContext = mock(TopologyContext.class); + private final Map<String, Object> conf = new HashMap<>(); + private final SpoutOutputCollector collector = mock(SpoutOutputCollector.class); + private final long commitOffsetPeriodMs = 2_000; + private final int numMessages = 100; + private final int maxUncommittedOffsets = 10; + private final int maxPollRecords = 5; + private final int initialRetryDelaySecs = 60; + private final KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort()) + .setOffsetCommitPeriodMs(commitOffsetPeriodMs) + .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords) + .setMaxUncommittedOffsets(maxUncommittedOffsets) + .setRetry(new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(initialRetryDelaySecs), KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), + 1, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(initialRetryDelaySecs))) //Retry once after a minute + .build(); + private KafkaSpout<String, String> spout; + + + + @Before + public void setUp() { + //This is because the tests are checking that a hard cap of maxUncommittedOffsets + maxPollRecords - 1 uncommitted offsets exists + //so Kafka must be able to return more messages than that in order for the tests to be meaningful + assertThat("Current tests require numMessages >= 2*maxUncommittedOffsets", numMessages, greaterThanOrEqualTo(maxUncommittedOffsets * 2)); + //This is to verify that a low maxPollRecords does not interfere with reemitting failed tuples + //The spout must be able to reemit all retriable tuples, even if the maxPollRecords is set to a low value compared to maxUncommittedOffsets. + assertThat("Current tests require maxPollRecords < maxUncommittedOffsets", maxPollRecords, lessThanOrEqualTo(maxUncommittedOffsets)); + MockitoAnnotations.initMocks(this); + spout = new KafkaSpout<>(spoutConfig); + } + + private void prepareSpout(int msgCount) throws Exception { + SingleTopicKafkaUnitSetupHelper.populateTopicData(kafkaUnitRule.getKafkaUnit(), SingleTopicKafkaSpoutConfiguration.TOPIC, msgCount); + SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, topologyContext, collector); + } + + private ArgumentCaptor<KafkaSpoutMessageId> emitMaxUncommittedOffsetsMessagesAndCheckNoMoreAreEmitted(int messageCount) throws Exception { + assertThat("The message count is less than maxUncommittedOffsets. This test is not meaningful with this configuration.", messageCount, greaterThanOrEqualTo(maxUncommittedOffsets)); + //The spout must respect maxUncommittedOffsets when requesting/emitting tuples + prepareSpout(messageCount); + + //Try to emit all messages. Ensure only maxUncommittedOffsets are emitted + ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); + for (int i = 0; i < messageCount; i++) { + spout.nextTuple(); + } + verify(collector, times(maxUncommittedOffsets)).emit( + anyString(), + anyList(), + messageIds.capture()); + return messageIds; + } + + @Test + public void testNextTupleCanEmitMoreMessagesWhenDroppingBelowMaxUncommittedOffsetsDueToCommit() throws Exception { + //The spout must respect maxUncommittedOffsets after committing a set of records + try (Time.SimulatedTime simulatedTime = new Time.SimulatedTime()) { + //First check that maxUncommittedOffsets is respected when emitting from scratch + ArgumentCaptor<KafkaSpoutMessageId> messageIds = emitMaxUncommittedOffsetsMessagesAndCheckNoMoreAreEmitted(numMessages); + reset(collector); + + //Ack all emitted messages and commit them + for (KafkaSpoutMessageId messageId : messageIds.getAllValues()) { + spout.ack(messageId); + } + Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS); + + spout.nextTuple(); + + //Now check that the spout will emit another maxUncommittedOffsets messages + for (int i = 0; i < numMessages; i++) { + spout.nextTuple(); + } + verify(collector, times(maxUncommittedOffsets)).emit( + anyString(), + anyList(), + anyObject()); + } + } + + @Test + public void testNextTupleWillRespectMaxUncommittedOffsetsWhenThereAreAckedUncommittedTuples() throws Exception { + //The spout must respect maxUncommittedOffsets even if some tuples have been acked but not committed + try (Time.SimulatedTime simulatedTime = new Time.SimulatedTime()) { + //First check that maxUncommittedOffsets is respected when emitting from scratch + ArgumentCaptor<KafkaSpoutMessageId> messageIds = emitMaxUncommittedOffsetsMessagesAndCheckNoMoreAreEmitted(numMessages); + reset(collector); + + //Fail all emitted messages except the last one. Try to commit. + List<KafkaSpoutMessageId> messageIdList = messageIds.getAllValues(); + for (int i = 0; i < messageIdList.size() - 1; i++) { + spout.fail(messageIdList.get(i)); + } + spout.ack(messageIdList.get(messageIdList.size() - 1)); + Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS); + spout.nextTuple(); + + //Now check that the spout will not emit anything else since nothing has been committed + for (int i = 0; i < numMessages; i++) { + spout.nextTuple(); + } + + verify(collector, times(0)).emit( + anyString(), + anyList(), + anyObject()); + } + } + + private void failAllExceptTheFirstMessageThenCommit(ArgumentCaptor<KafkaSpoutMessageId> messageIds) { + //Fail all emitted messages except the first. Commit the first. + List<KafkaSpoutMessageId> messageIdList = messageIds.getAllValues(); + for (int i = 1; i < messageIdList.size(); i++) { + spout.fail(messageIdList.get(i)); + } + spout.ack(messageIdList.get(0)); + Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS); + spout.nextTuple(); + } + + @Test + public void testNextTupleWillNotEmitMoreThanMaxUncommittedOffsetsPlusMaxPollRecordsMessages() throws Exception { + /* + For each partition the spout is allowed to retry all tuples between the committed offset, and maxUncommittedOffsets ahead. + It is not allowed to retry tuples past that limit. + This makes the actual limit per partition maxUncommittedOffsets + maxPollRecords - 1, + reached if the tuple at the maxUncommittedOffsets limit is the earliest retriable tuple, + or if the spout is 1 tuple below the limit, and receives a full maxPollRecords tuples in the poll. + */ + + try (Time.SimulatedTime simulatedTime = new Time.SimulatedTime()) { + //First check that maxUncommittedOffsets is respected when emitting from scratch + ArgumentCaptor<KafkaSpoutMessageId> messageIds = emitMaxUncommittedOffsetsMessagesAndCheckNoMoreAreEmitted(numMessages); + reset(collector); + + //Fail only the last tuple + List<KafkaSpoutMessageId> messageIdList = messageIds.getAllValues(); + KafkaSpoutMessageId failedMessageId = messageIdList.get(messageIdList.size() - 1); + spout.fail(failedMessageId); + + //Offset 0 to maxUncommittedOffsets - 2 are pending, maxUncommittedOffsets - 1 is failed but not retriable + //The spout should not emit any more tuples. + spout.nextTuple(); + verify(collector, never()).emit( + anyString(), + anyList(), + any(KafkaSpoutMessageId.class)); + + //Allow the failed record to retry + Time.advanceTimeSecs(initialRetryDelaySecs); + for (int i = 0; i < maxPollRecords; i++) { + spout.nextTuple(); + } + ArgumentCaptor<KafkaSpoutMessageId> secondRunMessageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); + verify(collector, times(maxPollRecords)).emit( + anyString(), + anyList(), + secondRunMessageIds.capture()); + reset(collector); + assertThat(secondRunMessageIds.getAllValues().get(0), is(failedMessageId)); + + //There should now be maxUncommittedOffsets + maxPollRecords emitted in all. + //Fail the last emitted tuple and verify that the spout won't retry it because it's above the emit limit. + spout.fail(secondRunMessageIds.getAllValues().get(secondRunMessageIds.getAllValues().size() - 1)); + Time.advanceTimeSecs(initialRetryDelaySecs); + spout.nextTuple(); + verify(collector, never()).emit(anyString(), anyList(), any(KafkaSpoutMessageId.class)); + } + } + + @Test + public void testNextTupleWillAllowRetryForTuplesBelowEmitLimit() throws Exception { + /* + For each partition the spout is allowed to retry all tuples between the committed offset, and maxUncommittedOffsets ahead. + It must retry tuples within that limit, even if more tuples were emitted. + */ + try (Time.SimulatedTime simulatedTime = new Time.SimulatedTime()) { + //First check that maxUncommittedOffsets is respected when emitting from scratch + ArgumentCaptor<KafkaSpoutMessageId> messageIds = emitMaxUncommittedOffsetsMessagesAndCheckNoMoreAreEmitted(numMessages); + reset(collector); + + failAllExceptTheFirstMessageThenCommit(messageIds); + + //Offset 0 is committed, 1 to maxUncommittedOffsets - 1 are failed but not retriable + //The spout should now emit another maxPollRecords messages + //This is allowed because the committed message brings the numUncommittedOffsets below the cap + for (int i = 0; i < maxUncommittedOffsets; i++) { + spout.nextTuple(); + } + + ArgumentCaptor<KafkaSpoutMessageId> secondRunMessageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); + verify(collector, times(maxPollRecords)).emit( + anyString(), + anyList(), + secondRunMessageIds.capture()); + reset(collector); + + List<Long> firstRunOffsets = new ArrayList<>(); + for (KafkaSpoutMessageId msgId : messageIds.getAllValues()) { + firstRunOffsets.add(msgId.offset()); + } + List<Long> secondRunOffsets = new ArrayList<>(); + for (KafkaSpoutMessageId msgId : secondRunMessageIds.getAllValues()) { + secondRunOffsets.add(msgId.offset()); + } + assertThat("Expected the newly emitted messages to have no overlap with the first batch", secondRunOffsets.removeAll(firstRunOffsets), is(false)); + + //Offset 0 is committed, 1 to maxUncommittedOffsets-1 are failed, maxUncommittedOffsets to maxUncommittedOffsets + maxPollRecords-1 are emitted + //Fail the last tuples so only offset 0 is not failed. + //Advance time so the failed tuples become ready for retry, and check that the spout will emit retriable tuples + //for all the failed tuples that are within maxUncommittedOffsets tuples of the committed offset + //This means 1 to maxUncommitteddOffsets, but not maxUncommittedOffsets+1...maxUncommittedOffsets+maxPollRecords-1 + for(KafkaSpoutMessageId msgId : secondRunMessageIds.getAllValues()) { + spout.fail(msgId); + } + Time.advanceTimeSecs(initialRetryDelaySecs); + for (int i = 0; i < numMessages; i++) { + spout.nextTuple(); + } + ArgumentCaptor<KafkaSpoutMessageId> thirdRunMessageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); + verify(collector, times(maxUncommittedOffsets)).emit( + anyString(), + anyList(), + thirdRunMessageIds.capture()); + reset(collector); + + List<Long> thirdRunOffsets = new ArrayList<>(); + for (KafkaSpoutMessageId msgId : thirdRunMessageIds.getAllValues()) { + thirdRunOffsets.add(msgId.offset()); + } + + assertThat("Expected the emitted messages to be retries of the failed tuples from the first batch, plus the first failed tuple from the second batch", thirdRunOffsets, everyItem(either(isIn(firstRunOffsets)).or(is(secondRunMessageIds.getAllValues().get(0).offset())))); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedSubscriptionTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedSubscriptionTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedSubscriptionTest.java new file mode 100644 index 0000000..56bfddf --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedSubscriptionTest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collection; + +import static org.junit.Assert.*; + +public class NamedSubscriptionTest { + + private NamedSubscription namedSubscription; + + @Test + public void testGetTopicsStringWithOneTopic() throws Exception { + Collection<String> topics = new ArrayList<>(); + topics.add("test-topic1"); + + namedSubscription = new NamedSubscription(topics); + + Assert.assertEquals(namedSubscription.getTopicsString(), "test-topic1"); + } + + @Test + public void testGetTopicsStringWithManyTopics() throws Exception { + Collection<String> topics = new ArrayList<>(); + topics.add("test-topic1"); + topics.add("test-topic2"); + topics.add("test-topic3"); + + namedSubscription = new NamedSubscription(topics); + + Assert.assertEquals(namedSubscription.getTopicsString(), "test-topic1,test-topic2,test-topic3"); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java new file mode 100644 index 0000000..fe3325c --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java @@ -0,0 +1,70 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import org.apache.storm.kafka.spout.NamedTopicFilter; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.junit.Before; +import org.junit.Test; + +public class NamedTopicFilterTest { + + private KafkaConsumer<?, ?> consumerMock; + + @Before + public void setUp() { + consumerMock = mock(KafkaConsumer.class); + } + + @Test + public void testFilter() { + String matchingTopicOne = "test-1"; + String matchingTopicTwo = "test-11"; + String unmatchedTopic = "unmatched"; + + NamedTopicFilter filter = new NamedTopicFilter(matchingTopicOne, matchingTopicTwo); + + when(consumerMock.partitionsFor(matchingTopicOne)).thenReturn(Collections.singletonList(createPartitionInfo(matchingTopicOne, 0))); + List<PartitionInfo> partitionTwoPartitions = new ArrayList<>(); + partitionTwoPartitions.add(createPartitionInfo(matchingTopicTwo, 0)); + partitionTwoPartitions.add(createPartitionInfo(matchingTopicTwo, 1)); + when(consumerMock.partitionsFor(matchingTopicTwo)).thenReturn(partitionTwoPartitions); + when(consumerMock.partitionsFor(unmatchedTopic)).thenReturn(Collections.singletonList(createPartitionInfo(unmatchedTopic, 0))); + + List<TopicPartition> matchedPartitions = filter.getFilteredTopicPartitions(consumerMock); + + assertThat("Expected filter to pass only topics with exact name matches", matchedPartitions, + containsInAnyOrder(new TopicPartition(matchingTopicOne, 0), new TopicPartition(matchingTopicTwo, 0), new TopicPartition(matchingTopicTwo, 1))); + + } + + private PartitionInfo createPartitionInfo(String topic, int partition) { + return new PartitionInfo(topic, partition, null, null, null); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java new file mode 100644 index 0000000..335ab31 --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java @@ -0,0 +1,75 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import org.apache.storm.kafka.spout.PatternTopicFilter; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.junit.Before; +import org.junit.Test; + +public class PatternTopicFilterTest { + + private KafkaConsumer<?, ?> consumerMock; + + @Before + public void setUp(){ + consumerMock = mock(KafkaConsumer.class); + } + + @Test + public void testFilter() { + Pattern pattern = Pattern.compile("test-\\d+"); + PatternTopicFilter filter = new PatternTopicFilter(pattern); + + String matchingTopicOne = "test-1"; + String matchingTopicTwo = "test-11"; + String unmatchedTopic = "unmatched"; + + Map<String, List<PartitionInfo>> allTopics = new HashMap<>(); + allTopics.put(matchingTopicOne, Collections.singletonList(createPartitionInfo(matchingTopicOne, 0))); + List<PartitionInfo> testTwoPartitions = new ArrayList<>(); + testTwoPartitions.add(createPartitionInfo(matchingTopicTwo, 0)); + testTwoPartitions.add(createPartitionInfo(matchingTopicTwo, 1)); + allTopics.put(matchingTopicTwo, testTwoPartitions); + allTopics.put(unmatchedTopic, Collections.singletonList(createPartitionInfo(unmatchedTopic, 0))); + + when(consumerMock.listTopics()).thenReturn(allTopics); + + List<TopicPartition> matchedPartitions = filter.getFilteredTopicPartitions(consumerMock); + + assertThat("Expected topic partitions matching the pattern to be passed by the filter", matchedPartitions, + containsInAnyOrder(new TopicPartition(matchingTopicOne, 0), new TopicPartition(matchingTopicTwo, 0), new TopicPartition(matchingTopicTwo, 1))); + } + + private PartitionInfo createPartitionInfo(String topic, int partition) { + return new PartitionInfo(topic, partition, null, null, null); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaUnitSetupHelper.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaUnitSetupHelper.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaUnitSetupHelper.java new file mode 100644 index 0000000..f5b9423 --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaUnitSetupHelper.java @@ -0,0 +1,89 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Collections; +import java.util.Map; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.KafkaUnit; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.mockito.ArgumentCaptor; + +public class SingleTopicKafkaUnitSetupHelper { + + /** + * Using the given KafkaUnit instance, put some messages in the specified topic. + * + * @param kafkaUnit The KafkaUnit instance to use + * @param topicName The topic to produce messages for + * @param msgCount The number of messages to produce + */ + public static void populateTopicData(KafkaUnit kafkaUnit, String topicName, int msgCount) throws Exception { + kafkaUnit.createTopic(topicName); + + for (int i = 0; i < msgCount; i++) { + ProducerRecord<String, String> producerRecord = new ProducerRecord<>( + topicName, Integer.toString(i), + Integer.toString(i)); + kafkaUnit.sendMessage(producerRecord); + } + } + + /* + * Asserts that commitSync has been called once, + * that there are only commits on one topic, + * and that the committed offset covers messageCount messages + */ + public static <K, V> void verifyAllMessagesCommitted(KafkaConsumer<K, V> consumerSpy, + ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture, long messageCount) { + verify(consumerSpy, times(1)).commitSync(commitCapture.capture()); + Map<TopicPartition, OffsetAndMetadata> commits = commitCapture.getValue(); + assertThat("Expected commits for only one topic partition", commits.entrySet().size(), is(1)); + OffsetAndMetadata offset = commits.entrySet().iterator().next().getValue(); + assertThat("Expected committed offset to cover all emitted messages", offset.offset(), is(messageCount)); + } + + /** + * Open and activate a KafkaSpout that acts as a single-task/executor spout. + * + * @param <K> Kafka key type + * @param <V> Kafka value type + * @param spout The spout to prepare + * @param topoConf The topoConf + * @param topoContextMock The TopologyContext mock + * @param collectorMock The output collector mock + */ + public static <K, V> void initializeSpout(KafkaSpout<K, V> spout, Map<String, Object> topoConf, TopologyContext topoContextMock, + SpoutOutputCollector collectorMock) throws Exception { + when(topoContextMock.getThisTaskIndex()).thenReturn(0); + when(topoContextMock.getComponentTasks(anyString())).thenReturn(Collections.singletonList(0)); + spout.open(topoConf, topoContextMock, collectorMock); + spout.activate(); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java new file mode 100644 index 0000000..3aad61e --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java @@ -0,0 +1,171 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.anyList; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mockingDetails; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.mockito.ArgumentCaptor; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +public class SpoutWithMockedConsumerSetupHelper { + + /** + * Creates, opens and activates a KafkaSpout using a mocked consumer. The subscription should be a mock object, since this method skips + * the subscription and instead just configures the mocked consumer to act as if the specified partitions are assigned to it. + * + * @param <K> The Kafka key type + * @param <V> The Kafka value type + * @param spoutConfig The spout config to use + * @param topoConf The topo conf to pass to the spout + * @param contextMock The topo context to pass to the spout + * @param collectorMock The mocked collector to pass to the spout + * @param consumerMock The mocked consumer + * @param assignedPartitions The partitions to assign to this spout. The consumer will act like these partitions are assigned to it. + * @return The spout + */ + public static <K, V> KafkaSpout<K, V> setupSpout(KafkaSpoutConfig<K, V> spoutConfig, Map<String, Object> topoConf, + TopologyContext contextMock, SpoutOutputCollector collectorMock, final KafkaConsumer<K, V> consumerMock, TopicPartition... assignedPartitions) { + Subscription subscriptionMock = spoutConfig.getSubscription(); + if (!mockingDetails(subscriptionMock).isMock()) { + throw new IllegalStateException("Use a mocked subscription when using this method, it helps avoid complex stubbing"); + } + + final Set<TopicPartition> assignedPartitionsSet = new HashSet<>(Arrays.asList(assignedPartitions)); + + when(consumerMock.assignment()).thenReturn(assignedPartitionsSet); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + ConsumerRebalanceListener listener = (ConsumerRebalanceListener) invocation.getArguments()[1]; + listener.onPartitionsAssigned(assignedPartitionsSet); + return null; + } + + }).when(subscriptionMock).subscribe(any(KafkaConsumer.class), any(ConsumerRebalanceListener.class), any(TopologyContext.class)); + + KafkaConsumerFactory<K, V> consumerFactory = new KafkaConsumerFactory<K, V>() { + @Override + public KafkaConsumer<K, V> createConsumer(KafkaSpoutConfig<K, V> kafkaSpoutConfig) { + return consumerMock; + } + }; + KafkaSpout<K, V> spout = new KafkaSpout<>(spoutConfig, consumerFactory); + + spout.open(topoConf, contextMock, collectorMock); + spout.activate(); + + return spout; + } + + /** + * Creates sequential dummy records + * + * @param <K> The Kafka key type + * @param <V> The Kafka value type + * @param topic The topic partition to create records for + * @param startingOffset The starting offset of the records + * @param numRecords The number of records to create + * @return The dummy records + */ + public static <K, V> List<ConsumerRecord<K, V>> createRecords(TopicPartition topic, long startingOffset, int numRecords) { + List<ConsumerRecord<K, V>> recordsForPartition = new ArrayList<>(); + for (int i = 0; i < numRecords; i++) { + recordsForPartition.add(new ConsumerRecord<K, V>(topic.topic(), topic.partition(), startingOffset + i, null, null)); + } + return recordsForPartition; + } + + /** + * Creates messages for the input offsets, emits the messages by calling nextTuple once per offset and returns the captured message ids + * + * @param <K> The Kafka key type + * @param <V> The Kafka value type + * @param spout The spout + * @param consumerMock The consumer used by the spout + * @param expectedEmits The number of expected emits + * @param collectorMock The collector used by the spout + * @param partition The partition to emit messages on + * @param offsetsToEmit The offsets to emit + * @return The message ids emitted by the spout during the nextTuple calls + */ + public static <K, V> List<KafkaSpoutMessageId> pollAndEmit(KafkaSpout<K, V> spout, KafkaConsumer<K, V> consumerMock, int expectedEmits, SpoutOutputCollector collectorMock, TopicPartition partition, int... offsetsToEmit) { + return pollAndEmit(spout, consumerMock, expectedEmits, collectorMock, Collections.singletonMap(partition, offsetsToEmit)); + } + + /** + * Creates messages for the input offsets, emits the messages by calling nextTuple once per offset and returns the captured message ids + * + * @param <K> The Kafka key type + * @param <V> The Kafka value type + * @param spout The spout + * @param consumerMock The consumer used by the spout + * @param collectorMock The collector used by the spout + * @param offsetsToEmit The offsets to emit per partition + * @return The message ids emitted by the spout during the nextTuple calls + */ + public static <K, V> List<KafkaSpoutMessageId> pollAndEmit(KafkaSpout<K, V> spout, KafkaConsumer<K, V> consumerMock, int expectedEmits, SpoutOutputCollector collectorMock, Map<TopicPartition, int[]> offsetsToEmit) { + int totalOffsets = 0; + Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new HashMap<>(); + for (Entry<TopicPartition, int[]> entry : offsetsToEmit.entrySet()) { + TopicPartition tp = entry.getKey(); + List<ConsumerRecord<K, V>> tpRecords = new ArrayList<>(); + for (Integer offset : entry.getValue()) { + tpRecords.add(new ConsumerRecord<K, V>(tp.topic(), tp.partition(), offset, null, null)); + totalOffsets++; + } + records.put(tp, tpRecords); + } + + when(consumerMock.poll(anyLong())) + .thenReturn(new ConsumerRecords<>(records)); + + for (int i = 0; i < totalOffsets; i++) { + spout.nextTuple(); + } + + ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); + verify(collectorMock, times(expectedEmits)).emit(anyString(), anyList(), messageIds.capture()); + return messageIds.getAllValues(); + } + +}