[ 
https://issues.apache.org/jira/browse/KAFKA-6474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16484168#comment-16484168
 ] 

ASF GitHub Bot commented on KAFKA-6474:
---------------------------------------

guozhangwang closed pull request #5052: KAFKA-6474: Rewrite tests to use new 
public TopologyTestDriver [part 3]
URL: https://github.com/apache/kafka/pull/5052
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java
index aa2397170f6..4ae2f76698b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java
@@ -17,25 +17,32 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.Serialized;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import 
org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
-import org.apache.kafka.test.KStreamTestDriver;
-import org.apache.kafka.test.TestUtils;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
 
+import java.util.Properties;
+
 import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
 import static org.hamcrest.CoreMatchers.hasItem;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 
 public class KStreamWindowReduceTest {
+
+    private final Properties props = 
StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
+    private final ConsumerRecordFactory<String, String> recordFactory = new 
ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
+
     @Test
     public void shouldLogAndMeterOnNullKey() {
-        final KStreamTestDriver driver = new KStreamTestDriver();
 
         final StreamsBuilder builder = new StreamsBuilder();
         builder
@@ -49,14 +56,14 @@ public String apply(final String value1, final String 
value2) {
                 }
             });
 
-        driver.setUp(builder, TestUtils.tempDirectory(), 0);
 
-        final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister();
-        driver.process("TOPIC", null, "asdf");
-        driver.flushState();
-        LogCaptureAppender.unregister(appender);
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+            final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister();
+            driver.pipeInput(recordFactory.create("TOPIC", null, "asdf"));
+            LogCaptureAppender.unregister(appender);
 
-        assertEquals(1.0, 
getMetricByName(driver.context().metrics().metrics(), "skipped-records-total", 
"stream-metrics").metricValue());
-        assertThat(appender.getMessages(), hasItem("Skipping record due to 
null key. value=[asdf] topic=[TOPIC] partition=[-1] offset=[-1]"));
+            assertEquals(1.0, getMetricByName(driver.metrics(), 
"skipped-records-total", "stream-metrics").metricValue());
+            assertThat(appender.getMessages(), hasItem("Skipping record due to 
null key. value=[asdf] topic=[TOPIC] partition=[0] offset=[0]"));
+        }
     }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
index 14552d6b325..081c6a069aa 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
@@ -17,40 +17,30 @@
 package org.apache.kafka.streams.kstream.internals;
 
 
-import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.TestUtils;
-
-import org.junit.Before;
-import org.junit.Rule;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
 
-import java.io.File;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Properties;
 
 import static org.junit.Assert.assertEquals;
 
 public class KTableMapKeysTest {
 
-    final private Serde<String> stringSerde = new Serdes.StringSerde();
-    final private Serde<Integer>  integerSerde = new Serdes.IntegerSerde();
-    private File stateDir = null;
-    @Rule
-    public final KStreamTestDriver driver = new KStreamTestDriver();
-
-    
-    @Before
-     public void setUp() {
-        stateDir = TestUtils.tempDirectory("kafka-test");
-    }
+    private final ConsumerRecordFactory<Integer, String> recordFactory = new 
ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
+    private final Properties props = 
StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String());
 
     @Test
     public void testMapKeysConvertingToStream() {
@@ -58,7 +48,7 @@ public void testMapKeysConvertingToStream() {
 
         String topic1 = "topic_map_keys";
 
-        KTable<Integer, String> table1 = builder.table(topic1, 
Consumed.with(integerSerde, stringSerde));
+        KTable<Integer, String> table1 = builder.table(topic1, 
Consumed.with(Serdes.Integer(), Serdes.String()));
 
         final Map<Integer, String> keyMap = new HashMap<>();
         keyMap.put(1, "ONE");
@@ -82,11 +72,11 @@ public  String apply(Integer key, String value) {
 
         convertedStream.process(supplier);
 
-        driver.setUp(builder, stateDir);
-        for (int i = 0;  i < originalKeys.length; i++) {
-            driver.process(topic1, originalKeys[i], values[i]);
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+            for (int i = 0; i < originalKeys.length; i++) {
+                driver.pipeInput(recordFactory.create(topic1, originalKeys[i], 
values[i]));
+            }
         }
-        driver.flushState();
 
         assertEquals(3, supplier.theCapturedProcessor().processed.size());
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
index 08fa65c2ad9..825edb3eb37 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
@@ -18,10 +18,12 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Materialized;
@@ -31,20 +33,19 @@
 import org.apache.kafka.streams.kstream.SessionWindows;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.state.SessionStore;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockAggregator;
 import org.apache.kafka.test.MockInitializer;
 import org.apache.kafka.test.MockReducer;
 import org.apache.kafka.test.StreamsTestUtils;
-import org.apache.kafka.test.TestUtils;
 import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
 
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -53,9 +54,9 @@
 
     private static final String TOPIC = "input";
     private final StreamsBuilder builder = new StreamsBuilder();
+    private final ConsumerRecordFactory<String, String> recordFactory = new 
ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
+    private final Properties props = 
StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
 
-    @Rule
-    public final KStreamTestDriver driver = new KStreamTestDriver();
     private final Merger<String, String> sessionMerger = new Merger<String, 
String>() {
         @Override
         public String apply(final String aggKey, final String aggOne, final 
String aggTwo) {
@@ -83,7 +84,9 @@ public void apply(final Windowed<String> key, final Long 
value) {
                     }
                 });
 
-        processData();
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props, 0L)) {
+            processData(driver);
+        }
         assertThat(results.get(new Windowed<>("1", new SessionWindow(10, 
15))), equalTo(2L));
         assertThat(results.get(new Windowed<>("2", new SessionWindow(600, 
600))), equalTo(1L));
         assertThat(results.get(new Windowed<>("1", new SessionWindow(600, 
600))), equalTo(1L));
@@ -101,7 +104,9 @@ public void apply(final Windowed<String> key, final String 
value) {
                     }
                 });
 
-        processData();
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props, 0L)) {
+            processData(driver);
+        }
         assertThat(results.get(new Windowed<>("1", new SessionWindow(10, 
15))), equalTo("1+2"));
         assertThat(results.get(new Windowed<>("2", new SessionWindow(600, 
600))), equalTo("1"));
         assertThat(results.get(new Windowed<>("1", new SessionWindow(600, 
600))), equalTo("3"));
@@ -121,42 +126,45 @@ public void apply(final Windowed<String> key, final 
String value) {
                         results.put(key, value);
                     }
                 });
-        processData();
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props, 0L)) {
+            processData(driver);
+        }
         assertThat(results.get(new Windowed<>("1", new SessionWindow(10, 
15))), equalTo("0+0+1+2"));
         assertThat(results.get(new Windowed<>("2", new SessionWindow(600, 
600))), equalTo("0+1"));
         assertThat(results.get(new Windowed<>("1", new SessionWindow(600, 
600))), equalTo("0+3"));
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldMaterializeCount() {
         stream.count(Materialized.<String, Long, SessionStore<Bytes, 
byte[]>>as("count-store"));
 
-        processData();
-        final SessionStore<String, Long> store = (SessionStore<String, Long>) 
driver.allStateStores().get("count-store");
-        final List<KeyValue<Windowed<String>, Long>> data = 
StreamsTestUtils.toList(store.fetch("1", "2"));
-        assertThat(data, equalTo(Arrays.asList(
-                KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), 
2L),
-                KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 
600)), 1L),
-                KeyValue.pair(new Windowed<>("2", new SessionWindow(600, 
600)), 1L))));
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props, 0L)) {
+            processData(driver);
+            final SessionStore<String, Long> store = 
driver.getSessionStore("count-store");
+            final List<KeyValue<Windowed<String>, Long>> data = 
StreamsTestUtils.toList(store.fetch("1", "2"));
+            assertThat(data, equalTo(Arrays.asList(
+                    KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 
15)), 2L),
+                    KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 
600)), 1L),
+                    KeyValue.pair(new Windowed<>("2", new SessionWindow(600, 
600)), 1L))));
+        }
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldMaterializeReduced() {
         stream.reduce(MockReducer.STRING_ADDER, Materialized.<String, String, 
SessionStore<Bytes, byte[]>>as("reduced"));
 
-        processData();
-        final SessionStore<String, String> sessionStore = 
(SessionStore<String, String>) driver.allStateStores().get("reduced");
-        final List<KeyValue<Windowed<String>, String>> data = 
StreamsTestUtils.toList(sessionStore.fetch("1", "2"));
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props, 0L)) {
+            processData(driver);
+            final SessionStore<String, String> sessionStore = 
driver.getSessionStore("reduced");
+            final List<KeyValue<Windowed<String>, String>> data = 
StreamsTestUtils.toList(sessionStore.fetch("1", "2"));
 
-        assertThat(data, equalTo(Arrays.asList(
-                KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), 
"1+2"),
-                KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 
600)), "3"),
-                KeyValue.pair(new Windowed<>("2", new SessionWindow(600, 
600)), "1"))));
+            assertThat(data, equalTo(Arrays.asList(
+                    KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 
15)), "1+2"),
+                    KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 
600)), "3"),
+                    KeyValue.pair(new Windowed<>("2", new SessionWindow(600, 
600)), "1"))));
+        }
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldMaterializeAggregated() {
         stream.aggregate(MockInitializer.STRING_INIT,
@@ -164,13 +172,15 @@ public void shouldMaterializeAggregated() {
                          sessionMerger,
                          Materialized.<String, String, SessionStore<Bytes, 
byte[]>>as("aggregated").withValueSerde(Serdes.String()));
 
-        processData();
-        final SessionStore<String, String> sessionStore = 
(SessionStore<String, String>) driver.allStateStores().get("aggregated");
-        final List<KeyValue<Windowed<String>, String>> data = 
StreamsTestUtils.toList(sessionStore.fetch("1", "2"));
-        assertThat(data, equalTo(Arrays.asList(
-                KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), 
"0+0+1+2"),
-                KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 
600)), "0+3"),
-                KeyValue.pair(new Windowed<>("2", new SessionWindow(600, 
600)), "0+1"))));
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props, 0L)) {
+            processData(driver);
+            final SessionStore<String, String> sessionStore = 
driver.getSessionStore("aggregated");
+            final List<KeyValue<Windowed<String>, String>> data = 
StreamsTestUtils.toList(sessionStore.fetch("1", "2"));
+            assertThat(data, equalTo(Arrays.asList(
+                    KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 
15)), "0+0+1+2"),
+                    KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 
600)), "0+3"),
+                    KeyValue.pair(new Windowed<>("2", new SessionWindow(600, 
600)), "0+1"))));
+        }
     }
 
     @Test(expected = NullPointerException.class)
@@ -243,16 +253,11 @@ public void 
shouldThrowNullPointerOnCountIfMaterializedIsNull() {
         stream.count(null);
     }
 
-    private void processData() {
-        driver.setUp(builder, TestUtils.tempDirectory(), 0);
-        driver.setTime(10);
-        driver.process(TOPIC, "1", "1");
-        driver.setTime(15);
-        driver.process(TOPIC, "1", "2");
-        driver.setTime(600);
-        driver.process(TOPIC, "1", "3");
-        driver.process(TOPIC, "2", "1");
-        driver.flushState();
+    private void processData(final TopologyTestDriver driver) {
+        driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 10));
+        driver.pipeInput(recordFactory.create(TOPIC, "1", "2", 15));
+        driver.pipeInput(recordFactory.create(TOPIC, "1", "3", 600));
+        driver.pipeInput(recordFactory.create(TOPIC, "2", "1", 600));
     }
 
 }
\ No newline at end of file
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
index 610e52f2ed6..7b885b23bf2 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
@@ -18,32 +18,33 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Serialized;
+import org.apache.kafka.streams.kstream.TimeWindowedKStream;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.TimeWindowedKStream;
 import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockAggregator;
 import org.apache.kafka.test.MockInitializer;
 import org.apache.kafka.test.MockReducer;
 import org.apache.kafka.test.StreamsTestUtils;
-import org.apache.kafka.test.TestUtils;
 import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
 
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -52,9 +53,8 @@
 
     private static final String TOPIC = "input";
     private final StreamsBuilder builder = new StreamsBuilder();
-
-    @Rule
-    public final KStreamTestDriver driver = new KStreamTestDriver();
+    private final ConsumerRecordFactory<String, String> recordFactory = new 
ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
+    private final Properties props = 
StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
     private TimeWindowedKStream<String, String> windowedStream;
 
     @Before
@@ -76,7 +76,9 @@ public void apply(final Windowed<String> key, final Long 
value) {
                     }
                 });
 
-        processData();
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props, 0L)) {
+            processData(driver);
+        }
         assertThat(results.get(new Windowed<>("1", new TimeWindow(0, 500))), 
equalTo(2L));
         assertThat(results.get(new Windowed<>("2", new TimeWindow(500, 
1000))), equalTo(1L));
         assertThat(results.get(new Windowed<>("1", new TimeWindow(500, 
1000))), equalTo(1L));
@@ -95,7 +97,9 @@ public void apply(final Windowed<String> key, final String 
value) {
                     }
                 });
 
-        processData();
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props, 0L)) {
+            processData(driver);
+        }
         assertThat(results.get(new Windowed<>("1", new TimeWindow(0, 500))), 
equalTo("1+2"));
         assertThat(results.get(new Windowed<>("2", new TimeWindow(500, 
1000))), equalTo("1"));
         assertThat(results.get(new Windowed<>("1", new TimeWindow(500, 
1000))), equalTo("3"));
@@ -115,29 +119,32 @@ public void apply(final Windowed<String> key, final 
String value) {
                         results.put(key, value);
                     }
                 });
-        processData();
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props, 0L)) {
+            processData(driver);
+        }
         assertThat(results.get(new Windowed<>("1", new TimeWindow(0, 500))), 
equalTo("0+1+2"));
         assertThat(results.get(new Windowed<>("2", new TimeWindow(500, 
1000))), equalTo("0+1"));
         assertThat(results.get(new Windowed<>("1", new TimeWindow(500, 
1000))), equalTo("0+3"));
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldMaterializeCount() {
         windowedStream.count(Materialized.<String, Long, WindowStore<Bytes, 
byte[]>>as("count-store")
                                      .withKeySerde(Serdes.String())
                                      .withValueSerde(Serdes.Long()));
 
-        processData();
-        final WindowStore<String, Long> windowStore = (WindowStore<String, 
Long>) driver.allStateStores().get("count-store");
-        final List<KeyValue<Windowed<String>, Long>> data = 
StreamsTestUtils.toList(windowStore.fetch("1", "2", 0, 1000));
-        assertThat(data, equalTo(Arrays.asList(
-                KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), 2L),
-                KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), 
1L),
-                KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), 
1L))));
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props, 0L)) {
+            processData(driver);
+            final WindowStore<String, Long> windowStore = 
driver.getWindowStore("count-store");
+            final List<KeyValue<Windowed<String>, Long>> data = 
StreamsTestUtils.toList(windowStore.fetch("1", "2", 0, 1000));
+
+            assertThat(data, equalTo(Arrays.asList(
+                    KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), 
2L),
+                    KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 
1000)), 1L),
+                    KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 
1000)), 1L))));
+        }
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldMaterializeReduced() {
         windowedStream.reduce(MockReducer.STRING_ADDER,
@@ -145,17 +152,18 @@ public void shouldMaterializeReduced() {
                                       .withKeySerde(Serdes.String())
                                       .withValueSerde(Serdes.String()));
 
-        processData();
-        final WindowStore<String, String> windowStore = (WindowStore<String, 
String>) driver.allStateStores().get("reduced");
-        final List<KeyValue<Windowed<String>, String>> data = 
StreamsTestUtils.toList(windowStore.fetch("1", "2", 0, 1000));
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props, 0L)) {
+            processData(driver);
+            final WindowStore<String, String> windowStore = 
driver.getWindowStore("reduced");
+            final List<KeyValue<Windowed<String>, String>> data = 
StreamsTestUtils.toList(windowStore.fetch("1", "2", 0, 1000));
 
-        assertThat(data, equalTo(Arrays.asList(
-                KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), 
"1+2"),
-                KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), 
"3"),
-                KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), 
"1"))));
+            assertThat(data, equalTo(Arrays.asList(
+                    KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), 
"1+2"),
+                    KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 
1000)), "3"),
+                    KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 
1000)), "1"))));
+        }
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldMaterializeAggregated() {
         windowedStream.aggregate(MockInitializer.STRING_INIT,
@@ -164,13 +172,15 @@ public void shouldMaterializeAggregated() {
                                          .withKeySerde(Serdes.String())
                                          .withValueSerde(Serdes.String()));
 
-        processData();
-        final WindowStore<String, String> windowStore = (WindowStore<String, 
String>) driver.allStateStores().get("aggregated");
-        final List<KeyValue<Windowed<String>, String>> data = 
StreamsTestUtils.toList(windowStore.fetch("1", "2", 0, 1000));
-        assertThat(data, equalTo(Arrays.asList(
-                KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), 
"0+1+2"),
-                KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), 
"0+3"),
-                KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), 
"0+1"))));
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props, 0L)) {
+            processData(driver);
+            final WindowStore<String, String> windowStore = 
driver.getWindowStore("aggregated");
+            final List<KeyValue<Windowed<String>, String>> data = 
StreamsTestUtils.toList(windowStore.fetch("1", "2", 0, 1000));
+            assertThat(data, equalTo(Arrays.asList(
+                    KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), 
"0+1+2"),
+                    KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 
1000)), "0+3"),
+                    KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 
1000)), "0+1"))));
+        }
     }
 
     @Test(expected = NullPointerException.class)
@@ -227,16 +237,11 @@ public void 
shouldThrowNullPointerOnCountIfMaterializedIsNull() {
         windowedStream.count(null);
     }
 
-    private void processData() {
-        driver.setUp(builder, TestUtils.tempDirectory(), 0);
-        driver.setTime(10);
-        driver.process(TOPIC, "1", "1");
-        driver.setTime(15);
-        driver.process(TOPIC, "1", "2");
-        driver.setTime(500);
-        driver.process(TOPIC, "1", "3");
-        driver.process(TOPIC, "2", "1");
-        driver.flushState();
+    private void processData(final TopologyTestDriver driver) {
+        driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 10L));
+        driver.pipeInput(recordFactory.create(TOPIC, "1", "2", 15L));
+        driver.pipeInput(recordFactory.create(TOPIC, "1", "3", 500L));
+        driver.pipeInput(recordFactory.create(TOPIC, "2", "1", 500L));
     }
 
 }
\ No newline at end of file


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Rewrite test to use new public TopologyTestDriver
> -------------------------------------------------
>
>                 Key: KAFKA-6474
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6474
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams, unit tests
>    Affects Versions: 1.1.0
>            Reporter: Matthias J. Sax
>            Assignee: Filipe Agapito
>            Priority: Major
>              Labels: beginner, newbie
>
> With KIP-247 we added public TopologyTestDriver. We should rewrite out own 
> test to use this new test driver and remove the two classes 
> ProcessorTopoogyTestDriver and KStreamTestDriver.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to