Repository: storm
Updated Branches:
  refs/heads/master 10d381b30 -> 3580dbc80


http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
index 2d55520..23630a6 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
@@ -19,9 +19,10 @@ import static 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfigu
 import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.Matchers.hasKey;
 import static org.junit.Assert.assertThat;
-import static org.mockito.Matchers.anyCollection;
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.reset;
@@ -42,6 +43,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
 import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
+import org.apache.storm.kafka.spout.subscription.Subscription;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.utils.Time;
@@ -74,14 +76,11 @@ public class KafkaSpoutRebalanceTest {
     }
 
     //Returns messageIds in order of emission
-    private List<KafkaSpoutMessageId> 
emitOneMessagePerPartitionThenRevokeOnePartition(KafkaSpout<String, String> 
spout, TopicPartition partitionThatWillBeRevoked, TopicPartition 
assignedPartition) {
-        //Setup spout with mock consumer so we can get at the rebalance 
listener
+    private List<KafkaSpoutMessageId> 
emitOneMessagePerPartitionThenRevokeOnePartition(KafkaSpout<String, String> 
spout, TopicPartition partitionThatWillBeRevoked, TopicPartition 
assignedPartition, ArgumentCaptor<ConsumerRebalanceListener> 
rebalanceListenerCapture) {
+        //Setup spout with mock consumer so we can get at the rebalance 
listener   
         spout.open(conf, contextMock, collectorMock);
         spout.activate();
 
-        ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = 
ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
-        verify(consumerMock).subscribe(anyCollection(), 
rebalanceListenerCapture.capture());
-
         //Assign partitions to the spout
         ConsumerRebalanceListener consumerRebalanceListener = 
rebalanceListenerCapture.getValue();
         List<TopicPartition> assignedPartitions = new ArrayList<>();
@@ -95,9 +94,9 @@ public class KafkaSpoutRebalanceTest {
         Map<TopicPartition, List<ConsumerRecord<String, String>>> 
secondPartitionRecords = new HashMap<>();
         secondPartitionRecords.put(assignedPartition, 
Collections.singletonList(new ConsumerRecord(assignedPartition.topic(), 
assignedPartition.partition(), 0L, "key", "value")));
         when(consumerMock.poll(anyLong()))
-            .thenReturn(new ConsumerRecords(firstPartitionRecords))
-            .thenReturn(new ConsumerRecords(secondPartitionRecords))
-            .thenReturn(new ConsumerRecords(Collections.emptyMap()));
+            .thenReturn(new ConsumerRecords<>(firstPartitionRecords))
+            .thenReturn(new ConsumerRecords<>(secondPartitionRecords))
+            .thenReturn(new ConsumerRecords<>(Collections.emptyMap()));
 
         //Emit the messages
         spout.nextTuple();
@@ -122,7 +121,12 @@ public class KafkaSpoutRebalanceTest {
     public void spoutMustIgnoreAcksForTuplesItIsNotAssignedAfterRebalance() 
throws Exception {
         //Acking tuples for partitions that are no longer assigned is useless 
since the spout will not be allowed to commit them
         try (SimulatedTime simulatedTime = new SimulatedTime()) {
-            KafkaSpout<String, String> spout = new 
KafkaSpout<>(getKafkaSpoutConfigBuilder(-1)
+            ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture 
= ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
+            Subscription subscriptionMock = mock(Subscription.class);
+            doNothing()
+                .when(subscriptionMock)
+                .subscribe(any(), rebalanceListenerCapture.capture(), any());
+            KafkaSpout<String, String> spout = new 
KafkaSpout<>(getKafkaSpoutConfigBuilder(subscriptionMock, -1)
                 .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
                 .build(), consumerFactory);
             String topic = SingleTopicKafkaSpoutConfiguration.TOPIC;
@@ -130,7 +134,8 @@ public class KafkaSpoutRebalanceTest {
             TopicPartition assignedPartition = new TopicPartition(topic, 2);
 
             //Emit a message on each partition and revoke the first partition
-            List<KafkaSpoutMessageId> emittedMessageIds = 
emitOneMessagePerPartitionThenRevokeOnePartition(spout, 
partitionThatWillBeRevoked, assignedPartition);
+            List<KafkaSpoutMessageId> emittedMessageIds = 
emitOneMessagePerPartitionThenRevokeOnePartition(
+                spout, partitionThatWillBeRevoked, assignedPartition, 
rebalanceListenerCapture);
 
             //Ack both emitted tuples
             spout.ack(emittedMessageIds.get(0));
@@ -152,8 +157,13 @@ public class KafkaSpoutRebalanceTest {
     @Test
     public void spoutMustIgnoreFailsForTuplesItIsNotAssignedAfterRebalance() 
throws Exception {
         //Failing tuples for partitions that are no longer assigned is useless 
since the spout will not be allowed to commit them if they later pass
+        ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = 
ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
+            Subscription subscriptionMock = mock(Subscription.class);
+            doNothing()
+                .when(subscriptionMock)
+                .subscribe(any(), rebalanceListenerCapture.capture(), any());
         KafkaSpoutRetryService retryServiceMock = 
mock(KafkaSpoutRetryService.class);
-        KafkaSpout<String, String> spout = new 
KafkaSpout<>(getKafkaSpoutConfigBuilder(-1)
+        KafkaSpout<String, String> spout = new 
KafkaSpout<>(getKafkaSpoutConfigBuilder(subscriptionMock, -1)
             .setOffsetCommitPeriodMs(10)
             .setRetry(retryServiceMock)
             .build(), consumerFactory);
@@ -166,7 +176,8 @@ public class KafkaSpoutRebalanceTest {
             .thenReturn(new KafkaSpoutMessageId(assignedPartition, 0));
         
         //Emit a message on each partition and revoke the first partition
-        List<KafkaSpoutMessageId> emittedMessageIds = 
emitOneMessagePerPartitionThenRevokeOnePartition(spout, 
partitionThatWillBeRevoked, assignedPartition);
+        List<KafkaSpoutMessageId> emittedMessageIds = 
emitOneMessagePerPartitionThenRevokeOnePartition(
+            spout, partitionThatWillBeRevoked, assignedPartition, 
rebalanceListenerCapture);
 
         //Check that only two message ids were generated
         verify(retryServiceMock, times(2)).getMessageId(anyObject());

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
index d84f4da..078f7a1 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
@@ -30,80 +30,71 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.kafka.clients.consumer.*;
 import org.apache.kafka.common.TopicPartition;
 import 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
-import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.utils.Time;
 import org.apache.storm.utils.Time.SimulatedTime;
+import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
 import org.mockito.InOrder;
+import org.mockito.MockitoAnnotations;
 
 public class KafkaSpoutRetryLimitTest {
-
+    
     private final long offsetCommitPeriodMs = 2_000;
     private final TopologyContext contextMock = mock(TopologyContext.class);
     private final SpoutOutputCollector collectorMock = 
mock(SpoutOutputCollector.class);
     private final Map<String, Object> conf = new HashMap<>();
     private final TopicPartition partition = new 
TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
     private KafkaConsumer<String, String> consumerMock;
-    private KafkaSpout<String, String> spout;
-    private KafkaSpoutConfig spoutConfig;
-
+    private KafkaSpoutConfig<String, String> spoutConfig;
+    
     public static final KafkaSpoutRetryService ZERO_RETRIES_RETRY_SERVICE =
-            new 
KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
 KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0),
-                    0, 
KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0));
-
-    private void setupSpoutWithNoRetry(Set<TopicPartition> assignedPartitions) 
{
+        new 
KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
 KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0),
+            0, KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0));
+    
+    @Captor
+    private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> 
commitCapture;
+    
+    @Before
+    public void setUp() {
+        MockitoAnnotations.initMocks(this);
         spoutConfig = getKafkaSpoutConfigBuilder(-1)
-                .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
-                .setRetry(ZERO_RETRIES_RETRY_SERVICE)
-                .build();
-
+            .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
+            .setRetry(ZERO_RETRIES_RETRY_SERVICE)
+            .build();
         consumerMock = mock(KafkaConsumer.class);
-        KafkaConsumerFactory<String, String> consumerFactory = 
(kafkaSpoutConfig) -> consumerMock;
-
-        spout = new KafkaSpout<>(spoutConfig, consumerFactory);
-
-        spout.open(conf, contextMock, collectorMock);
-        spout.activate();
-
-        ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = 
ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
-        verify(consumerMock).subscribe(anyCollection(), 
rebalanceListenerCapture.capture());
-
-        //Assign partitions to the spout
-        ConsumerRebalanceListener consumerRebalanceListener = 
rebalanceListenerCapture.getValue();
-        consumerRebalanceListener.onPartitionsAssigned(assignedPartitions);
     }
-
+    
     @Test
     public void testFailingTupleCompletesAckAfterRetryLimitIsMet() {
         //Spout should ack failed messages after they hit the retry limit
         try (SimulatedTime simulatedTime = new SimulatedTime()) {
-            setupSpoutWithNoRetry(Collections.singleton(partition));
+            KafkaSpout<String, String> spout = 
SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, 
collectorMock, consumerMock, Collections.singleton(partition));
             Map<TopicPartition, List<ConsumerRecord<String, String>>> records 
= new HashMap<>();
             List<ConsumerRecord<String, String>> recordsForPartition = new 
ArrayList<>();
             int lastOffset = 3;
             for (int i = 0; i <= lastOffset; i++) {
-                recordsForPartition.add(new ConsumerRecord(partition.topic(), 
partition.partition(), i, "key", "value"));
+                recordsForPartition.add(new 
ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value"));
             }
             records.put(partition, recordsForPartition);
-
+            
             when(consumerMock.poll(anyLong()))
-                    .thenReturn(new ConsumerRecords(records));
-
+                .thenReturn(new ConsumerRecords<>(records));
+            
             for (int i = 0; i < recordsForPartition.size(); i++) {
                 spout.nextTuple();
             }
-
+            
             ArgumentCaptor<KafkaSpoutMessageId> messageIds = 
ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
             verify(collectorMock, 
times(recordsForPartition.size())).emit(anyObject(), anyObject(), 
messageIds.capture());
-
+            
             for (KafkaSpoutMessageId messageId : messageIds.getAllValues()) {
                 spout.fail(messageId);
             }
@@ -111,16 +102,15 @@ public class KafkaSpoutRetryLimitTest {
             // Advance time and then trigger call to kafka consumer commit
             Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs);
             spout.nextTuple();
-
-            ArgumentCaptor<Map> 
committedOffsets=ArgumentCaptor.forClass(Map.class);
+            
             InOrder inOrder = inOrder(consumerMock);
-            
inOrder.verify(consumerMock).commitSync(committedOffsets.capture());
+            inOrder.verify(consumerMock).commitSync(commitCapture.capture());
             inOrder.verify(consumerMock).poll(anyLong());
 
             //verify that Offset 3 was committed for the given TopicPartition
-            assertTrue(committedOffsets.getValue().containsKey(partition));
-            assertEquals(lastOffset, ((OffsetAndMetadata) 
(committedOffsets.getValue().get(partition))).offset());
+            assertTrue(commitCapture.getValue().containsKey(partition));
+            assertEquals(lastOffset, ((OffsetAndMetadata) 
(commitCapture.getValue().get(partition))).offset());
         }
     }
-
-}
\ No newline at end of file
+    
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
index 9ebdcf7..261c654 100755
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
@@ -22,12 +22,15 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.isIn;
 import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -58,7 +61,7 @@ public class MaxUncommittedOffsetTest {
     private final int maxUncommittedOffsets = 10;
     private final int maxPollRecords = 5;
     private final int initialRetryDelaySecs = 60;
-    private final KafkaSpoutConfig spoutConfig = 
getKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort())
+    private final KafkaSpoutConfig<String, String> spoutConfig = 
getKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort())
         .setOffsetCommitPeriodMs(commitOffsetPeriodMs)
         .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords)
         .setMaxUncommittedOffsets(maxUncommittedOffsets)
@@ -93,6 +96,8 @@ public class MaxUncommittedOffsetTest {
 
     private void initializeSpout(int msgCount) throws Exception {
         populateTopicData(SingleTopicKafkaSpoutConfiguration.TOPIC, msgCount);
+        when(topologyContext.getThisTaskIndex()).thenReturn(0);
+        
when(topologyContext.getComponentTasks(any())).thenReturn(Collections.singletonList(0));
         spout.open(conf, topologyContext, collector);
         spout.activate();
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java
deleted file mode 100644
index e97c7e1..0000000
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Copyright 2017 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.kafka.spout;
-
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.mock;
-
-import static org.mockito.Mockito.when;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.junit.Before;
-import org.junit.Test;
-
-public class NamedTopicFilterTest {
-
-    private KafkaConsumer<?, ?> consumerMock;
-    
-    @Before
-    public void setUp() {
-        consumerMock = mock(KafkaConsumer.class);
-    }
-    
-    @Test
-    public void testFilter() {
-        String matchingTopicOne = "test-1";
-        String matchingTopicTwo = "test-11";
-        String unmatchedTopic = "unmatched";
-        
-        NamedTopicFilter filter = new NamedTopicFilter(matchingTopicOne, 
matchingTopicTwo);
-        
-        
when(consumerMock.partitionsFor(matchingTopicOne)).thenReturn(Collections.singletonList(createPartitionInfo(matchingTopicOne,
 0)));
-        List<PartitionInfo> partitionTwoPartitions = new ArrayList<>();
-        partitionTwoPartitions.add(createPartitionInfo(matchingTopicTwo, 0));
-        partitionTwoPartitions.add(createPartitionInfo(matchingTopicTwo, 1));
-        
when(consumerMock.partitionsFor(matchingTopicTwo)).thenReturn(partitionTwoPartitions);
-        
when(consumerMock.partitionsFor(unmatchedTopic)).thenReturn(Collections.singletonList(createPartitionInfo(unmatchedTopic,
 0)));
-        
-        List<TopicPartition> matchedPartitions = 
filter.getFilteredTopicPartitions(consumerMock);
-        
-        assertThat("Expected filter to pass only topics with exact name 
matches", matchedPartitions, 
-            containsInAnyOrder(new TopicPartition(matchingTopicOne, 0), new 
TopicPartition(matchingTopicTwo, 0), new TopicPartition(matchingTopicTwo, 1)));
-            
-    }
-    
-    private PartitionInfo createPartitionInfo(String topic, int partition) {
-        return new PartitionInfo(topic, partition, null, null, null);
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java
deleted file mode 100644
index 877efdc..0000000
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Copyright 2017 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.kafka.spout;
-
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.regex.Pattern;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.junit.Before;
-import org.junit.Test;
-
-public class PatternTopicFilterTest {
-
-    private KafkaConsumer<?, ?> consumerMock;
-    
-    @Before
-    public void setUp(){
-        consumerMock = mock(KafkaConsumer.class);
-    }
-    
-    @Test
-    public void testFilter() {
-        Pattern pattern = Pattern.compile("test-\\d+");
-        PatternTopicFilter filter = new PatternTopicFilter(pattern);
-        
-        String matchingTopicOne = "test-1";
-        String matchingTopicTwo = "test-11";
-        String unmatchedTopic = "unmatched";
-        
-        Map<String, List<PartitionInfo>> allTopics = new HashMap<>();
-        allTopics.put(matchingTopicOne, 
Collections.singletonList(createPartitionInfo(matchingTopicOne, 0)));
-        List<PartitionInfo> testTwoPartitions = new ArrayList<>();
-        testTwoPartitions.add(createPartitionInfo(matchingTopicTwo, 0));
-        testTwoPartitions.add(createPartitionInfo(matchingTopicTwo, 1));
-        allTopics.put(matchingTopicTwo, testTwoPartitions);
-        allTopics.put(unmatchedTopic, 
Collections.singletonList(createPartitionInfo(unmatchedTopic, 0)));
-        
-        when(consumerMock.listTopics()).thenReturn(allTopics);
-        
-        List<TopicPartition> matchedPartitions = 
filter.getFilteredTopicPartitions(consumerMock);
-        
-        assertThat("Expected topic partitions matching the pattern to be 
passed by the filter", matchedPartitions,
-            containsInAnyOrder(new TopicPartition(matchingTopicOne, 0), new 
TopicPartition(matchingTopicTwo, 0), new TopicPartition(matchingTopicTwo, 1)));
-    }
-    
-    private PartitionInfo createPartitionInfo(String topic, int partition) {
-        return new PartitionInfo(topic, partition, null, null, null);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
index 7f0973b..6b92de8 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
@@ -20,6 +20,7 @@ package org.apache.storm.kafka.spout;
 import static 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
@@ -28,7 +29,9 @@ import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -77,12 +80,12 @@ public class SingleTopicKafkaSpoutTest {
     @Before
     public void setUp() {
         MockitoAnnotations.initMocks(this);
-        KafkaSpoutConfig spoutConfig = 
getKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort())
+        KafkaSpoutConfig<String, String> spoutConfig = 
getKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort())
             .setOffsetCommitPeriodMs(commitOffsetPeriodMs)
             .setRetry(new 
KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
 KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
                 maxRetries, 
KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0)))
             .build();
-        this.consumerSpy = spy(new 
KafkaConsumerFactoryDefault().createConsumer(spoutConfig));
+        this.consumerSpy = spy(new KafkaConsumerFactoryDefault<String, 
String>().createConsumer(spoutConfig));
         this.consumerFactory = (kafkaSpoutConfig) -> consumerSpy;
         this.spout = new KafkaSpout<>(spoutConfig, consumerFactory);
     }
@@ -100,6 +103,8 @@ public class SingleTopicKafkaSpoutTest {
 
     private void initializeSpout(int msgCount) throws InterruptedException, 
ExecutionException, TimeoutException {
         populateTopicData(SingleTopicKafkaSpoutConfiguration.TOPIC, msgCount);
+        when(topologyContext.getThisTaskIndex()).thenReturn(0);
+        
when(topologyContext.getComponentTasks(any())).thenReturn(Collections.singletonList(0));
         spout.open(conf, topologyContext, collector);
         spout.activate();
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java
new file mode 100644
index 0000000..5f931bb
--- /dev/null
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+
+public class SpoutWithMockedConsumerSetupHelper {
+    
+    /**
+     * Creates, opens and activates a KafkaSpout using a mocked consumer.
+     * @param <K> The Kafka key type
+     * @param <V> The Kafka value type
+     * @param spoutConfig The spout config to use
+     * @param topoConf The topo conf to pass to the spout
+     * @param contextMock The topo context to pass to the spout
+     * @param collectorMock The mocked collector to pass to the spout
+     * @param consumerMock The mocked consumer
+     * @param assignedPartitions The partitions to assign to this spout. The 
consumer will act like these partitions are assigned to it.
+     * @return The spout
+     */
+    public static <K, V> KafkaSpout<K, V> setupSpout(KafkaSpoutConfig<K, V> 
spoutConfig, Map<String, Object> topoConf,
+        TopologyContext contextMock, SpoutOutputCollector collectorMock, 
KafkaConsumer<K, V> consumerMock, Set<TopicPartition> assignedPartitions) {     
+
+        Map<String, List<PartitionInfo>> partitionInfos = 
assignedPartitions.stream()
+            .map(tp -> new PartitionInfo(tp.topic(), tp.partition(), null, 
null, null))
+            .collect(Collectors.groupingBy(info -> info.topic()));
+        partitionInfos.keySet()
+            .forEach(key -> when(consumerMock.partitionsFor(key))
+                .thenReturn(partitionInfos.get(key)));
+        KafkaConsumerFactory<K, V> consumerFactory = (kafkaSpoutConfig) -> 
consumerMock;
+
+        KafkaSpout<K, V> spout = new KafkaSpout<>(spoutConfig, 
consumerFactory);
+
+        
when(contextMock.getComponentTasks(any())).thenReturn(Collections.singletonList(0));
+        when(contextMock.getThisTaskIndex()).thenReturn(0);
+        
+        spout.open(topoConf, contextMock, collectorMock);
+        spout.activate();
+
+        verify(consumerMock).assign(assignedPartitions);
+        
+        return spout;
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
index 62dbfe5..d2f38b0 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
@@ -17,6 +17,7 @@
  */
 package org.apache.storm.kafka.spout.builders;
 
+import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.DEFAULT_MAX_RETRIES;
 import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -24,16 +25,26 @@ import org.apache.storm.Config;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.kafka.spout.KafkaSpout;
 import org.apache.storm.kafka.spout.KafkaSpoutConfig;
+import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff;
 import org.apache.storm.kafka.spout.KafkaSpoutRetryService;
+import org.apache.storm.kafka.spout.subscription.Subscription;
 import org.apache.storm.kafka.spout.test.KafkaSpoutTestBolt;
 import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Values;
 
 public class SingleTopicKafkaSpoutConfiguration {
+
     public static final String STREAM = "test_stream";
     public static final String TOPIC = "test";
 
+    /**
+     * Retry in a tight loop (keep unit tests fasts).
+     */
+    public static final KafkaSpoutRetryService UNIT_TEST_RETRY_SERVICE =
+        new 
KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
 KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0),
+            DEFAULT_MAX_RETRIES, 
KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0));
+
     public static Config getConfig() {
         Config config = new Config();
         config.setDebug(true);
@@ -47,20 +58,27 @@ public class SingleTopicKafkaSpoutConfiguration {
         return tp.createTopology();
     }
 
-    public static KafkaSpoutConfig.Builder<String,String> 
getKafkaSpoutConfigBuilder(int port) {
-        return KafkaSpoutConfig.builder("127.0.0.1:" + port, TOPIC)
-                .setRecordTranslator((r) -> new Values(r.topic(), r.key(), 
r.value()),
-                        new Fields("topic", "key", "value"), STREAM)
-                .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup")
-                .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5)
-                .setRetry(getRetryService())
-                .setOffsetCommitPeriodMs(10_000)
-                .setFirstPollOffsetStrategy(EARLIEST)
-                .setMaxUncommittedOffsets(250)
-                .setPollTimeoutMs(1000);
+    public static KafkaSpoutConfig.Builder<String, String> 
getKafkaSpoutConfigBuilder(int port) {
+        return setCommonSpoutConfig(KafkaSpoutConfig.builder("127.0.0.1:" + 
port, TOPIC));
+    }
+
+    public static KafkaSpoutConfig.Builder<String, String> 
getKafkaSpoutConfigBuilder(Subscription subscription, int port) {
+        return setCommonSpoutConfig(new 
KafkaSpoutConfig.Builder<>("127.0.0.1:" + port, subscription));
     }
-        
+
+    private static KafkaSpoutConfig.Builder<String, String> 
setCommonSpoutConfig(KafkaSpoutConfig.Builder<String, String> config) {
+        return config.setRecordTranslator((r) -> new Values(r.topic(), 
r.key(), r.value()),
+            new Fields("topic", "key", "value"), STREAM)
+            .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup")
+            .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5)
+            .setRetry(getRetryService())
+            .setOffsetCommitPeriodMs(10_000)
+            .setFirstPollOffsetStrategy(EARLIEST)
+            .setMaxUncommittedOffsets(250)
+            .setPollTimeoutMs(1000);
+    }
+
     protected static KafkaSpoutRetryService getRetryService() {
-        return KafkaSpoutConfig.UNIT_TEST_RETRY_SERVICE;
+        return UNIT_TEST_RETRY_SERVICE;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilterTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilterTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilterTest.java
new file mode 100644
index 0000000..3985619
--- /dev/null
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilterTest.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.subscription;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Before;
+import org.junit.Test;
+
+public class NamedTopicFilterTest {
+
+    private KafkaConsumer<?, ?> consumerMock;
+    
+    @Before
+    public void setUp() {
+        consumerMock = mock(KafkaConsumer.class);
+    }
+    
+    @Test
+    public void testFilter() {
+        String matchingTopicOne = "test-1";
+        String matchingTopicTwo = "test-11";
+        String unmatchedTopic = "unmatched";
+        
+        NamedTopicFilter filter = new NamedTopicFilter(matchingTopicOne, 
matchingTopicTwo);
+        
+        
when(consumerMock.partitionsFor(matchingTopicOne)).thenReturn(Collections.singletonList(createPartitionInfo(matchingTopicOne,
 0)));
+        List<PartitionInfo> partitionTwoPartitions = new ArrayList<>();
+        partitionTwoPartitions.add(createPartitionInfo(matchingTopicTwo, 0));
+        partitionTwoPartitions.add(createPartitionInfo(matchingTopicTwo, 1));
+        
when(consumerMock.partitionsFor(matchingTopicTwo)).thenReturn(partitionTwoPartitions);
+        
when(consumerMock.partitionsFor(unmatchedTopic)).thenReturn(Collections.singletonList(createPartitionInfo(unmatchedTopic,
 0)));
+        
+        List<TopicPartition> matchedPartitions = 
filter.getFilteredTopicPartitions(consumerMock);
+        
+        assertThat("Expected filter to pass only topics with exact name 
matches", matchedPartitions, 
+            containsInAnyOrder(new TopicPartition(matchingTopicOne, 0), new 
TopicPartition(matchingTopicTwo, 0), new TopicPartition(matchingTopicTwo, 1)));
+            
+    }
+    
+    private PartitionInfo createPartitionInfo(String topic, int partition) {
+        return new PartitionInfo(topic, partition, null, null, null);
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilterTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilterTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilterTest.java
new file mode 100644
index 0000000..67411e3
--- /dev/null
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilterTest.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.subscription;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Before;
+import org.junit.Test;
+
+public class PatternTopicFilterTest {
+
+    private KafkaConsumer<?, ?> consumerMock;
+    
+    @Before
+    public void setUp(){
+        consumerMock = mock(KafkaConsumer.class);
+    }
+    
+    @Test
+    public void testFilter() {
+        Pattern pattern = Pattern.compile("test-\\d+");
+        PatternTopicFilter filter = new PatternTopicFilter(pattern);
+        
+        String matchingTopicOne = "test-1";
+        String matchingTopicTwo = "test-11";
+        String unmatchedTopic = "unmatched";
+        
+        Map<String, List<PartitionInfo>> allTopics = new HashMap<>();
+        allTopics.put(matchingTopicOne, 
Collections.singletonList(createPartitionInfo(matchingTopicOne, 0)));
+        List<PartitionInfo> testTwoPartitions = new ArrayList<>();
+        testTwoPartitions.add(createPartitionInfo(matchingTopicTwo, 0));
+        testTwoPartitions.add(createPartitionInfo(matchingTopicTwo, 1));
+        allTopics.put(matchingTopicTwo, testTwoPartitions);
+        allTopics.put(unmatchedTopic, 
Collections.singletonList(createPartitionInfo(unmatchedTopic, 0)));
+        
+        when(consumerMock.listTopics()).thenReturn(allTopics);
+        
+        List<TopicPartition> matchedPartitions = 
filter.getFilteredTopicPartitions(consumerMock);
+        
+        assertThat("Expected topic partitions matching the pattern to be 
passed by the filter", matchedPartitions,
+            containsInAnyOrder(new TopicPartition(matchingTopicOne, 0), new 
TopicPartition(matchingTopicTwo, 0), new TopicPartition(matchingTopicTwo, 1)));
+    }
+    
+    private PartitionInfo createPartitionInfo(String topic, int partition) {
+        return new PartitionInfo(topic, partition, null, null, null);
+    }
+}

Reply via email to