Repository: kafka Updated Branches: refs/heads/trunk f60a3fad3 -> 1a73629bb
http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java index 9cafe8b..efb17fc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java @@ -26,11 +26,13 @@ import org.apache.kafka.streams.kstream.Predicate; import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.test.KStreamTestDriver; import org.apache.kafka.test.MockProcessorSupplier; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; import org.junit.Test; import java.io.File; import java.io.IOException; -import java.nio.file.Files; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -39,7 +41,23 @@ import static org.junit.Assert.assertTrue; public class KTableMapValuesTest { - final private Serde<String> stringSerde = new Serdes.StringSerde(); + final private Serde<String> stringSerde = Serdes.String(); + + private KStreamTestDriver driver = null; + private File stateDir = null; + + @After + public void tearDown() { + if (driver != null) { + driver.close(); + } + driver = null; + } + + @Before + public void setUp() throws IOException { + stateDir = TestUtils.tempDirectory("kafka-test"); + } @Test public void testKTable() { @@ -58,7 +76,7 @@ public class KTableMapValuesTest { MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>(); table2.toStream().process(proc2); - KStreamTestDriver driver = new KStreamTestDriver(builder); + driver = new KStreamTestDriver(builder); driver.process(topic1, "A", "01"); driver.process(topic1, "B", "02"); @@ -70,230 +88,211 @@ public class KTableMapValuesTest { @Test public void testValueGetter() throws IOException { - File stateDir = Files.createTempDirectory("test").toFile(); - try { - final KStreamBuilder builder = new KStreamBuilder(); - - String topic1 = "topic1"; - String topic2 = "topic2"; - - KTableImpl<String, String, String> table1 = - (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1); - KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues( - new ValueMapper<String, Integer>() { - @Override - public Integer apply(String value) { - return new Integer(value); - } - }); - KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table2.filter( - new Predicate<String, Integer>() { - @Override - public boolean test(String key, Integer value) { - return (value % 2) == 0; - } - }); - KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>) - table1.through(stringSerde, stringSerde, topic2); - - KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier(); - KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier(); - KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier(); - KTableValueGetterSupplier<String, String> getterSupplier4 = table4.valueGetterSupplier(); - - KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null); - - KTableValueGetter<String, String> getter1 = getterSupplier1.get(); - getter1.init(driver.context()); - KTableValueGetter<String, Integer> getter2 = getterSupplier2.get(); - getter2.init(driver.context()); - KTableValueGetter<String, Integer> getter3 = getterSupplier3.get(); - getter3.init(driver.context()); - KTableValueGetter<String, String> getter4 = getterSupplier4.get(); - getter4.init(driver.context()); - - driver.process(topic1, "A", "01"); - driver.process(topic1, "B", "01"); - driver.process(topic1, "C", "01"); - - assertEquals("01", getter1.get("A")); - assertEquals("01", getter1.get("B")); - assertEquals("01", getter1.get("C")); - - assertEquals(new Integer(1), getter2.get("A")); - assertEquals(new Integer(1), getter2.get("B")); - assertEquals(new Integer(1), getter2.get("C")); - - assertNull(getter3.get("A")); - assertNull(getter3.get("B")); - assertNull(getter3.get("C")); - - assertEquals("01", getter4.get("A")); - assertEquals("01", getter4.get("B")); - assertEquals("01", getter4.get("C")); - - driver.process(topic1, "A", "02"); - driver.process(topic1, "B", "02"); - - assertEquals("02", getter1.get("A")); - assertEquals("02", getter1.get("B")); - assertEquals("01", getter1.get("C")); - - assertEquals(new Integer(2), getter2.get("A")); - assertEquals(new Integer(2), getter2.get("B")); - assertEquals(new Integer(1), getter2.get("C")); - - assertEquals(new Integer(2), getter3.get("A")); - assertEquals(new Integer(2), getter3.get("B")); - assertNull(getter3.get("C")); - - assertEquals("02", getter4.get("A")); - assertEquals("02", getter4.get("B")); - assertEquals("01", getter4.get("C")); - - driver.process(topic1, "A", "03"); - - assertEquals("03", getter1.get("A")); - assertEquals("02", getter1.get("B")); - assertEquals("01", getter1.get("C")); - - assertEquals(new Integer(3), getter2.get("A")); - assertEquals(new Integer(2), getter2.get("B")); - assertEquals(new Integer(1), getter2.get("C")); - - assertNull(getter3.get("A")); - assertEquals(new Integer(2), getter3.get("B")); - assertNull(getter3.get("C")); - - assertEquals("03", getter4.get("A")); - assertEquals("02", getter4.get("B")); - assertEquals("01", getter4.get("C")); - - driver.process(topic1, "A", null); - - assertNull(getter1.get("A")); - assertEquals("02", getter1.get("B")); - assertEquals("01", getter1.get("C")); - - assertNull(getter2.get("A")); - assertEquals(new Integer(2), getter2.get("B")); - assertEquals(new Integer(1), getter2.get("C")); - - assertNull(getter3.get("A")); - assertEquals(new Integer(2), getter3.get("B")); - assertNull(getter3.get("C")); - - assertNull(getter4.get("A")); - assertEquals("02", getter4.get("B")); - assertEquals("01", getter4.get("C")); - - } finally { - Utils.delete(stateDir); - } + KStreamBuilder builder = new KStreamBuilder(); + + String topic1 = "topic1"; + String topic2 = "topic2"; + + KTableImpl<String, String, String> table1 = + (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1); + KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues( + new ValueMapper<String, Integer>() { + @Override + public Integer apply(String value) { + return new Integer(value); + } + }); + KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table2.filter( + new Predicate<String, Integer>() { + @Override + public boolean test(String key, Integer value) { + return (value % 2) == 0; + } + }); + KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>) + table1.through(stringSerde, stringSerde, topic2); + + KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier(); + KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier(); + KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier(); + KTableValueGetterSupplier<String, String> getterSupplier4 = table4.valueGetterSupplier(); + + driver = new KStreamTestDriver(builder, stateDir, null, null); + + KTableValueGetter<String, String> getter1 = getterSupplier1.get(); + getter1.init(driver.context()); + KTableValueGetter<String, Integer> getter2 = getterSupplier2.get(); + getter2.init(driver.context()); + KTableValueGetter<String, Integer> getter3 = getterSupplier3.get(); + getter3.init(driver.context()); + KTableValueGetter<String, String> getter4 = getterSupplier4.get(); + getter4.init(driver.context()); + + driver.process(topic1, "A", "01"); + driver.process(topic1, "B", "01"); + driver.process(topic1, "C", "01"); + + assertEquals("01", getter1.get("A")); + assertEquals("01", getter1.get("B")); + assertEquals("01", getter1.get("C")); + + assertEquals(new Integer(1), getter2.get("A")); + assertEquals(new Integer(1), getter2.get("B")); + assertEquals(new Integer(1), getter2.get("C")); + + assertNull(getter3.get("A")); + assertNull(getter3.get("B")); + assertNull(getter3.get("C")); + + assertEquals("01", getter4.get("A")); + assertEquals("01", getter4.get("B")); + assertEquals("01", getter4.get("C")); + + driver.process(topic1, "A", "02"); + driver.process(topic1, "B", "02"); + + assertEquals("02", getter1.get("A")); + assertEquals("02", getter1.get("B")); + assertEquals("01", getter1.get("C")); + + assertEquals(new Integer(2), getter2.get("A")); + assertEquals(new Integer(2), getter2.get("B")); + assertEquals(new Integer(1), getter2.get("C")); + + assertEquals(new Integer(2), getter3.get("A")); + assertEquals(new Integer(2), getter3.get("B")); + assertNull(getter3.get("C")); + + assertEquals("02", getter4.get("A")); + assertEquals("02", getter4.get("B")); + assertEquals("01", getter4.get("C")); + + driver.process(topic1, "A", "03"); + + assertEquals("03", getter1.get("A")); + assertEquals("02", getter1.get("B")); + assertEquals("01", getter1.get("C")); + + assertEquals(new Integer(3), getter2.get("A")); + assertEquals(new Integer(2), getter2.get("B")); + assertEquals(new Integer(1), getter2.get("C")); + + assertNull(getter3.get("A")); + assertEquals(new Integer(2), getter3.get("B")); + assertNull(getter3.get("C")); + + assertEquals("03", getter4.get("A")); + assertEquals("02", getter4.get("B")); + assertEquals("01", getter4.get("C")); + + driver.process(topic1, "A", null); + + assertNull(getter1.get("A")); + assertEquals("02", getter1.get("B")); + assertEquals("01", getter1.get("C")); + + assertNull(getter2.get("A")); + assertEquals(new Integer(2), getter2.get("B")); + assertEquals(new Integer(1), getter2.get("C")); + + assertNull(getter3.get("A")); + assertEquals(new Integer(2), getter3.get("B")); + assertNull(getter3.get("C")); + + assertNull(getter4.get("A")); + assertEquals("02", getter4.get("B")); + assertEquals("01", getter4.get("C")); } @Test public void testNotSendingOldValue() throws IOException { - File stateDir = Files.createTempDirectory("test").toFile(); - try { - final KStreamBuilder builder = new KStreamBuilder(); - - String topic1 = "topic1"; + KStreamBuilder builder = new KStreamBuilder(); - KTableImpl<String, String, String> table1 = - (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1); - KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues( - new ValueMapper<String, Integer>() { - @Override - public Integer apply(String value) { - return new Integer(value); - } - }); + String topic1 = "topic1"; - MockProcessorSupplier<String, Integer> proc = new MockProcessorSupplier<>(); + KTableImpl<String, String, String> table1 = + (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1); + KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues( + new ValueMapper<String, Integer>() { + @Override + public Integer apply(String value) { + return new Integer(value); + } + }); - builder.addProcessor("proc", proc, table2.name); + MockProcessorSupplier<String, Integer> proc = new MockProcessorSupplier<>(); - KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null); + builder.addProcessor("proc", proc, table2.name); - assertFalse(table1.sendingOldValueEnabled()); - assertFalse(table2.sendingOldValueEnabled()); + driver = new KStreamTestDriver(builder, stateDir, null, null); - driver.process(topic1, "A", "01"); - driver.process(topic1, "B", "01"); - driver.process(topic1, "C", "01"); + assertFalse(table1.sendingOldValueEnabled()); + assertFalse(table2.sendingOldValueEnabled()); - proc.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)"); + driver.process(topic1, "A", "01"); + driver.process(topic1, "B", "01"); + driver.process(topic1, "C", "01"); - driver.process(topic1, "A", "02"); - driver.process(topic1, "B", "02"); + proc.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)"); - proc.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)"); + driver.process(topic1, "A", "02"); + driver.process(topic1, "B", "02"); - driver.process(topic1, "A", "03"); + proc.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)"); - proc.checkAndClearProcessResult("A:(3<-null)"); + driver.process(topic1, "A", "03"); - driver.process(topic1, "A", null); + proc.checkAndClearProcessResult("A:(3<-null)"); - proc.checkAndClearProcessResult("A:(null<-null)"); + driver.process(topic1, "A", null); - } finally { - Utils.delete(stateDir); - } + proc.checkAndClearProcessResult("A:(null<-null)"); } @Test public void testSendingOldValue() throws IOException { - File stateDir = Files.createTempDirectory("test").toFile(); - try { - final KStreamBuilder builder = new KStreamBuilder(); - - String topic1 = "topic1"; + KStreamBuilder builder = new KStreamBuilder(); - KTableImpl<String, String, String> table1 = - (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1); - KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues( - new ValueMapper<String, Integer>() { - @Override - public Integer apply(String value) { - return new Integer(value); - } - }); + String topic1 = "topic1"; - table2.enableSendingOldValues(); + KTableImpl<String, String, String> table1 = + (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1); + KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues( + new ValueMapper<String, Integer>() { + @Override + public Integer apply(String value) { + return new Integer(value); + } + }); - MockProcessorSupplier<String, Integer> proc = new MockProcessorSupplier<>(); + table2.enableSendingOldValues(); - builder.addProcessor("proc", proc, table2.name); + MockProcessorSupplier<String, Integer> proc = new MockProcessorSupplier<>(); - KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null); + builder.addProcessor("proc", proc, table2.name); - assertTrue(table1.sendingOldValueEnabled()); - assertTrue(table2.sendingOldValueEnabled()); + driver = new KStreamTestDriver(builder, stateDir, null, null); - driver.process(topic1, "A", "01"); - driver.process(topic1, "B", "01"); - driver.process(topic1, "C", "01"); + assertTrue(table1.sendingOldValueEnabled()); + assertTrue(table2.sendingOldValueEnabled()); - proc.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)"); + driver.process(topic1, "A", "01"); + driver.process(topic1, "B", "01"); + driver.process(topic1, "C", "01"); - driver.process(topic1, "A", "02"); - driver.process(topic1, "B", "02"); + proc.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)"); - proc.checkAndClearProcessResult("A:(2<-1)", "B:(2<-1)"); + driver.process(topic1, "A", "02"); + driver.process(topic1, "B", "02"); - driver.process(topic1, "A", "03"); + proc.checkAndClearProcessResult("A:(2<-1)", "B:(2<-1)"); - proc.checkAndClearProcessResult("A:(3<-2)"); + driver.process(topic1, "A", "03"); - driver.process(topic1, "A", null); + proc.checkAndClearProcessResult("A:(3<-2)"); - proc.checkAndClearProcessResult("A:(null<-3)"); + driver.process(topic1, "A", null); - } finally { - Utils.delete(stateDir); - } + proc.checkAndClearProcessResult("A:(null<-3)"); } - } http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java index 7c158e2..aaa6cc7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java @@ -24,11 +24,13 @@ import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.test.KStreamTestDriver; import org.apache.kafka.test.MockProcessorSupplier; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; import org.junit.Test; import java.io.File; import java.io.IOException; -import java.nio.file.Files; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; @@ -36,7 +38,23 @@ import static org.junit.Assert.assertTrue; public class KTableSourceTest { - final private Serde<String> stringSerde = new Serdes.StringSerde(); + final private Serde<String> stringSerde = Serdes.String(); + + private KStreamTestDriver driver = null; + private File stateDir = null; + + @After + public void tearDown() { + if (driver != null) { + driver.close(); + } + driver = null; + } + + @Before + public void setUp() throws IOException { + stateDir = TestUtils.tempDirectory("kafka-test"); + } @Test public void testKTable() { @@ -49,7 +67,7 @@ public class KTableSourceTest { MockProcessorSupplier<String, String> proc1 = new MockProcessorSupplier<>(); table1.toStream().process(proc1); - KStreamTestDriver driver = new KStreamTestDriver(builder); + driver = new KStreamTestDriver(builder); driver.process(topic1, "A", 1); driver.process(topic1, "B", 2); @@ -63,138 +81,120 @@ public class KTableSourceTest { @Test public void testValueGetter() throws IOException { - File stateDir = Files.createTempDirectory("test").toFile(); - try { - final KStreamBuilder builder = new KStreamBuilder(); + final KStreamBuilder builder = new KStreamBuilder(); - String topic1 = "topic1"; + String topic1 = "topic1"; - KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1); + KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1); - KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier(); + KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier(); - KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null); + driver = new KStreamTestDriver(builder, stateDir, null, null); - KTableValueGetter<String, String> getter1 = getterSupplier1.get(); - getter1.init(driver.context()); + KTableValueGetter<String, String> getter1 = getterSupplier1.get(); + getter1.init(driver.context()); - driver.process(topic1, "A", "01"); - driver.process(topic1, "B", "01"); - driver.process(topic1, "C", "01"); + driver.process(topic1, "A", "01"); + driver.process(topic1, "B", "01"); + driver.process(topic1, "C", "01"); - assertEquals("01", getter1.get("A")); - assertEquals("01", getter1.get("B")); - assertEquals("01", getter1.get("C")); + assertEquals("01", getter1.get("A")); + assertEquals("01", getter1.get("B")); + assertEquals("01", getter1.get("C")); - driver.process(topic1, "A", "02"); - driver.process(topic1, "B", "02"); + driver.process(topic1, "A", "02"); + driver.process(topic1, "B", "02"); - assertEquals("02", getter1.get("A")); - assertEquals("02", getter1.get("B")); - assertEquals("01", getter1.get("C")); + assertEquals("02", getter1.get("A")); + assertEquals("02", getter1.get("B")); + assertEquals("01", getter1.get("C")); - driver.process(topic1, "A", "03"); + driver.process(topic1, "A", "03"); - assertEquals("03", getter1.get("A")); - assertEquals("02", getter1.get("B")); - assertEquals("01", getter1.get("C")); + assertEquals("03", getter1.get("A")); + assertEquals("02", getter1.get("B")); + assertEquals("01", getter1.get("C")); - driver.process(topic1, "A", null); - driver.process(topic1, "B", null); + driver.process(topic1, "A", null); + driver.process(topic1, "B", null); - assertNull(getter1.get("A")); - assertNull(getter1.get("B")); - assertEquals("01", getter1.get("C")); + assertNull(getter1.get("A")); + assertNull(getter1.get("B")); + assertEquals("01", getter1.get("C")); - } finally { - Utils.delete(stateDir); - } } @Test public void testNotSedingOldValue() throws IOException { - File stateDir = Files.createTempDirectory("test").toFile(); - try { - final KStreamBuilder builder = new KStreamBuilder(); - - String topic1 = "topic1"; + final KStreamBuilder builder = new KStreamBuilder(); - KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1); + String topic1 = "topic1"; - MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>(); + KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1); - builder.addProcessor("proc1", proc1, table1.name); + MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>(); - KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null); + builder.addProcessor("proc1", proc1, table1.name); - driver.process(topic1, "A", "01"); - driver.process(topic1, "B", "01"); - driver.process(topic1, "C", "01"); + driver = new KStreamTestDriver(builder, stateDir, null, null); - proc1.checkAndClearProcessResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)"); + driver.process(topic1, "A", "01"); + driver.process(topic1, "B", "01"); + driver.process(topic1, "C", "01"); - driver.process(topic1, "A", "02"); - driver.process(topic1, "B", "02"); + proc1.checkAndClearProcessResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)"); - proc1.checkAndClearProcessResult("A:(02<-null)", "B:(02<-null)"); + driver.process(topic1, "A", "02"); + driver.process(topic1, "B", "02"); - driver.process(topic1, "A", "03"); + proc1.checkAndClearProcessResult("A:(02<-null)", "B:(02<-null)"); - proc1.checkAndClearProcessResult("A:(03<-null)"); + driver.process(topic1, "A", "03"); - driver.process(topic1, "A", null); - driver.process(topic1, "B", null); + proc1.checkAndClearProcessResult("A:(03<-null)"); - proc1.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)"); + driver.process(topic1, "A", null); + driver.process(topic1, "B", null); - } finally { - Utils.delete(stateDir); - } + proc1.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)"); } @Test public void testSedingOldValue() throws IOException { - File stateDir = Files.createTempDirectory("test").toFile(); - try { - final KStreamBuilder builder = new KStreamBuilder(); - - String topic1 = "topic1"; + final KStreamBuilder builder = new KStreamBuilder(); - KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1); + String topic1 = "topic1"; - table1.enableSendingOldValues(); + KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1); - assertTrue(table1.sendingOldValueEnabled()); + table1.enableSendingOldValues(); - MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>(); + assertTrue(table1.sendingOldValueEnabled()); - builder.addProcessor("proc1", proc1, table1.name); + MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>(); - KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null); + builder.addProcessor("proc1", proc1, table1.name); - driver.process(topic1, "A", "01"); - driver.process(topic1, "B", "01"); - driver.process(topic1, "C", "01"); + driver = new KStreamTestDriver(builder, stateDir, null, null); - proc1.checkAndClearProcessResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)"); + driver.process(topic1, "A", "01"); + driver.process(topic1, "B", "01"); + driver.process(topic1, "C", "01"); - driver.process(topic1, "A", "02"); - driver.process(topic1, "B", "02"); + proc1.checkAndClearProcessResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)"); - proc1.checkAndClearProcessResult("A:(02<-01)", "B:(02<-01)"); + driver.process(topic1, "A", "02"); + driver.process(topic1, "B", "02"); - driver.process(topic1, "A", "03"); + proc1.checkAndClearProcessResult("A:(02<-01)", "B:(02<-01)"); - proc1.checkAndClearProcessResult("A:(03<-02)"); + driver.process(topic1, "A", "03"); - driver.process(topic1, "A", null); - driver.process(topic1, "B", null); + proc1.checkAndClearProcessResult("A:(03<-02)"); - proc1.checkAndClearProcessResult("A:(null<-03)", "B:(null<-02)"); + driver.process(topic1, "A", null); + driver.process(topic1, "B", null); - } finally { - Utils.delete(stateDir); - } + proc1.checkAndClearProcessResult("A:(null<-03)", "B:(null<-02)"); } - } http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinterProcessorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinterProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinterProcessorTest.java index 22948ab..c8707af 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinterProcessorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinterProcessorTest.java @@ -26,6 +26,7 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.test.KStreamTestDriver; +import org.junit.After; import org.junit.Test; import java.io.ByteArrayOutputStream; @@ -39,11 +40,19 @@ public class KeyValuePrinterProcessorTest { private String topicName = "topic"; private Serde<String> stringSerde = Serdes.String(); - private Serde<byte[]> bytesSerde = Serdes.ByteArray(); private ByteArrayOutputStream baos = new ByteArrayOutputStream(); private KStreamBuilder builder = new KStreamBuilder(); private PrintStream printStream = new PrintStream(baos); + private KStreamTestDriver driver = null; + + @After + public void cleanup() { + if (driver != null) { + driver.close(); + } + driver = null; + } @Test public void testPrintKeyValueDefaultSerde() throws Exception { @@ -57,7 +66,7 @@ public class KeyValuePrinterProcessorTest { KStream<String, String> stream = builder.stream(stringSerde, stringSerde, topicName); stream.process(keyValuePrinter); - KStreamTestDriver driver = new KStreamTestDriver(builder); + driver = new KStreamTestDriver(builder); for (int i = 0; i < suppliedKeys.length; i++) { driver.process(topicName, suppliedKeys[i], suppliedValues[i]); } @@ -79,7 +88,7 @@ public class KeyValuePrinterProcessorTest { stream.process(keyValuePrinter); - KStreamTestDriver driver = new KStreamTestDriver(builder); + driver = new KStreamTestDriver(builder); String suppliedKey = null; byte[] suppliedValue = "{\"name\":\"print\", \"label\":\"test\"}".getBytes(Charset.forName("UTF-8")); http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java index d738794..7316804 100644 --- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java @@ -27,6 +27,7 @@ import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.internals.ProcessorNode; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.internals.ProcessorTopology; import org.apache.kafka.streams.processor.internals.RecordCollector; @@ -82,6 +83,12 @@ public class KStreamTestDriver { public void process(String topicName, Object key, Object value) { currNode = topology.source(topicName); + + // if currNode is null, check if this topic is a changelog topic; + // if yes, skip + if (topicName.endsWith(ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX)) + return; + try { forward(key, value); } finally { @@ -108,10 +115,6 @@ public class KStreamTestDriver { context.setTime(timestamp); } - public StateStore getStateStore(String name) { - return context.getStateStore(name); - } - @SuppressWarnings("unchecked") public <K, V> void forward(K key, V value) { ProcessorNode thisNode = currNode; @@ -153,6 +156,23 @@ public class KStreamTestDriver { } } + public void close() { + // close all processors + for (ProcessorNode node : topology.processors()) { + currNode = node; + try { + node.close(); + } finally { + currNode = null; + } + } + + // close all state stores + for (StateStore store : context.allStateStores().values()) { + store.close(); + } + } + public Set<String> allProcessorNames() { Set<String> names = new HashSet<>(); http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/test/MockKeyValueMapper.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/MockKeyValueMapper.java b/streams/src/test/java/org/apache/kafka/test/MockKeyValueMapper.java index ae8c2fd..769ee71 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockKeyValueMapper.java +++ b/streams/src/test/java/org/apache/kafka/test/MockKeyValueMapper.java @@ -26,7 +26,7 @@ public class MockKeyValueMapper { @Override public KeyValue<K, V> apply(K key, V value) { - return new KeyValue<>(key, value); + return KeyValue.pair(key, value); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/1a73629b/streams/src/test/java/org/apache/kafka/test/MockValueJoiner.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/MockValueJoiner.java b/streams/src/test/java/org/apache/kafka/test/MockValueJoiner.java new file mode 100644 index 0000000..4d44166 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/test/MockValueJoiner.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.test; + +import org.apache.kafka.streams.kstream.ValueJoiner; + +public class MockValueJoiner { + + private static class StringJoin implements ValueJoiner<String, String, String> { + + @Override + public String apply(String value1, String value2) { + return value1 + "+" + value2; + } + }; + + public final static ValueJoiner<String, String, String> STRING_JOINER = new StringJoin(); +} \ No newline at end of file
