Repository: kafka Updated Branches: refs/heads/trunk bc5051565 -> 1b764c5e8
MINOR: add unit test for KGroupedTable.count Author: Damian Guy <[email protected]> Reviewers: Guozhang Wang <[email protected]>, Michael G. Noll <[email protected]> Closes #1255 from dguy/kgroupedtable-count-test Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1b764c5e Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1b764c5e Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1b764c5e Branch: refs/heads/trunk Commit: 1b764c5e834c0d03f3c7107a58f21ad3bbb98ac3 Parents: bc50515 Author: Damian Guy <[email protected]> Authored: Mon Apr 25 11:18:28 2016 -0700 Committer: Guozhang Wang <[email protected]> Committed: Mon Apr 25 11:18:28 2016 -0700 ---------------------------------------------------------------------- .../internals/KGroupedTableImplTest.java | 86 ++++++++++++++++++++ 1 file changed, 86 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/1b764c5e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java new file mode 100644 index 0000000..9eeea20 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.test.KStreamTestDriver; +import org.apache.kafka.test.MockProcessorSupplier; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; + + +public class KGroupedTableImplTest { + + private File stateDir; + + @Before + public void setUp() throws IOException { + stateDir = Files.createTempDirectory("test").toFile(); + } + + @After + public void tearDown() throws IOException { + Utils.delete(stateDir); + } + + @SuppressWarnings("unchecked") + @Test + public void testGroupedCountOccurences() throws IOException { + final KStreamBuilder builder = new KStreamBuilder(); + final String input = "count-test-input"; + final MockProcessorSupplier processorSupplier = new MockProcessorSupplier<>(); + + builder.table(Serdes.String(), Serdes.String(), input) + .groupBy(new KeyValueMapper<String, String, KeyValue<String, String>>() { + @Override + public KeyValue<String, String> apply(final String key, final String value) { + return new KeyValue<>(value, value); + } + }, Serdes.String(), Serdes.String()) + .count("count") + .toStream() + .process(processorSupplier); + + + final KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir); + + + driver.process(input, "A", "green"); + driver.process(input, "B", "green"); + driver.process(input, "A", "blue"); + driver.process(input, "C", "yellow"); + driver.process(input, "D", "green"); + + final List<String> expected = Arrays.asList("green:1", "green:2", "blue:1", "green:1", "yellow:1", "green:2"); + final List<String> actual = processorSupplier.processed; + assertEquals(expected, actual); + } +} \ No newline at end of file
