KAFKA-3607: Close KStreamTestDriver upon completing; follow-up fixes to be tracked in KAFKA-3623
Author: Guozhang Wang <[email protected]> Reviewers: Eno Thereska, Michael G. Noll, Ismael Juma Closes #1258 from guozhangwang/K3607 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1a73629b Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1a73629b Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1a73629b Branch: refs/heads/trunk Commit: 1a73629bb43bbc781e5a968a61f6079365bc75b7 Parents: f60a3fa Author: Guozhang Wang <[email protected]> Authored: Tue Apr 26 11:39:49 2016 -0700 Committer: Guozhang Wang <[email protected]> Committed: Tue Apr 26 11:39:49 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/kafka/test/TestUtils.java | 37 +- .../test/scala/unit/kafka/utils/TestUtils.scala | 9 +- .../streams/kstream/KStreamBuilderTest.java | 13 +- .../internals/KGroupedTableImplTest.java | 11 +- .../kstream/internals/KStreamBranchTest.java | 13 +- .../kstream/internals/KStreamFilterTest.java | 15 +- .../kstream/internals/KStreamFlatMapTest.java | 13 +- .../internals/KStreamFlatMapValuesTest.java | 13 +- .../kstream/internals/KStreamForeachTest.java | 13 +- .../internals/KStreamKStreamJoinTest.java | 651 +++++++++---------- .../internals/KStreamKStreamLeftJoinTest.java | 341 +++++----- .../internals/KStreamKTableLeftJoinTest.java | 153 +++-- .../kstream/internals/KStreamMapTest.java | 13 +- .../kstream/internals/KStreamMapValuesTest.java | 13 +- .../kstream/internals/KStreamSelectKeyTest.java | 13 +- .../kstream/internals/KStreamTransformTest.java | 13 +- .../internals/KStreamTransformValuesTest.java | 13 +- .../internals/KStreamWindowAggregateTest.java | 455 +++++++------ .../kstream/internals/KTableAggregateTest.java | 92 +-- .../kstream/internals/KTableFilterTest.java | 327 +++++----- .../kstream/internals/KTableForeachTest.java | 13 +- .../kstream/internals/KTableImplTest.java | 451 ++++++------- .../kstream/internals/KTableKTableJoinTest.java | 394 ++++++----- .../internals/KTableKTableLeftJoinTest.java | 397 ++++++----- .../internals/KTableKTableOuterJoinTest.java | 426 ++++++------ .../kstream/internals/KTableMapKeysTest.java | 13 +- .../kstream/internals/KTableMapValuesTest.java | 381 ++++++----- .../kstream/internals/KTableSourceTest.java | 172 ++--- .../internals/KeyValuePrinterProcessorTest.java | 15 +- .../apache/kafka/test/KStreamTestDriver.java | 28 +- .../apache/kafka/test/MockKeyValueMapper.java | 2 +- .../org/apache/kafka/test/MockValueJoiner.java | 33 + 32 files changed, 2349 insertions(+), 2197 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/clients/src/test/java/org/apache/kafka/test/TestUtils.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java index 027221e..1bfe578 100644 --- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java +++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java @@ -20,6 +20,8 @@ import static java.util.Arrays.asList; import java.io.File; import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -29,6 +31,7 @@ import java.util.Random; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.utils.Utils; /** @@ -97,12 +100,44 @@ public class TestUtils { } /** - * Creates an empty file in the default temporary-file directory, using `kafka` as the prefix and `tmp` as the + * Create an empty file in the default temporary-file directory, using `kafka` as the prefix and `tmp` as the * suffix to generate its name. */ public static File tempFile() throws IOException { File file = File.createTempFile("kafka", ".tmp"); file.deleteOnExit(); + + return file; + } + + /** + * Create a temporary relative directory in the default temporary-file directory with the given prefix. + * + * @param prefix The prefix of the temporary directory, if null using "kafka-" as default prefix + */ + public static File tempDirectory(String prefix) throws IOException { + return tempDirectory(null, prefix); + } + + /** + * Create a temporary relative directory in the specified parent directory with the given prefix. + * + * @param parent The parent folder path name, if null using the default temporary-file directory + * @param prefix The prefix of the temporary directory, if null using "kafka-" as default prefix + */ + public static File tempDirectory(Path parent, String prefix) throws IOException { + final File file = parent == null ? + Files.createTempDirectory(prefix == null ? "kafka-" : prefix).toFile() : + Files.createTempDirectory(parent, prefix == null ? "kafka-" : prefix).toFile(); + file.deleteOnExit(); + + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + Utils.delete(file); + } + }); + return file; } http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/core/src/test/scala/unit/kafka/utils/TestUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index be7741d..6bd6c63 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -95,15 +95,8 @@ object TestUtils extends Logging { def tempRelativeDir(parent: String): File = { val parentFile = new File(parent) parentFile.mkdirs() - val f = Files.createTempDirectory(parentFile.toPath, "kafka-").toFile - f.deleteOnExit() - Runtime.getRuntime().addShutdownHook(new Thread() { - override def run() = { - Utils.delete(f) - } - }) - f + org.apache.kafka.test.TestUtils.tempDirectory(parentFile.toPath, "kafka-"); } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java index e75b595..cdf28db 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java @@ -22,12 +22,23 @@ import org.apache.kafka.streams.kstream.internals.KStreamImpl; import org.apache.kafka.streams.errors.TopologyBuilderException; import org.apache.kafka.test.KStreamTestDriver; import org.apache.kafka.test.MockProcessorSupplier; +import org.junit.After; import org.junit.Test; import static org.junit.Assert.assertEquals; public class KStreamBuilderTest { + private KStreamTestDriver driver = null; + + @After + public void cleanup() { + if (driver != null) { + driver.close(); + } + driver = null; + } + @Test(expected = TopologyBuilderException.class) public void testFrom() { final KStreamBuilder builder = new KStreamBuilder(); @@ -66,7 +77,7 @@ public class KStreamBuilderTest { MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>(); merged.process(processorSupplier); - KStreamTestDriver driver = new KStreamTestDriver(builder); + driver = new KStreamTestDriver(builder); driver.setTime(0L); driver.process(topic1, "A", "aa"); http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java index 9eeea20..fc0451a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java @@ -18,19 +18,17 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.test.KStreamTestDriver; import org.apache.kafka.test.MockProcessorSupplier; -import org.junit.After; +import org.apache.kafka.test.TestUtils; import org.junit.Before; import org.junit.Test; import java.io.File; import java.io.IOException; -import java.nio.file.Files; import java.util.Arrays; import java.util.List; @@ -43,12 +41,7 @@ public class KGroupedTableImplTest { @Before public void setUp() throws IOException { - stateDir = Files.createTempDirectory("test").toFile(); - } - - @After - public void tearDown() throws IOException { - Utils.delete(stateDir); + stateDir = TestUtils.tempDirectory("kafka-test"); } @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java index e04a273..0650b95 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java @@ -23,6 +23,7 @@ import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.Predicate; import org.apache.kafka.test.KStreamTestDriver; import org.apache.kafka.test.MockProcessorSupplier; +import org.junit.After; import org.junit.Test; import java.lang.reflect.Array; @@ -33,6 +34,16 @@ public class KStreamBranchTest { private String topicName = "topic"; + private KStreamTestDriver driver = null; + + @After + public void cleanup() { + if (driver != null) { + driver.close(); + } + driver = null; + } + @SuppressWarnings("unchecked") @Test public void testKStreamBranch() { @@ -74,7 +85,7 @@ public class KStreamBranchTest { branches[i].process(processors[i]); } - KStreamTestDriver driver = new KStreamTestDriver(builder); + driver = new KStreamTestDriver(builder); for (int i = 0; i < expectedKeys.length; i++) { driver.process(topicName, expectedKeys[i], "V" + expectedKeys[i]); } http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java index 75465c8..4be8513 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java @@ -24,6 +24,7 @@ import org.apache.kafka.streams.kstream.Predicate; import org.apache.kafka.test.KStreamTestDriver; import org.apache.kafka.test.MockProcessorSupplier; +import org.junit.After; import org.junit.Test; import static org.junit.Assert.assertEquals; @@ -32,6 +33,16 @@ public class KStreamFilterTest { private String topicName = "topic"; + private KStreamTestDriver driver = null; + + @After + public void cleanup() { + if (driver != null) { + driver.close(); + } + driver = null; + } + private Predicate<Integer, String> isMultipleOfThree = new Predicate<Integer, String>() { @Override public boolean test(Integer key, String value) { @@ -51,7 +62,7 @@ public class KStreamFilterTest { stream = builder.stream(Serdes.Integer(), Serdes.String(), topicName); stream.filter(isMultipleOfThree).process(processor); - KStreamTestDriver driver = new KStreamTestDriver(builder); + driver = new KStreamTestDriver(builder); for (int i = 0; i < expectedKeys.length; i++) { driver.process(topicName, expectedKeys[i], "V" + expectedKeys[i]); } @@ -71,7 +82,7 @@ public class KStreamFilterTest { stream = builder.stream(Serdes.Integer(), Serdes.String(), topicName); stream.filterNot(isMultipleOfThree).process(processor); - KStreamTestDriver driver = new KStreamTestDriver(builder); + driver = new KStreamTestDriver(builder); for (int i = 0; i < expectedKeys.length; i++) { driver.process(topicName, expectedKeys[i], "V" + expectedKeys[i]); } http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java index bc85757..da57d4b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java @@ -24,6 +24,7 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.test.KStreamTestDriver; import org.apache.kafka.test.MockProcessorSupplier; +import org.junit.After; import org.junit.Test; import static org.junit.Assert.assertEquals; @@ -34,6 +35,16 @@ public class KStreamFlatMapTest { private String topicName = "topic"; + private KStreamTestDriver driver = null; + + @After + public void cleanup() { + if (driver != null) { + driver.close(); + } + driver = null; + } + @Test public void testFlatMap() { KStreamBuilder builder = new KStreamBuilder(); @@ -59,7 +70,7 @@ public class KStreamFlatMapTest { stream = builder.stream(Serdes.Integer(), Serdes.String(), topicName); stream.flatMap(mapper).process(processor); - KStreamTestDriver driver = new KStreamTestDriver(builder); + driver = new KStreamTestDriver(builder); for (int i = 0; i < expectedKeys.length; i++) { driver.process(topicName, expectedKeys[i], "V" + expectedKeys[i]); } http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java index 63f5636..9d1141b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java @@ -23,6 +23,7 @@ import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.test.KStreamTestDriver; import org.apache.kafka.test.MockProcessorSupplier; +import org.junit.After; import org.junit.Test; import java.util.ArrayList; @@ -34,6 +35,16 @@ public class KStreamFlatMapValuesTest { private String topicName = "topic"; + private KStreamTestDriver driver = null; + + @After + public void cleanup() { + if (driver != null) { + driver.close(); + } + driver = null; + } + @Test public void testFlatMapValues() { KStreamBuilder builder = new KStreamBuilder(); @@ -58,7 +69,7 @@ public class KStreamFlatMapValuesTest { stream = builder.stream(Serdes.Integer(), Serdes.String(), topicName); stream.flatMapValues(mapper).process(processor); - KStreamTestDriver driver = new KStreamTestDriver(builder); + driver = new KStreamTestDriver(builder); for (int i = 0; i < expectedKeys.length; i++) { driver.process(topicName, expectedKeys[i], "V" + expectedKeys[i]); } http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java index d0a182d..0bc5e77 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java @@ -24,6 +24,7 @@ import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.test.KStreamTestDriver; +import org.junit.After; import org.junit.Test; import java.util.List; import java.util.Locale; @@ -39,6 +40,16 @@ public class KStreamForeachTest { final private Serde<Integer> intSerde = Serdes.Integer(); final private Serde<String> stringSerde = Serdes.String(); + private KStreamTestDriver driver = null; + + @After + public void cleanup() { + if (driver != null) { + driver.close(); + } + driver = null; + } + @Test public void testForeach() { // Given @@ -71,7 +82,7 @@ public class KStreamForeachTest { stream.foreach(action); // Then - KStreamTestDriver driver = new KStreamTestDriver(builder); + driver = new KStreamTestDriver(builder); for (KeyValue<Integer, String> record: inputRecords) { driver.process(topicName, record.key, record.value); } http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java index 19a9411..6b0828a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java @@ -19,17 +19,19 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.kstream.JoinWindows; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; -import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.test.KStreamTestDriver; import org.apache.kafka.test.MockProcessorSupplier; +import org.apache.kafka.test.MockValueJoiner; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; import org.junit.Test; import java.io.File; -import java.nio.file.Files; +import java.io.IOException; import java.util.Arrays; import java.util.Collection; import java.util.HashSet; @@ -39,460 +41,447 @@ import static org.junit.Assert.assertEquals; public class KStreamKStreamJoinTest { - private String topic1 = "topic1"; - private String topic2 = "topic2"; + final private String topic1 = "topic1"; + final private String topic2 = "topic2"; final private Serde<Integer> intSerde = Serdes.Integer(); final private Serde<String> stringSerde = Serdes.String(); - private ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() { - @Override - public String apply(String value1, String value2) { - return value1 + "+" + value2; + private KStreamTestDriver driver = null; + private File stateDir = null; + + @After + public void tearDown() { + if (driver != null) { + driver.close(); } - }; + driver = null; + } + + @Before + public void setUp() throws IOException { + stateDir = TestUtils.tempDirectory("kafka-test"); + } @Test public void testJoin() throws Exception { - File baseDir = Files.createTempDirectory("test").toFile(); - try { - - KStreamBuilder builder = new KStreamBuilder(); - - final int[] expectedKeys = new int[]{0, 1, 2, 3}; - - KStream<Integer, String> stream1; - KStream<Integer, String> stream2; - KStream<Integer, String> joined; - MockProcessorSupplier<Integer, String> processor; + KStreamBuilder builder = new KStreamBuilder(); - processor = new MockProcessorSupplier<>(); - stream1 = builder.stream(intSerde, stringSerde, topic1); - stream2 = builder.stream(intSerde, stringSerde, topic2); - joined = stream1.join(stream2, joiner, JoinWindows.of("test").within(100), intSerde, stringSerde, stringSerde); - joined.process(processor); + final int[] expectedKeys = new int[]{0, 1, 2, 3}; - Collection<Set<String>> copartitionGroups = builder.copartitionGroups(); + KStream<Integer, String> stream1; + KStream<Integer, String> stream2; + KStream<Integer, String> joined; + MockProcessorSupplier<Integer, String> processor; - assertEquals(1, copartitionGroups.size()); - assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); + processor = new MockProcessorSupplier<>(); + stream1 = builder.stream(intSerde, stringSerde, topic1); + stream2 = builder.stream(intSerde, stringSerde, topic2); + joined = stream1.join(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test").within(100), intSerde, stringSerde, stringSerde); + joined.process(processor); - KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir); - driver.setTime(0L); + Collection<Set<String>> copartitionGroups = builder.copartitionGroups(); - // push two items to the primary stream. the other window is empty - // w1 = {} - // w2 = {} - // --> w1 = { 0:X1, 1:X1 } - // w2 = {} + assertEquals(1, copartitionGroups.size()); + assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); - for (int i = 0; i < 2; i++) { - driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); - } + driver = new KStreamTestDriver(builder, stateDir); + driver.setTime(0L); - processor.checkAndClearProcessResult(); + // push two items to the primary stream. the other window is empty + // w1 = {} + // w2 = {} + // --> w1 = { 0:X1, 1:X1 } + // w2 = {} - // push two items to the other stream. this should produce two items. - // w1 = { 0:X0, 1:X1 } - // w2 = {} - // --> w1 = { 0:X1, 1:X1 } - // w2 = { 0:Y0, 1:Y1 } + for (int i = 0; i < 2; i++) { + driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); + } - for (int i = 0; i < 2; i++) { - driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); - } + processor.checkAndClearProcessResult(); - processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1"); + // push two items to the other stream. this should produce two items. + // w1 = { 0:X0, 1:X1 } + // w2 = {} + // --> w1 = { 0:X1, 1:X1 } + // w2 = { 0:Y0, 1:Y1 } - // push all four items to the primary stream. this should produce two items. - // w1 = { 0:X0, 1:X1 } - // w2 = { 0:Y0, 1:Y1 } - // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 } - // w2 = { 0:Y0, 1:Y1 } + for (int i = 0; i < 2; i++) { + driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); + } - for (int i = 0; i < expectedKeys.length; i++) { - driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); - } + processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1"); - processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1"); + // push all four items to the primary stream. this should produce two items. + // w1 = { 0:X0, 1:X1 } + // w2 = { 0:Y0, 1:Y1 } + // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 } + // w2 = { 0:Y0, 1:Y1 } - // push all items to the other stream. this should produce six items. - // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 } - // w2 = { 0:Y0, 1:Y1 } - // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 } - // w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3 } + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); + } - for (int i = 0; i < expectedKeys.length; i++) { - driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); - } + processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1"); - processor.checkAndClearProcessResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); + // push all items to the other stream. this should produce six items. + // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 } + // w2 = { 0:Y0, 1:Y1 } + // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 } + // w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3 } - // push all four items to the primary stream. this should produce six items. - // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 } - // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 - // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 } - // w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3 } + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); + } - for (int i = 0; i < expectedKeys.length; i++) { - driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); - } + processor.checkAndClearProcessResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); - processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3"); + // push all four items to the primary stream. this should produce six items. + // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 } + // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 + // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 } + // w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3 } - // push two items to the other stream. this should produce six item. - // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 } - // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 - // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 } - // w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3, 0:YYY0, 1:YYY1 } + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); + } - for (int i = 0; i < 2; i++) { - driver.process(topic2, expectedKeys[i], "YYY" + expectedKeys[i]); - } + processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3"); - processor.checkAndClearProcessResult("0:X0+YYY0", "0:X0+YYY0", "0:XX0+YYY0", "1:X1+YYY1", "1:X1+YYY1", "1:XX1+YYY1"); + // push two items to the other stream. this should produce six item. + // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 } + // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 + // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 } + // w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3, 0:YYY0, 1:YYY1 } - } finally { - Utils.delete(baseDir); + for (int i = 0; i < 2; i++) { + driver.process(topic2, expectedKeys[i], "YYY" + expectedKeys[i]); } + + processor.checkAndClearProcessResult("0:X0+YYY0", "0:X0+YYY0", "0:XX0+YYY0", "1:X1+YYY1", "1:X1+YYY1", "1:XX1+YYY1"); } @Test public void testOuterJoin() throws Exception { - File baseDir = Files.createTempDirectory("test").toFile(); - try { + KStreamBuilder builder = new KStreamBuilder(); - KStreamBuilder builder = new KStreamBuilder(); + final int[] expectedKeys = new int[]{0, 1, 2, 3}; - final int[] expectedKeys = new int[]{0, 1, 2, 3}; + KStream<Integer, String> stream1; + KStream<Integer, String> stream2; + KStream<Integer, String> joined; + MockProcessorSupplier<Integer, String> processor; - KStream<Integer, String> stream1; - KStream<Integer, String> stream2; - KStream<Integer, String> joined; - MockProcessorSupplier<Integer, String> processor; + processor = new MockProcessorSupplier<>(); + stream1 = builder.stream(intSerde, stringSerde, topic1); + stream2 = builder.stream(intSerde, stringSerde, topic2); + joined = stream1.outerJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test").within(100), intSerde, stringSerde, stringSerde); + joined.process(processor); - processor = new MockProcessorSupplier<>(); - stream1 = builder.stream(intSerde, stringSerde, topic1); - stream2 = builder.stream(intSerde, stringSerde, topic2); - joined = stream1.outerJoin(stream2, joiner, JoinWindows.of("test").within(100), intSerde, stringSerde, stringSerde); - joined.process(processor); + Collection<Set<String>> copartitionGroups = builder.copartitionGroups(); - Collection<Set<String>> copartitionGroups = builder.copartitionGroups(); + assertEquals(1, copartitionGroups.size()); + assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); - assertEquals(1, copartitionGroups.size()); - assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); + driver = new KStreamTestDriver(builder, stateDir); + driver.setTime(0L); - KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir); - driver.setTime(0L); + // push two items to the primary stream. the other window is empty.this should produce two items + // w1 = {} + // w2 = {} + // --> w1 = { 0:X1, 1:X1 } + // w2 = {} - // push two items to the primary stream. the other window is empty.this should produce two items - // w1 = {} - // w2 = {} - // --> w1 = { 0:X1, 1:X1 } - // w2 = {} - - for (int i = 0; i < 2; i++) { - driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); - } - - processor.checkAndClearProcessResult("0:X0+null", "1:X1+null"); - - // push two items to the other stream. this should produce two items. - // w1 = { 0:X0, 1:X1 } - // w2 = {} - // --> w1 = { 0:X1, 1:X1 } - // w2 = { 0:Y0, 1:Y1 } + for (int i = 0; i < 2; i++) { + driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); + } - for (int i = 0; i < 2; i++) { - driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); - } + processor.checkAndClearProcessResult("0:X0+null", "1:X1+null"); - processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1"); + // push two items to the other stream. this should produce two items. + // w1 = { 0:X0, 1:X1 } + // w2 = {} + // --> w1 = { 0:X1, 1:X1 } + // w2 = { 0:Y0, 1:Y1 } - // push all four items to the primary stream. this should produce four items. - // w1 = { 0:X0, 1:X1 } - // w2 = { 0:Y0, 1:Y1 } - // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 } - // w2 = { 0:Y0, 1:Y1 } + for (int i = 0; i < 2; i++) { + driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); + } - for (int i = 0; i < expectedKeys.length; i++) { - driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); - } + processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1"); - processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null"); + // push all four items to the primary stream. this should produce four items. + // w1 = { 0:X0, 1:X1 } + // w2 = { 0:Y0, 1:Y1 } + // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 } + // w2 = { 0:Y0, 1:Y1 } - // push all items to the other stream. this should produce six items. - // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 } - // w2 = { 0:Y0, 1:Y1 } - // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 } - // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 } + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); + } - for (int i = 0; i < expectedKeys.length; i++) { - driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); - } + processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null"); - processor.checkAndClearProcessResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); + // push all items to the other stream. this should produce six items. + // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 } + // w2 = { 0:Y0, 1:Y1 } + // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 } + // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 } - // push all four items to the primary stream. this should produce six items. - // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 } - // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 - // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 } - // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 } + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); + } - for (int i = 0; i < expectedKeys.length; i++) { - driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); - } + processor.checkAndClearProcessResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); - processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3"); + // push all four items to the primary stream. this should produce six items. + // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 } + // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 + // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 } + // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 } - // push two items to the other stream. this should produce six item. - // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 } - // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 - // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 } - // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3, 0:YYY0, 1:YYY1 } + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); + } - for (int i = 0; i < 2; i++) { - driver.process(topic2, expectedKeys[i], "YYY" + expectedKeys[i]); - } + processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3"); - processor.checkAndClearProcessResult("0:X0+YYY0", "0:X0+YYY0", "0:XX0+YYY0", "1:X1+YYY1", "1:X1+YYY1", "1:XX1+YYY1"); + // push two items to the other stream. this should produce six item. + // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 } + // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 + // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 } + // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3, 0:YYY0, 1:YYY1 } - } finally { - Utils.delete(baseDir); + for (int i = 0; i < 2; i++) { + driver.process(topic2, expectedKeys[i], "YYY" + expectedKeys[i]); } + + processor.checkAndClearProcessResult("0:X0+YYY0", "0:X0+YYY0", "0:XX0+YYY0", "1:X1+YYY1", "1:X1+YYY1", "1:XX1+YYY1"); } @Test public void testWindowing() throws Exception { - File baseDir = Files.createTempDirectory("test").toFile(); - try { - - long time = 0L; - - KStreamBuilder builder = new KStreamBuilder(); - - final int[] expectedKeys = new int[]{0, 1, 2, 3}; + long time = 0L; - KStream<Integer, String> stream1; - KStream<Integer, String> stream2; - KStream<Integer, String> joined; - MockProcessorSupplier<Integer, String> processor; + KStreamBuilder builder = new KStreamBuilder(); - processor = new MockProcessorSupplier<>(); - stream1 = builder.stream(intSerde, stringSerde, topic1); - stream2 = builder.stream(intSerde, stringSerde, topic2); - joined = stream1.join(stream2, joiner, JoinWindows.of("test").within(100), intSerde, stringSerde, stringSerde); - joined.process(processor); + final int[] expectedKeys = new int[]{0, 1, 2, 3}; - Collection<Set<String>> copartitionGroups = builder.copartitionGroups(); + KStream<Integer, String> stream1; + KStream<Integer, String> stream2; + KStream<Integer, String> joined; + MockProcessorSupplier<Integer, String> processor; - assertEquals(1, copartitionGroups.size()); - assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); + processor = new MockProcessorSupplier<>(); + stream1 = builder.stream(intSerde, stringSerde, topic1); + stream2 = builder.stream(intSerde, stringSerde, topic2); + joined = stream1.join(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test").within(100), intSerde, stringSerde, stringSerde); + joined.process(processor); - KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir); - driver.setTime(time); + Collection<Set<String>> copartitionGroups = builder.copartitionGroups(); - // push two items to the primary stream. the other window is empty. this should produce no items. - // w1 = {} - // w2 = {} - // --> w1 = { 0:X1, 1:X1 } - // w2 = {} + assertEquals(1, copartitionGroups.size()); + assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); - for (int i = 0; i < 2; i++) { - driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); - } + driver = new KStreamTestDriver(builder, stateDir); + driver.setTime(time); - processor.checkAndClearProcessResult(); + // push two items to the primary stream. the other window is empty. this should produce no items. + // w1 = {} + // w2 = {} + // --> w1 = { 0:X1, 1:X1 } + // w2 = {} - // push two items to the other stream. this should produce two items. - // w1 = { 0:X0, 1:X1 } - // w2 = {} - // --> w1 = { 0:X1, 1:X1 } - // w2 = { 0:Y0, 1:Y1 } + for (int i = 0; i < 2; i++) { + driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); + } - for (int i = 0; i < 2; i++) { - driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); - } + processor.checkAndClearProcessResult(); - processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1"); + // push two items to the other stream. this should produce two items. + // w1 = { 0:X0, 1:X1 } + // w2 = {} + // --> w1 = { 0:X1, 1:X1 } + // w2 = { 0:Y0, 1:Y1 } - // clear logically - time = 1000L; + for (int i = 0; i < 2; i++) { + driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); + } - for (int i = 0; i < expectedKeys.length; i++) { - driver.setTime(time + i); - driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); - } - processor.checkAndClearProcessResult(); + processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1"); - // gradually expires items in w1 - // w1 = { 0:X0, 1:X1, 2:X2, 3:X3 } + // clear logically + time = 1000L; - time = 1000 + 100L; - driver.setTime(time); + for (int i = 0; i < expectedKeys.length; i++) { + driver.setTime(time + i); + driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); + } + processor.checkAndClearProcessResult(); - for (int i = 0; i < expectedKeys.length; i++) { - driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); - } + // gradually expires items in w1 + // w1 = { 0:X0, 1:X1, 2:X2, 3:X3 } - processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); + time = 1000 + 100L; + driver.setTime(time); - driver.setTime(++time); - for (int i = 0; i < expectedKeys.length; i++) { - driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); - } + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); + } - processor.checkAndClearProcessResult("1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); + processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); - driver.setTime(++time); - for (int i = 0; i < expectedKeys.length; i++) { - driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); - } + driver.setTime(++time); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); + } - processor.checkAndClearProcessResult("2:X2+YY2", "3:X3+YY3"); + processor.checkAndClearProcessResult("1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); - driver.setTime(++time); - for (int i = 0; i < expectedKeys.length; i++) { - driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); - } + driver.setTime(++time); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); + } - processor.checkAndClearProcessResult("3:X3+YY3"); + processor.checkAndClearProcessResult("2:X2+YY2", "3:X3+YY3"); - driver.setTime(++time); - for (int i = 0; i < expectedKeys.length; i++) { - driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); - } + driver.setTime(++time); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); + } - processor.checkAndClearProcessResult(); + processor.checkAndClearProcessResult("3:X3+YY3"); - // go back to the time before expiration + driver.setTime(++time); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); + } - time = 1000L - 100L - 1L; - driver.setTime(time); - for (int i = 0; i < expectedKeys.length; i++) { - driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); - } + processor.checkAndClearProcessResult(); - processor.checkAndClearProcessResult(); + // go back to the time before expiration - driver.setTime(++time); - for (int i = 0; i < expectedKeys.length; i++) { - driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); - } + time = 1000L - 100L - 1L; + driver.setTime(time); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); + } - processor.checkAndClearProcessResult("0:X0+YY0"); + processor.checkAndClearProcessResult(); - driver.setTime(++time); - for (int i = 0; i < expectedKeys.length; i++) { - driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); - } + driver.setTime(++time); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); + } - processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1"); + processor.checkAndClearProcessResult("0:X0+YY0"); - driver.setTime(++time); - for (int i = 0; i < expectedKeys.length; i++) { - driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); - } + driver.setTime(++time); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); + } - processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2"); + processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1"); - driver.setTime(++time); - for (int i = 0; i < expectedKeys.length; i++) { - driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); - } + driver.setTime(++time); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); + } - processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); + processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2"); - // clear (logically) - time = 2000L; + driver.setTime(++time); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); + } - for (int i = 0; i < expectedKeys.length; i++) { - driver.setTime(time + i); - driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); - } + processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); - processor.checkAndClearProcessResult(); + // clear (logically) + time = 2000L; - // gradually expires items in w2 - // w2 = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 } + for (int i = 0; i < expectedKeys.length; i++) { + driver.setTime(time + i); + driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); + } - time = 2000L + 100L; - driver.setTime(time); - for (int i = 0; i < expectedKeys.length; i++) { - driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); - } + processor.checkAndClearProcessResult(); - processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3"); + // gradually expires items in w2 + // w2 = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 } - driver.setTime(++time); - for (int i = 0; i < expectedKeys.length; i++) { - driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); - } + time = 2000L + 100L; + driver.setTime(time); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); + } - processor.checkAndClearProcessResult("1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3"); + processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3"); - driver.setTime(++time); - for (int i = 0; i < expectedKeys.length; i++) { - driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); - } + driver.setTime(++time); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); + } - processor.checkAndClearProcessResult("2:XX2+Y2", "3:XX3+Y3"); + processor.checkAndClearProcessResult("1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3"); - driver.setTime(++time); - for (int i = 0; i < expectedKeys.length; i++) { - driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); - } + driver.setTime(++time); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); + } - processor.checkAndClearProcessResult("3:XX3+Y3"); + processor.checkAndClearProcessResult("2:XX2+Y2", "3:XX3+Y3"); - driver.setTime(++time); - for (int i = 0; i < expectedKeys.length; i++) { - driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); - } + driver.setTime(++time); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); + } - processor.checkAndClearProcessResult(); + processor.checkAndClearProcessResult("3:XX3+Y3"); - // go back to the time before expiration + driver.setTime(++time); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); + } - time = 2000L - 100L - 1L; - driver.setTime(time); - for (int i = 0; i < expectedKeys.length; i++) { - driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); - } + processor.checkAndClearProcessResult(); - processor.checkAndClearProcessResult(); + // go back to the time before expiration - driver.setTime(++time); - for (int i = 0; i < expectedKeys.length; i++) { - driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); - } + time = 2000L - 100L - 1L; + driver.setTime(time); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); + } - processor.checkAndClearProcessResult("0:XX0+Y0"); + processor.checkAndClearProcessResult(); - driver.setTime(++time); - for (int i = 0; i < expectedKeys.length; i++) { - driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); - } + driver.setTime(++time); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); + } - processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1"); + processor.checkAndClearProcessResult("0:XX0+Y0"); - driver.setTime(++time); - for (int i = 0; i < expectedKeys.length; i++) { - driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); - } + driver.setTime(++time); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); + } - processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2"); + processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1"); - driver.setTime(++time); - for (int i = 0; i < expectedKeys.length; i++) { - driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); - } + driver.setTime(++time); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); + } - processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3"); + processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2"); - } finally { - Utils.delete(baseDir); + driver.setTime(++time); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); } - } + processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3"); + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java index 65226d3..65a4b54 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java @@ -19,17 +19,19 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.kstream.JoinWindows; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; -import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.test.KStreamTestDriver; import org.apache.kafka.test.MockProcessorSupplier; +import org.apache.kafka.test.MockValueJoiner; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; import org.junit.Test; import java.io.File; -import java.nio.file.Files; +import java.io.IOException; import java.util.Arrays; import java.util.Collection; import java.util.HashSet; @@ -39,245 +41,240 @@ import static org.junit.Assert.assertEquals; public class KStreamKStreamLeftJoinTest { - private String topic1 = "topic1"; - private String topic2 = "topic2"; + final private String topic1 = "topic1"; + final private String topic2 = "topic2"; final private Serde<Integer> intSerde = Serdes.Integer(); final private Serde<String> stringSerde = Serdes.String(); - private ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() { - @Override - public String apply(String value1, String value2) { - return value1 + "+" + value2; - } - }; + private KStreamTestDriver driver = null; + private File stateDir = null; - @Test - public void testLeftJoin() throws Exception { - File baseDir = Files.createTempDirectory("test").toFile(); - try { + @After + public void tearDown() { + if (driver != null) { + driver.close(); + } + driver = null; + } - KStreamBuilder builder = new KStreamBuilder(); + @Before + public void setUp() throws IOException { + stateDir = TestUtils.tempDirectory("kafka-test"); + } - final int[] expectedKeys = new int[]{0, 1, 2, 3}; - KStream<Integer, String> stream1; - KStream<Integer, String> stream2; - KStream<Integer, String> joined; - MockProcessorSupplier<Integer, String> processor; + @Test + public void testLeftJoin() throws Exception { + final KStreamBuilder builder = new KStreamBuilder(); - processor = new MockProcessorSupplier<>(); - stream1 = builder.stream(intSerde, stringSerde, topic1); - stream2 = builder.stream(intSerde, stringSerde, topic2); - joined = stream1.leftJoin(stream2, joiner, JoinWindows.of("test").within(100), intSerde, stringSerde); - joined.process(processor); + final int[] expectedKeys = new int[]{0, 1, 2, 3}; - Collection<Set<String>> copartitionGroups = builder.copartitionGroups(); + KStream<Integer, String> stream1; + KStream<Integer, String> stream2; + KStream<Integer, String> joined; + MockProcessorSupplier<Integer, String> processor; - assertEquals(1, copartitionGroups.size()); - assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); + processor = new MockProcessorSupplier<>(); + stream1 = builder.stream(intSerde, stringSerde, topic1); + stream2 = builder.stream(intSerde, stringSerde, topic2); + joined = stream1.leftJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test").within(100), intSerde, stringSerde); + joined.process(processor); - KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir); - driver.setTime(0L); + Collection<Set<String>> copartitionGroups = builder.copartitionGroups(); - // push two items to the primary stream. the other window is empty - // w {} - // --> w = {} + assertEquals(1, copartitionGroups.size()); + assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); - for (int i = 0; i < 2; i++) { - driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); - } + driver = new KStreamTestDriver(builder, stateDir); + driver.setTime(0L); - processor.checkAndClearProcessResult("0:X0+null", "1:X1+null"); + // push two items to the primary stream. the other window is empty + // w {} + // --> w = {} - // push two items to the other stream. this should produce two items. - // w {} - // --> w = { 0:Y0, 1:Y1 } + for (int i = 0; i < 2; i++) { + driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); + } - for (int i = 0; i < 2; i++) { - driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); - } + processor.checkAndClearProcessResult("0:X0+null", "1:X1+null"); - processor.checkAndClearProcessResult(); + // push two items to the other stream. this should produce two items. + // w {} + // --> w = { 0:Y0, 1:Y1 } - // push all four items to the primary stream. this should produce four items. - // w = { 0:Y0, 1:Y1 } - // --> w = { 0:Y0, 1:Y1 } + for (int i = 0; i < 2; i++) { + driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); + } - for (int i = 0; i < expectedKeys.length; i++) { - driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); - } + processor.checkAndClearProcessResult(); - processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null"); + // push all four items to the primary stream. this should produce four items. + // w = { 0:Y0, 1:Y1 } + // --> w = { 0:Y0, 1:Y1 } - // push all items to the other stream. this should produce no items. - // w = { 0:Y0, 1:Y1 } - // --> w = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 } + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); + } - for (int i = 0; i < expectedKeys.length; i++) { - driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); - } + processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null"); - processor.checkAndClearProcessResult(); + // push all items to the other stream. this should produce no items. + // w = { 0:Y0, 1:Y1 } + // --> w = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 } - // push all four items to the primary stream. this should produce four items. - // w = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 - // --> w = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 } + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); + } - for (int i = 0; i < expectedKeys.length; i++) { - driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); - } + processor.checkAndClearProcessResult(); - processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3"); + // push all four items to the primary stream. this should produce four items. + // w = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 + // --> w = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 } - } finally { - Utils.delete(baseDir); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); } + + processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3"); } @Test public void testWindowing() throws Exception { - File baseDir = Files.createTempDirectory("test").toFile(); - try { - - long time = 0L; - - KStreamBuilder builder = new KStreamBuilder(); + final KStreamBuilder builder = new KStreamBuilder(); - final int[] expectedKeys = new int[]{0, 1, 2, 3}; + final int[] expectedKeys = new int[]{0, 1, 2, 3}; - KStream<Integer, String> stream1; - KStream<Integer, String> stream2; - KStream<Integer, String> joined; - MockProcessorSupplier<Integer, String> processor; + long time = 0L; - processor = new MockProcessorSupplier<>(); - stream1 = builder.stream(intSerde, stringSerde, topic1); - stream2 = builder.stream(intSerde, stringSerde, topic2); - joined = stream1.leftJoin(stream2, joiner, JoinWindows.of("test").within(100), intSerde, stringSerde); - joined.process(processor); + KStream<Integer, String> stream1; + KStream<Integer, String> stream2; + KStream<Integer, String> joined; + MockProcessorSupplier<Integer, String> processor; - Collection<Set<String>> copartitionGroups = builder.copartitionGroups(); + processor = new MockProcessorSupplier<>(); + stream1 = builder.stream(intSerde, stringSerde, topic1); + stream2 = builder.stream(intSerde, stringSerde, topic2); + joined = stream1.leftJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test").within(100), intSerde, stringSerde); + joined.process(processor); - assertEquals(1, copartitionGroups.size()); - assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); + Collection<Set<String>> copartitionGroups = builder.copartitionGroups(); - KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir); - driver.setTime(time); + assertEquals(1, copartitionGroups.size()); + assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); - // push two items to the primary stream. the other window is empty. this should produce two items - // w = {} - // --> w = {} + driver = new KStreamTestDriver(builder, stateDir); + driver.setTime(time); - for (int i = 0; i < 2; i++) { - driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); - } + // push two items to the primary stream. the other window is empty. this should produce two items + // w = {} + // --> w = {} - processor.checkAndClearProcessResult("0:X0+null", "1:X1+null"); - - // push two items to the other stream. this should produce no items. - // w = {} - // --> w = { 0:Y0, 1:Y1 } + for (int i = 0; i < 2; i++) { + driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); + } - for (int i = 0; i < 2; i++) { - driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); - } + processor.checkAndClearProcessResult("0:X0+null", "1:X1+null"); - processor.checkAndClearProcessResult(); + // push two items to the other stream. this should produce no items. + // w = {} + // --> w = { 0:Y0, 1:Y1 } - // clear logically - time = 1000L; + for (int i = 0; i < 2; i++) { + driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); + } - // push all items to the other stream. this should produce no items. - // w = {} - // --> w = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 } - for (int i = 0; i < expectedKeys.length; i++) { - driver.setTime(time + i); - driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); - } + processor.checkAndClearProcessResult(); - processor.checkAndClearProcessResult(); + // clear logically + time = 1000L; - // gradually expire items in window. - // w = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 } + // push all items to the other stream. this should produce no items. + // w = {} + // --> w = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 } + for (int i = 0; i < expectedKeys.length; i++) { + driver.setTime(time + i); + driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); + } - time = 1000L + 100L; - driver.setTime(time); - for (int i = 0; i < expectedKeys.length; i++) { - driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); - } + processor.checkAndClearProcessResult(); - processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3"); + // gradually expire items in window. + // w = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 } - driver.setTime(++time); - for (int i = 0; i < expectedKeys.length; i++) { - driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); - } + time = 1000L + 100L; + driver.setTime(time); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); + } - processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3"); + processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3"); - driver.setTime(++time); - for (int i = 0; i < expectedKeys.length; i++) { - driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); - } + driver.setTime(++time); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); + } - processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+Y2", "3:XX3+Y3"); + processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3"); - driver.setTime(++time); - for (int i = 0; i < expectedKeys.length; i++) { - driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); - } + driver.setTime(++time); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); + } - processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+Y3"); + processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+Y2", "3:XX3+Y3"); - driver.setTime(++time); - for (int i = 0; i < expectedKeys.length; i++) { - driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); - } + driver.setTime(++time); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); + } - processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+null"); + processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+Y3"); - // go back to the time before expiration + driver.setTime(++time); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); + } - time = 1000L - 100L - 1L; - driver.setTime(time); - for (int i = 0; i < expectedKeys.length; i++) { - driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); - } + processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+null"); - processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+null"); + // go back to the time before expiration - driver.setTime(++time); - for (int i = 0; i < expectedKeys.length; i++) { - driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); - } + time = 1000L - 100L - 1L; + driver.setTime(time); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); + } - processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+null", "2:XX2+null", "3:XX3+null"); + processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+null"); - driver.setTime(++time); - for (int i = 0; i < expectedKeys.length; i++) { - driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); - } + driver.setTime(++time); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); + } - processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+null", "3:XX3+null"); + processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+null", "2:XX2+null", "3:XX3+null"); - driver.setTime(++time); - for (int i = 0; i < expectedKeys.length; i++) { - driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); - } + driver.setTime(++time); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); + } - processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+null"); + processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+null", "3:XX3+null"); - driver.setTime(++time); - for (int i = 0; i < expectedKeys.length; i++) { - driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); - } + driver.setTime(++time); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); + } - processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3"); + processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+null"); - } finally { - Utils.delete(baseDir); + driver.setTime(++time); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); } - } + processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3"); + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java index 3acb59a..2c6108b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java @@ -20,19 +20,20 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.kstream.KeyValueMapper; -import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.test.KStreamTestDriver; +import org.apache.kafka.test.MockKeyValueMapper; import org.apache.kafka.test.MockProcessorSupplier; +import org.apache.kafka.test.MockValueJoiner; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; import org.junit.Test; import java.io.File; -import java.nio.file.Files; +import java.io.IOException; import java.util.Arrays; import java.util.Collection; import java.util.HashSet; @@ -42,111 +43,105 @@ import static org.junit.Assert.assertEquals; public class KStreamKTableLeftJoinTest { - private String topic1 = "topic1"; - private String topic2 = "topic2"; + final private String topic1 = "topic1"; + final private String topic2 = "topic2"; - final private Serde<Integer> intSerde = new Serdes.IntegerSerde(); - final private Serde<String> stringSerde = new Serdes.StringSerde(); + final private Serde<Integer> intSerde = Serdes.Integer(); + final private Serde<String> stringSerde = Serdes.String(); - private ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() { - @Override - public String apply(String value1, String value2) { - return value1 + "+" + value2; + private KStreamTestDriver driver = null; + private File stateDir = null; + + @After + public void tearDown() { + if (driver != null) { + driver.close(); } - }; + driver = null; + } - private KeyValueMapper<Integer, String, KeyValue<Integer, String>> keyValueMapper = - new KeyValueMapper<Integer, String, KeyValue<Integer, String>>() { - @Override - public KeyValue<Integer, String> apply(Integer key, String value) { - return KeyValue.pair(key, value); - } - }; + @Before + public void setUp() throws IOException { + stateDir = TestUtils.tempDirectory("kafka-test"); + } @Test public void testJoin() throws Exception { - File baseDir = Files.createTempDirectory("test").toFile(); - try { - - KStreamBuilder builder = new KStreamBuilder(); - - final int[] expectedKeys = new int[]{0, 1, 2, 3}; - - KStream<Integer, String> stream; - KTable<Integer, String> table; - MockProcessorSupplier<Integer, String> processor; + KStreamBuilder builder = new KStreamBuilder(); - processor = new MockProcessorSupplier<>(); - stream = builder.stream(intSerde, stringSerde, topic1); - table = builder.table(intSerde, stringSerde, topic2); - stream.leftJoin(table, joiner).process(processor); + final int[] expectedKeys = new int[]{0, 1, 2, 3}; - Collection<Set<String>> copartitionGroups = builder.copartitionGroups(); + KStream<Integer, String> stream; + KTable<Integer, String> table; + MockProcessorSupplier<Integer, String> processor; - assertEquals(1, copartitionGroups.size()); - assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); + processor = new MockProcessorSupplier<>(); + stream = builder.stream(intSerde, stringSerde, topic1); + table = builder.table(intSerde, stringSerde, topic2); + stream.leftJoin(table, MockValueJoiner.STRING_JOINER).process(processor); - KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir); - driver.setTime(0L); + Collection<Set<String>> copartitionGroups = builder.copartitionGroups(); - // push two items to the primary stream. the other table is empty + assertEquals(1, copartitionGroups.size()); + assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); - for (int i = 0; i < 2; i++) { - driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); - } + driver = new KStreamTestDriver(builder, stateDir); + driver.setTime(0L); - processor.checkAndClearProcessResult("0:X0+null", "1:X1+null"); + // push two items to the primary stream. the other table is empty - // push two items to the other stream. this should not produce any item. + for (int i = 0; i < 2; i++) { + driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); + } - for (int i = 0; i < 2; i++) { - driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); - } + processor.checkAndClearProcessResult("0:X0+null", "1:X1+null"); - processor.checkAndClearProcessResult(); + // push two items to the other stream. this should not produce any item. - // push all four items to the primary stream. this should produce four items. + for (int i = 0; i < 2; i++) { + driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); + } - for (int i = 0; i < expectedKeys.length; i++) { - driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); - } + processor.checkAndClearProcessResult(); - processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null"); + // push all four items to the primary stream. this should produce four items. - // push all items to the other stream. this should not produce any item - for (int i = 0; i < expectedKeys.length; i++) { - driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); - } + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); + } - processor.checkAndClearProcessResult(); + processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null"); - // push all four items to the primary stream. this should produce four items. + // push all items to the other stream. this should not produce any item + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); + } - for (int i = 0; i < expectedKeys.length; i++) { - driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); - } + processor.checkAndClearProcessResult(); - processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); + // push all four items to the primary stream. this should produce four items. - // push two items with null to the other stream as deletes. this should not produce any item. + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); + } - for (int i = 0; i < 2; i++) { - driver.process(topic2, expectedKeys[i], null); - } + processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); - processor.checkAndClearProcessResult(); + // push two items with null to the other stream as deletes. this should not produce any item. - // push all four items to the primary stream. this should produce four items. + for (int i = 0; i < 2; i++) { + driver.process(topic2, expectedKeys[i], null); + } - for (int i = 0; i < expectedKeys.length; i++) { - driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); - } + processor.checkAndClearProcessResult(); - processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+YY2", "3:XX3+YY3"); + // push all four items to the primary stream. this should produce four items. - } finally { - Utils.delete(baseDir); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); } + + processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+YY2", "3:XX3+YY3"); } @Test(expected = KafkaException.class) @@ -158,10 +153,10 @@ public class KStreamKTableLeftJoinTest { MockProcessorSupplier<Integer, String> processor; processor = new MockProcessorSupplier<>(); - stream = builder.stream(intSerde, stringSerde, topic1).map(keyValueMapper); + stream = builder.stream(intSerde, stringSerde, topic1).map(MockKeyValueMapper.<Integer, String>NoOpKeyValueMapper()); table = builder.table(intSerde, stringSerde, topic2); - stream.leftJoin(table, joiner).process(processor); + stream.leftJoin(table, MockValueJoiner.STRING_JOINER).process(processor); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java index 68fa656..00e5d70 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java @@ -25,6 +25,7 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.test.KStreamTestDriver; import org.apache.kafka.test.MockProcessorSupplier; +import org.junit.After; import org.junit.Test; import static org.junit.Assert.assertEquals; @@ -36,6 +37,16 @@ public class KStreamMapTest { final private Serde<Integer> intSerde = Serdes.Integer(); final private Serde<String> stringSerde = Serdes.String(); + private KStreamTestDriver driver = null; + + @After + public void cleanup() { + if (driver != null) { + driver.close(); + } + driver = null; + } + @Test public void testMap() { KStreamBuilder builder = new KStreamBuilder(); @@ -56,7 +67,7 @@ public class KStreamMapTest { processor = new MockProcessorSupplier<>(); stream.map(mapper).process(processor); - KStreamTestDriver driver = new KStreamTestDriver(builder); + driver = new KStreamTestDriver(builder); for (int i = 0; i < expectedKeys.length; i++) { driver.process(topicName, expectedKeys[i], "V" + expectedKeys[i]); } http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java index e671aab..e48b677 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java @@ -24,6 +24,7 @@ import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.test.KStreamTestDriver; import org.apache.kafka.test.MockProcessorSupplier; +import org.junit.After; import org.junit.Test; import static org.junit.Assert.assertEquals; @@ -35,6 +36,16 @@ public class KStreamMapValuesTest { final private Serde<Integer> intSerde = Serdes.Integer(); final private Serde<String> stringSerde = Serdes.String(); + private KStreamTestDriver driver; + + @After + public void cleanup() { + if (driver != null) { + driver.close(); + } + driver = null; + } + @Test public void testFlatMapValues() { KStreamBuilder builder = new KStreamBuilder(); @@ -54,7 +65,7 @@ public class KStreamMapValuesTest { stream = builder.stream(intSerde, stringSerde, topicName); stream.mapValues(mapper).process(processor); - KStreamTestDriver driver = new KStreamTestDriver(builder); + driver = new KStreamTestDriver(builder); for (int i = 0; i < expectedKeys.length; i++) { driver.process(topicName, expectedKeys[i], Integer.toString(expectedKeys[i])); }
