Repository: kafka Updated Branches: refs/heads/trunk ca06862a7 -> 9e65b25e9
KAFKA-4652: Added tests for KStreamBuilder Author: bbejeck <[email protected]> Reviewers: Damian Guy, Guozhang Wang Closes #2597 from bbejeck/KAFKA-4652_improve_kstream_builder_test_coverage Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9e65b25e Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9e65b25e Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9e65b25e Branch: refs/heads/trunk Commit: 9e65b25e9fca6c26cef3498ff747879d4f527700 Parents: ca06862 Author: Bill Bejeck <[email protected]> Authored: Thu Mar 2 13:19:08 2017 -0800 Committer: Guozhang Wang <[email protected]> Committed: Thu Mar 2 13:19:08 2017 -0800 ---------------------------------------------------------------------- .../streams/kstream/KStreamBuilderTest.java | 92 ++++++++++++++++++++ 1 file changed, 92 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/9e65b25e/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java index 3c66dc9..7ce0b54 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java @@ -16,11 +16,13 @@ */ package org.apache.kafka.streams.kstream; +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.kstream.internals.KStreamImpl; import org.apache.kafka.streams.errors.TopologyBuilderException; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.TopologyBuilder; import org.apache.kafka.streams.processor.internals.ProcessorTopology; import org.apache.kafka.test.KStreamTestDriver; import org.apache.kafka.test.MockKeyValueMapper; @@ -35,8 +37,10 @@ import java.util.List; import java.util.Collections; import java.util.Map; import java.util.Set; +import java.util.regex.Pattern; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; public class KStreamBuilderTest { @@ -232,4 +236,92 @@ public class KStreamBuilderTest { assertEquals(Collections.singletonList("table-topic"), builder.stateStoreNameToSourceTopics().get("table-store")); assertEquals(Collections.singletonList(APP_ID + "-KSTREAM-MAP-0000000003-repartition"), builder.stateStoreNameToSourceTopics().get("count")); } + + @Test + public void shouldAddTopicToEarliestAutoOffsetResetList() { + final String topicName = "topic-1"; + + builder.stream(TopologyBuilder.AutoOffsetReset.EARLIEST, topicName); + + assertTrue(builder.earliestResetTopicsPattern().matcher(topicName).matches()); + assertFalse(builder.latestResetTopicsPattern().matcher(topicName).matches()); + } + + @Test + public void shouldAddTopicToLatestAutoOffsetResetList() { + final String topicName = "topic-1"; + + builder.stream(TopologyBuilder.AutoOffsetReset.LATEST, topicName); + + assertTrue(builder.latestResetTopicsPattern().matcher(topicName).matches()); + assertFalse(builder.earliestResetTopicsPattern().matcher(topicName).matches()); + } + + @Test + public void shouldAddTableToEarliestAutoOffsetResetList() { + final String topicName = "topic-1"; + final String storeName = "test-store"; + + builder.table(TopologyBuilder.AutoOffsetReset.EARLIEST, topicName, storeName); + + assertTrue(builder.earliestResetTopicsPattern().matcher(topicName).matches()); + assertFalse(builder.latestResetTopicsPattern().matcher(topicName).matches()); + } + + @Test + public void shouldAddTableToLatestAutoOffsetResetList() { + final String topicName = "topic-1"; + final String storeName = "test-store"; + + builder.table(TopologyBuilder.AutoOffsetReset.LATEST, topicName, storeName); + + assertTrue(builder.latestResetTopicsPattern().matcher(topicName).matches()); + assertFalse(builder.earliestResetTopicsPattern().matcher(topicName).matches()); + } + + @Test + public void shouldNotAddTableToOffsetResetLists() { + final String topicName = "topic-1"; + final String storeName = "test-store"; + final Serde<String> stringSerde = Serdes.String(); + + builder.table(stringSerde, stringSerde, topicName, storeName); + + assertFalse(builder.latestResetTopicsPattern().matcher(topicName).matches()); + assertFalse(builder.earliestResetTopicsPattern().matcher(topicName).matches()); + } + + @Test + public void shouldNotAddRegexTopicsToOffsetResetLists() { + final Pattern topicPattern = Pattern.compile("topic-\\d"); + final String topic = "topic-5"; + + builder.stream(topicPattern); + + assertFalse(builder.latestResetTopicsPattern().matcher(topic).matches()); + assertFalse(builder.earliestResetTopicsPattern().matcher(topic).matches()); + + } + + @Test + public void shouldAddRegexTopicToEarliestAutoOffsetResetList() { + final Pattern topicPattern = Pattern.compile("topic-\\d+"); + final String topicTwo = "topic-500000"; + + builder.stream(TopologyBuilder.AutoOffsetReset.EARLIEST, topicPattern); + + assertTrue(builder.earliestResetTopicsPattern().matcher(topicTwo).matches()); + assertFalse(builder.latestResetTopicsPattern().matcher(topicTwo).matches()); + } + + @Test + public void shouldAddRegexTopicToLatestAutoOffsetResetList() { + final Pattern topicPattern = Pattern.compile("topic-\\d+"); + final String topicTwo = "topic-1000000"; + + builder.stream(TopologyBuilder.AutoOffsetReset.LATEST, topicPattern); + + assertTrue(builder.latestResetTopicsPattern().matcher(topicTwo).matches()); + assertFalse(builder.earliestResetTopicsPattern().matcher(topicTwo).matches()); + } }
