http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java index 5f19b9e..1bd870e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java @@ -26,6 +26,7 @@ import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.test.KStreamTestDriver; import org.apache.kafka.test.MockProcessorSupplier; +import org.junit.After; import org.junit.Test; import java.util.HashMap; @@ -40,6 +41,16 @@ public class KStreamSelectKeyTest { final private Serde<Integer> integerSerde = Serdes.Integer(); final private Serde<String> stringSerde = Serdes.String(); + private KStreamTestDriver driver; + + @After + public void cleanup() { + if (driver != null) { + driver.close(); + } + driver = null; + } + @Test public void testSelectKey() { KStreamBuilder builder = new KStreamBuilder(); @@ -66,7 +77,7 @@ public class KStreamSelectKeyTest { stream.selectKey(selector).process(processor); - KStreamTestDriver driver = new KStreamTestDriver(builder); + driver = new KStreamTestDriver(builder); for (int expectedValue : expectedValues) { driver.process(topicName, null, expectedValue);
http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java index a0a61f2..e0bdfbc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java @@ -27,6 +27,7 @@ import org.apache.kafka.streams.kstream.TransformerSupplier; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.test.KStreamTestDriver; import org.apache.kafka.test.MockProcessorSupplier; +import org.junit.After; import org.junit.Test; import static org.junit.Assert.assertEquals; @@ -37,6 +38,16 @@ public class KStreamTransformTest { final private Serde<Integer> intSerde = Serdes.Integer(); + private KStreamTestDriver driver; + + @After + public void cleanup() { + if (driver != null) { + driver.close(); + } + driver = null; + } + @Test public void testTransform() { KStreamBuilder builder = new KStreamBuilder(); @@ -76,7 +87,7 @@ public class KStreamTransformTest { KStream<Integer, Integer> stream = builder.stream(intSerde, intSerde, topicName); stream.transform(transformerSupplier).process(processor); - KStreamTestDriver driver = new KStreamTestDriver(builder); + driver = new KStreamTestDriver(builder); for (int i = 0; i < expectedKeys.length; i++) { driver.process(topicName, expectedKeys[i], expectedKeys[i] * 10); } http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java index f5f9698..aebcc76 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java @@ -26,6 +26,7 @@ import org.apache.kafka.streams.kstream.ValueTransformerSupplier; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.test.KStreamTestDriver; import org.apache.kafka.test.MockProcessorSupplier; +import org.junit.After; import org.junit.Test; import static org.junit.Assert.assertEquals; @@ -36,6 +37,16 @@ public class KStreamTransformValuesTest { final private Serde<Integer> intSerde = Serdes.Integer(); + private KStreamTestDriver driver; + + @After + public void cleanup() { + if (driver != null) { + driver.close(); + } + driver = null; + } + @Test public void testTransform() { KStreamBuilder builder = new KStreamBuilder(); @@ -76,7 +87,7 @@ public class KStreamTransformValuesTest { stream = builder.stream(intSerde, intSerde, topicName); stream.transformValues(valueTransformerSupplier).process(processor); - KStreamTestDriver driver = new KStreamTestDriver(builder); + driver = new KStreamTestDriver(builder); for (int i = 0; i < expectedKeys.length; i++) { driver.process(topicName, expectedKeys[i], expectedKeys[i] * 10); } http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java index 3c7a1bd..828103a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java @@ -20,266 +20,257 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.kstream.HoppingWindows; -import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.test.KStreamTestDriver; +import org.apache.kafka.test.MockAggregator; +import org.apache.kafka.test.MockInitializer; import org.apache.kafka.test.MockProcessorSupplier; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; import org.junit.Test; import java.io.File; -import java.nio.file.Files; +import java.io.IOException; import static org.junit.Assert.assertEquals; public class KStreamWindowAggregateTest { - final private Serde<String> strSerde = new Serdes.StringSerde(); + final private Serde<String> strSerde = Serdes.String(); - private class StringAdd implements Aggregator<String, String, String> { + private KStreamTestDriver driver = null; + private File stateDir = null; - @Override - public String apply(String aggKey, String value, String aggregate) { - return aggregate + "+" + value; + @After + public void tearDown() { + if (driver != null) { + driver.close(); } + driver = null; } - private class StringInit implements Initializer<String> { - - @Override - public String apply() { - return "0"; - } + @Before + public void setUp() throws IOException { + stateDir = TestUtils.tempDirectory("kafka-test"); } @Test public void testAggBasic() throws Exception { - final File baseDir = Files.createTempDirectory("test").toFile(); - - try { - final KStreamBuilder builder = new KStreamBuilder(); - String topic1 = "topic1"; - - KStream<String, String> stream1 = builder.stream(strSerde, strSerde, topic1); - KTable<Windowed<String>, String> table2 = stream1.aggregateByKey(new StringInit(), new StringAdd(), - HoppingWindows.of("topic1-Canonized").with(10L).every(5L), - strSerde, - strSerde); - - MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>(); - table2.toStream().process(proc2); - - KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir); - - driver.setTime(0L); - driver.process(topic1, "A", "1"); - driver.setTime(1L); - driver.process(topic1, "B", "2"); - driver.setTime(2L); - driver.process(topic1, "C", "3"); - driver.setTime(3L); - driver.process(topic1, "D", "4"); - driver.setTime(4L); - driver.process(topic1, "A", "1"); - - driver.setTime(5L); - driver.process(topic1, "A", "1"); - driver.setTime(6L); - driver.process(topic1, "B", "2"); - driver.setTime(7L); - driver.process(topic1, "D", "4"); - driver.setTime(8L); - driver.process(topic1, "B", "2"); - driver.setTime(9L); - driver.process(topic1, "C", "3"); - - driver.setTime(10L); - driver.process(topic1, "A", "1"); - driver.setTime(11L); - driver.process(topic1, "B", "2"); - driver.setTime(12L); - driver.process(topic1, "D", "4"); - driver.setTime(13L); - driver.process(topic1, "B", "2"); - driver.setTime(14L); - driver.process(topic1, "C", "3"); - - assertEquals(Utils.mkList( - "[A@0]:0+1", - "[B@0]:0+2", - "[C@0]:0+3", - "[D@0]:0+4", - "[A@0]:0+1+1", - - "[A@0]:0+1+1+1", "[A@5]:0+1", - "[B@0]:0+2+2", "[B@5]:0+2", - "[D@0]:0+4+4", "[D@5]:0+4", - "[B@0]:0+2+2+2", "[B@5]:0+2+2", - "[C@0]:0+3+3", "[C@5]:0+3", - - "[A@5]:0+1+1", "[A@10]:0+1", - "[B@5]:0+2+2+2", "[B@10]:0+2", - "[D@5]:0+4+4", "[D@10]:0+4", - "[B@5]:0+2+2+2+2", "[B@10]:0+2+2", - "[C@5]:0+3+3", "[C@10]:0+3"), proc2.processed); - - } finally { - Utils.delete(baseDir); - } + final KStreamBuilder builder = new KStreamBuilder(); + + String topic1 = "topic1"; + + KStream<String, String> stream1 = builder.stream(strSerde, strSerde, topic1); + KTable<Windowed<String>, String> table2 = stream1.aggregateByKey(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, + HoppingWindows.of("topic1-Canonized").with(10L).every(5L), + strSerde, + strSerde); + + MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>(); + table2.toStream().process(proc2); + + driver = new KStreamTestDriver(builder, stateDir); + + driver.setTime(0L); + driver.process(topic1, "A", "1"); + driver.setTime(1L); + driver.process(topic1, "B", "2"); + driver.setTime(2L); + driver.process(topic1, "C", "3"); + driver.setTime(3L); + driver.process(topic1, "D", "4"); + driver.setTime(4L); + driver.process(topic1, "A", "1"); + + driver.setTime(5L); + driver.process(topic1, "A", "1"); + driver.setTime(6L); + driver.process(topic1, "B", "2"); + driver.setTime(7L); + driver.process(topic1, "D", "4"); + driver.setTime(8L); + driver.process(topic1, "B", "2"); + driver.setTime(9L); + driver.process(topic1, "C", "3"); + + driver.setTime(10L); + driver.process(topic1, "A", "1"); + driver.setTime(11L); + driver.process(topic1, "B", "2"); + driver.setTime(12L); + driver.process(topic1, "D", "4"); + driver.setTime(13L); + driver.process(topic1, "B", "2"); + driver.setTime(14L); + driver.process(topic1, "C", "3"); + + assertEquals(Utils.mkList( + "[A@0]:0+1", + "[B@0]:0+2", + "[C@0]:0+3", + "[D@0]:0+4", + "[A@0]:0+1+1", + + "[A@0]:0+1+1+1", "[A@5]:0+1", + "[B@0]:0+2+2", "[B@5]:0+2", + "[D@0]:0+4+4", "[D@5]:0+4", + "[B@0]:0+2+2+2", "[B@5]:0+2+2", + "[C@0]:0+3+3", "[C@5]:0+3", + + "[A@5]:0+1+1", "[A@10]:0+1", + "[B@5]:0+2+2+2", "[B@10]:0+2", + "[D@5]:0+4+4", "[D@10]:0+4", + "[B@5]:0+2+2+2+2", "[B@10]:0+2+2", + "[C@5]:0+3+3", "[C@10]:0+3"), proc2.processed); } @Test public void testJoin() throws Exception { - final File baseDir = Files.createTempDirectory("test").toFile(); - - try { - final KStreamBuilder builder = new KStreamBuilder(); - String topic1 = "topic1"; - String topic2 = "topic2"; - - KStream<String, String> stream1 = builder.stream(strSerde, strSerde, topic1); - KTable<Windowed<String>, String> table1 = stream1.aggregateByKey(new StringInit(), new StringAdd(), - HoppingWindows.of("topic1-Canonized").with(10L).every(5L), - strSerde, - strSerde); - - MockProcessorSupplier<Windowed<String>, String> proc1 = new MockProcessorSupplier<>(); - table1.toStream().process(proc1); - - KStream<String, String> stream2 = builder.stream(strSerde, strSerde, topic2); - KTable<Windowed<String>, String> table2 = stream2.aggregateByKey(new StringInit(), new StringAdd(), - HoppingWindows.of("topic2-Canonized").with(10L).every(5L), - strSerde, - strSerde); - - MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>(); - table2.toStream().process(proc2); - - - MockProcessorSupplier<Windowed<String>, String> proc3 = new MockProcessorSupplier<>(); - table1.join(table2, new ValueJoiner<String, String, String>() { - @Override - public String apply(String p1, String p2) { - return p1 + "%" + p2; - } - }).toStream().process(proc3); - - KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir); - - driver.setTime(0L); - driver.process(topic1, "A", "1"); - driver.setTime(1L); - driver.process(topic1, "B", "2"); - driver.setTime(2L); - driver.process(topic1, "C", "3"); - driver.setTime(3L); - driver.process(topic1, "D", "4"); - driver.setTime(4L); - driver.process(topic1, "A", "1"); - - proc1.checkAndClearProcessResult( - "[A@0]:0+1", - "[B@0]:0+2", - "[C@0]:0+3", - "[D@0]:0+4", - "[A@0]:0+1+1" - ); - proc2.checkAndClearProcessResult(); - proc3.checkAndClearProcessResult( - "[A@0]:null", - "[B@0]:null", - "[C@0]:null", - "[D@0]:null", - "[A@0]:null" - ); - - driver.setTime(5L); - driver.process(topic1, "A", "1"); - driver.setTime(6L); - driver.process(topic1, "B", "2"); - driver.setTime(7L); - driver.process(topic1, "D", "4"); - driver.setTime(8L); - driver.process(topic1, "B", "2"); - driver.setTime(9L); - driver.process(topic1, "C", "3"); - - proc1.checkAndClearProcessResult( - "[A@0]:0+1+1+1", "[A@5]:0+1", - "[B@0]:0+2+2", "[B@5]:0+2", - "[D@0]:0+4+4", "[D@5]:0+4", - "[B@0]:0+2+2+2", "[B@5]:0+2+2", - "[C@0]:0+3+3", "[C@5]:0+3" - ); - proc2.checkAndClearProcessResult(); - proc3.checkAndClearProcessResult( - "[A@0]:null", "[A@5]:null", - "[B@0]:null", "[B@5]:null", - "[D@0]:null", "[D@5]:null", - "[B@0]:null", "[B@5]:null", - "[C@0]:null", "[C@5]:null" - ); - - driver.setTime(0L); - driver.process(topic2, "A", "a"); - driver.setTime(1L); - driver.process(topic2, "B", "b"); - driver.setTime(2L); - driver.process(topic2, "C", "c"); - driver.setTime(3L); - driver.process(topic2, "D", "d"); - driver.setTime(4L); - driver.process(topic2, "A", "a"); - - proc1.checkAndClearProcessResult(); - proc2.checkAndClearProcessResult( - "[A@0]:0+a", - "[B@0]:0+b", - "[C@0]:0+c", - "[D@0]:0+d", - "[A@0]:0+a+a" - ); - proc3.checkAndClearProcessResult( - "[A@0]:0+1+1+1%0+a", - "[B@0]:0+2+2+2%0+b", - "[C@0]:0+3+3%0+c", - "[D@0]:0+4+4%0+d", - "[A@0]:0+1+1+1%0+a+a"); - - driver.setTime(5L); - driver.process(topic2, "A", "a"); - driver.setTime(6L); - driver.process(topic2, "B", "b"); - driver.setTime(7L); - driver.process(topic2, "D", "d"); - driver.setTime(8L); - driver.process(topic2, "B", "b"); - driver.setTime(9L); - driver.process(topic2, "C", "c"); - - proc1.checkAndClearProcessResult(); - proc2.checkAndClearProcessResult( - "[A@0]:0+a+a+a", "[A@5]:0+a", - "[B@0]:0+b+b", "[B@5]:0+b", - "[D@0]:0+d+d", "[D@5]:0+d", - "[B@0]:0+b+b+b", "[B@5]:0+b+b", - "[C@0]:0+c+c", "[C@5]:0+c" - ); - proc3.checkAndClearProcessResult( - "[A@0]:0+1+1+1%0+a+a+a", "[A@5]:0+1%0+a", - "[B@0]:0+2+2+2%0+b+b", "[B@5]:0+2+2%0+b", - "[D@0]:0+4+4%0+d+d", "[D@5]:0+4%0+d", - "[B@0]:0+2+2+2%0+b+b+b", "[B@5]:0+2+2%0+b+b", - "[C@0]:0+3+3%0+c+c", "[C@5]:0+3%0+c" - ); - - } finally { - Utils.delete(baseDir); - } + final KStreamBuilder builder = new KStreamBuilder(); + + String topic1 = "topic1"; + String topic2 = "topic2"; + + KStream<String, String> stream1 = builder.stream(strSerde, strSerde, topic1); + KTable<Windowed<String>, String> table1 = stream1.aggregateByKey(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, + HoppingWindows.of("topic1-Canonized").with(10L).every(5L), + strSerde, + strSerde); + + MockProcessorSupplier<Windowed<String>, String> proc1 = new MockProcessorSupplier<>(); + table1.toStream().process(proc1); + + KStream<String, String> stream2 = builder.stream(strSerde, strSerde, topic2); + KTable<Windowed<String>, String> table2 = stream2.aggregateByKey(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, + HoppingWindows.of("topic2-Canonized").with(10L).every(5L), + strSerde, + strSerde); + + MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>(); + table2.toStream().process(proc2); + + + MockProcessorSupplier<Windowed<String>, String> proc3 = new MockProcessorSupplier<>(); + table1.join(table2, new ValueJoiner<String, String, String>() { + @Override + public String apply(String p1, String p2) { + return p1 + "%" + p2; + } + }).toStream().process(proc3); + + driver = new KStreamTestDriver(builder, stateDir); + + driver.setTime(0L); + driver.process(topic1, "A", "1"); + driver.setTime(1L); + driver.process(topic1, "B", "2"); + driver.setTime(2L); + driver.process(topic1, "C", "3"); + driver.setTime(3L); + driver.process(topic1, "D", "4"); + driver.setTime(4L); + driver.process(topic1, "A", "1"); + + proc1.checkAndClearProcessResult( + "[A@0]:0+1", + "[B@0]:0+2", + "[C@0]:0+3", + "[D@0]:0+4", + "[A@0]:0+1+1" + ); + proc2.checkAndClearProcessResult(); + proc3.checkAndClearProcessResult( + "[A@0]:null", + "[B@0]:null", + "[C@0]:null", + "[D@0]:null", + "[A@0]:null" + ); + + driver.setTime(5L); + driver.process(topic1, "A", "1"); + driver.setTime(6L); + driver.process(topic1, "B", "2"); + driver.setTime(7L); + driver.process(topic1, "D", "4"); + driver.setTime(8L); + driver.process(topic1, "B", "2"); + driver.setTime(9L); + driver.process(topic1, "C", "3"); + + proc1.checkAndClearProcessResult( + "[A@0]:0+1+1+1", "[A@5]:0+1", + "[B@0]:0+2+2", "[B@5]:0+2", + "[D@0]:0+4+4", "[D@5]:0+4", + "[B@0]:0+2+2+2", "[B@5]:0+2+2", + "[C@0]:0+3+3", "[C@5]:0+3" + ); + proc2.checkAndClearProcessResult(); + proc3.checkAndClearProcessResult( + "[A@0]:null", "[A@5]:null", + "[B@0]:null", "[B@5]:null", + "[D@0]:null", "[D@5]:null", + "[B@0]:null", "[B@5]:null", + "[C@0]:null", "[C@5]:null" + ); + + driver.setTime(0L); + driver.process(topic2, "A", "a"); + driver.setTime(1L); + driver.process(topic2, "B", "b"); + driver.setTime(2L); + driver.process(topic2, "C", "c"); + driver.setTime(3L); + driver.process(topic2, "D", "d"); + driver.setTime(4L); + driver.process(topic2, "A", "a"); + + proc1.checkAndClearProcessResult(); + proc2.checkAndClearProcessResult( + "[A@0]:0+a", + "[B@0]:0+b", + "[C@0]:0+c", + "[D@0]:0+d", + "[A@0]:0+a+a" + ); + proc3.checkAndClearProcessResult( + "[A@0]:0+1+1+1%0+a", + "[B@0]:0+2+2+2%0+b", + "[C@0]:0+3+3%0+c", + "[D@0]:0+4+4%0+d", + "[A@0]:0+1+1+1%0+a+a"); + + driver.setTime(5L); + driver.process(topic2, "A", "a"); + driver.setTime(6L); + driver.process(topic2, "B", "b"); + driver.setTime(7L); + driver.process(topic2, "D", "d"); + driver.setTime(8L); + driver.process(topic2, "B", "b"); + driver.setTime(9L); + driver.process(topic2, "C", "c"); + + proc1.checkAndClearProcessResult(); + proc2.checkAndClearProcessResult( + "[A@0]:0+a+a+a", "[A@5]:0+a", + "[B@0]:0+b+b", "[B@5]:0+b", + "[D@0]:0+d+d", "[D@5]:0+d", + "[B@0]:0+b+b+b", "[B@5]:0+b+b", + "[C@0]:0+c+c", "[C@5]:0+c" + ); + proc3.checkAndClearProcessResult( + "[A@0]:0+1+1+1%0+a+a+a", "[A@5]:0+1%0+a", + "[B@0]:0+2+2+2%0+b+b", "[B@5]:0+2+2%0+b", + "[D@0]:0+4+4%0+d+d", "[D@5]:0+4%0+d", + "[B@0]:0+2+2+2%0+b+b+b", "[B@5]:0+2+2%0+b+b", + "[C@0]:0+3+3%0+c+c", "[C@5]:0+3%0+c" + ); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java index be0ec19..a614479 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java @@ -27,61 +27,73 @@ import org.apache.kafka.test.MockAggregator; import org.apache.kafka.test.MockInitializer; import org.apache.kafka.test.MockKeyValueMapper; import org.apache.kafka.test.MockProcessorSupplier; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; import org.junit.Test; import java.io.File; -import java.nio.file.Files; +import java.io.IOException; import static org.junit.Assert.assertEquals; public class KTableAggregateTest { - final private Serde<String> stringSerde = new Serdes.StringSerde(); + final private Serde<String> stringSerde = Serdes.String(); - @Test - public void testAggBasic() throws Exception { - final File baseDir = Files.createTempDirectory("test").toFile(); + private KStreamTestDriver driver = null; + private File stateDir = null; - try { - final KStreamBuilder builder = new KStreamBuilder(); - String topic1 = "topic1"; + @After + public void tearDown() { + if (driver != null) { + driver.close(); + } + driver = null; + } - KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1); - KTable<String, String> table2 = table1.groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper(), - stringSerde, - stringSerde - ).aggregate(MockInitializer.STRING_INIT, - MockAggregator.STRING_ADDER, - MockAggregator.STRING_REMOVER, - stringSerde, - "topic1-Canonized"); + @Before + public void setUp() throws IOException { + stateDir = TestUtils.tempDirectory("kafka-test"); + } - MockProcessorSupplier<String, String> proc2 = new MockProcessorSupplier<>(); - table2.toStream().process(proc2); + @Test + public void testAggBasic() throws Exception { + final KStreamBuilder builder = new KStreamBuilder(); + String topic1 = "topic1"; - KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir); + KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1); + KTable<String, String> table2 = table1.groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper(), + stringSerde, + stringSerde + ).aggregate(MockInitializer.STRING_INIT, + MockAggregator.STRING_ADDER, + MockAggregator.STRING_REMOVER, + stringSerde, + "topic1-Canonized"); - driver.process(topic1, "A", "1"); - driver.process(topic1, "B", "2"); - driver.process(topic1, "A", "3"); - driver.process(topic1, "B", "4"); - driver.process(topic1, "C", "5"); - driver.process(topic1, "D", "6"); - driver.process(topic1, "B", "7"); - driver.process(topic1, "C", "8"); + MockProcessorSupplier<String, String> proc2 = new MockProcessorSupplier<>(); + table2.toStream().process(proc2); - assertEquals(Utils.mkList( - "A:0+1", - "B:0+2", - "A:0+1+3", "A:0+1+3-1", - "B:0+2+4", "B:0+2+4-2", - "C:0+5", - "D:0+6", - "B:0+2+4-2+7", "B:0+2+4-2+7-4", - "C:0+5+8", "C:0+5+8-5"), proc2.processed); + driver = new KStreamTestDriver(builder, stateDir); - } finally { - Utils.delete(baseDir); - } + driver.process(topic1, "A", "1"); + driver.process(topic1, "B", "2"); + driver.process(topic1, "A", "3"); + driver.process(topic1, "B", "4"); + driver.process(topic1, "C", "5"); + driver.process(topic1, "D", "6"); + driver.process(topic1, "B", "7"); + driver.process(topic1, "C", "8"); + + assertEquals(Utils.mkList( + "A:0+1", + "B:0+2", + "A:0+1+3", "A:0+1+3-1", + "B:0+2+4", "B:0+2+4-2", + "C:0+5", + "D:0+6", + "B:0+2+4-2+7", "B:0+2+4-2+7-4", + "C:0+5+8", "C:0+5+8-5"), proc2.processed); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java index ee26058..a3af133 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java @@ -19,25 +19,42 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Predicate; import org.apache.kafka.test.KStreamTestDriver; import org.apache.kafka.test.MockProcessorSupplier; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; import org.junit.Test; import java.io.File; import java.io.IOException; -import java.nio.file.Files; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; public class KTableFilterTest { - final private Serde<Integer> intSerde = new Serdes.IntegerSerde(); - final private Serde<String> stringSerde = new Serdes.StringSerde(); + final private Serde<Integer> intSerde = Serdes.Integer(); + final private Serde<String> stringSerde = Serdes.String(); + + private KStreamTestDriver driver = null; + private File stateDir = null; + + @After + public void tearDown() { + if (driver != null) { + driver.close(); + } + driver = null; + } + + @Before + public void setUp() throws IOException { + stateDir = TestUtils.tempDirectory("kafka-test"); + } @Test public void testKTable() { @@ -65,7 +82,7 @@ public class KTableFilterTest { table2.toStream().process(proc2); table3.toStream().process(proc3); - KStreamTestDriver driver = new KStreamTestDriver(builder); + driver = new KStreamTestDriver(builder); driver.process(topic1, "A", 1); driver.process(topic1, "B", 2); @@ -80,199 +97,181 @@ public class KTableFilterTest { @Test public void testValueGetter() throws IOException { - File stateDir = Files.createTempDirectory("test").toFile(); - try { - final KStreamBuilder builder = new KStreamBuilder(); - - String topic1 = "topic1"; - - KTableImpl<String, Integer, Integer> table1 = - (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1); - KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter( - new Predicate<String, Integer>() { - @Override - public boolean test(String key, Integer value) { - return (value % 2) == 0; - } - }); - KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table1.filterNot( - new Predicate<String, Integer>() { - @Override - public boolean test(String key, Integer value) { - return (value % 2) == 0; - } - }); - - KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier(); - KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier(); - - KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null); - - KTableValueGetter<String, Integer> getter2 = getterSupplier2.get(); - KTableValueGetter<String, Integer> getter3 = getterSupplier3.get(); - - getter2.init(driver.context()); - getter3.init(driver.context()); - - driver.process(topic1, "A", 1); - driver.process(topic1, "B", 1); - driver.process(topic1, "C", 1); - - assertNull(getter2.get("A")); - assertNull(getter2.get("B")); - assertNull(getter2.get("C")); - - assertEquals(1, (int) getter3.get("A")); - assertEquals(1, (int) getter3.get("B")); - assertEquals(1, (int) getter3.get("C")); - - driver.process(topic1, "A", 2); - driver.process(topic1, "B", 2); - - assertEquals(2, (int) getter2.get("A")); - assertEquals(2, (int) getter2.get("B")); - assertNull(getter2.get("C")); - - assertNull(getter3.get("A")); - assertNull(getter3.get("B")); - assertEquals(1, (int) getter3.get("C")); - - driver.process(topic1, "A", 3); - - assertNull(getter2.get("A")); - assertEquals(2, (int) getter2.get("B")); - assertNull(getter2.get("C")); - - assertEquals(3, (int) getter3.get("A")); - assertNull(getter3.get("B")); - assertEquals(1, (int) getter3.get("C")); - - driver.process(topic1, "A", null); - driver.process(topic1, "B", null); - - assertNull(getter2.get("A")); - assertNull(getter2.get("B")); - assertNull(getter2.get("C")); - - assertNull(getter3.get("A")); - assertNull(getter3.get("B")); - assertEquals(1, (int) getter3.get("C")); - - } finally { - Utils.delete(stateDir); - } + KStreamBuilder builder = new KStreamBuilder(); + + String topic1 = "topic1"; + + KTableImpl<String, Integer, Integer> table1 = + (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1); + KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter( + new Predicate<String, Integer>() { + @Override + public boolean test(String key, Integer value) { + return (value % 2) == 0; + } + }); + KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table1.filterNot( + new Predicate<String, Integer>() { + @Override + public boolean test(String key, Integer value) { + return (value % 2) == 0; + } + }); + + KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier(); + KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier(); + + driver = new KStreamTestDriver(builder, stateDir, null, null); + + KTableValueGetter<String, Integer> getter2 = getterSupplier2.get(); + KTableValueGetter<String, Integer> getter3 = getterSupplier3.get(); + + getter2.init(driver.context()); + getter3.init(driver.context()); + + driver.process(topic1, "A", 1); + driver.process(topic1, "B", 1); + driver.process(topic1, "C", 1); + + assertNull(getter2.get("A")); + assertNull(getter2.get("B")); + assertNull(getter2.get("C")); + + assertEquals(1, (int) getter3.get("A")); + assertEquals(1, (int) getter3.get("B")); + assertEquals(1, (int) getter3.get("C")); + + driver.process(topic1, "A", 2); + driver.process(topic1, "B", 2); + + assertEquals(2, (int) getter2.get("A")); + assertEquals(2, (int) getter2.get("B")); + assertNull(getter2.get("C")); + + assertNull(getter3.get("A")); + assertNull(getter3.get("B")); + assertEquals(1, (int) getter3.get("C")); + + driver.process(topic1, "A", 3); + + assertNull(getter2.get("A")); + assertEquals(2, (int) getter2.get("B")); + assertNull(getter2.get("C")); + + assertEquals(3, (int) getter3.get("A")); + assertNull(getter3.get("B")); + assertEquals(1, (int) getter3.get("C")); + + driver.process(topic1, "A", null); + driver.process(topic1, "B", null); + + assertNull(getter2.get("A")); + assertNull(getter2.get("B")); + assertNull(getter2.get("C")); + + assertNull(getter3.get("A")); + assertNull(getter3.get("B")); + assertEquals(1, (int) getter3.get("C")); } @Test public void testNotSendingOldValue() throws IOException { - File stateDir = Files.createTempDirectory("test").toFile(); - try { - final KStreamBuilder builder = new KStreamBuilder(); - - String topic1 = "topic1"; - - KTableImpl<String, Integer, Integer> table1 = - (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1); - KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter( - new Predicate<String, Integer>() { - @Override - public boolean test(String key, Integer value) { - return (value % 2) == 0; - } - }); + KStreamBuilder builder = new KStreamBuilder(); - MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>(); - MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>(); + String topic1 = "topic1"; - builder.addProcessor("proc1", proc1, table1.name); - builder.addProcessor("proc2", proc2, table2.name); + KTableImpl<String, Integer, Integer> table1 = + (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1); + KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter( + new Predicate<String, Integer>() { + @Override + public boolean test(String key, Integer value) { + return (value % 2) == 0; + } + }); + + MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>(); + MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>(); - KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null); + builder.addProcessor("proc1", proc1, table1.name); + builder.addProcessor("proc2", proc2, table2.name); - driver.process(topic1, "A", 1); - driver.process(topic1, "B", 1); - driver.process(topic1, "C", 1); + driver = new KStreamTestDriver(builder, stateDir, null, null); - proc1.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)"); - proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)", "C:(null<-null)"); + driver.process(topic1, "A", 1); + driver.process(topic1, "B", 1); + driver.process(topic1, "C", 1); - driver.process(topic1, "A", 2); - driver.process(topic1, "B", 2); + proc1.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)"); + proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)", "C:(null<-null)"); - proc1.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)"); - proc2.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)"); + driver.process(topic1, "A", 2); + driver.process(topic1, "B", 2); - driver.process(topic1, "A", 3); + proc1.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)"); + proc2.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)"); - proc1.checkAndClearProcessResult("A:(3<-null)"); - proc2.checkAndClearProcessResult("A:(null<-null)"); + driver.process(topic1, "A", 3); - driver.process(topic1, "A", null); - driver.process(topic1, "B", null); + proc1.checkAndClearProcessResult("A:(3<-null)"); + proc2.checkAndClearProcessResult("A:(null<-null)"); - proc1.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)"); - proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)"); + driver.process(topic1, "A", null); + driver.process(topic1, "B", null); - } finally { - Utils.delete(stateDir); - } + proc1.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)"); + proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)"); } @Test public void testSendingOldValue() throws IOException { - File stateDir = Files.createTempDirectory("test").toFile(); - try { - final KStreamBuilder builder = new KStreamBuilder(); - - String topic1 = "topic1"; + KStreamBuilder builder = new KStreamBuilder(); - KTableImpl<String, Integer, Integer> table1 = - (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1); - KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter( - new Predicate<String, Integer>() { - @Override - public boolean test(String key, Integer value) { - return (value % 2) == 0; - } - }); + String topic1 = "topic1"; - table2.enableSendingOldValues(); + KTableImpl<String, Integer, Integer> table1 = + (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1); + KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter( + new Predicate<String, Integer>() { + @Override + public boolean test(String key, Integer value) { + return (value % 2) == 0; + } + }); - MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>(); - MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>(); + table2.enableSendingOldValues(); - builder.addProcessor("proc1", proc1, table1.name); - builder.addProcessor("proc2", proc2, table2.name); + MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>(); + MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>(); - KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null); + builder.addProcessor("proc1", proc1, table1.name); + builder.addProcessor("proc2", proc2, table2.name); - driver.process(topic1, "A", 1); - driver.process(topic1, "B", 1); - driver.process(topic1, "C", 1); + driver = new KStreamTestDriver(builder, stateDir, null, null); - proc1.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)"); - proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)", "C:(null<-null)"); + driver.process(topic1, "A", 1); + driver.process(topic1, "B", 1); + driver.process(topic1, "C", 1); - driver.process(topic1, "A", 2); - driver.process(topic1, "B", 2); + proc1.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)"); + proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)", "C:(null<-null)"); - proc1.checkAndClearProcessResult("A:(2<-1)", "B:(2<-1)"); - proc2.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)"); + driver.process(topic1, "A", 2); + driver.process(topic1, "B", 2); - driver.process(topic1, "A", 3); + proc1.checkAndClearProcessResult("A:(2<-1)", "B:(2<-1)"); + proc2.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)"); - proc1.checkAndClearProcessResult("A:(3<-2)"); - proc2.checkAndClearProcessResult("A:(null<-2)"); + driver.process(topic1, "A", 3); - driver.process(topic1, "A", null); - driver.process(topic1, "B", null); + proc1.checkAndClearProcessResult("A:(3<-2)"); + proc2.checkAndClearProcessResult("A:(null<-2)"); - proc1.checkAndClearProcessResult("A:(null<-3)", "B:(null<-2)"); - proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-2)"); + driver.process(topic1, "A", null); + driver.process(topic1, "B", null); - } finally { - Utils.delete(stateDir); - } + proc1.checkAndClearProcessResult("A:(null<-3)", "B:(null<-2)"); + proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-2)"); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java index 27a5114..af131c2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java @@ -24,6 +24,7 @@ import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.test.KStreamTestDriver; +import org.junit.After; import org.junit.Test; import java.util.List; import java.util.Locale; @@ -39,6 +40,16 @@ public class KTableForeachTest { final private Serde<Integer> intSerde = Serdes.Integer(); final private Serde<String> stringSerde = Serdes.String(); + private KStreamTestDriver driver; + + @After + public void cleanup() { + if (driver != null) { + driver.close(); + } + driver = null; + } + @Test public void testForeach() { // Given @@ -71,7 +82,7 @@ public class KTableForeachTest { table.foreach(action); // Then - KStreamTestDriver driver = new KStreamTestDriver(builder); + driver = new KStreamTestDriver(builder); for (KeyValue<Integer, String> record: inputRecords) { driver.process(topicName, record.key, record.value); } http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java index 8a13e9a..ca3bbe1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java @@ -33,11 +33,13 @@ import org.apache.kafka.test.MockInitializer; import org.apache.kafka.test.MockKeyValueMapper; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockReducer; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; import org.junit.Test; import java.io.File; import java.io.IOException; -import java.nio.file.Files; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -46,7 +48,23 @@ import static org.junit.Assert.assertTrue; public class KTableImplTest { - final private Serde<String> stringSerde = new Serdes.StringSerde(); + final private Serde<String> stringSerde = Serdes.String(); + + private KStreamTestDriver driver = null; + private File stateDir = null; + + @After + public void tearDown() { + if (driver != null) { + driver.close(); + } + driver = null; + } + + @Before + public void setUp() throws IOException { + stateDir = TestUtils.tempDirectory("kafka-test"); + } @Test public void testKTable() { @@ -85,7 +103,7 @@ public class KTableImplTest { MockProcessorSupplier<String, String> proc4 = new MockProcessorSupplier<>(); table4.toStream().process(proc4); - KStreamTestDriver driver = new KStreamTestDriver(builder); + driver = new KStreamTestDriver(builder); driver.process(topic1, "A", "01"); driver.process(topic1, "B", "02"); @@ -100,129 +118,157 @@ public class KTableImplTest { @Test public void testValueGetter() throws IOException { - File stateDir = Files.createTempDirectory("test").toFile(); - try { - final KStreamBuilder builder = new KStreamBuilder(); - - String topic1 = "topic1"; - String topic2 = "topic2"; - - KTableImpl<String, String, String> table1 = - (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1); - KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues( - new ValueMapper<String, Integer>() { - @Override - public Integer apply(String value) { - return new Integer(value); - } - }); - KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table2.filter( - new Predicate<String, Integer>() { - @Override - public boolean test(String key, Integer value) { - return (value % 2) == 0; - } - }); - KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>) - table1.through(stringSerde, stringSerde, topic2); - - KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier(); - KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier(); - KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier(); - KTableValueGetterSupplier<String, String> getterSupplier4 = table4.valueGetterSupplier(); - - KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null); - - // two state store should be created - assertEquals(2, driver.allStateStores().size()); - - KTableValueGetter<String, String> getter1 = getterSupplier1.get(); - getter1.init(driver.context()); - KTableValueGetter<String, Integer> getter2 = getterSupplier2.get(); - getter2.init(driver.context()); - KTableValueGetter<String, Integer> getter3 = getterSupplier3.get(); - getter3.init(driver.context()); - KTableValueGetter<String, String> getter4 = getterSupplier4.get(); - getter4.init(driver.context()); - - driver.process(topic1, "A", "01"); - driver.process(topic1, "B", "01"); - driver.process(topic1, "C", "01"); - - assertEquals("01", getter1.get("A")); - assertEquals("01", getter1.get("B")); - assertEquals("01", getter1.get("C")); - - assertEquals(new Integer(1), getter2.get("A")); - assertEquals(new Integer(1), getter2.get("B")); - assertEquals(new Integer(1), getter2.get("C")); - - assertNull(getter3.get("A")); - assertNull(getter3.get("B")); - assertNull(getter3.get("C")); - - assertEquals("01", getter4.get("A")); - assertEquals("01", getter4.get("B")); - assertEquals("01", getter4.get("C")); - - driver.process(topic1, "A", "02"); - driver.process(topic1, "B", "02"); - - assertEquals("02", getter1.get("A")); - assertEquals("02", getter1.get("B")); - assertEquals("01", getter1.get("C")); - - assertEquals(new Integer(2), getter2.get("A")); - assertEquals(new Integer(2), getter2.get("B")); - assertEquals(new Integer(1), getter2.get("C")); - - assertEquals(new Integer(2), getter3.get("A")); - assertEquals(new Integer(2), getter3.get("B")); - assertNull(getter3.get("C")); - - assertEquals("02", getter4.get("A")); - assertEquals("02", getter4.get("B")); - assertEquals("01", getter4.get("C")); - - driver.process(topic1, "A", "03"); - - assertEquals("03", getter1.get("A")); - assertEquals("02", getter1.get("B")); - assertEquals("01", getter1.get("C")); - - assertEquals(new Integer(3), getter2.get("A")); - assertEquals(new Integer(2), getter2.get("B")); - assertEquals(new Integer(1), getter2.get("C")); - - assertNull(getter3.get("A")); - assertEquals(new Integer(2), getter3.get("B")); - assertNull(getter3.get("C")); - - assertEquals("03", getter4.get("A")); - assertEquals("02", getter4.get("B")); - assertEquals("01", getter4.get("C")); - - driver.process(topic1, "A", null); - - assertNull(getter1.get("A")); - assertEquals("02", getter1.get("B")); - assertEquals("01", getter1.get("C")); - - assertNull(getter2.get("A")); - assertEquals(new Integer(2), getter2.get("B")); - assertEquals(new Integer(1), getter2.get("C")); - - assertNull(getter3.get("A")); - assertEquals(new Integer(2), getter3.get("B")); - assertNull(getter3.get("C")); - - assertNull(getter4.get("A")); - assertEquals("02", getter4.get("B")); - assertEquals("01", getter4.get("C")); + final KStreamBuilder builder = new KStreamBuilder(); - } finally { - Utils.delete(stateDir); - } + String topic1 = "topic1"; + String topic2 = "topic2"; + + KTableImpl<String, String, String> table1 = + (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1); + KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues( + new ValueMapper<String, Integer>() { + @Override + public Integer apply(String value) { + return new Integer(value); + } + }); + KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table2.filter( + new Predicate<String, Integer>() { + @Override + public boolean test(String key, Integer value) { + return (value % 2) == 0; + } + }); + KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>) + table1.through(stringSerde, stringSerde, topic2); + + KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier(); + KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier(); + KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier(); + KTableValueGetterSupplier<String, String> getterSupplier4 = table4.valueGetterSupplier(); + + driver = new KStreamTestDriver(builder, stateDir, null, null); + + // two state store should be created + assertEquals(2, driver.allStateStores().size()); + + KTableValueGetter<String, String> getter1 = getterSupplier1.get(); + getter1.init(driver.context()); + KTableValueGetter<String, Integer> getter2 = getterSupplier2.get(); + getter2.init(driver.context()); + KTableValueGetter<String, Integer> getter3 = getterSupplier3.get(); + getter3.init(driver.context()); + KTableValueGetter<String, String> getter4 = getterSupplier4.get(); + getter4.init(driver.context()); + + driver.process(topic1, "A", "01"); + driver.process(topic1, "B", "01"); + driver.process(topic1, "C", "01"); + + assertEquals("01", getter1.get("A")); + assertEquals("01", getter1.get("B")); + assertEquals("01", getter1.get("C")); + + assertEquals(new Integer(1), getter2.get("A")); + assertEquals(new Integer(1), getter2.get("B")); + assertEquals(new Integer(1), getter2.get("C")); + + assertNull(getter3.get("A")); + assertNull(getter3.get("B")); + assertNull(getter3.get("C")); + + assertEquals("01", getter4.get("A")); + assertEquals("01", getter4.get("B")); + assertEquals("01", getter4.get("C")); + + driver.process(topic1, "A", "02"); + driver.process(topic1, "B", "02"); + + assertEquals("02", getter1.get("A")); + assertEquals("02", getter1.get("B")); + assertEquals("01", getter1.get("C")); + + assertEquals(new Integer(2), getter2.get("A")); + assertEquals(new Integer(2), getter2.get("B")); + assertEquals(new Integer(1), getter2.get("C")); + + assertEquals(new Integer(2), getter3.get("A")); + assertEquals(new Integer(2), getter3.get("B")); + assertNull(getter3.get("C")); + + assertEquals("02", getter4.get("A")); + assertEquals("02", getter4.get("B")); + assertEquals("01", getter4.get("C")); + + driver.process(topic1, "A", "03"); + + assertEquals("03", getter1.get("A")); + assertEquals("02", getter1.get("B")); + assertEquals("01", getter1.get("C")); + + assertEquals(new Integer(3), getter2.get("A")); + assertEquals(new Integer(2), getter2.get("B")); + assertEquals(new Integer(1), getter2.get("C")); + + assertNull(getter3.get("A")); + assertEquals(new Integer(2), getter3.get("B")); + assertNull(getter3.get("C")); + + assertEquals("03", getter4.get("A")); + assertEquals("02", getter4.get("B")); + assertEquals("01", getter4.get("C")); + + driver.process(topic1, "A", null); + + assertNull(getter1.get("A")); + assertEquals("02", getter1.get("B")); + assertEquals("01", getter1.get("C")); + + assertNull(getter2.get("A")); + assertEquals(new Integer(2), getter2.get("B")); + assertEquals(new Integer(1), getter2.get("C")); + + assertNull(getter3.get("A")); + assertEquals(new Integer(2), getter3.get("B")); + assertNull(getter3.get("C")); + + assertNull(getter4.get("A")); + assertEquals("02", getter4.get("B")); + assertEquals("01", getter4.get("C")); + } + + @Test + public void testStateStoreLazyEval() throws IOException { + String topic1 = "topic1"; + String topic2 = "topic2"; + + final KStreamBuilder builder = new KStreamBuilder(); + + KTableImpl<String, String, String> table1 = + (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1); + KTableImpl<String, String, String> table2 = + (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic2); + + KTableImpl<String, String, Integer> table1Mapped = (KTableImpl<String, String, Integer>) table1.mapValues( + new ValueMapper<String, Integer>() { + @Override + public Integer apply(String value) { + return new Integer(value); + } + }); + KTableImpl<String, Integer, Integer> table1MappedFiltered = (KTableImpl<String, Integer, Integer>) table1Mapped.filter( + new Predicate<String, Integer>() { + @Override + public boolean test(String key, Integer value) { + return (value % 2) == 0; + } + }); + + driver = new KStreamTestDriver(builder, stateDir, null, null); + driver.setTime(0L); + + // no state store should be created + assertEquals(0, driver.allStateStores().size()); } @Test @@ -230,120 +276,75 @@ public class KTableImplTest { String topic1 = "topic1"; String topic2 = "topic2"; - File stateDir = Files.createTempDirectory("test").toFile(); - try { - KStreamBuilder builder = new KStreamBuilder(); - - KTableImpl<String, String, String> table1 = - (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1); - KTableImpl<String, String, String> table2 = - (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic2); - - KTableImpl<String, String, Integer> table1Mapped = (KTableImpl<String, String, Integer>) table1.mapValues( - new ValueMapper<String, Integer>() { - @Override - public Integer apply(String value) { - return new Integer(value); - } - }); - KTableImpl<String, Integer, Integer> table1MappedFiltered = (KTableImpl<String, Integer, Integer>) table1Mapped.filter( - new Predicate<String, Integer>() { - @Override - public boolean test(String key, Integer value) { - return (value % 2) == 0; - } - }); - - KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null); - driver.setTime(0L); - - // no state store should be created - assertEquals(0, driver.allStateStores().size()); - - } finally { - Utils.delete(stateDir); - } + final KStreamBuilder builder = new KStreamBuilder(); - try { - KStreamBuilder builder = new KStreamBuilder(); - - KTableImpl<String, String, String> table1 = - (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1); - KTableImpl<String, String, String> table2 = - (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic2); - - KTableImpl<String, String, Integer> table1Mapped = (KTableImpl<String, String, Integer>) table1.mapValues( - new ValueMapper<String, Integer>() { - @Override - public Integer apply(String value) { - return new Integer(value); - } - }); - KTableImpl<String, Integer, Integer> table1MappedFiltered = (KTableImpl<String, Integer, Integer>) table1Mapped.filter( - new Predicate<String, Integer>() { - @Override - public boolean test(String key, Integer value) { - return (value % 2) == 0; - } - }); - table2.join(table1MappedFiltered, - new ValueJoiner<String, Integer, String>() { - @Override - public String apply(String v1, Integer v2) { - return v1 + v2; - } - }); - - KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null); - driver.setTime(0L); - - // two state store should be created - assertEquals(2, driver.allStateStores().size()); - - } finally { - Utils.delete(stateDir); - } + KTableImpl<String, String, String> table1 = + (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1); + KTableImpl<String, String, String> table2 = + (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic2); + + KTableImpl<String, String, Integer> table1Mapped = (KTableImpl<String, String, Integer>) table1.mapValues( + new ValueMapper<String, Integer>() { + @Override + public Integer apply(String value) { + return new Integer(value); + } + }); + KTableImpl<String, Integer, Integer> table1MappedFiltered = (KTableImpl<String, Integer, Integer>) table1Mapped.filter( + new Predicate<String, Integer>() { + @Override + public boolean test(String key, Integer value) { + return (value % 2) == 0; + } + }); + table2.join(table1MappedFiltered, + new ValueJoiner<String, Integer, String>() { + @Override + public String apply(String v1, Integer v2) { + return v1 + v2; + } + }); + + driver = new KStreamTestDriver(builder, stateDir, null, null); + driver.setTime(0L); + + // two state store should be created + assertEquals(2, driver.allStateStores().size()); } @Test public void testRepartition() throws IOException { String topic1 = "topic1"; - File stateDir = Files.createTempDirectory("test").toFile(); - try { - KStreamBuilder builder = new KStreamBuilder(); - - KTableImpl<String, String, String> table1 = - (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1); + final KStreamBuilder builder = new KStreamBuilder(); - KTableImpl<String, String, String> table1Aggregated = (KTableImpl<String, String, String>) table1 - .groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper()) - .aggregate(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, MockAggregator.STRING_REMOVER, "mock-result1"); + KTableImpl<String, String, String> table1 = + (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1); + KTableImpl<String, String, String> table1Aggregated = (KTableImpl<String, String, String>) table1 + .groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper()) + .aggregate(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, MockAggregator.STRING_REMOVER, "mock-result1"); - KTableImpl<String, String, String> table1Reduced = (KTableImpl<String, String, String>) table1 - .groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper()) - .reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, "mock-result2"); - KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, stringSerde, stringSerde); - driver.setTime(0L); + KTableImpl<String, String, String> table1Reduced = (KTableImpl<String, String, String>) table1 + .groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper()) + .reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, "mock-result2"); - // three state store should be created, one for source, one for aggregate and one for reduce - assertEquals(3, driver.allStateStores().size()); + driver = new KStreamTestDriver(builder, stateDir, stringSerde, stringSerde); + driver.setTime(0L); - // contains the corresponding repartition source / sink nodes - assertTrue(driver.allProcessorNames().contains("KSTREAM-SINK-0000000003")); - assertTrue(driver.allProcessorNames().contains("KSTREAM-SOURCE-0000000004")); - assertTrue(driver.allProcessorNames().contains("KSTREAM-SINK-0000000007")); - assertTrue(driver.allProcessorNames().contains("KSTREAM-SOURCE-0000000008")); + // three state store should be created, one for source, one for aggregate and one for reduce + assertEquals(3, driver.allStateStores().size()); - assertNotNull(((ChangedSerializer) ((SinkNode) driver.processor("KSTREAM-SINK-0000000003")).valueSerializer()).inner()); - assertNotNull(((ChangedDeserializer) ((SourceNode) driver.processor("KSTREAM-SOURCE-0000000004")).valueDeserializer()).inner()); - assertNotNull(((ChangedSerializer) ((SinkNode) driver.processor("KSTREAM-SINK-0000000007")).valueSerializer()).inner()); - assertNotNull(((ChangedDeserializer) ((SourceNode) driver.processor("KSTREAM-SOURCE-0000000008")).valueDeserializer()).inner()); + // contains the corresponding repartition source / sink nodes + assertTrue(driver.allProcessorNames().contains("KSTREAM-SINK-0000000003")); + assertTrue(driver.allProcessorNames().contains("KSTREAM-SOURCE-0000000004")); + assertTrue(driver.allProcessorNames().contains("KSTREAM-SINK-0000000007")); + assertTrue(driver.allProcessorNames().contains("KSTREAM-SOURCE-0000000008")); - } finally { - Utils.delete(stateDir); - } + assertNotNull(((ChangedSerializer) ((SinkNode) driver.processor("KSTREAM-SINK-0000000003")).valueSerializer()).inner()); + assertNotNull(((ChangedDeserializer) ((SourceNode) driver.processor("KSTREAM-SOURCE-0000000004")).valueDeserializer()).inner()); + assertNotNull(((ChangedSerializer) ((SinkNode) driver.processor("KSTREAM-SINK-0000000007")).valueSerializer()).inner()); + assertNotNull(((ChangedDeserializer) ((SourceNode) driver.processor("KSTREAM-SOURCE-0000000008")).valueDeserializer()).inner()); } }
