[ https://issues.apache.org/jira/browse/KAFKA-6474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16484168#comment-16484168 ]
ASF GitHub Bot commented on KAFKA-6474: --------------------------------------- guozhangwang closed pull request #5052: KAFKA-6474: Rewrite tests to use new public TopologyTestDriver [part 3] URL: https://github.com/apache/kafka/pull/5052 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java index aa2397170f6..4ae2f76698b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java @@ -17,25 +17,32 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Reducer; import org.apache.kafka.streams.kstream.Serialized; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; -import org.apache.kafka.test.KStreamTestDriver; -import org.apache.kafka.test.TestUtils; +import org.apache.kafka.streams.test.ConsumerRecordFactory; +import org.apache.kafka.test.StreamsTestUtils; import org.junit.Test; +import java.util.Properties; + import static org.apache.kafka.test.StreamsTestUtils.getMetricByName; import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; public class KStreamWindowReduceTest { + + private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String()); + private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer()); + @Test public void shouldLogAndMeterOnNullKey() { - final KStreamTestDriver driver = new KStreamTestDriver(); final StreamsBuilder builder = new StreamsBuilder(); builder @@ -49,14 +56,14 @@ public String apply(final String value1, final String value2) { } }); - driver.setUp(builder, TestUtils.tempDirectory(), 0); - final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(); - driver.process("TOPIC", null, "asdf"); - driver.flushState(); - LogCaptureAppender.unregister(appender); + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { + final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(); + driver.pipeInput(recordFactory.create("TOPIC", null, "asdf")); + LogCaptureAppender.unregister(appender); - assertEquals(1.0, getMetricByName(driver.context().metrics().metrics(), "skipped-records-total", "stream-metrics").metricValue()); - assertThat(appender.getMessages(), hasItem("Skipping record due to null key. value=[asdf] topic=[TOPIC] partition=[-1] offset=[-1]")); + assertEquals(1.0, getMetricByName(driver.metrics(), "skipped-records-total", "stream-metrics").metricValue()); + assertThat(appender.getMessages(), hasItem("Skipping record due to null key. value=[asdf] topic=[TOPIC] partition=[0] offset=[0]")); + } } } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java index 14552d6b325..081c6a069aa 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java @@ -17,40 +17,30 @@ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KeyValueMapper; -import org.apache.kafka.test.KStreamTestDriver; +import org.apache.kafka.streams.test.ConsumerRecordFactory; import org.apache.kafka.test.MockProcessorSupplier; -import org.apache.kafka.test.TestUtils; - -import org.junit.Before; -import org.junit.Rule; +import org.apache.kafka.test.StreamsTestUtils; import org.junit.Test; -import java.io.File; import java.util.HashMap; import java.util.Map; +import java.util.Properties; import static org.junit.Assert.assertEquals; public class KTableMapKeysTest { - final private Serde<String> stringSerde = new Serdes.StringSerde(); - final private Serde<Integer> integerSerde = new Serdes.IntegerSerde(); - private File stateDir = null; - @Rule - public final KStreamTestDriver driver = new KStreamTestDriver(); - - - @Before - public void setUp() { - stateDir = TestUtils.tempDirectory("kafka-test"); - } + private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer()); + private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String()); @Test public void testMapKeysConvertingToStream() { @@ -58,7 +48,7 @@ public void testMapKeysConvertingToStream() { String topic1 = "topic_map_keys"; - KTable<Integer, String> table1 = builder.table(topic1, Consumed.with(integerSerde, stringSerde)); + KTable<Integer, String> table1 = builder.table(topic1, Consumed.with(Serdes.Integer(), Serdes.String())); final Map<Integer, String> keyMap = new HashMap<>(); keyMap.put(1, "ONE"); @@ -82,11 +72,11 @@ public String apply(Integer key, String value) { convertedStream.process(supplier); - driver.setUp(builder, stateDir); - for (int i = 0; i < originalKeys.length; i++) { - driver.process(topic1, originalKeys[i], values[i]); + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { + for (int i = 0; i < originalKeys.length; i++) { + driver.pipeInput(recordFactory.create(topic1, originalKeys[i], values[i])); + } } - driver.flushState(); assertEquals(3, supplier.theCapturedProcessor().processed.size()); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java index 08fa65c2ad9..825edb3eb37 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java @@ -18,10 +18,12 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.ForeachAction; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Materialized; @@ -31,20 +33,19 @@ import org.apache.kafka.streams.kstream.SessionWindows; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.state.SessionStore; -import org.apache.kafka.test.KStreamTestDriver; +import org.apache.kafka.streams.test.ConsumerRecordFactory; import org.apache.kafka.test.MockAggregator; import org.apache.kafka.test.MockInitializer; import org.apache.kafka.test.MockReducer; import org.apache.kafka.test.StreamsTestUtils; -import org.apache.kafka.test.TestUtils; import org.junit.Before; -import org.junit.Rule; import org.junit.Test; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; @@ -53,9 +54,9 @@ private static final String TOPIC = "input"; private final StreamsBuilder builder = new StreamsBuilder(); + private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer()); + private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String()); - @Rule - public final KStreamTestDriver driver = new KStreamTestDriver(); private final Merger<String, String> sessionMerger = new Merger<String, String>() { @Override public String apply(final String aggKey, final String aggOne, final String aggTwo) { @@ -83,7 +84,9 @@ public void apply(final Windowed<String> key, final Long value) { } }); - processData(); + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) { + processData(driver); + } assertThat(results.get(new Windowed<>("1", new SessionWindow(10, 15))), equalTo(2L)); assertThat(results.get(new Windowed<>("2", new SessionWindow(600, 600))), equalTo(1L)); assertThat(results.get(new Windowed<>("1", new SessionWindow(600, 600))), equalTo(1L)); @@ -101,7 +104,9 @@ public void apply(final Windowed<String> key, final String value) { } }); - processData(); + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) { + processData(driver); + } assertThat(results.get(new Windowed<>("1", new SessionWindow(10, 15))), equalTo("1+2")); assertThat(results.get(new Windowed<>("2", new SessionWindow(600, 600))), equalTo("1")); assertThat(results.get(new Windowed<>("1", new SessionWindow(600, 600))), equalTo("3")); @@ -121,42 +126,45 @@ public void apply(final Windowed<String> key, final String value) { results.put(key, value); } }); - processData(); + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) { + processData(driver); + } assertThat(results.get(new Windowed<>("1", new SessionWindow(10, 15))), equalTo("0+0+1+2")); assertThat(results.get(new Windowed<>("2", new SessionWindow(600, 600))), equalTo("0+1")); assertThat(results.get(new Windowed<>("1", new SessionWindow(600, 600))), equalTo("0+3")); } - @SuppressWarnings("unchecked") @Test public void shouldMaterializeCount() { stream.count(Materialized.<String, Long, SessionStore<Bytes, byte[]>>as("count-store")); - processData(); - final SessionStore<String, Long> store = (SessionStore<String, Long>) driver.allStateStores().get("count-store"); - final List<KeyValue<Windowed<String>, Long>> data = StreamsTestUtils.toList(store.fetch("1", "2")); - assertThat(data, equalTo(Arrays.asList( - KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), 2L), - KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), 1L), - KeyValue.pair(new Windowed<>("2", new SessionWindow(600, 600)), 1L)))); + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) { + processData(driver); + final SessionStore<String, Long> store = driver.getSessionStore("count-store"); + final List<KeyValue<Windowed<String>, Long>> data = StreamsTestUtils.toList(store.fetch("1", "2")); + assertThat(data, equalTo(Arrays.asList( + KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), 2L), + KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), 1L), + KeyValue.pair(new Windowed<>("2", new SessionWindow(600, 600)), 1L)))); + } } - @SuppressWarnings("unchecked") @Test public void shouldMaterializeReduced() { stream.reduce(MockReducer.STRING_ADDER, Materialized.<String, String, SessionStore<Bytes, byte[]>>as("reduced")); - processData(); - final SessionStore<String, String> sessionStore = (SessionStore<String, String>) driver.allStateStores().get("reduced"); - final List<KeyValue<Windowed<String>, String>> data = StreamsTestUtils.toList(sessionStore.fetch("1", "2")); + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) { + processData(driver); + final SessionStore<String, String> sessionStore = driver.getSessionStore("reduced"); + final List<KeyValue<Windowed<String>, String>> data = StreamsTestUtils.toList(sessionStore.fetch("1", "2")); - assertThat(data, equalTo(Arrays.asList( - KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), "1+2"), - KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), "3"), - KeyValue.pair(new Windowed<>("2", new SessionWindow(600, 600)), "1")))); + assertThat(data, equalTo(Arrays.asList( + KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), "1+2"), + KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), "3"), + KeyValue.pair(new Windowed<>("2", new SessionWindow(600, 600)), "1")))); + } } - @SuppressWarnings("unchecked") @Test public void shouldMaterializeAggregated() { stream.aggregate(MockInitializer.STRING_INIT, @@ -164,13 +172,15 @@ public void shouldMaterializeAggregated() { sessionMerger, Materialized.<String, String, SessionStore<Bytes, byte[]>>as("aggregated").withValueSerde(Serdes.String())); - processData(); - final SessionStore<String, String> sessionStore = (SessionStore<String, String>) driver.allStateStores().get("aggregated"); - final List<KeyValue<Windowed<String>, String>> data = StreamsTestUtils.toList(sessionStore.fetch("1", "2")); - assertThat(data, equalTo(Arrays.asList( - KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), "0+0+1+2"), - KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), "0+3"), - KeyValue.pair(new Windowed<>("2", new SessionWindow(600, 600)), "0+1")))); + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) { + processData(driver); + final SessionStore<String, String> sessionStore = driver.getSessionStore("aggregated"); + final List<KeyValue<Windowed<String>, String>> data = StreamsTestUtils.toList(sessionStore.fetch("1", "2")); + assertThat(data, equalTo(Arrays.asList( + KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), "0+0+1+2"), + KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), "0+3"), + KeyValue.pair(new Windowed<>("2", new SessionWindow(600, 600)), "0+1")))); + } } @Test(expected = NullPointerException.class) @@ -243,16 +253,11 @@ public void shouldThrowNullPointerOnCountIfMaterializedIsNull() { stream.count(null); } - private void processData() { - driver.setUp(builder, TestUtils.tempDirectory(), 0); - driver.setTime(10); - driver.process(TOPIC, "1", "1"); - driver.setTime(15); - driver.process(TOPIC, "1", "2"); - driver.setTime(600); - driver.process(TOPIC, "1", "3"); - driver.process(TOPIC, "2", "1"); - driver.flushState(); + private void processData(final TopologyTestDriver driver) { + driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 10)); + driver.pipeInput(recordFactory.create(TOPIC, "1", "2", 15)); + driver.pipeInput(recordFactory.create(TOPIC, "1", "3", 600)); + driver.pipeInput(recordFactory.create(TOPIC, "2", "1", 600)); } } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java index 610e52f2ed6..7b885b23bf2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java @@ -18,32 +18,33 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.ForeachAction; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Serialized; +import org.apache.kafka.streams.kstream.TimeWindowedKStream; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.Windowed; -import org.apache.kafka.streams.kstream.TimeWindowedKStream; import org.apache.kafka.streams.state.WindowStore; -import org.apache.kafka.test.KStreamTestDriver; +import org.apache.kafka.streams.test.ConsumerRecordFactory; import org.apache.kafka.test.MockAggregator; import org.apache.kafka.test.MockInitializer; import org.apache.kafka.test.MockReducer; import org.apache.kafka.test.StreamsTestUtils; -import org.apache.kafka.test.TestUtils; import org.junit.Before; -import org.junit.Rule; import org.junit.Test; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; @@ -52,9 +53,8 @@ private static final String TOPIC = "input"; private final StreamsBuilder builder = new StreamsBuilder(); - - @Rule - public final KStreamTestDriver driver = new KStreamTestDriver(); + private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer()); + private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String()); private TimeWindowedKStream<String, String> windowedStream; @Before @@ -76,7 +76,9 @@ public void apply(final Windowed<String> key, final Long value) { } }); - processData(); + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) { + processData(driver); + } assertThat(results.get(new Windowed<>("1", new TimeWindow(0, 500))), equalTo(2L)); assertThat(results.get(new Windowed<>("2", new TimeWindow(500, 1000))), equalTo(1L)); assertThat(results.get(new Windowed<>("1", new TimeWindow(500, 1000))), equalTo(1L)); @@ -95,7 +97,9 @@ public void apply(final Windowed<String> key, final String value) { } }); - processData(); + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) { + processData(driver); + } assertThat(results.get(new Windowed<>("1", new TimeWindow(0, 500))), equalTo("1+2")); assertThat(results.get(new Windowed<>("2", new TimeWindow(500, 1000))), equalTo("1")); assertThat(results.get(new Windowed<>("1", new TimeWindow(500, 1000))), equalTo("3")); @@ -115,29 +119,32 @@ public void apply(final Windowed<String> key, final String value) { results.put(key, value); } }); - processData(); + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) { + processData(driver); + } assertThat(results.get(new Windowed<>("1", new TimeWindow(0, 500))), equalTo("0+1+2")); assertThat(results.get(new Windowed<>("2", new TimeWindow(500, 1000))), equalTo("0+1")); assertThat(results.get(new Windowed<>("1", new TimeWindow(500, 1000))), equalTo("0+3")); } - @SuppressWarnings("unchecked") @Test public void shouldMaterializeCount() { windowedStream.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("count-store") .withKeySerde(Serdes.String()) .withValueSerde(Serdes.Long())); - processData(); - final WindowStore<String, Long> windowStore = (WindowStore<String, Long>) driver.allStateStores().get("count-store"); - final List<KeyValue<Windowed<String>, Long>> data = StreamsTestUtils.toList(windowStore.fetch("1", "2", 0, 1000)); - assertThat(data, equalTo(Arrays.asList( - KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), 2L), - KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), 1L), - KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), 1L)))); + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) { + processData(driver); + final WindowStore<String, Long> windowStore = driver.getWindowStore("count-store"); + final List<KeyValue<Windowed<String>, Long>> data = StreamsTestUtils.toList(windowStore.fetch("1", "2", 0, 1000)); + + assertThat(data, equalTo(Arrays.asList( + KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), 2L), + KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), 1L), + KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), 1L)))); + } } - @SuppressWarnings("unchecked") @Test public void shouldMaterializeReduced() { windowedStream.reduce(MockReducer.STRING_ADDER, @@ -145,17 +152,18 @@ public void shouldMaterializeReduced() { .withKeySerde(Serdes.String()) .withValueSerde(Serdes.String())); - processData(); - final WindowStore<String, String> windowStore = (WindowStore<String, String>) driver.allStateStores().get("reduced"); - final List<KeyValue<Windowed<String>, String>> data = StreamsTestUtils.toList(windowStore.fetch("1", "2", 0, 1000)); + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) { + processData(driver); + final WindowStore<String, String> windowStore = driver.getWindowStore("reduced"); + final List<KeyValue<Windowed<String>, String>> data = StreamsTestUtils.toList(windowStore.fetch("1", "2", 0, 1000)); - assertThat(data, equalTo(Arrays.asList( - KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), "1+2"), - KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), "3"), - KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), "1")))); + assertThat(data, equalTo(Arrays.asList( + KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), "1+2"), + KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), "3"), + KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), "1")))); + } } - @SuppressWarnings("unchecked") @Test public void shouldMaterializeAggregated() { windowedStream.aggregate(MockInitializer.STRING_INIT, @@ -164,13 +172,15 @@ public void shouldMaterializeAggregated() { .withKeySerde(Serdes.String()) .withValueSerde(Serdes.String())); - processData(); - final WindowStore<String, String> windowStore = (WindowStore<String, String>) driver.allStateStores().get("aggregated"); - final List<KeyValue<Windowed<String>, String>> data = StreamsTestUtils.toList(windowStore.fetch("1", "2", 0, 1000)); - assertThat(data, equalTo(Arrays.asList( - KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), "0+1+2"), - KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), "0+3"), - KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), "0+1")))); + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) { + processData(driver); + final WindowStore<String, String> windowStore = driver.getWindowStore("aggregated"); + final List<KeyValue<Windowed<String>, String>> data = StreamsTestUtils.toList(windowStore.fetch("1", "2", 0, 1000)); + assertThat(data, equalTo(Arrays.asList( + KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), "0+1+2"), + KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), "0+3"), + KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), "0+1")))); + } } @Test(expected = NullPointerException.class) @@ -227,16 +237,11 @@ public void shouldThrowNullPointerOnCountIfMaterializedIsNull() { windowedStream.count(null); } - private void processData() { - driver.setUp(builder, TestUtils.tempDirectory(), 0); - driver.setTime(10); - driver.process(TOPIC, "1", "1"); - driver.setTime(15); - driver.process(TOPIC, "1", "2"); - driver.setTime(500); - driver.process(TOPIC, "1", "3"); - driver.process(TOPIC, "2", "1"); - driver.flushState(); + private void processData(final TopologyTestDriver driver) { + driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 10L)); + driver.pipeInput(recordFactory.create(TOPIC, "1", "2", 15L)); + driver.pipeInput(recordFactory.create(TOPIC, "1", "3", 500L)); + driver.pipeInput(recordFactory.create(TOPIC, "2", "1", 500L)); } } \ No newline at end of file ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Rewrite test to use new public TopologyTestDriver > ------------------------------------------------- > > Key: KAFKA-6474 > URL: https://issues.apache.org/jira/browse/KAFKA-6474 > Project: Kafka > Issue Type: Improvement > Components: streams, unit tests > Affects Versions: 1.1.0 > Reporter: Matthias J. Sax > Assignee: Filipe Agapito > Priority: Major > Labels: beginner, newbie > > With KIP-247 we added public TopologyTestDriver. We should rewrite out own > test to use this new test driver and remove the two classes > ProcessorTopoogyTestDriver and KStreamTestDriver. -- This message was sent by Atlassian JIRA (v7.6.3#76005)