Repository: storm Updated Branches: refs/heads/master 10d381b30 -> 3580dbc80
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java index 2d55520..23630a6 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java @@ -19,9 +19,10 @@ import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfigu import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.Matchers.hasKey; import static org.junit.Assert.assertThat; -import static org.mockito.Matchers.anyCollection; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.reset; @@ -42,6 +43,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration; import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; +import org.apache.storm.kafka.spout.subscription.Subscription; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.utils.Time; @@ -74,14 +76,11 @@ public class KafkaSpoutRebalanceTest { } //Returns messageIds in order of emission - private List<KafkaSpoutMessageId> emitOneMessagePerPartitionThenRevokeOnePartition(KafkaSpout<String, String> spout, TopicPartition partitionThatWillBeRevoked, TopicPartition assignedPartition) { - //Setup spout with mock consumer so we can get at the rebalance listener + private List<KafkaSpoutMessageId> emitOneMessagePerPartitionThenRevokeOnePartition(KafkaSpout<String, String> spout, TopicPartition partitionThatWillBeRevoked, TopicPartition assignedPartition, ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture) { + //Setup spout with mock consumer so we can get at the rebalance listener spout.open(conf, contextMock, collectorMock); spout.activate(); - ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class); - verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture()); - //Assign partitions to the spout ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue(); List<TopicPartition> assignedPartitions = new ArrayList<>(); @@ -95,9 +94,9 @@ public class KafkaSpoutRebalanceTest { Map<TopicPartition, List<ConsumerRecord<String, String>>> secondPartitionRecords = new HashMap<>(); secondPartitionRecords.put(assignedPartition, Collections.singletonList(new ConsumerRecord(assignedPartition.topic(), assignedPartition.partition(), 0L, "key", "value"))); when(consumerMock.poll(anyLong())) - .thenReturn(new ConsumerRecords(firstPartitionRecords)) - .thenReturn(new ConsumerRecords(secondPartitionRecords)) - .thenReturn(new ConsumerRecords(Collections.emptyMap())); + .thenReturn(new ConsumerRecords<>(firstPartitionRecords)) + .thenReturn(new ConsumerRecords<>(secondPartitionRecords)) + .thenReturn(new ConsumerRecords<>(Collections.emptyMap())); //Emit the messages spout.nextTuple(); @@ -122,7 +121,12 @@ public class KafkaSpoutRebalanceTest { public void spoutMustIgnoreAcksForTuplesItIsNotAssignedAfterRebalance() throws Exception { //Acking tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them try (SimulatedTime simulatedTime = new SimulatedTime()) { - KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfigBuilder(-1) + ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class); + Subscription subscriptionMock = mock(Subscription.class); + doNothing() + .when(subscriptionMock) + .subscribe(any(), rebalanceListenerCapture.capture(), any()); + KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfigBuilder(subscriptionMock, -1) .setOffsetCommitPeriodMs(offsetCommitPeriodMs) .build(), consumerFactory); String topic = SingleTopicKafkaSpoutConfiguration.TOPIC; @@ -130,7 +134,8 @@ public class KafkaSpoutRebalanceTest { TopicPartition assignedPartition = new TopicPartition(topic, 2); //Emit a message on each partition and revoke the first partition - List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(spout, partitionThatWillBeRevoked, assignedPartition); + List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition( + spout, partitionThatWillBeRevoked, assignedPartition, rebalanceListenerCapture); //Ack both emitted tuples spout.ack(emittedMessageIds.get(0)); @@ -152,8 +157,13 @@ public class KafkaSpoutRebalanceTest { @Test public void spoutMustIgnoreFailsForTuplesItIsNotAssignedAfterRebalance() throws Exception { //Failing tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them if they later pass + ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class); + Subscription subscriptionMock = mock(Subscription.class); + doNothing() + .when(subscriptionMock) + .subscribe(any(), rebalanceListenerCapture.capture(), any()); KafkaSpoutRetryService retryServiceMock = mock(KafkaSpoutRetryService.class); - KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfigBuilder(-1) + KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfigBuilder(subscriptionMock, -1) .setOffsetCommitPeriodMs(10) .setRetry(retryServiceMock) .build(), consumerFactory); @@ -166,7 +176,8 @@ public class KafkaSpoutRebalanceTest { .thenReturn(new KafkaSpoutMessageId(assignedPartition, 0)); //Emit a message on each partition and revoke the first partition - List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(spout, partitionThatWillBeRevoked, assignedPartition); + List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition( + spout, partitionThatWillBeRevoked, assignedPartition, rebalanceListenerCapture); //Check that only two message ids were generated verify(retryServiceMock, times(2)).getMessageId(anyObject()); http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java index d84f4da..078f7a1 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java @@ -30,80 +30,71 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration; -import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.utils.Time; import org.apache.storm.utils.Time.SimulatedTime; +import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; +import org.mockito.Captor; import org.mockito.InOrder; +import org.mockito.MockitoAnnotations; public class KafkaSpoutRetryLimitTest { - + private final long offsetCommitPeriodMs = 2_000; private final TopologyContext contextMock = mock(TopologyContext.class); private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class); private final Map<String, Object> conf = new HashMap<>(); private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1); private KafkaConsumer<String, String> consumerMock; - private KafkaSpout<String, String> spout; - private KafkaSpoutConfig spoutConfig; - + private KafkaSpoutConfig<String, String> spoutConfig; + public static final KafkaSpoutRetryService ZERO_RETRIES_RETRY_SERVICE = - new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0), - 0, KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0)); - - private void setupSpoutWithNoRetry(Set<TopicPartition> assignedPartitions) { + new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0), + 0, KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0)); + + @Captor + private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); spoutConfig = getKafkaSpoutConfigBuilder(-1) - .setOffsetCommitPeriodMs(offsetCommitPeriodMs) - .setRetry(ZERO_RETRIES_RETRY_SERVICE) - .build(); - + .setOffsetCommitPeriodMs(offsetCommitPeriodMs) + .setRetry(ZERO_RETRIES_RETRY_SERVICE) + .build(); consumerMock = mock(KafkaConsumer.class); - KafkaConsumerFactory<String, String> consumerFactory = (kafkaSpoutConfig) -> consumerMock; - - spout = new KafkaSpout<>(spoutConfig, consumerFactory); - - spout.open(conf, contextMock, collectorMock); - spout.activate(); - - ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class); - verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture()); - - //Assign partitions to the spout - ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue(); - consumerRebalanceListener.onPartitionsAssigned(assignedPartitions); } - + @Test public void testFailingTupleCompletesAckAfterRetryLimitIsMet() { //Spout should ack failed messages after they hit the retry limit try (SimulatedTime simulatedTime = new SimulatedTime()) { - setupSpoutWithNoRetry(Collections.singleton(partition)); + KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition)); Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>(); List<ConsumerRecord<String, String>> recordsForPartition = new ArrayList<>(); int lastOffset = 3; for (int i = 0; i <= lastOffset; i++) { - recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value")); + recordsForPartition.add(new ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value")); } records.put(partition, recordsForPartition); - + when(consumerMock.poll(anyLong())) - .thenReturn(new ConsumerRecords(records)); - + .thenReturn(new ConsumerRecords<>(records)); + for (int i = 0; i < recordsForPartition.size(); i++) { spout.nextTuple(); } - + ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); verify(collectorMock, times(recordsForPartition.size())).emit(anyObject(), anyObject(), messageIds.capture()); - + for (KafkaSpoutMessageId messageId : messageIds.getAllValues()) { spout.fail(messageId); } @@ -111,16 +102,15 @@ public class KafkaSpoutRetryLimitTest { // Advance time and then trigger call to kafka consumer commit Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs); spout.nextTuple(); - - ArgumentCaptor<Map> committedOffsets=ArgumentCaptor.forClass(Map.class); + InOrder inOrder = inOrder(consumerMock); - inOrder.verify(consumerMock).commitSync(committedOffsets.capture()); + inOrder.verify(consumerMock).commitSync(commitCapture.capture()); inOrder.verify(consumerMock).poll(anyLong()); //verify that Offset 3 was committed for the given TopicPartition - assertTrue(committedOffsets.getValue().containsKey(partition)); - assertEquals(lastOffset, ((OffsetAndMetadata) (committedOffsets.getValue().get(partition))).offset()); + assertTrue(commitCapture.getValue().containsKey(partition)); + assertEquals(lastOffset, ((OffsetAndMetadata) (commitCapture.getValue().get(partition))).offset()); } } - -} \ No newline at end of file + +} http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java index 9ebdcf7..261c654 100755 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java @@ -22,12 +22,15 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.isIn; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyObject; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -58,7 +61,7 @@ public class MaxUncommittedOffsetTest { private final int maxUncommittedOffsets = 10; private final int maxPollRecords = 5; private final int initialRetryDelaySecs = 60; - private final KafkaSpoutConfig spoutConfig = getKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort()) + private final KafkaSpoutConfig<String, String> spoutConfig = getKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort()) .setOffsetCommitPeriodMs(commitOffsetPeriodMs) .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords) .setMaxUncommittedOffsets(maxUncommittedOffsets) @@ -93,6 +96,8 @@ public class MaxUncommittedOffsetTest { private void initializeSpout(int msgCount) throws Exception { populateTopicData(SingleTopicKafkaSpoutConfiguration.TOPIC, msgCount); + when(topologyContext.getThisTaskIndex()).thenReturn(0); + when(topologyContext.getComponentTasks(any())).thenReturn(Collections.singletonList(0)); spout.open(conf, topologyContext, collector); spout.activate(); } http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java deleted file mode 100644 index e97c7e1..0000000 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Copyright 2017 The Apache Software Foundation. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.kafka.spout; - -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.junit.Assert.assertThat; -import static org.mockito.Mockito.mock; - -import static org.mockito.Mockito.when; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.TopicPartition; -import org.junit.Before; -import org.junit.Test; - -public class NamedTopicFilterTest { - - private KafkaConsumer<?, ?> consumerMock; - - @Before - public void setUp() { - consumerMock = mock(KafkaConsumer.class); - } - - @Test - public void testFilter() { - String matchingTopicOne = "test-1"; - String matchingTopicTwo = "test-11"; - String unmatchedTopic = "unmatched"; - - NamedTopicFilter filter = new NamedTopicFilter(matchingTopicOne, matchingTopicTwo); - - when(consumerMock.partitionsFor(matchingTopicOne)).thenReturn(Collections.singletonList(createPartitionInfo(matchingTopicOne, 0))); - List<PartitionInfo> partitionTwoPartitions = new ArrayList<>(); - partitionTwoPartitions.add(createPartitionInfo(matchingTopicTwo, 0)); - partitionTwoPartitions.add(createPartitionInfo(matchingTopicTwo, 1)); - when(consumerMock.partitionsFor(matchingTopicTwo)).thenReturn(partitionTwoPartitions); - when(consumerMock.partitionsFor(unmatchedTopic)).thenReturn(Collections.singletonList(createPartitionInfo(unmatchedTopic, 0))); - - List<TopicPartition> matchedPartitions = filter.getFilteredTopicPartitions(consumerMock); - - assertThat("Expected filter to pass only topics with exact name matches", matchedPartitions, - containsInAnyOrder(new TopicPartition(matchingTopicOne, 0), new TopicPartition(matchingTopicTwo, 0), new TopicPartition(matchingTopicTwo, 1))); - - } - - private PartitionInfo createPartitionInfo(String topic, int partition) { - return new PartitionInfo(topic, partition, null, null, null); - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java deleted file mode 100644 index 877efdc..0000000 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Copyright 2017 The Apache Software Foundation. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.kafka.spout; - -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.junit.Assert.assertThat; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.regex.Pattern; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.TopicPartition; -import org.junit.Before; -import org.junit.Test; - -public class PatternTopicFilterTest { - - private KafkaConsumer<?, ?> consumerMock; - - @Before - public void setUp(){ - consumerMock = mock(KafkaConsumer.class); - } - - @Test - public void testFilter() { - Pattern pattern = Pattern.compile("test-\\d+"); - PatternTopicFilter filter = new PatternTopicFilter(pattern); - - String matchingTopicOne = "test-1"; - String matchingTopicTwo = "test-11"; - String unmatchedTopic = "unmatched"; - - Map<String, List<PartitionInfo>> allTopics = new HashMap<>(); - allTopics.put(matchingTopicOne, Collections.singletonList(createPartitionInfo(matchingTopicOne, 0))); - List<PartitionInfo> testTwoPartitions = new ArrayList<>(); - testTwoPartitions.add(createPartitionInfo(matchingTopicTwo, 0)); - testTwoPartitions.add(createPartitionInfo(matchingTopicTwo, 1)); - allTopics.put(matchingTopicTwo, testTwoPartitions); - allTopics.put(unmatchedTopic, Collections.singletonList(createPartitionInfo(unmatchedTopic, 0))); - - when(consumerMock.listTopics()).thenReturn(allTopics); - - List<TopicPartition> matchedPartitions = filter.getFilteredTopicPartitions(consumerMock); - - assertThat("Expected topic partitions matching the pattern to be passed by the filter", matchedPartitions, - containsInAnyOrder(new TopicPartition(matchingTopicOne, 0), new TopicPartition(matchingTopicTwo, 0), new TopicPartition(matchingTopicTwo, 1))); - } - - private PartitionInfo createPartitionInfo(String topic, int partition) { - return new PartitionInfo(topic, partition, null, null, null); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java index 7f0973b..6b92de8 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java @@ -20,6 +20,7 @@ package org.apache.storm.kafka.spout; import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; @@ -28,7 +29,9 @@ import static org.mockito.Mockito.reset; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -77,12 +80,12 @@ public class SingleTopicKafkaSpoutTest { @Before public void setUp() { MockitoAnnotations.initMocks(this); - KafkaSpoutConfig spoutConfig = getKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort()) + KafkaSpoutConfig<String, String> spoutConfig = getKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort()) .setOffsetCommitPeriodMs(commitOffsetPeriodMs) .setRetry(new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), maxRetries, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0))) .build(); - this.consumerSpy = spy(new KafkaConsumerFactoryDefault().createConsumer(spoutConfig)); + this.consumerSpy = spy(new KafkaConsumerFactoryDefault<String, String>().createConsumer(spoutConfig)); this.consumerFactory = (kafkaSpoutConfig) -> consumerSpy; this.spout = new KafkaSpout<>(spoutConfig, consumerFactory); } @@ -100,6 +103,8 @@ public class SingleTopicKafkaSpoutTest { private void initializeSpout(int msgCount) throws InterruptedException, ExecutionException, TimeoutException { populateTopicData(SingleTopicKafkaSpoutConfiguration.TOPIC, msgCount); + when(topologyContext.getThisTaskIndex()).thenReturn(0); + when(topologyContext.getComponentTasks(any())).thenReturn(Collections.singletonList(0)); spout.open(conf, topologyContext, collector); spout.activate(); } http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java new file mode 100644 index 0000000..5f931bb --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java @@ -0,0 +1,74 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; + +public class SpoutWithMockedConsumerSetupHelper { + + /** + * Creates, opens and activates a KafkaSpout using a mocked consumer. + * @param <K> The Kafka key type + * @param <V> The Kafka value type + * @param spoutConfig The spout config to use + * @param topoConf The topo conf to pass to the spout + * @param contextMock The topo context to pass to the spout + * @param collectorMock The mocked collector to pass to the spout + * @param consumerMock The mocked consumer + * @param assignedPartitions The partitions to assign to this spout. The consumer will act like these partitions are assigned to it. + * @return The spout + */ + public static <K, V> KafkaSpout<K, V> setupSpout(KafkaSpoutConfig<K, V> spoutConfig, Map<String, Object> topoConf, + TopologyContext contextMock, SpoutOutputCollector collectorMock, KafkaConsumer<K, V> consumerMock, Set<TopicPartition> assignedPartitions) { + + Map<String, List<PartitionInfo>> partitionInfos = assignedPartitions.stream() + .map(tp -> new PartitionInfo(tp.topic(), tp.partition(), null, null, null)) + .collect(Collectors.groupingBy(info -> info.topic())); + partitionInfos.keySet() + .forEach(key -> when(consumerMock.partitionsFor(key)) + .thenReturn(partitionInfos.get(key))); + KafkaConsumerFactory<K, V> consumerFactory = (kafkaSpoutConfig) -> consumerMock; + + KafkaSpout<K, V> spout = new KafkaSpout<>(spoutConfig, consumerFactory); + + when(contextMock.getComponentTasks(any())).thenReturn(Collections.singletonList(0)); + when(contextMock.getThisTaskIndex()).thenReturn(0); + + spout.open(topoConf, contextMock, collectorMock); + spout.activate(); + + verify(consumerMock).assign(assignedPartitions); + + return spout; + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java index 62dbfe5..d2f38b0 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java @@ -17,6 +17,7 @@ */ package org.apache.storm.kafka.spout.builders; +import static org.apache.storm.kafka.spout.KafkaSpoutConfig.DEFAULT_MAX_RETRIES; import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -24,16 +25,26 @@ import org.apache.storm.Config; import org.apache.storm.generated.StormTopology; import org.apache.storm.kafka.spout.KafkaSpout; import org.apache.storm.kafka.spout.KafkaSpoutConfig; +import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff; import org.apache.storm.kafka.spout.KafkaSpoutRetryService; +import org.apache.storm.kafka.spout.subscription.Subscription; import org.apache.storm.kafka.spout.test.KafkaSpoutTestBolt; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; public class SingleTopicKafkaSpoutConfiguration { + public static final String STREAM = "test_stream"; public static final String TOPIC = "test"; + /** + * Retry in a tight loop (keep unit tests fasts). + */ + public static final KafkaSpoutRetryService UNIT_TEST_RETRY_SERVICE = + new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0), + DEFAULT_MAX_RETRIES, KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0)); + public static Config getConfig() { Config config = new Config(); config.setDebug(true); @@ -47,20 +58,27 @@ public class SingleTopicKafkaSpoutConfiguration { return tp.createTopology(); } - public static KafkaSpoutConfig.Builder<String,String> getKafkaSpoutConfigBuilder(int port) { - return KafkaSpoutConfig.builder("127.0.0.1:" + port, TOPIC) - .setRecordTranslator((r) -> new Values(r.topic(), r.key(), r.value()), - new Fields("topic", "key", "value"), STREAM) - .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup") - .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5) - .setRetry(getRetryService()) - .setOffsetCommitPeriodMs(10_000) - .setFirstPollOffsetStrategy(EARLIEST) - .setMaxUncommittedOffsets(250) - .setPollTimeoutMs(1000); + public static KafkaSpoutConfig.Builder<String, String> getKafkaSpoutConfigBuilder(int port) { + return setCommonSpoutConfig(KafkaSpoutConfig.builder("127.0.0.1:" + port, TOPIC)); + } + + public static KafkaSpoutConfig.Builder<String, String> getKafkaSpoutConfigBuilder(Subscription subscription, int port) { + return setCommonSpoutConfig(new KafkaSpoutConfig.Builder<>("127.0.0.1:" + port, subscription)); } - + + private static KafkaSpoutConfig.Builder<String, String> setCommonSpoutConfig(KafkaSpoutConfig.Builder<String, String> config) { + return config.setRecordTranslator((r) -> new Values(r.topic(), r.key(), r.value()), + new Fields("topic", "key", "value"), STREAM) + .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup") + .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5) + .setRetry(getRetryService()) + .setOffsetCommitPeriodMs(10_000) + .setFirstPollOffsetStrategy(EARLIEST) + .setMaxUncommittedOffsets(250) + .setPollTimeoutMs(1000); + } + protected static KafkaSpoutRetryService getRetryService() { - return KafkaSpoutConfig.UNIT_TEST_RETRY_SERVICE; + return UNIT_TEST_RETRY_SERVICE; } } http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilterTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilterTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilterTest.java new file mode 100644 index 0000000..3985619 --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilterTest.java @@ -0,0 +1,68 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.subscription; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.junit.Before; +import org.junit.Test; + +public class NamedTopicFilterTest { + + private KafkaConsumer<?, ?> consumerMock; + + @Before + public void setUp() { + consumerMock = mock(KafkaConsumer.class); + } + + @Test + public void testFilter() { + String matchingTopicOne = "test-1"; + String matchingTopicTwo = "test-11"; + String unmatchedTopic = "unmatched"; + + NamedTopicFilter filter = new NamedTopicFilter(matchingTopicOne, matchingTopicTwo); + + when(consumerMock.partitionsFor(matchingTopicOne)).thenReturn(Collections.singletonList(createPartitionInfo(matchingTopicOne, 0))); + List<PartitionInfo> partitionTwoPartitions = new ArrayList<>(); + partitionTwoPartitions.add(createPartitionInfo(matchingTopicTwo, 0)); + partitionTwoPartitions.add(createPartitionInfo(matchingTopicTwo, 1)); + when(consumerMock.partitionsFor(matchingTopicTwo)).thenReturn(partitionTwoPartitions); + when(consumerMock.partitionsFor(unmatchedTopic)).thenReturn(Collections.singletonList(createPartitionInfo(unmatchedTopic, 0))); + + List<TopicPartition> matchedPartitions = filter.getFilteredTopicPartitions(consumerMock); + + assertThat("Expected filter to pass only topics with exact name matches", matchedPartitions, + containsInAnyOrder(new TopicPartition(matchingTopicOne, 0), new TopicPartition(matchingTopicTwo, 0), new TopicPartition(matchingTopicTwo, 1))); + + } + + private PartitionInfo createPartitionInfo(String topic, int partition) { + return new PartitionInfo(topic, partition, null, null, null); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilterTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilterTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilterTest.java new file mode 100644 index 0000000..67411e3 --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilterTest.java @@ -0,0 +1,73 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.subscription; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.junit.Before; +import org.junit.Test; + +public class PatternTopicFilterTest { + + private KafkaConsumer<?, ?> consumerMock; + + @Before + public void setUp(){ + consumerMock = mock(KafkaConsumer.class); + } + + @Test + public void testFilter() { + Pattern pattern = Pattern.compile("test-\\d+"); + PatternTopicFilter filter = new PatternTopicFilter(pattern); + + String matchingTopicOne = "test-1"; + String matchingTopicTwo = "test-11"; + String unmatchedTopic = "unmatched"; + + Map<String, List<PartitionInfo>> allTopics = new HashMap<>(); + allTopics.put(matchingTopicOne, Collections.singletonList(createPartitionInfo(matchingTopicOne, 0))); + List<PartitionInfo> testTwoPartitions = new ArrayList<>(); + testTwoPartitions.add(createPartitionInfo(matchingTopicTwo, 0)); + testTwoPartitions.add(createPartitionInfo(matchingTopicTwo, 1)); + allTopics.put(matchingTopicTwo, testTwoPartitions); + allTopics.put(unmatchedTopic, Collections.singletonList(createPartitionInfo(unmatchedTopic, 0))); + + when(consumerMock.listTopics()).thenReturn(allTopics); + + List<TopicPartition> matchedPartitions = filter.getFilteredTopicPartitions(consumerMock); + + assertThat("Expected topic partitions matching the pattern to be passed by the filter", matchedPartitions, + containsInAnyOrder(new TopicPartition(matchingTopicOne, 0), new TopicPartition(matchingTopicTwo, 0), new TopicPartition(matchingTopicTwo, 1))); + } + + private PartitionInfo createPartitionInfo(String topic, int partition) { + return new PartitionInfo(topic, partition, null, null, null); + } +}
