http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/DefaultRecordTranslatorTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/DefaultRecordTranslatorTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/DefaultRecordTranslatorTest.java new file mode 100644 index 0000000..681953d --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/DefaultRecordTranslatorTest.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout; + +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.storm.tuple.Fields; +import org.junit.Test; + +public class DefaultRecordTranslatorTest { + @Test + public void testBasic() { + DefaultRecordTranslator<String, String> trans = new DefaultRecordTranslator<>(); + assertEquals(Arrays.asList("default"), trans.streams()); + assertEquals(new Fields("topic", "partition", "offset", "key", "value"), trans.getFieldsFor("default")); + ConsumerRecord<String, String> cr = new ConsumerRecord<>("TOPIC", 100, 100, "THE KEY", "THE VALUE"); + assertEquals(Arrays.asList("TOPIC", 100, 100l, "THE KEY", "THE VALUE"), trans.apply(cr)); + } +}
http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java new file mode 100644 index 0000000..0467383 --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.KafkaUnitRule; +import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration; +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Time; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.MockitoAnnotations; + +import java.util.HashMap; +import java.util.Map; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; + +public abstract class KafkaSpoutAbstractTest { + @Rule + public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule(); + + final TopologyContext topologyContext = mock(TopologyContext.class); + final Map<String, Object> conf = new HashMap<>(); + final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class); + final long commitOffsetPeriodMs; + + KafkaConsumer<String, String> consumerSpy; + KafkaSpout<String, String> spout; + + @Captor + ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture; + private Time.SimulatedTime simulatedTime; + private KafkaSpoutConfig<String, String> spoutConfig; + + /** + * This constructor should be called by the subclass' default constructor with the desired value + * @param commitOffsetPeriodMs commit offset period to be used in commit and verification of messages committed + */ + protected KafkaSpoutAbstractTest(long commitOffsetPeriodMs) { + this.commitOffsetPeriodMs = commitOffsetPeriodMs; + } + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + + spoutConfig = createSpoutConfig(); + + consumerSpy = createConsumerSpy(); + + spout = new KafkaSpout<>(spoutConfig, createConsumerFactory()); + + simulatedTime = new Time.SimulatedTime(); + } + + private KafkaConsumerFactory<String, String> createConsumerFactory() { + + return new KafkaConsumerFactory<String, String>() { + @Override + public KafkaConsumer<String, String> createConsumer(KafkaSpoutConfig<String, String> kafkaSpoutConfig) { + return consumerSpy; + } + + }; + } + + KafkaConsumer<String, String> createConsumerSpy() { + return spy(new KafkaConsumerFactoryDefault<String, String>().createConsumer(spoutConfig)); + } + + @After + public void tearDown() throws Exception { + simulatedTime.close(); + } + + abstract KafkaSpoutConfig<String, String> createSpoutConfig(); + + void prepareSpout(int messageCount) throws Exception { + SingleTopicKafkaUnitSetupHelper.populateTopicData(kafkaUnitRule.getKafkaUnit(), SingleTopicKafkaSpoutConfiguration.TOPIC, messageCount); + SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, topologyContext, collectorMock); + } + + /** + * Helper method to in sequence do: + * <li> + * <ul>spout.nexTuple()</ul> + * <ul>verify messageId</ul> + * <ul>spout.ack(msgId)</ul> + * <ul>reset(collector) to be able to reuse mock</ul> + * </li> + * + * @param offset offset of message to be verified + * @return {@link ArgumentCaptor} of the messageId verified + */ + ArgumentCaptor<Object> nextTuple_verifyEmitted_ack_resetCollector(int offset) { + spout.nextTuple(); + + ArgumentCaptor<Object> messageId = verifyMessageEmitted(offset); + + spout.ack(messageId.getValue()); + + reset(collectorMock); + + return messageId; + } + + // offset and messageId are used interchangeably + ArgumentCaptor<Object> verifyMessageEmitted(int offset) { + final ArgumentCaptor<Object> messageId = ArgumentCaptor.forClass(Object.class); + + verify(collectorMock).emit( + eq(SingleTopicKafkaSpoutConfiguration.STREAM), + eq(new Values(SingleTopicKafkaSpoutConfiguration.TOPIC, + Integer.toString(offset), + Integer.toString(offset))), + messageId.capture()); + + return messageId; + } + + void commitAndVerifyAllMessagesCommitted(long msgCount) { + // reset commit timer such that commit happens on next call to nextTuple() + Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS); + + //Commit offsets + spout.nextTuple(); + + verifyAllMessagesCommitted(msgCount); + } + + /* + * Asserts that commitSync has been called once, + * that there are only commits on one topic, + * and that the committed offset covers messageCount messages + */ + void verifyAllMessagesCommitted(long messageCount) { + verify(consumerSpy).commitSync(commitCapture.capture()); + + final Map<TopicPartition, OffsetAndMetadata> commits = commitCapture.getValue(); + assertThat("Expected commits for only one topic partition", commits.entrySet().size(), is(1)); + + OffsetAndMetadata offset = commits.entrySet().iterator().next().getValue(); + assertThat("Expected committed offset to cover all emitted messages", offset.offset(), is(messageCount)); + + reset(consumerSpy); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java new file mode 100644 index 0000000..90e906b --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +import java.util.HashMap; +import java.util.Map; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy; +import org.hamcrest.CoreMatchers; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +public class KafkaSpoutConfigTest { + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testBasic() { + KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic").build(); + assertEquals(FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST, conf.getFirstPollOffsetStrategy()); + assertNull(conf.getConsumerGroupId()); + assertTrue(conf.getTranslator() instanceof DefaultRecordTranslator); + HashMap<String, Object> expected = new HashMap<>(); + expected.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:1234"); + expected.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + expected.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + expected.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + expected.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + assertEquals(expected, conf.getKafkaProps()); + assertEquals(KafkaSpoutConfig.DEFAULT_METRICS_TIME_BUCKET_SIZE_SECONDS, conf.getMetricsTimeBucketSizeInSecs()); + } + + @Test + public void testSetEmitNullTuplesToTrue() { + final KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic") + .setEmitNullTuples(true) + .build(); + + assertTrue("Failed to set emit null tuples to true", conf.isEmitNullTuples()); + } + + @Test + public void testShouldNotChangeAutoOffsetResetPolicyWhenNotUsingAtLeastOnce() { + KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic") + .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) + .build(); + + assertThat("When at-least-once is not specified, the spout should use the Kafka default auto offset reset policy", + conf.getKafkaProps().get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), nullValue()); + } + + @Test + public void testWillRespectExplicitAutoOffsetResetPolicy() { + KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic") + .setProp(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none") + .build(); + + assertThat("Should allow users to pick a different auto offset reset policy than the one recommended for the at-least-once processing guarantee", + (String)conf.getKafkaProps().get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), is("none")); + } + + @Test + public void testCanConfigureWithExplicitTrueBooleanAutoCommitMode() { + /* + * Since adding setProcessingGuarantee to KafkaSpoutConfig we don't want users to set "enable.auto.commit" in the consumer config, + * because setting the processing guarantee will do it automatically. For backward compatibility we need to be able to handle the + * property being set anyway for a few releases, and try to set a processing guarantee that corresponds to the property. + */ + + KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic") + .setProp(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true) + .build(); + + assertThat("When setting enable auto commit to true explicitly the spout should use the 'none' processing guarantee", + conf.getProcessingGuarantee(), is(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE)); + } + + @Test + public void testCanConfigureWithExplicitFalseBooleanAutoCommitMode() { + KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic") + .setProp(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false) + .build(); + + assertThat("When setting enable auto commit to false explicitly the spout should use the 'at-least-once' processing guarantee", + conf.getProcessingGuarantee(), is(KafkaSpoutConfig.ProcessingGuarantee.AT_LEAST_ONCE)); + } + + @Test + public void testCanConfigureWithExplicitTrueStringAutoCommitMode() { + KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic") + .setProp(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true") + .build(); + + assertThat("When setting enable auto commit to true explicitly the spout should use the 'none' processing guarantee", + conf.getProcessingGuarantee(), is(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE)); + } + + @Test + public void testCanConfigureWithExplicitFalseStringAutoCommitMode() { + KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic") + .setProp(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") + .build(); + + assertThat("When setting enable auto commit explicitly to false the spout should use the 'at-least-once' processing guarantee", + conf.getProcessingGuarantee(), is(KafkaSpoutConfig.ProcessingGuarantee.AT_LEAST_ONCE)); + } + + @Test + public void testCanGetKeyDeserializerWhenUsingDefaultBuilder() { + KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic") + .build(); + + assertThat("When using the default builder methods, the key deserializer should default to StringDeserializer", + conf.getKeyDeserializer(), instanceOf(StringDeserializer.class)); + } + + @Test + public void testCanGetValueDeserializerWhenUsingDefaultBuilder() { + KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic") + .build(); + + assertThat("When using the default builder methods, the value deserializer should default to StringDeserializer", + conf.getValueDeserializer(), instanceOf(StringDeserializer.class)); + } + + @Test + public void testCanOverrideDeprecatedDeserializerClassWithKafkaProps() { + KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic") + .setKey(StringDeserializer.class) + .setValue(StringDeserializer.class) + .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class) + .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class) + .build(); + + assertThat("The last set key deserializer should be used, regardless of how it is set", + conf.getKafkaProps().get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG), CoreMatchers.<Object>equalTo(ByteArrayDeserializer.class)); + assertThat("The last set value deserializer should be used, regardless of how it is set", + conf.getKafkaProps().get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG), CoreMatchers.<Object>equalTo(ByteArrayDeserializer.class)); + } + + private static class SerializableStringDeserializer implements SerializableDeserializer { + + private final StringDeserializer delegate = new StringDeserializer(); + + @Override + public void configure(Map configs, boolean isKey) { + delegate.configure(configs, isKey); + } + + @Override + public Object deserialize(String topic, byte[] data) { + return delegate.deserialize(topic, data); + } + + @Override + public void close() { + delegate.close(); + } + } + + @Test + public void testCanOverrideDeprecatedDeserializerInstanceWithKafkaProps() { + KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic") + .setKey(new SerializableStringDeserializer()) + .setValue(new SerializableStringDeserializer()) + .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class) + .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class) + .build(); + + assertThat("The last set key deserializer should be used, regardless of how it is set", + conf.getKafkaProps().get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG), CoreMatchers.<Object>equalTo(ByteArrayDeserializer.class)); + assertThat("The last set value deserializer should be used, regardless of how it is set", + conf.getKafkaProps().get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG), CoreMatchers.<Object>equalTo(ByteArrayDeserializer.class)); + } + + @Test + public void testCanOverrideKafkaPropsWithDeprecatedDeserializerSetter() { + KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic") + .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class) + .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class) + .setKey(new SerializableStringDeserializer()) + .setValue(new SerializableStringDeserializer()) + .build(); + + assertThat("The last set key deserializer should be used, regardless of how it is set", + conf.getKafkaProps().get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG), CoreMatchers.<Object>equalTo(SerializableStringDeserializer.class)); + assertThat("The last set value deserializer should be used, regardless of how it is set", + conf.getKafkaProps().get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG), CoreMatchers.<Object>equalTo(SerializableStringDeserializer.class)); + } + + @Test + public void testCanMixOldAndNewDeserializerSetter() { + KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic") + .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class) + .setKey(new SerializableStringDeserializer()) + .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class) + .setValue(new SerializableStringDeserializer()) + .build(); + + assertThat("The last set key deserializer should be used, regardless of how it is set", + conf.getKafkaProps().get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG), CoreMatchers.<Object>equalTo(ByteArrayDeserializer.class)); + assertThat("The last set value deserializer should be used, regardless of how it is set", + conf.getKafkaProps().get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG), CoreMatchers.<Object>equalTo(SerializableStringDeserializer.class)); + } + + @Test + public void testMetricsTimeBucketSizeInSecs() { + KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic") + .setMetricsTimeBucketSizeInSecs(100) + .build(); + + assertEquals(100, conf.getMetricsTimeBucketSizeInSecs()); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java new file mode 100755 index 0000000..dbba04b --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java @@ -0,0 +1,214 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout; + +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +import static org.mockito.Matchers.anyList; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.never; + +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Time.SimulatedTime; +import org.junit.Before; +import org.mockito.InOrder; + +import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder; +import static org.mockito.Matchers.eq; + +public class KafkaSpoutEmitTest { + + private final long offsetCommitPeriodMs = 2_000; + private final TopologyContext contextMock = mock(TopologyContext.class); + private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class); + private final Map<String, Object> conf = new HashMap<>(); + private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1); + private KafkaConsumer<String, String> consumerMock; + private KafkaSpoutConfig<String, String> spoutConfig; + + @Before + public void setUp() { + spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1) + .setOffsetCommitPeriodMs(offsetCommitPeriodMs) + .build(); + consumerMock = mock(KafkaConsumer.class); + } + + @Test + public void testNextTupleEmitsAtMostOneTuple() { + //The spout should emit at most one message per call to nextTuple + //This is necessary for Storm to be able to throttle the spout according to maxSpoutPending + KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition); + Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>(); + records.put(partition, SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, 10)); + + when(consumerMock.poll(anyLong())) + .thenReturn(new ConsumerRecords<>(records)); + + spout.nextTuple(); + + verify(collectorMock, times(1)).emit(anyString(), anyList(), anyObject()); + } + + @Test + public void testNextTupleEmitsFailedMessagesEvenWhenMaxUncommittedOffsetsIsExceeded() throws IOException { + //The spout must reemit failed messages waiting for retry even if it is not allowed to poll for new messages due to maxUncommittedOffsets being exceeded + + //Emit maxUncommittedOffsets messages, and fail all of them. Then ensure that the spout will retry them when the retry backoff has passed + try (SimulatedTime simulatedTime = new SimulatedTime()) { + KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition); + Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>(); + int numRecords = spoutConfig.getMaxUncommittedOffsets(); + //This is cheating a bit since maxPollRecords would normally spread this across multiple polls + records.put(partition, SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, numRecords)); + + when(consumerMock.poll(anyLong())) + .thenReturn(new ConsumerRecords<>(records)); + + for (int i = 0; i < numRecords; i++) { + spout.nextTuple(); + } + + ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); + verify(collectorMock, times(numRecords)).emit(anyString(), anyList(), messageIds.capture()); + + for (KafkaSpoutMessageId messageId : messageIds.getAllValues()) { + spout.fail(messageId); + } + + reset(collectorMock); + + Time.advanceTime(50); + //No backoff for test retry service, just check that messages will retry immediately + for (int i = 0; i < numRecords; i++) { + spout.nextTuple(); + } + + ArgumentCaptor<KafkaSpoutMessageId> retryMessageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); + verify(collectorMock, times(numRecords)).emit(anyString(), anyList(), retryMessageIds.capture()); + + //Verify that the poll started at the earliest retriable tuple offset + List<Long> failedOffsets = new ArrayList<>(); + for (KafkaSpoutMessageId msgId : messageIds.getAllValues()) { + failedOffsets.add(msgId.offset()); + } + InOrder inOrder = inOrder(consumerMock); + inOrder.verify(consumerMock).seek(partition, failedOffsets.get(0)); + inOrder.verify(consumerMock).poll(anyLong()); + } + } + + @Test + public void testSpoutWillSkipPartitionsAtTheMaxUncommittedOffsetsLimit() { + //This verifies that partitions can't prevent each other from retrying tuples due to the maxUncommittedOffsets limit. + try (SimulatedTime simulatedTime = new SimulatedTime()) { + TopicPartition partitionTwo = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 2); + KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition, partitionTwo); + Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>(); + //This is cheating a bit since maxPollRecords would normally spread this across multiple polls + records.put(partition, SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, spoutConfig.getMaxUncommittedOffsets())); + records.put(partitionTwo, SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partitionTwo, 0, spoutConfig.getMaxUncommittedOffsets() + 1)); + int numMessages = spoutConfig.getMaxUncommittedOffsets()*2 + 1; + + when(consumerMock.poll(anyLong())) + .thenReturn(new ConsumerRecords<>(records)); + + for (int i = 0; i < numMessages; i++) { + spout.nextTuple(); + } + + ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); + verify(collectorMock, times(numMessages)).emit(anyString(), anyList(), messageIds.capture()); + + //Now fail a tuple on partition one and verify that it is allowed to retry, because the failed tuple is below the maxUncommittedOffsets limit + KafkaSpoutMessageId failedMessageIdPartitionOne = null; + for (KafkaSpoutMessageId msgId : messageIds.getAllValues()) { + if (msgId.partition() == partition.partition()) { + failedMessageIdPartitionOne = msgId; + break; + } + } + + spout.fail(failedMessageIdPartitionOne); + + //Also fail the last tuple from partition two. Since the failed tuple is beyond the maxUncommittedOffsets limit, it should not be retried until earlier messages are acked. + KafkaSpoutMessageId failedMessageIdPartitionTwo = null; + for (KafkaSpoutMessageId msgId: messageIds.getAllValues()) { + if (msgId.partition() == partitionTwo.partition()) { + if (failedMessageIdPartitionTwo != null) { + if (msgId.offset() >= failedMessageIdPartitionTwo.offset()) { + failedMessageIdPartitionTwo = msgId; + } + } else { + failedMessageIdPartitionTwo = msgId; + } + } + } + + spout.fail(failedMessageIdPartitionTwo); + + reset(collectorMock); + + Time.advanceTime(50); + when(consumerMock.poll(anyLong())) + .thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition, SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, failedMessageIdPartitionOne.offset(), 1)))); + + spout.nextTuple(); + + verify(collectorMock, times(1)).emit(anyString(), anyList(), anyObject()); + + InOrder inOrder = inOrder(consumerMock); + inOrder.verify(consumerMock).seek(partition, failedMessageIdPartitionOne.offset()); + //Should not seek on the paused partition + inOrder.verify(consumerMock, never()).seek(eq(partitionTwo), anyLong()); + inOrder.verify(consumerMock).pause(Collections.singleton(partitionTwo)); + inOrder.verify(consumerMock).poll(anyLong()); + inOrder.verify(consumerMock).resume(Collections.singleton(partitionTwo)); + + reset(collectorMock); + + //Now also check that no more tuples are polled for, since both partitions are at their limits + spout.nextTuple(); + + verify(collectorMock, never()).emit(anyString(), anyList(), anyObject()); + } + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java new file mode 100644 index 0000000..09f7fc5 --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java @@ -0,0 +1,223 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout; + +import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Time.SimulatedTime; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.InOrder; +import org.mockito.MockitoAnnotations; + +import static org.hamcrest.CoreMatchers.is; + +import static org.hamcrest.Matchers.hasKey; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.anyList; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration; + +public class KafkaSpoutLogCompactionSupportTest { + + private final long offsetCommitPeriodMs = 2_000; + private final TopologyContext contextMock = mock(TopologyContext.class); + private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class); + private final Map<String, Object> conf = new HashMap<>(); + private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1); + private KafkaConsumer<String, String> consumerMock; + private KafkaSpoutConfig<String, String> spoutConfig; + + @Captor + private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1) + .setOffsetCommitPeriodMs(offsetCommitPeriodMs) + .build(); + consumerMock = mock(KafkaConsumer.class); + } + + @Test + public void testCommitSuccessWithOffsetVoids() { + //Verify that the commit logic can handle offset voids due to log compaction + try (SimulatedTime simulatedTime = new SimulatedTime()) { + KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition); + Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>(); + List<ConsumerRecord<String, String>> recordsForPartition = new ArrayList<>(); + // Offsets emitted are 0,1,2,3,4,<void>,8,9 + recordsForPartition.addAll(SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, 5)); + recordsForPartition.addAll(SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 8, 2)); + records.put(partition, recordsForPartition); + + when(consumerMock.poll(anyLong())) + .thenReturn(new ConsumerRecords<>(records)); + + for (int i = 0; i < recordsForPartition.size(); i++) { + spout.nextTuple(); + } + + ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); + verify(collectorMock, times(recordsForPartition.size())).emit(anyString(), anyList(), messageIds.capture()); + + for (KafkaSpoutMessageId messageId : messageIds.getAllValues()) { + spout.ack(messageId); + } + + // Advance time and then trigger first call to kafka consumer commit; the commit must progress to offset 9 + Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs); + when(consumerMock.poll(anyLong())) + .thenReturn(new ConsumerRecords<String, String>(Collections.<TopicPartition, List<ConsumerRecord<String, String>>>emptyMap())); + spout.nextTuple(); + + InOrder inOrder = inOrder(consumerMock); + inOrder.verify(consumerMock).commitSync(commitCapture.capture()); + inOrder.verify(consumerMock).poll(anyLong()); + + //verify that Offset 10 was last committed offset, since this is the offset the spout should resume at + Map<TopicPartition, OffsetAndMetadata> commits = commitCapture.getValue(); + assertTrue(commits.containsKey(partition)); + assertEquals(10, commits.get(partition).offset()); + } + } + + @Test + public void testWillSkipRetriableTuplesIfOffsetsAreCompactedAway() { + /* + Verify that failed offsets will only retry if the corresponding message exists. + When log compaction is enabled in Kafka it is possible that a tuple can fail, + and then be impossible to retry because the message in Kafka has been deleted. + The spout needs to quietly ack such tuples to allow commits to progress past the deleted offset. + */ + try (SimulatedTime simulatedTime = new SimulatedTime()) { + TopicPartition partitionTwo = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 2); + KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition, partitionTwo); + + List<KafkaSpoutMessageId> firstPartitionMsgIds = SpoutWithMockedConsumerSetupHelper + .pollAndEmit(spout, consumerMock, 3, collectorMock, partition, 0, 1, 2); + reset(collectorMock); + List<KafkaSpoutMessageId> secondPartitionMsgIds = SpoutWithMockedConsumerSetupHelper + .pollAndEmit(spout, consumerMock, 3, collectorMock, partitionTwo, 0, 1, 2); + reset(collectorMock); + + for(int i = 0; i < 3; i++) { + spout.fail(firstPartitionMsgIds.get(i)); + spout.fail(secondPartitionMsgIds.get(i)); + } + + Time.advanceTime(50); + + //The failed tuples are ready for retry. Make it appear like 0 and 1 on the first partition were compacted away. + //In this case the second partition acts as control to verify that we only skip past offsets that are no longer present. + Map<TopicPartition, int[]> retryOffsets = new HashMap<>(); + retryOffsets.put(partition, new int[] {2}); + retryOffsets.put(partitionTwo, new int[] {0, 1, 2}); + int expectedEmits = 4; //2 on first partition, 0-2 on second partition + List<KafkaSpoutMessageId> retryMessageIds = SpoutWithMockedConsumerSetupHelper.pollAndEmit(spout, consumerMock, expectedEmits, collectorMock, retryOffsets); + + Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs); + spout.nextTuple(); + + verify(consumerMock).commitSync(commitCapture.capture()); + Map<TopicPartition, OffsetAndMetadata> committed = commitCapture.getValue(); + assertThat(committed.keySet(), is(Collections.singleton(partition))); + assertThat("The first partition should have committed up to the first retriable tuple that is not missing", committed.get(partition).offset(), is(2L)); + + for(KafkaSpoutMessageId msgId : retryMessageIds) { + spout.ack(msgId); + } + + //The spout should now commit all the offsets, since all offsets are either acked or were missing when retrying + Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs); + spout.nextTuple(); + + verify(consumerMock, times(2)).commitSync(commitCapture.capture()); + committed = commitCapture.getValue(); + assertThat(committed, hasKey(partition)); + assertThat(committed, hasKey(partitionTwo)); + assertThat(committed.get(partition).offset(), is(3L)); + assertThat(committed.get(partitionTwo).offset(), is(3L)); + } + } + + @Test + public void testWillSkipRetriableTuplesIfOffsetsAreCompactedAwayWithoutAckingPendingTuples() { + //Demonstrate that the spout doesn't ack pending tuples when skipping compacted tuples. The pending tuples should be allowed to finish normally. + try (SimulatedTime simulatedTime = new SimulatedTime()) { + KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition); + + List<KafkaSpoutMessageId> firstPartitionMsgIds = SpoutWithMockedConsumerSetupHelper + .pollAndEmit(spout, consumerMock, 3, collectorMock, partition, 0, 1, 2); + reset(collectorMock); + + spout.fail(firstPartitionMsgIds.get(0)); + spout.fail(firstPartitionMsgIds.get(2)); + + Time.advanceTime(50); + + //The failed tuples are ready for retry. Make it appear like 0 and 1 were compacted away. + List<KafkaSpoutMessageId> retryMessageIds = SpoutWithMockedConsumerSetupHelper.pollAndEmit(spout, consumerMock, 1, collectorMock, partition, 2); + for(KafkaSpoutMessageId msgId : retryMessageIds) { + spout.ack(msgId); + } + + Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs); + spout.nextTuple(); + + verify(consumerMock).commitSync(commitCapture.capture()); + Map<TopicPartition, OffsetAndMetadata> committed = commitCapture.getValue(); + assertThat(committed.keySet(), is(Collections.singleton(partition))); + assertThat("The first partition should have committed the missing offset, but no further since the next tuple is pending", + committed.get(partition).offset(), is(1L)); + + spout.ack(firstPartitionMsgIds.get(1)); + + Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs); + spout.nextTuple(); + + verify(consumerMock, times(2)).commitSync(commitCapture.capture()); + committed = commitCapture.getValue(); + assertThat(committed.keySet(), is(Collections.singleton(partition))); + assertThat("The first partition should have committed all offsets", committed.get(partition).offset(), is(3L)); + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java new file mode 100644 index 0000000..082cc58 --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java @@ -0,0 +1,259 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.anyList; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.argThat; +import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.isNull; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetCommitCallback; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration; +import org.apache.storm.kafka.spout.internal.CommitMetadataManager; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Time.SimulatedTime; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.ArgumentMatcher; +import org.mockito.Captor; +import org.mockito.InOrder; +import org.mockito.runners.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class KafkaSpoutMessagingGuaranteeTest { + + @Captor + private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture; + + private final TopologyContext contextMock = mock(TopologyContext.class); + private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class); + private final Map<String, Object> conf = new HashMap<>(); + private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1); + private KafkaConsumer<String, String> consumerMock; + + @Before + public void setUp() { + consumerMock = mock(KafkaConsumer.class); + } + + @Test + public void testAtMostOnceModeCommitsBeforeEmit() throws Exception { + //At-most-once mode must commit tuples before they are emitted to the topology to ensure that a spout crash won't cause replays. + KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1) + .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) + .build(); + KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition); + + when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition, + SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, 1)))); + + spout.nextTuple(); + + when(consumerMock.position(partition)).thenReturn(1L); + + //The spout should have emitted the tuple, and must have committed it before emit + InOrder inOrder = inOrder(consumerMock, collectorMock); + inOrder.verify(consumerMock).poll(anyLong()); + inOrder.verify(consumerMock).commitSync(commitCapture.capture()); + inOrder.verify(collectorMock).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList()); + + CommitMetadataManager metadataManager = new CommitMetadataManager(contextMock, KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE); + Map<TopicPartition, OffsetAndMetadata> committedOffsets = commitCapture.getValue(); + assertThat(committedOffsets.get(partition).offset(), is(0L)); + assertThat(committedOffsets.get(partition).metadata(), is(metadataManager.getCommitMetadata())); + } + + private void doTestModeDisregardsMaxUncommittedOffsets(KafkaSpoutConfig<String, String> spoutConfig) { + KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition); + + when(consumerMock.poll(anyLong())) + .thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition, + SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, spoutConfig.getMaxUncommittedOffsets())))) + .thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition, + SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, spoutConfig.getMaxUncommittedOffsets() - 1, spoutConfig.getMaxUncommittedOffsets())))); + + for (int i = 0; i < spoutConfig.getMaxUncommittedOffsets() * 2; i++) { + spout.nextTuple(); + } + + verify(consumerMock, times(2)).poll(anyLong()); + verify(collectorMock, times(spoutConfig.getMaxUncommittedOffsets() * 2)).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList()); + } + + @Test + public void testAtMostOnceModeDisregardsMaxUncommittedOffsets() throws Exception { + //The maxUncommittedOffsets limit should not be enforced, since it is only meaningful in at-least-once mode + KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1) + .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) + .build(); + doTestModeDisregardsMaxUncommittedOffsets(spoutConfig); + } + + @Test + public void testNoGuaranteeModeDisregardsMaxUncommittedOffsets() throws Exception { + //The maxUncommittedOffsets limit should not be enforced, since it is only meaningful in at-least-once mode + KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1) + .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE) + .build(); + doTestModeDisregardsMaxUncommittedOffsets(spoutConfig); + } + + private void doTestModeCannotReplayTuples(KafkaSpoutConfig<String, String> spoutConfig) { + KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition); + + when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition, + SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, 1)))); + + spout.nextTuple(); + + ArgumentCaptor<KafkaSpoutMessageId> msgIdCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); + verify(collectorMock).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList(), msgIdCaptor.capture()); + assertThat("Should have captured a message id", msgIdCaptor.getValue(), not(nullValue())); + + spout.fail(msgIdCaptor.getValue()); + + reset(consumerMock); + + when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition, + SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 1, 1)))); + + spout.nextTuple(); + + //The consumer should not be seeking to retry the failed tuple, it should just be continuing from the current position + verify(consumerMock, never()).seek(eq(partition), anyLong()); + } + + @Test + public void testAtMostOnceModeCannotReplayTuples() throws Exception { + //When tuple tracking is enabled, the spout must not replay tuples in at-most-once mode + KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1) + .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) + .setTupleTrackingEnforced(true) + .build(); + doTestModeCannotReplayTuples(spoutConfig); + } + + @Test + public void testNoGuaranteeModeCannotReplayTuples() throws Exception { + //When tuple tracking is enabled, the spout must not replay tuples in no guarantee mode + KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1) + .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE) + .setTupleTrackingEnforced(true) + .build(); + doTestModeCannotReplayTuples(spoutConfig); + } + + @Test + public void testAtMostOnceModeDoesNotCommitAckedTuples() throws Exception { + //When tuple tracking is enabled, the spout must not commit acked tuples in at-most-once mode because they were committed before being emitted + KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1) + .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) + .setTupleTrackingEnforced(true) + .build(); + try (SimulatedTime time = new SimulatedTime()) { + KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition); + + when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition, + SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, 1)))); + + spout.nextTuple(); + reset(consumerMock); + + ArgumentCaptor<KafkaSpoutMessageId> msgIdCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); + verify(collectorMock).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList(), msgIdCaptor.capture()); + assertThat("Should have captured a message id", msgIdCaptor.getValue(), not(nullValue())); + + spout.ack(msgIdCaptor.getValue()); + + Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + spoutConfig.getOffsetsCommitPeriodMs()); + + when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.<TopicPartition, List<ConsumerRecord<String, String>>>emptyMap())); + + spout.nextTuple(); + + verify(consumerMock, never()).commitSync(argThat(new ArgumentMatcher<Map<TopicPartition, OffsetAndMetadata>>() { + @Override + public boolean matches(Object arg) { + Map<TopicPartition, OffsetAndMetadata> castArg = (Map<TopicPartition, OffsetAndMetadata>) arg; + return !castArg.containsKey(partition); + } + })); + } + } + + @Test + public void testNoGuaranteeModeCommitsPolledTuples() throws Exception { + //When using the no guarantee mode, the spout must commit tuples periodically, regardless of whether they've been acked + KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1) + .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE) + .setTupleTrackingEnforced(true) + .build(); + + try (SimulatedTime time = new SimulatedTime()) { + KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition); + + when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition, + SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, 1)))); + + spout.nextTuple(); + + when(consumerMock.position(partition)).thenReturn(1L); + + ArgumentCaptor<KafkaSpoutMessageId> msgIdCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); + verify(collectorMock).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList(), msgIdCaptor.capture()); + assertThat("Should have captured a message id", msgIdCaptor.getValue(), not(nullValue())); + + Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + spoutConfig.getOffsetsCommitPeriodMs()); + + spout.nextTuple(); + + verify(consumerMock).commitAsync(commitCapture.capture(), isNull(OffsetCommitCallback.class)); + + CommitMetadataManager metadataManager = new CommitMetadataManager(contextMock, KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE); + Map<TopicPartition, OffsetAndMetadata> committedOffsets = commitCapture.getValue(); + assertThat(committedOffsets.get(partition).offset(), is(1L)); + assertThat(committedOffsets.get(partition).metadata(), is(metadataManager.getCommitMetadata())); + } + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java new file mode 100644 index 0000000..c2c46b5 --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java @@ -0,0 +1,145 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import static org.apache.storm.kafka.spout.KafkaSpout.TIMER_DELAY_MS; +import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyList; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.HashMap; +import java.util.Map; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.KafkaUnitRule; +import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration; +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.utils.Time; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.runners.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class KafkaSpoutReactivationTest { + + @Rule + public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule(); + + @Captor + private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture; + + private final TopologyContext topologyContext = mock(TopologyContext.class); + private final Map<String, Object> conf = new HashMap<>(); + private final SpoutOutputCollector collector = mock(SpoutOutputCollector.class); + private final long commitOffsetPeriodMs = 2_000; + private KafkaConsumer<String, String> consumerSpy; + private KafkaConsumer<String, String> postReactivationConsumerSpy; + private KafkaSpout<String, String> spout; + private final int maxPollRecords = 10; + + @Before + public void setUp() { + KafkaSpoutConfig<String, String> spoutConfig = + SingleTopicKafkaSpoutConfiguration.setCommonSpoutConfig( + KafkaSpoutConfig.builder("127.0.0.1:" + kafkaUnitRule.getKafkaUnit().getKafkaPort(), + SingleTopicKafkaSpoutConfiguration.TOPIC)) + .setFirstPollOffsetStrategy(UNCOMMITTED_EARLIEST) + .setOffsetCommitPeriodMs(commitOffsetPeriodMs) + .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords) + .build(); + KafkaConsumerFactory<String, String> consumerFactory = new KafkaConsumerFactoryDefault<>(); + this.consumerSpy = spy(consumerFactory.createConsumer(spoutConfig)); + this.postReactivationConsumerSpy = spy(consumerFactory.createConsumer(spoutConfig)); + KafkaConsumerFactory<String, String> consumerFactoryMock = mock(KafkaConsumerFactory.class); + when(consumerFactoryMock.createConsumer(any(KafkaSpoutConfig.class))) + .thenReturn(consumerSpy) + .thenReturn(postReactivationConsumerSpy); + this.spout = new KafkaSpout<>(spoutConfig, consumerFactoryMock); + } + + private void prepareSpout(int messageCount) throws Exception { + SingleTopicKafkaUnitSetupHelper.populateTopicData(kafkaUnitRule.getKafkaUnit(), SingleTopicKafkaSpoutConfiguration.TOPIC, messageCount); + SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, topologyContext, collector); + } + + private KafkaSpoutMessageId emitOne() { + ArgumentCaptor<KafkaSpoutMessageId> messageId = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); + spout.nextTuple(); + verify(collector).emit(anyString(), anyList(), messageId.capture()); + reset(collector); + return messageId.getValue(); + } + + @Test + public void testSpoutMustHandleReactivationGracefully() throws Exception { + try (Time.SimulatedTime time = new Time.SimulatedTime()) { + int messageCount = maxPollRecords * 2; + prepareSpout(messageCount); + + //Emit and ack some tuples, ensure that some polled tuples remain cached in the spout by emitting less than maxPollRecords + int beforeReactivationEmits = maxPollRecords - 3; + for (int i = 0; i < beforeReactivationEmits - 1; i++) { + KafkaSpoutMessageId msgId = emitOne(); + spout.ack(msgId); + } + + KafkaSpoutMessageId ackAfterDeactivateMessageId = emitOne(); + + //Cycle spout activation + spout.deactivate(); + SingleTopicKafkaUnitSetupHelper.verifyAllMessagesCommitted(consumerSpy, commitCapture, beforeReactivationEmits - 1); + //Tuples may be acked/failed after the spout deactivates, so we have to be able to handle this too + spout.ack(ackAfterDeactivateMessageId); + spout.activate(); + + //Emit and ack the rest + for (int i = beforeReactivationEmits; i < messageCount; i++) { + KafkaSpoutMessageId msgId = emitOne(); + spout.ack(msgId); + } + + //Commit + Time.advanceTime(TIMER_DELAY_MS + commitOffsetPeriodMs); + spout.nextTuple(); + + //Verify that no more tuples are emitted and all tuples are committed + SingleTopicKafkaUnitSetupHelper.verifyAllMessagesCommitted(postReactivationConsumerSpy, commitCapture, messageCount); + + reset(collector); + spout.nextTuple(); + verify(collector, never()).emit(anyString(), anyList(), any(KafkaSpoutMessageId.class)); + } + + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java index a554a3b..29d2a22 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java @@ -15,11 +15,26 @@ */ package org.apache.storm.kafka.spout; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.Matchers.hasKey; +import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -27,45 +42,31 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration; - -import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutStreams; - import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Time.SimulatedTime; +import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; - -import static org.mockito.Matchers.anyCollection; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Matchers.anyObject; -import static org.mockito.Mockito.when; - -import org.junit.Before; import org.mockito.Captor; - -import static org.mockito.Mockito.reset; - +import org.mockito.Mockito; import org.mockito.MockitoAnnotations; -import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfig; -import static org.hamcrest.CoreMatchers.not; -import static org.hamcrest.Matchers.hasKey; -import static org.junit.Assert.assertThat; -import static org.mockito.Matchers.anyList; -import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; +import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Matchers.eq; public class KafkaSpoutRebalanceTest { @Captor private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture; + private final long offsetCommitPeriodMs = 2_000; + private final Map<String, Object> conf = new HashMap<>(); private TopologyContext contextMock; private SpoutOutputCollector collectorMock; - private Map conf; private KafkaConsumer<String, String> consumerMock; private KafkaConsumerFactory<String, String> consumerFactoryMock; @@ -74,7 +75,6 @@ public class KafkaSpoutRebalanceTest { MockitoAnnotations.initMocks(this); contextMock = mock(TopologyContext.class); collectorMock = mock(SpoutOutputCollector.class); - conf = new HashMap<>(); consumerMock = mock(KafkaConsumer.class); consumerFactoryMock = new KafkaConsumerFactory<String, String>(){ @Override @@ -85,44 +85,39 @@ public class KafkaSpoutRebalanceTest { } //Returns messageIds in order of emission - private List<KafkaSpoutMessageId> emitOneMessagePerPartitionThenRevokeOnePartition(KafkaSpout<String, String> spout, TopicPartition partitionThatWillBeRevoked, TopicPartition assignedPartition) { - //Setup spout with mock consumer so we can get at the rebalance listener + private List<KafkaSpoutMessageId> emitOneMessagePerPartitionThenRevokeOnePartition(KafkaSpout<String, String> spout, TopicPartition partitionThatWillBeRevoked, TopicPartition assignedPartition, ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture) { + //Setup spout with mock consumer so we can get at the rebalance listener spout.open(conf, contextMock, collectorMock); spout.activate(); - ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class); - verify(consumerMock).subscribe(anyList(), rebalanceListenerCapture.capture()); - //Assign partitions to the spout ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue(); - List<TopicPartition> assignedPartitions = new ArrayList<>(); + Set<TopicPartition> assignedPartitions = new HashSet<>(); assignedPartitions.add(partitionThatWillBeRevoked); assignedPartitions.add(assignedPartition); consumerRebalanceListener.onPartitionsAssigned(assignedPartitions); + when(consumerMock.assignment()).thenReturn(assignedPartitions); //Make the consumer return a single message for each partition - Map<TopicPartition, List<ConsumerRecord<String, String>>> firstPartitionRecords = new HashMap<>(); - firstPartitionRecords.put(partitionThatWillBeRevoked, Collections.singletonList(new ConsumerRecord<>(partitionThatWillBeRevoked.topic(), partitionThatWillBeRevoked.partition(), 0L, "key", "value"))); - Map<TopicPartition, List<ConsumerRecord<String, String>>> secondPartitionRecords = new HashMap<>(); - secondPartitionRecords.put(assignedPartition, Collections.singletonList(new ConsumerRecord<>(assignedPartition.topic(), assignedPartition.partition(), 0L, "key", "value"))); when(consumerMock.poll(anyLong())) - .thenReturn(new ConsumerRecords(firstPartitionRecords)) - .thenReturn(new ConsumerRecords(secondPartitionRecords)) - .thenReturn(new ConsumerRecords(Collections.emptyMap())); + .thenReturn(new ConsumerRecords<>(Collections.singletonMap(partitionThatWillBeRevoked, SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partitionThatWillBeRevoked, 0, 1)))) + .thenReturn(new ConsumerRecords<>(Collections.singletonMap(assignedPartition, SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(assignedPartition, 0, 1)))) + .thenReturn(new ConsumerRecords<>(new HashMap<TopicPartition, List<ConsumerRecord<String, String>>>())); //Emit the messages spout.nextTuple(); ArgumentCaptor<KafkaSpoutMessageId> messageIdForRevokedPartition = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); - verify(collectorMock).emit(anyString(), anyList(), messageIdForRevokedPartition.capture()); + verify(collectorMock).emit(Mockito.anyString(), Mockito.anyList(), messageIdForRevokedPartition.capture()); reset(collectorMock); spout.nextTuple(); ArgumentCaptor<KafkaSpoutMessageId> messageIdForAssignedPartition = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); - verify(collectorMock).emit(anyString(), anyList(), messageIdForAssignedPartition.capture()); + verify(collectorMock).emit(Mockito.anyString(), Mockito.anyList(), messageIdForAssignedPartition.capture()); //Now rebalance consumerRebalanceListener.onPartitionsRevoked(assignedPartitions); consumerRebalanceListener.onPartitionsAssigned(Collections.singleton(assignedPartition)); - + when(consumerMock.assignment()).thenReturn(Collections.singleton(assignedPartition)); + List<KafkaSpoutMessageId> emittedMessageIds = new ArrayList<>(); emittedMessageIds.add(messageIdForRevokedPartition.getValue()); emittedMessageIds.add(messageIdForAssignedPartition.getValue()); @@ -132,49 +127,122 @@ public class KafkaSpoutRebalanceTest { @Test public void spoutMustIgnoreAcksForTuplesItIsNotAssignedAfterRebalance() throws Exception { //Acking tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them - KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams(), -1, 10), consumerFactoryMock); - String topic = SingleTopicKafkaSpoutConfiguration.TOPIC; - TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 1); - TopicPartition assignedPartition = new TopicPartition(topic, 2); - - //Emit a message on each partition and revoke the first partition - List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(spout, partitionThatWillBeRevoked, assignedPartition); - - //Ack both emitted tuples - spout.ack(emittedMessageIds.get(0)); - spout.ack(emittedMessageIds.get(1)); - - //Ensure the commit timer has expired - Thread.sleep(510); - - //Make the spout commit any acked tuples - spout.nextTuple(); - //Verify that it only committed the message on the assigned partition - verify(consumerMock).commitSync(commitCapture.capture()); - - Map<TopicPartition, OffsetAndMetadata> commitCaptureMap = commitCapture.getValue(); - assertThat(commitCaptureMap, hasKey(assignedPartition)); - assertThat(commitCaptureMap, not(hasKey(partitionThatWillBeRevoked))); + try (SimulatedTime simulatedTime = new SimulatedTime()) { + ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class); + Subscription subscriptionMock = mock(Subscription.class); + doNothing() + .when(subscriptionMock) + .subscribe(any(KafkaConsumer.class), rebalanceListenerCapture.capture(), any(TopologyContext.class)); + KafkaSpout<String, String> spout = new KafkaSpout<>(createKafkaSpoutConfigBuilder(subscriptionMock, -1) + .setOffsetCommitPeriodMs(offsetCommitPeriodMs) + .build(), consumerFactoryMock); + String topic = SingleTopicKafkaSpoutConfiguration.TOPIC; + TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 1); + TopicPartition assignedPartition = new TopicPartition(topic, 2); + + //Emit a message on each partition and revoke the first partition + List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition( + spout, partitionThatWillBeRevoked, assignedPartition, rebalanceListenerCapture); + + //Ack both emitted tuples + spout.ack(emittedMessageIds.get(0)); + spout.ack(emittedMessageIds.get(1)); + + //Ensure the commit timer has expired + Time.advanceTime(offsetCommitPeriodMs + KafkaSpout.TIMER_DELAY_MS); + //Make the spout commit any acked tuples + spout.nextTuple(); + //Verify that it only committed the message on the assigned partition + verify(consumerMock, times(1)).commitSync(commitCapture.capture()); + + Map<TopicPartition, OffsetAndMetadata> commitCaptureMap = commitCapture.getValue(); + assertThat(commitCaptureMap, hasKey(assignedPartition)); + assertThat(commitCaptureMap, not(hasKey(partitionThatWillBeRevoked))); + } } - + @Test public void spoutMustIgnoreFailsForTuplesItIsNotAssignedAfterRebalance() throws Exception { //Failing tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them if they later pass + ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class); + Subscription subscriptionMock = mock(Subscription.class); + doNothing() + .when(subscriptionMock) + .subscribe(any(KafkaConsumer.class), rebalanceListenerCapture.capture(), any(TopologyContext.class)); KafkaSpoutRetryService retryServiceMock = mock(KafkaSpoutRetryService.class); - KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams(), -1, 10, retryServiceMock), consumerFactoryMock); + KafkaSpout<String, String> spout = new KafkaSpout<>(createKafkaSpoutConfigBuilder(subscriptionMock, -1) + .setOffsetCommitPeriodMs(10) + .setRetry(retryServiceMock) + .build(), consumerFactoryMock); String topic = SingleTopicKafkaSpoutConfiguration.TOPIC; TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 1); TopicPartition assignedPartition = new TopicPartition(topic, 2); - + + when(retryServiceMock.getMessageId(Mockito.any(ConsumerRecord.class))) + .thenReturn(new KafkaSpoutMessageId(partitionThatWillBeRevoked, 0)) + .thenReturn(new KafkaSpoutMessageId(assignedPartition, 0)); + //Emit a message on each partition and revoke the first partition - List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(spout, partitionThatWillBeRevoked, assignedPartition); + List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition( + spout, partitionThatWillBeRevoked, assignedPartition, rebalanceListenerCapture); + + //Check that only two message ids were generated + verify(retryServiceMock, times(2)).getMessageId(Mockito.any(ConsumerRecord.class)); //Fail both emitted tuples spout.fail(emittedMessageIds.get(0)); spout.fail(emittedMessageIds.get(1)); - + //Check that only the tuple on the currently assigned partition is retried verify(retryServiceMock, never()).schedule(emittedMessageIds.get(0)); verify(retryServiceMock).schedule(emittedMessageIds.get(1)); } + + @Test + public void testReassignPartitionSeeksForOnlyNewPartitions() { + /* + * When partitions are reassigned, the spout should seek with the first poll offset strategy for new partitions. + * Previously assigned partitions should be left alone, since the spout keeps the emitted and acked state for those. + */ + + ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class); + Subscription subscriptionMock = mock(Subscription.class); + doNothing() + .when(subscriptionMock) + .subscribe(any(KafkaConsumer.class), rebalanceListenerCapture.capture(), any(TopologyContext.class)); + KafkaSpout<String, String> spout = new KafkaSpout<>(createKafkaSpoutConfigBuilder(subscriptionMock, -1) + .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST) + .build(), consumerFactoryMock); + String topic = SingleTopicKafkaSpoutConfiguration.TOPIC; + TopicPartition assignedPartition = new TopicPartition(topic, 1); + TopicPartition newPartition = new TopicPartition(topic, 2); + + //Setup spout with mock consumer so we can get at the rebalance listener + spout.open(conf, contextMock, collectorMock); + spout.activate(); + + //Assign partitions to the spout + ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue(); + Set<TopicPartition> assignedPartitions = new HashSet<>(); + assignedPartitions.add(assignedPartition); + consumerRebalanceListener.onPartitionsAssigned(assignedPartitions); + reset(consumerMock); + + //Set up committed so it looks like some messages have been committed on each partition + long committedOffset = 500; + when(consumerMock.committed(assignedPartition)).thenReturn(new OffsetAndMetadata(committedOffset)); + when(consumerMock.committed(newPartition)).thenReturn(new OffsetAndMetadata(committedOffset)); + + //Now rebalance and add a new partition + consumerRebalanceListener.onPartitionsRevoked(assignedPartitions); + Set<TopicPartition> newAssignedPartitions = new HashSet<>(); + newAssignedPartitions.add(assignedPartition); + newAssignedPartitions.add(newPartition); + consumerRebalanceListener.onPartitionsAssigned(newAssignedPartitions); + + //This partition was previously assigned, so the consumer position shouldn't change + verify(consumerMock, never()).seek(eq(assignedPartition), anyLong()); + //This partition is new, and should start at the committed offset + verify(consumerMock).seek(newPartition, committedOffset); + } }