This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 08b3d9f37b0 MINOR: update KStreamAggregationIntegrationTest (#16699)
08b3d9f37b0 is described below

commit 08b3d9f37b0682078ac1d273497fb88cebfc9528
Author: Matthias J. Sax <[email protected]>
AuthorDate: Wed Jul 31 11:27:09 2024 -0700

    MINOR: update KStreamAggregationIntegrationTest (#16699)
    
    Refactor test to move off deprecated `transform()` in favor of
    `process()`.
    
    Reviewers: Bill Bejeck <[email protected]>
---
 .../KStreamAggregationIntegrationTest.java         | 781 ++++++++++-----------
 1 file changed, 386 insertions(+), 395 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index a9b63f66d29..d4a53f1b222 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -48,14 +48,13 @@ import org.apache.kafka.streams.kstream.SessionWindows;
 import org.apache.kafka.streams.kstream.SlidingWindows;
 import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
 import org.apache.kafka.streams.kstream.TimeWindows;
-import org.apache.kafka.streams.kstream.Transformer;
 import org.apache.kafka.streams.kstream.UnlimitedWindows;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.WindowedSerdes;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.apache.kafka.streams.kstream.internals.UnlimitedWindow;
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlySessionStore;
@@ -74,7 +73,6 @@ import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.Timeout;
 
 import java.io.ByteArrayOutputStream;
-import java.io.IOException;
 import java.io.PrintStream;
 import java.time.Duration;
 import java.util.Arrays;
@@ -99,7 +97,6 @@ import static org.hamcrest.core.Is.is;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-@SuppressWarnings({"unchecked", "deprecation"})
 @Timeout(600)
 @Tag("integration")
 public class KStreamAggregationIntegrationTest {
@@ -108,7 +105,7 @@ public class KStreamAggregationIntegrationTest {
     public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
 
     @BeforeAll
-    public static void startCluster() throws IOException {
+    public static void startCluster() throws Exception {
         CLUSTER.start();
     }
 
@@ -132,7 +129,7 @@ public class KStreamAggregationIntegrationTest {
     private KStream<Integer, String> stream;
 
     @BeforeEach
-    public void before(final TestInfo testInfo) throws InterruptedException {
+    public void before(final TestInfo testInfo) throws Exception {
         builder = new StreamsBuilder();
         final String safeTestName = safeUniqueTestName(testInfo);
         createTopics(safeTestName);
@@ -143,8 +140,8 @@ public class KStreamAggregationIntegrationTest {
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
         
streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
100L);
-        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
-        
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.Integer().getClass());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.StringSerde.class);
+        
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.IntegerSerde.class);
 
         final KeyValueMapper<Integer, String, String> mapper = 
MockMapper.selectValueMapper();
         stream = builder.stream(streamOneInput, 
Consumed.with(Serdes.Integer(), Serdes.String()));
@@ -156,7 +153,7 @@ public class KStreamAggregationIntegrationTest {
     }
 
     @AfterEach
-    public void whenShuttingDown() throws IOException {
+    public void whenShuttingDown() throws Exception {
         if (kafkaStreams != null) {
             kafkaStreams.close();
         }
@@ -183,21 +180,25 @@ public class KStreamAggregationIntegrationTest {
 
         results.sort(KStreamAggregationIntegrationTest::compare);
 
-        assertThat(results, is(Arrays.asList(
-            new KeyValueTimestamp("A", "A", mockTime.milliseconds()),
-            new KeyValueTimestamp("A", "A:A", mockTime.milliseconds()),
-            new KeyValueTimestamp("B", "B", mockTime.milliseconds()),
-            new KeyValueTimestamp("B", "B:B", mockTime.milliseconds()),
-            new KeyValueTimestamp("C", "C", mockTime.milliseconds()),
-            new KeyValueTimestamp("C", "C:C", mockTime.milliseconds()),
-            new KeyValueTimestamp("D", "D", mockTime.milliseconds()),
-            new KeyValueTimestamp("D", "D:D", mockTime.milliseconds()),
-            new KeyValueTimestamp("E", "E", mockTime.milliseconds()),
-            new KeyValueTimestamp("E", "E:E", mockTime.milliseconds()))));
+        assertThat(
+            results,
+            is(Arrays.asList(
+                new KeyValueTimestamp<>("A", "A", mockTime.milliseconds()),
+                new KeyValueTimestamp<>("A", "A:A", mockTime.milliseconds()),
+                new KeyValueTimestamp<>("B", "B", mockTime.milliseconds()),
+                new KeyValueTimestamp<>("B", "B:B", mockTime.milliseconds()),
+                new KeyValueTimestamp<>("C", "C", mockTime.milliseconds()),
+                new KeyValueTimestamp<>("C", "C:C", mockTime.milliseconds()),
+                new KeyValueTimestamp<>("D", "D", mockTime.milliseconds()),
+                new KeyValueTimestamp<>("D", "D:D", mockTime.milliseconds()),
+                new KeyValueTimestamp<>("E", "E", mockTime.milliseconds()),
+                new KeyValueTimestamp<>("E", "E:E", mockTime.milliseconds())
+            ))
+        );
     }
 
-    private static <K extends Comparable, V extends Comparable> int 
compare(final KeyValueTimestamp<K, V> o1,
-                                                                            
final KeyValueTimestamp<K, V> o2) {
+    private static <K extends Comparable<K>, V extends Comparable<V>> int 
compare(final KeyValueTimestamp<K, V> o1,
+                                                                               
   final KeyValueTimestamp<K, V> o2) {
         final int keyComparison = o1.key().compareTo(o2.key());
         if (keyComparison == 0) {
             final int valueComparison = o1.value().compareTo(o2.value());
@@ -220,12 +221,11 @@ public class KStreamAggregationIntegrationTest {
         produceMessages(secondBatchTimestamp);
 
         final Serde<Windowed<String>> windowedSerde = 
WindowedSerdes.timeWindowedSerdeFrom(String.class, 500L);
-        //noinspection deprecation
         groupedStream
-                .windowedBy(TimeWindows.of(ofMillis(500L)))
-                .reduce(reducer)
-                .toStream()
-                .to(outputTopic, Produced.with(windowedSerde, 
Serdes.String()));
+            .windowedBy(TimeWindows.of(ofMillis(500L)))
+            .reduce(reducer)
+            .toStream()
+            .to(outputTopic, Produced.with(windowedSerde, Serdes.String()));
 
         startStreams();
 
@@ -234,7 +234,8 @@ public class KStreamAggregationIntegrationTest {
             new StringDeserializer(),
             String.class,
             15,
-            testInfo);
+            testInfo
+        );
 
         // read from ConsoleConsumer
         final String resultFromConsoleConsumer = 
readWindowedKeyedMessagesViaConsoleConsumer(
@@ -242,7 +243,8 @@ public class KStreamAggregationIntegrationTest {
             new StringDeserializer(),
             String.class,
             15,
-            true);
+            true
+        );
 
         final Comparator<KeyValueTimestamp<Windowed<String>, String>> 
comparator =
             Comparator.comparing((KeyValueTimestamp<Windowed<String>, String> 
o) -> o.key().key())
@@ -255,21 +257,21 @@ public class KStreamAggregationIntegrationTest {
         final long secondBatchWindowEnd = secondBatchWindowStart + 500;
 
         final List<KeyValueTimestamp<Windowed<String>, String>> expectResult = 
Arrays.asList(
-                new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(firstBatchWindowStart, firstBatchWindowEnd)), "A", 
firstBatchTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(secondBatchWindowStart, secondBatchWindowEnd)), "A", 
secondBatchTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(secondBatchWindowStart, secondBatchWindowEnd)), "A:A", 
secondBatchTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(firstBatchWindowStart, firstBatchWindowEnd)), "B", 
firstBatchTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(secondBatchWindowStart, secondBatchWindowEnd)), "B", 
secondBatchTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(secondBatchWindowStart, secondBatchWindowEnd)), "B:B", 
secondBatchTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(firstBatchWindowStart, firstBatchWindowEnd)), "C", 
firstBatchTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(secondBatchWindowStart, secondBatchWindowEnd)), "C", 
secondBatchTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(secondBatchWindowStart, secondBatchWindowEnd)), "C:C", 
secondBatchTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("D", new 
TimeWindow(firstBatchWindowStart, firstBatchWindowEnd)), "D", 
firstBatchTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("D", new 
TimeWindow(secondBatchWindowStart, secondBatchWindowEnd)), "D", 
secondBatchTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("D", new 
TimeWindow(secondBatchWindowStart, secondBatchWindowEnd)), "D:D", 
secondBatchTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(firstBatchWindowStart, firstBatchWindowEnd)), "E", 
firstBatchTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(secondBatchWindowStart, secondBatchWindowEnd)), "E", 
secondBatchTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(secondBatchWindowStart, secondBatchWindowEnd)), "E:E", 
secondBatchTimestamp)
+            new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(firstBatchWindowStart, firstBatchWindowEnd)), "A", 
firstBatchTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(secondBatchWindowStart, secondBatchWindowEnd)), "A", 
secondBatchTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(secondBatchWindowStart, secondBatchWindowEnd)), "A:A", 
secondBatchTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(firstBatchWindowStart, firstBatchWindowEnd)), "B", 
firstBatchTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(secondBatchWindowStart, secondBatchWindowEnd)), "B", 
secondBatchTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(secondBatchWindowStart, secondBatchWindowEnd)), "B:B", 
secondBatchTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(firstBatchWindowStart, firstBatchWindowEnd)), "C", 
firstBatchTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(secondBatchWindowStart, secondBatchWindowEnd)), "C", 
secondBatchTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(secondBatchWindowStart, secondBatchWindowEnd)), "C:C", 
secondBatchTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("D", new 
TimeWindow(firstBatchWindowStart, firstBatchWindowEnd)), "D", 
firstBatchTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("D", new 
TimeWindow(secondBatchWindowStart, secondBatchWindowEnd)), "D", 
secondBatchTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("D", new 
TimeWindow(secondBatchWindowStart, secondBatchWindowEnd)), "D:D", 
secondBatchTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(firstBatchWindowStart, firstBatchWindowEnd)), "E", 
firstBatchTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(secondBatchWindowStart, secondBatchWindowEnd)), "E", 
secondBatchTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(secondBatchWindowStart, secondBatchWindowEnd)), "E:E", 
secondBatchTimestamp)
         );
         assertThat(windowedOutput, is(expectResult));
 
@@ -304,22 +306,26 @@ public class KStreamAggregationIntegrationTest {
             new StringDeserializer(),
             new IntegerDeserializer(),
             10,
-            testInfo);
+            testInfo
+        );
 
         results.sort(KStreamAggregationIntegrationTest::compare);
 
-        assertThat(results, is(Arrays.asList(
-            new KeyValueTimestamp("A", 1, mockTime.milliseconds()),
-            new KeyValueTimestamp("A", 2, mockTime.milliseconds()),
-            new KeyValueTimestamp("B", 1, mockTime.milliseconds()),
-            new KeyValueTimestamp("B", 2, mockTime.milliseconds()),
-            new KeyValueTimestamp("C", 1, mockTime.milliseconds()),
-            new KeyValueTimestamp("C", 2, mockTime.milliseconds()),
-            new KeyValueTimestamp("D", 1, mockTime.milliseconds()),
-            new KeyValueTimestamp("D", 2, mockTime.milliseconds()),
-            new KeyValueTimestamp("E", 1, mockTime.milliseconds()),
-            new KeyValueTimestamp("E", 2, mockTime.milliseconds())
-        )));
+        assertThat(
+            results,
+            is(Arrays.asList(
+                new KeyValueTimestamp<>("A", 1, mockTime.milliseconds()),
+                new KeyValueTimestamp<>("A", 2, mockTime.milliseconds()),
+                new KeyValueTimestamp<>("B", 1, mockTime.milliseconds()),
+                new KeyValueTimestamp<>("B", 2, mockTime.milliseconds()),
+                new KeyValueTimestamp<>("C", 1, mockTime.milliseconds()),
+                new KeyValueTimestamp<>("C", 2, mockTime.milliseconds()),
+                new KeyValueTimestamp<>("D", 1, mockTime.milliseconds()),
+                new KeyValueTimestamp<>("D", 2, mockTime.milliseconds()),
+                new KeyValueTimestamp<>("E", 1, mockTime.milliseconds()),
+                new KeyValueTimestamp<>("E", 2, mockTime.milliseconds())
+           ))
+        );
     }
 
     @SuppressWarnings("deprecation")
@@ -333,15 +339,14 @@ public class KStreamAggregationIntegrationTest {
         produceMessages(secondTimestamp);
 
         final Serde<Windowed<String>> windowedSerde = 
WindowedSerdes.timeWindowedSerdeFrom(String.class, 500L);
-        //noinspection deprecation
         groupedStream.windowedBy(TimeWindows.of(ofMillis(500L)))
-                .aggregate(
-                        initializer,
-                        aggregator,
-                        Materialized.with(null, Serdes.Integer())
-                )
-                .toStream()
-                .to(outputTopic, Produced.with(windowedSerde, 
Serdes.Integer()));
+            .aggregate(
+                initializer,
+                aggregator,
+                Materialized.with(null, Serdes.Integer())
+            )
+            .toStream()
+            .to(outputTopic, Produced.with(windowedSerde, Serdes.Integer()));
 
         startStreams();
 
@@ -371,21 +376,22 @@ public class KStreamAggregationIntegrationTest {
         final long secondWindowEnd = secondWindowStart + 500;
 
         final List<KeyValueTimestamp<Windowed<String>, Integer>> expectResult 
= Arrays.asList(
-                new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(firstWindowStart, firstWindowEnd)), 1, firstTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(secondWindowStart, secondWindowEnd)), 1, secondTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(secondWindowStart, secondWindowEnd)), 2, secondTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(firstWindowStart, firstWindowEnd)), 1, firstTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(secondWindowStart, secondWindowEnd)), 1, secondTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(secondWindowStart, secondWindowEnd)), 2, secondTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(firstWindowStart, firstWindowEnd)), 1, firstTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(secondWindowStart, secondWindowEnd)), 1, secondTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(secondWindowStart, secondWindowEnd)), 2, secondTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("D", new 
TimeWindow(firstWindowStart, firstWindowEnd)), 1, firstTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("D", new 
TimeWindow(secondWindowStart, secondWindowEnd)), 1, secondTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("D", new 
TimeWindow(secondWindowStart, secondWindowEnd)), 2, secondTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(firstWindowStart, firstWindowEnd)), 1, firstTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(secondWindowStart, secondWindowEnd)), 1, secondTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(secondWindowStart, secondWindowEnd)), 2, secondTimestamp));
+            new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(firstWindowStart, firstWindowEnd)), 1, firstTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(secondWindowStart, secondWindowEnd)), 1, secondTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(secondWindowStart, secondWindowEnd)), 2, secondTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(firstWindowStart, firstWindowEnd)), 1, firstTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(secondWindowStart, secondWindowEnd)), 1, secondTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(secondWindowStart, secondWindowEnd)), 2, secondTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(firstWindowStart, firstWindowEnd)), 1, firstTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(secondWindowStart, secondWindowEnd)), 1, secondTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(secondWindowStart, secondWindowEnd)), 2, secondTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("D", new 
TimeWindow(firstWindowStart, firstWindowEnd)), 1, firstTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("D", new 
TimeWindow(secondWindowStart, secondWindowEnd)), 1, secondTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("D", new 
TimeWindow(secondWindowStart, secondWindowEnd)), 2, secondTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(firstWindowStart, firstWindowEnd)), 1, firstTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(secondWindowStart, secondWindowEnd)), 1, secondTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(secondWindowStart, secondWindowEnd)), 2, secondTimestamp)
+        );
 
         assertThat(windowedMessages, is(expectResult));
 
@@ -399,7 +405,6 @@ public class KStreamAggregationIntegrationTest {
         for (final String record: allRecords) {
             assertTrue(expectResultString.contains(record));
         }
-
     }
 
     private void shouldCountHelper(final TestInfo testInfo) throws Exception {
@@ -411,21 +416,25 @@ public class KStreamAggregationIntegrationTest {
             new StringDeserializer(),
             new LongDeserializer(),
             10,
-            testInfo);
+            testInfo
+        );
         results.sort(KStreamAggregationIntegrationTest::compare);
 
-        assertThat(results, is(Arrays.asList(
-            new KeyValueTimestamp("A", 1L, mockTime.milliseconds()),
-            new KeyValueTimestamp("A", 2L, mockTime.milliseconds()),
-            new KeyValueTimestamp("B", 1L, mockTime.milliseconds()),
-            new KeyValueTimestamp("B", 2L, mockTime.milliseconds()),
-            new KeyValueTimestamp("C", 1L, mockTime.milliseconds()),
-            new KeyValueTimestamp("C", 2L, mockTime.milliseconds()),
-            new KeyValueTimestamp("D", 1L, mockTime.milliseconds()),
-            new KeyValueTimestamp("D", 2L, mockTime.milliseconds()),
-            new KeyValueTimestamp("E", 1L, mockTime.milliseconds()),
-            new KeyValueTimestamp("E", 2L, mockTime.milliseconds())
-        )));
+        assertThat(
+            results,
+            is(Arrays.asList(
+                new KeyValueTimestamp<>("A", 1L, mockTime.milliseconds()),
+                new KeyValueTimestamp<>("A", 2L, mockTime.milliseconds()),
+                new KeyValueTimestamp<>("B", 1L, mockTime.milliseconds()),
+                new KeyValueTimestamp<>("B", 2L, mockTime.milliseconds()),
+                new KeyValueTimestamp<>("C", 1L, mockTime.milliseconds()),
+                new KeyValueTimestamp<>("C", 2L, mockTime.milliseconds()),
+                new KeyValueTimestamp<>("D", 1L, mockTime.milliseconds()),
+                new KeyValueTimestamp<>("D", 2L, mockTime.milliseconds()),
+                new KeyValueTimestamp<>("E", 1L, mockTime.milliseconds()),
+                new KeyValueTimestamp<>("E", 2L, mockTime.milliseconds())
+            ))
+        );
     }
 
     @Test
@@ -433,8 +442,8 @@ public class KStreamAggregationIntegrationTest {
         produceMessages(mockTime.milliseconds());
 
         groupedStream.count(Materialized.as("count-by-key"))
-                .toStream()
-                .to(outputTopic, Produced.with(Serdes.String(), 
Serdes.Long()));
+            .toStream()
+            .to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
 
         shouldCountHelper(testInfo);
     }
@@ -444,8 +453,8 @@ public class KStreamAggregationIntegrationTest {
         produceMessages(mockTime.milliseconds());
 
         groupedStream.count()
-                .toStream()
-                .to(outputTopic, Produced.with(Serdes.String(), 
Serdes.Long()));
+            .toStream()
+            .to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
 
         shouldCountHelper(testInfo);
     }
@@ -457,11 +466,10 @@ public class KStreamAggregationIntegrationTest {
         produceMessages(timestamp);
         produceMessages(timestamp);
 
-        //noinspection deprecation
         stream.groupByKey(Grouped.with(Serdes.Integer(), Serdes.String()))
-                .windowedBy(TimeWindows.of(ofMillis(500L)))
-                .count()
-                .toStream((windowedKey, value) -> windowedKey.key() + "@" + 
windowedKey.window().start()).to(outputTopic, Produced.with(Serdes.String(), 
Serdes.Long()));
+            .windowedBy(TimeWindows.of(ofMillis(500L)))
+            .count()
+            .toStream((windowedKey, value) -> windowedKey.key() + "@" + 
windowedKey.window().start()).to(outputTopic, Produced.with(Serdes.String(), 
Serdes.Long()));
 
         startStreams();
 
@@ -469,22 +477,26 @@ public class KStreamAggregationIntegrationTest {
             new StringDeserializer(),
             new LongDeserializer(),
             10,
-            testInfo);
+            testInfo
+        );
         results.sort(KStreamAggregationIntegrationTest::compare);
 
         final long window = timestamp / 500 * 500;
-        assertThat(results, is(Arrays.asList(
-            new KeyValueTimestamp("1@" + window, 1L, timestamp),
-            new KeyValueTimestamp("1@" + window, 2L, timestamp),
-            new KeyValueTimestamp("2@" + window, 1L, timestamp),
-            new KeyValueTimestamp("2@" + window, 2L, timestamp),
-            new KeyValueTimestamp("3@" + window, 1L, timestamp),
-            new KeyValueTimestamp("3@" + window, 2L, timestamp),
-            new KeyValueTimestamp("4@" + window, 1L, timestamp),
-            new KeyValueTimestamp("4@" + window, 2L, timestamp),
-            new KeyValueTimestamp("5@" + window, 1L, timestamp),
-            new KeyValueTimestamp("5@" + window, 2L, timestamp)
-        )));
+        assertThat(
+            results,
+            is(Arrays.asList(
+                new KeyValueTimestamp<>("1@" + window, 1L, timestamp),
+                new KeyValueTimestamp<>("1@" + window, 2L, timestamp),
+                new KeyValueTimestamp<>("2@" + window, 1L, timestamp),
+                new KeyValueTimestamp<>("2@" + window, 2L, timestamp),
+                new KeyValueTimestamp<>("3@" + window, 1L, timestamp),
+                new KeyValueTimestamp<>("3@" + window, 2L, timestamp),
+                new KeyValueTimestamp<>("4@" + window, 1L, timestamp),
+                new KeyValueTimestamp<>("4@" + window, 2L, timestamp),
+                new KeyValueTimestamp<>("5@" + window, 1L, timestamp),
+                new KeyValueTimestamp<>("5@" + window, 2L, timestamp)
+            ))
+        );
     }
 
     @SuppressWarnings("deprecation")
@@ -499,32 +511,33 @@ public class KStreamAggregationIntegrationTest {
         produceMessages(thirdBatchTimestamp);
 
         final Serde<Windowed<String>> windowedSerde = 
WindowedSerdes.timeWindowedSerdeFrom(String.class, timeDifference);
-        //noinspection deprecation
         groupedStream
-                
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(timeDifference), 
ofMillis(2000L)))
-                .reduce(reducer)
-                .toStream()
-                .to(outputTopic, Produced.with(windowedSerde, 
Serdes.String()));
+            
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(timeDifference), 
ofMillis(2000L)))
+            .reduce(reducer)
+            .toStream()
+            .to(outputTopic, Produced.with(windowedSerde, Serdes.String()));
 
         startStreams();
 
         final List<KeyValueTimestamp<Windowed<String>, String>> windowedOutput 
= receiveMessages(
-                new TimeWindowedDeserializer<>(new StringDeserializer(), 500L),
-                new StringDeserializer(),
-                String.class,
-                30,
-                testInfo);
+            new TimeWindowedDeserializer<>(new StringDeserializer(), 500L),
+            new StringDeserializer(),
+            String.class,
+            30,
+            testInfo
+        );
 
         final String resultFromConsoleConsumer = 
readWindowedKeyedMessagesViaConsoleConsumer(
-                new TimeWindowedDeserializer<String>(),
-                new StringDeserializer(),
-                String.class,
-                30,
-                true);
+            new TimeWindowedDeserializer<String>(),
+            new StringDeserializer(),
+            String.class,
+            30,
+            true
+        );
 
         final Comparator<KeyValueTimestamp<Windowed<String>, String>> 
comparator =
-                Comparator.comparing((KeyValueTimestamp<Windowed<String>, 
String> o) -> o.key().key())
-                        .thenComparing(KeyValueTimestamp::value);
+            Comparator.comparing((KeyValueTimestamp<Windowed<String>, String> 
o) -> o.key().key())
+                .thenComparing(KeyValueTimestamp::value);
 
         windowedOutput.sort(comparator);
         final long firstBatchLeftWindowStart = firstBatchTimestamp - 
timeDifference;
@@ -541,43 +554,43 @@ public class KStreamAggregationIntegrationTest {
         final long thirdBatchLeftWindowEnd = thirdBatchLeftWindowStart + 
timeDifference;
 
         final List<KeyValueTimestamp<Windowed<String>, String>> expectResult = 
Arrays.asList(
-                // A @ firstBatchTimestamp left window created when A @ 
firstBatchTimestamp processed
-                new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(firstBatchLeftWindowStart, firstBatchLeftWindowEnd)), "A", 
firstBatchTimestamp),
-                // A @ firstBatchTimestamp right window created when A @ 
secondBatchTimestamp processed
-                new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), "A", 
secondBatchTimestamp),
-                // A @ secondBatchTimestamp right window created when A @ 
thirdBatchTimestamp processed
-                new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(secondBatchRightWindowStart, secondBatchRightWindowEnd)), "A", 
thirdBatchTimestamp),
-                // A @ secondBatchTimestamp left window created when A @ 
secondBatchTimestamp processed
-                new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(secondBatchLeftWindowStart, secondBatchLeftWindowEnd)), "A:A", 
secondBatchTimestamp),
-                // A @ firstBatchTimestamp right window updated when A @ 
thirdBatchTimestamp processed
-                new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), "A:A", 
thirdBatchTimestamp),
-                // A @ thirdBatchTimestamp left window created when A @ 
thirdBatchTimestamp processed
-                new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(thirdBatchLeftWindowStart, thirdBatchLeftWindowEnd)), "A:A:A", 
thirdBatchTimestamp),
-
-                new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(firstBatchLeftWindowStart, firstBatchLeftWindowEnd)), "B", 
firstBatchTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), "B", 
secondBatchTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(secondBatchRightWindowStart, secondBatchRightWindowEnd)), "B", 
thirdBatchTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(secondBatchLeftWindowStart, secondBatchLeftWindowEnd)), "B:B", 
secondBatchTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), "B:B", 
thirdBatchTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(thirdBatchLeftWindowStart, thirdBatchLeftWindowEnd)), "B:B:B", 
thirdBatchTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(firstBatchLeftWindowStart, firstBatchLeftWindowEnd)), "C", 
firstBatchTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), "C", 
secondBatchTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(secondBatchRightWindowStart, secondBatchRightWindowEnd)), "C", 
thirdBatchTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(secondBatchLeftWindowStart, secondBatchLeftWindowEnd)), "C:C", 
secondBatchTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), "C:C", 
thirdBatchTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(thirdBatchLeftWindowStart, thirdBatchLeftWindowEnd)), "C:C:C", 
thirdBatchTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("D", new 
TimeWindow(firstBatchLeftWindowStart, firstBatchLeftWindowEnd)), "D", 
firstBatchTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("D", new 
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), "D", 
secondBatchTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("D", new 
TimeWindow(secondBatchRightWindowStart, secondBatchRightWindowEnd)), "D", 
thirdBatchTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("D", new 
TimeWindow(secondBatchLeftWindowStart, secondBatchLeftWindowEnd)), "D:D", 
secondBatchTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("D", new 
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), "D:D", 
thirdBatchTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("D", new 
TimeWindow(thirdBatchLeftWindowStart, thirdBatchLeftWindowEnd)), "D:D:D", 
thirdBatchTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(firstBatchLeftWindowStart, firstBatchLeftWindowEnd)), "E", 
firstBatchTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), "E", 
secondBatchTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(secondBatchRightWindowStart, secondBatchRightWindowEnd)), "E", 
thirdBatchTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(secondBatchLeftWindowStart, secondBatchLeftWindowEnd)), "E:E", 
secondBatchTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), "E:E", 
thirdBatchTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(thirdBatchLeftWindowStart, thirdBatchLeftWindowEnd)), "E:E:E", 
thirdBatchTimestamp)
+            // A @ firstBatchTimestamp left window created when A @ 
firstBatchTimestamp processed
+            new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(firstBatchLeftWindowStart, firstBatchLeftWindowEnd)), "A", 
firstBatchTimestamp),
+            // A @ firstBatchTimestamp right window created when A @ 
secondBatchTimestamp processed
+            new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), "A", 
secondBatchTimestamp),
+            // A @ secondBatchTimestamp right window created when A @ 
thirdBatchTimestamp processed
+            new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(secondBatchRightWindowStart, secondBatchRightWindowEnd)), "A", 
thirdBatchTimestamp),
+            // A @ secondBatchTimestamp left window created when A @ 
secondBatchTimestamp processed
+            new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(secondBatchLeftWindowStart, secondBatchLeftWindowEnd)), "A:A", 
secondBatchTimestamp),
+            // A @ firstBatchTimestamp right window updated when A @ 
thirdBatchTimestamp processed
+            new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), "A:A", 
thirdBatchTimestamp),
+            // A @ thirdBatchTimestamp left window created when A @ 
thirdBatchTimestamp processed
+            new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(thirdBatchLeftWindowStart, thirdBatchLeftWindowEnd)), "A:A:A", 
thirdBatchTimestamp),
+
+            new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(firstBatchLeftWindowStart, firstBatchLeftWindowEnd)), "B", 
firstBatchTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), "B", 
secondBatchTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(secondBatchRightWindowStart, secondBatchRightWindowEnd)), "B", 
thirdBatchTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(secondBatchLeftWindowStart, secondBatchLeftWindowEnd)), "B:B", 
secondBatchTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), "B:B", 
thirdBatchTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(thirdBatchLeftWindowStart, thirdBatchLeftWindowEnd)), "B:B:B", 
thirdBatchTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(firstBatchLeftWindowStart, firstBatchLeftWindowEnd)), "C", 
firstBatchTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), "C", 
secondBatchTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(secondBatchRightWindowStart, secondBatchRightWindowEnd)), "C", 
thirdBatchTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(secondBatchLeftWindowStart, secondBatchLeftWindowEnd)), "C:C", 
secondBatchTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), "C:C", 
thirdBatchTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(thirdBatchLeftWindowStart, thirdBatchLeftWindowEnd)), "C:C:C", 
thirdBatchTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("D", new 
TimeWindow(firstBatchLeftWindowStart, firstBatchLeftWindowEnd)), "D", 
firstBatchTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("D", new 
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), "D", 
secondBatchTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("D", new 
TimeWindow(secondBatchRightWindowStart, secondBatchRightWindowEnd)), "D", 
thirdBatchTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("D", new 
TimeWindow(secondBatchLeftWindowStart, secondBatchLeftWindowEnd)), "D:D", 
secondBatchTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("D", new 
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), "D:D", 
thirdBatchTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("D", new 
TimeWindow(thirdBatchLeftWindowStart, thirdBatchLeftWindowEnd)), "D:D:D", 
thirdBatchTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(firstBatchLeftWindowStart, firstBatchLeftWindowEnd)), "E", 
firstBatchTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), "E", 
secondBatchTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(secondBatchRightWindowStart, secondBatchRightWindowEnd)), "E", 
thirdBatchTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(secondBatchLeftWindowStart, secondBatchLeftWindowEnd)), "E:E", 
secondBatchTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), "E:E", 
thirdBatchTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(thirdBatchLeftWindowStart, thirdBatchLeftWindowEnd)), "E:E:E", 
thirdBatchTimestamp)
         );
         assertThat(windowedOutput, is(expectResult));
 
@@ -608,34 +621,36 @@ public class KStreamAggregationIntegrationTest {
         final Serde<Windowed<String>> windowedSerde = 
WindowedSerdes.timeWindowedSerdeFrom(String.class, timeDifference);
         //noinspection deprecation
         
groupedStream.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(500L),
 ofMinutes(5)))
-                .aggregate(
-                        initializer,
-                        aggregator,
-                        Materialized.with(null, Serdes.Integer())
-                )
-                .toStream()
-                .to(outputTopic, Produced.with(windowedSerde, 
Serdes.Integer()));
+            .aggregate(
+                initializer,
+                aggregator,
+                Materialized.with(null, Serdes.Integer())
+            )
+            .toStream()
+            .to(outputTopic, Produced.with(windowedSerde, Serdes.Integer()));
 
         startStreams();
 
         final List<KeyValueTimestamp<Windowed<String>, Integer>> 
windowedMessages = receiveMessagesWithTimestamp(
-                new TimeWindowedDeserializer<>(),
-                new IntegerDeserializer(),
-                String.class,
-                30,
-                testInfo);
+            new TimeWindowedDeserializer<>(),
+            new IntegerDeserializer(),
+            String.class,
+            30,
+            testInfo
+        );
 
         // read from ConsoleConsumer
         final String resultFromConsoleConsumer = 
readWindowedKeyedMessagesViaConsoleConsumer(
-                new TimeWindowedDeserializer<String>(),
-                new IntegerDeserializer(),
-                String.class,
-                30,
-                true);
+            new TimeWindowedDeserializer<String>(),
+            new IntegerDeserializer(),
+            String.class,
+            30,
+            true
+        );
 
         final Comparator<KeyValueTimestamp<Windowed<String>, Integer>> 
comparator =
-                Comparator.comparing((KeyValueTimestamp<Windowed<String>, 
Integer> o) -> o.key().key())
-                        .thenComparingInt(KeyValueTimestamp::value);
+            Comparator.comparing((KeyValueTimestamp<Windowed<String>, Integer> 
o) -> o.key().key())
+                .thenComparingInt(KeyValueTimestamp::value);
         windowedMessages.sort(comparator);
 
         final long firstBatchLeftWindowStart = firstBatchTimestamp - 
timeDifference;
@@ -652,43 +667,43 @@ public class KStreamAggregationIntegrationTest {
         final long thirdBatchLeftWindowEnd = thirdBatchLeftWindowStart + 
timeDifference;
 
         final List<KeyValueTimestamp<Windowed<String>, Integer>> expectResult 
= Arrays.asList(
-                // A @ firstBatchTimestamp left window created when A @ 
firstBatchTimestamp processed
-                new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(firstBatchLeftWindowStart, firstBatchLeftWindowEnd)), 1, 
firstBatchTimestamp),
-                // A @ firstBatchTimestamp right window created when A @ 
secondBatchTimestamp processed
-                new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), 1, 
secondBatchTimestamp),
-                // A @ secondBatchTimestamp right window created when A @ 
thirdBatchTimestamp processed
-                new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(secondBatchRightWindowStart, secondBatchRightWindowEnd)), 1, 
thirdBatchTimestamp),
-                // A @ secondBatchTimestamp left window created when A @ 
secondBatchTimestamp processed
-                new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(secondBatchLeftWindowStart, secondBatchLeftWindowEnd)), 2, 
secondBatchTimestamp),
-                // A @ firstBatchTimestamp right window updated when A @ 
thirdBatchTimestamp processed
-                new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), 2, 
thirdBatchTimestamp),
-                // A @ thirdBatchTimestamp left window created when A @ 
thirdBatchTimestamp processed
-                new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(thirdBatchLeftWindowStart, thirdBatchLeftWindowEnd)), 3, 
thirdBatchTimestamp),
-
-                new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(firstBatchLeftWindowStart, firstBatchLeftWindowEnd)), 1, 
firstBatchTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), 1, 
secondBatchTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(secondBatchRightWindowStart, secondBatchRightWindowEnd)), 1, 
thirdBatchTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(secondBatchLeftWindowStart, secondBatchLeftWindowEnd)), 2, 
secondBatchTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), 2, 
thirdBatchTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(thirdBatchLeftWindowStart, thirdBatchLeftWindowEnd)), 3, 
thirdBatchTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(firstBatchLeftWindowStart, firstBatchLeftWindowEnd)), 1, 
firstBatchTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), 1, 
secondBatchTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(secondBatchRightWindowStart, secondBatchRightWindowEnd)), 1, 
thirdBatchTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(secondBatchLeftWindowStart, secondBatchLeftWindowEnd)), 2, 
secondBatchTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), 2, 
thirdBatchTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(thirdBatchLeftWindowStart, thirdBatchLeftWindowEnd)), 3, 
thirdBatchTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("D", new 
TimeWindow(firstBatchLeftWindowStart, firstBatchLeftWindowEnd)), 1, 
firstBatchTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("D", new 
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), 1, 
secondBatchTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("D", new 
TimeWindow(secondBatchRightWindowStart, secondBatchRightWindowEnd)), 1, 
thirdBatchTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("D", new 
TimeWindow(secondBatchLeftWindowStart, secondBatchLeftWindowEnd)), 2, 
secondBatchTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("D", new 
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), 2, 
thirdBatchTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("D", new 
TimeWindow(thirdBatchLeftWindowStart, thirdBatchLeftWindowEnd)), 3, 
thirdBatchTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(firstBatchLeftWindowStart, firstBatchLeftWindowEnd)), 1, 
firstBatchTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), 1, 
secondBatchTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(secondBatchRightWindowStart, secondBatchRightWindowEnd)), 1, 
thirdBatchTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(secondBatchLeftWindowStart, secondBatchLeftWindowEnd)), 2, 
secondBatchTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), 2, 
thirdBatchTimestamp),
-                new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(thirdBatchLeftWindowStart, thirdBatchLeftWindowEnd)), 3, 
thirdBatchTimestamp)
+            // A @ firstBatchTimestamp left window created when A @ 
firstBatchTimestamp processed
+            new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(firstBatchLeftWindowStart, firstBatchLeftWindowEnd)), 1, 
firstBatchTimestamp),
+            // A @ firstBatchTimestamp right window created when A @ 
secondBatchTimestamp processed
+            new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), 1, 
secondBatchTimestamp),
+            // A @ secondBatchTimestamp right window created when A @ 
thirdBatchTimestamp processed
+            new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(secondBatchRightWindowStart, secondBatchRightWindowEnd)), 1, 
thirdBatchTimestamp),
+            // A @ secondBatchTimestamp left window created when A @ 
secondBatchTimestamp processed
+            new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(secondBatchLeftWindowStart, secondBatchLeftWindowEnd)), 2, 
secondBatchTimestamp),
+            // A @ firstBatchTimestamp right window updated when A @ 
thirdBatchTimestamp processed
+            new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), 2, 
thirdBatchTimestamp),
+            // A @ thirdBatchTimestamp left window created when A @ 
thirdBatchTimestamp processed
+            new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(thirdBatchLeftWindowStart, thirdBatchLeftWindowEnd)), 3, 
thirdBatchTimestamp),
+
+            new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(firstBatchLeftWindowStart, firstBatchLeftWindowEnd)), 1, 
firstBatchTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), 1, 
secondBatchTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(secondBatchRightWindowStart, secondBatchRightWindowEnd)), 1, 
thirdBatchTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(secondBatchLeftWindowStart, secondBatchLeftWindowEnd)), 2, 
secondBatchTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), 2, 
thirdBatchTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(thirdBatchLeftWindowStart, thirdBatchLeftWindowEnd)), 3, 
thirdBatchTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(firstBatchLeftWindowStart, firstBatchLeftWindowEnd)), 1, 
firstBatchTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), 1, 
secondBatchTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(secondBatchRightWindowStart, secondBatchRightWindowEnd)), 1, 
thirdBatchTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(secondBatchLeftWindowStart, secondBatchLeftWindowEnd)), 2, 
secondBatchTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), 2, 
thirdBatchTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(thirdBatchLeftWindowStart, thirdBatchLeftWindowEnd)), 3, 
thirdBatchTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("D", new 
TimeWindow(firstBatchLeftWindowStart, firstBatchLeftWindowEnd)), 1, 
firstBatchTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("D", new 
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), 1, 
secondBatchTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("D", new 
TimeWindow(secondBatchRightWindowStart, secondBatchRightWindowEnd)), 1, 
thirdBatchTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("D", new 
TimeWindow(secondBatchLeftWindowStart, secondBatchLeftWindowEnd)), 2, 
secondBatchTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("D", new 
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), 2, 
thirdBatchTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("D", new 
TimeWindow(thirdBatchLeftWindowStart, thirdBatchLeftWindowEnd)), 3, 
thirdBatchTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(firstBatchLeftWindowStart, firstBatchLeftWindowEnd)), 1, 
firstBatchTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), 1, 
secondBatchTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(secondBatchRightWindowStart, secondBatchRightWindowEnd)), 1, 
thirdBatchTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(secondBatchLeftWindowStart, secondBatchLeftWindowEnd)), 2, 
secondBatchTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), 2, 
thirdBatchTimestamp),
+            new KeyValueTimestamp<>(new Windowed<>("E", new 
TimeWindow(thirdBatchLeftWindowStart, thirdBatchLeftWindowEnd)), 3, 
thirdBatchTimestamp)
         );
 
         assertThat(windowedMessages, is(expectResult));
@@ -703,66 +718,71 @@ public class KStreamAggregationIntegrationTest {
         for (final String record: allRecords) {
             assertTrue(expectResultString.contains(record));
         }
-
     }
 
     @SuppressWarnings("deprecation")
     @Test
     public void shouldCountSessionWindows() throws Exception {
         final long sessionGap = 5 * 60 * 1000L;
-        final List<KeyValue<String, String>> t1Messages = Arrays.asList(new 
KeyValue<>("bob", "start"),
-                                                                        new 
KeyValue<>("penny", "start"),
-                                                                        new 
KeyValue<>("jo", "pause"),
-                                                                        new 
KeyValue<>("emily", "pause"));
+        final List<KeyValue<String, String>> t1Messages = Arrays.asList(
+            new KeyValue<>("bob", "start"),
+            new KeyValue<>("penny", "start"),
+            new KeyValue<>("jo", "pause"),
+            new KeyValue<>("emily", "pause")
+        );
 
         final long t1 = mockTime.milliseconds() - 
TimeUnit.MILLISECONDS.convert(1, TimeUnit.HOURS);
         IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
-                userSessionsStream,
-                t1Messages,
-                TestUtils.producerConfig(
-                        CLUSTER.bootstrapServers(),
-                        StringSerializer.class,
-                        StringSerializer.class,
-                        new Properties()),
-                t1);
+            userSessionsStream,
+            t1Messages,
+            TestUtils.producerConfig(
+                CLUSTER.bootstrapServers(),
+                StringSerializer.class,
+                StringSerializer.class,
+                new Properties()),
+            t1
+        );
         final long t2 = t1 + (sessionGap / 2);
         IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
-                userSessionsStream,
-                Collections.singletonList(
-                        new KeyValue<>("emily", "resume")
-                ),
-                TestUtils.producerConfig(
-                        CLUSTER.bootstrapServers(),
-                        StringSerializer.class,
-                        StringSerializer.class,
-                        new Properties()),
-                t2);
+            userSessionsStream,
+            Collections.singletonList(
+                new KeyValue<>("emily", "resume")
+            ),
+            TestUtils.producerConfig(
+                CLUSTER.bootstrapServers(),
+                StringSerializer.class,
+                StringSerializer.class,
+                new Properties()),
+            t2
+        );
         final long t3 = t1 + sessionGap + 1;
         IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
-                userSessionsStream,
-                Arrays.asList(
-                        new KeyValue<>("bob", "pause"),
-                        new KeyValue<>("penny", "stop")
-                ),
-                TestUtils.producerConfig(
-                        CLUSTER.bootstrapServers(),
-                        StringSerializer.class,
-                        StringSerializer.class,
-                        new Properties()),
-                t3);
+            userSessionsStream,
+            Arrays.asList(
+                new KeyValue<>("bob", "pause"),
+                new KeyValue<>("penny", "stop")
+            ),
+            TestUtils.producerConfig(
+                CLUSTER.bootstrapServers(),
+                StringSerializer.class,
+                StringSerializer.class,
+                new Properties()),
+            t3
+        );
         final long t4 = t3 + (sessionGap / 2);
         IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
-                userSessionsStream,
-                Arrays.asList(
-                        new KeyValue<>("bob", "resume"), // bobs session 
continues
-                        new KeyValue<>("jo", "resume")   // jo's starts new 
session
-                ),
-                TestUtils.producerConfig(
-                        CLUSTER.bootstrapServers(),
-                        StringSerializer.class,
-                        StringSerializer.class,
-                        new Properties()),
-                t4);
+            userSessionsStream,
+            Arrays.asList(
+                new KeyValue<>("bob", "resume"), // bobs session continues
+                new KeyValue<>("jo", "resume")   // jo's starts new session
+            ),
+            TestUtils.producerConfig(
+                CLUSTER.bootstrapServers(),
+                StringSerializer.class,
+                StringSerializer.class,
+                new Properties()),
+            t4
+        );
         final long t5 = t4 - 1;
         IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
             userSessionsStream,
@@ -774,35 +794,22 @@ public class KStreamAggregationIntegrationTest {
                 StringSerializer.class,
                 StringSerializer.class,
                 new Properties()),
-            t5);
+            t5
+        );
 
         final Map<Windowed<String>, KeyValue<Long, Long>> results = new 
HashMap<>();
         final CountDownLatch latch = new CountDownLatch(13);
 
         //noinspection deprecation
         builder.stream(userSessionsStream, Consumed.with(Serdes.String(), 
Serdes.String()))
-                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-                .windowedBy(SessionWindows.with(ofMillis(sessionGap)))
-                .count()
-                .toStream()
-                .transform(() -> new Transformer<Windowed<String>, Long, 
KeyValue<Object, Object>>() {
-                        private ProcessorContext context;
-
-                        @Override
-                        public void init(final ProcessorContext context) {
-                            this.context = context;
-                        }
-
-                        @Override
-                        public KeyValue<Object, Object> transform(final 
Windowed<String> key, final Long value) {
-                            results.put(key, KeyValue.pair(value, 
context.timestamp()));
-                            latch.countDown();
-                            return null;
-                        }
-
-                        @Override
-                        public void close() {}
-                    });
+            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+            .windowedBy(SessionWindows.with(ofMillis(sessionGap)))
+            .count()
+            .toStream()
+            .process(() -> (Processor<Windowed<String>, Long, Object, Object>) 
record -> {
+                results.put(record.key(), KeyValue.pair(record.value(), 
record.timestamp()));
+                latch.countDown();
+            });
 
         startStreams();
         latch.await(30, TimeUnit.SECONDS);
@@ -820,59 +827,65 @@ public class KStreamAggregationIntegrationTest {
     @Test
     public void shouldReduceSessionWindows() throws Exception {
         final long sessionGap = 1000L; // something to do with time
-        final List<KeyValue<String, String>> t1Messages = Arrays.asList(new 
KeyValue<>("bob", "start"),
-                                                                        new 
KeyValue<>("penny", "start"),
-                                                                        new 
KeyValue<>("jo", "pause"),
-                                                                        new 
KeyValue<>("emily", "pause"));
+        final List<KeyValue<String, String>> t1Messages = Arrays.asList(
+            new KeyValue<>("bob", "start"),
+            new KeyValue<>("penny", "start"),
+            new KeyValue<>("jo", "pause"),
+            new KeyValue<>("emily", "pause")
+        );
 
         final long t1 = mockTime.milliseconds();
         IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
-                userSessionsStream,
-                t1Messages,
-                TestUtils.producerConfig(
-                        CLUSTER.bootstrapServers(),
-                        StringSerializer.class,
-                        StringSerializer.class,
-                        new Properties()),
-                t1);
+            userSessionsStream,
+            t1Messages,
+            TestUtils.producerConfig(
+                CLUSTER.bootstrapServers(),
+                StringSerializer.class,
+                StringSerializer.class,
+                new Properties()),
+            t1
+        );
         final long t2 = t1 + (sessionGap / 2);
         IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
-                userSessionsStream,
-                Collections.singletonList(
-                        new KeyValue<>("emily", "resume")
-                ),
-                TestUtils.producerConfig(
-                        CLUSTER.bootstrapServers(),
-                        StringSerializer.class,
-                        StringSerializer.class,
-                        new Properties()),
-                t2);
+            userSessionsStream,
+            Collections.singletonList(
+                new KeyValue<>("emily", "resume")
+            ),
+            TestUtils.producerConfig(
+                CLUSTER.bootstrapServers(),
+                StringSerializer.class,
+                StringSerializer.class,
+                new Properties()),
+            t2
+        );
         final long t3 = t1 + sessionGap + 1;
         IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
-                userSessionsStream,
-                Arrays.asList(
-                        new KeyValue<>("bob", "pause"),
-                        new KeyValue<>("penny", "stop")
-                ),
-                TestUtils.producerConfig(
-                        CLUSTER.bootstrapServers(),
-                        StringSerializer.class,
-                        StringSerializer.class,
-                        new Properties()),
-                t3);
+            userSessionsStream,
+            Arrays.asList(
+                new KeyValue<>("bob", "pause"),
+                new KeyValue<>("penny", "stop")
+            ),
+            TestUtils.producerConfig(
+                CLUSTER.bootstrapServers(),
+                StringSerializer.class,
+                StringSerializer.class,
+                new Properties()),
+            t3
+        );
         final long t4 = t3 + (sessionGap / 2);
         IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
-                userSessionsStream,
-                Arrays.asList(
-                        new KeyValue<>("bob", "resume"), // bobs session 
continues
-                        new KeyValue<>("jo", "resume")   // jo's starts new 
session
-                ),
-                TestUtils.producerConfig(
-                        CLUSTER.bootstrapServers(),
-                        StringSerializer.class,
-                        StringSerializer.class,
-                        new Properties()),
-                t4);
+            userSessionsStream,
+            Arrays.asList(
+                new KeyValue<>("bob", "resume"), // bobs session continues
+                new KeyValue<>("jo", "resume")   // jo's starts new session
+            ),
+            TestUtils.producerConfig(
+                CLUSTER.bootstrapServers(),
+                StringSerializer.class,
+                StringSerializer.class,
+                new Properties()),
+            t4
+        );
         final long t5 = t4 - 1;
         IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
             userSessionsStream,
@@ -884,34 +897,19 @@ public class KStreamAggregationIntegrationTest {
                 StringSerializer.class,
                 StringSerializer.class,
                 new Properties()),
-            t5);
+            t5
+        );
 
         final Map<Windowed<String>, KeyValue<String, Long>> results = new 
HashMap<>();
         final CountDownLatch latch = new CountDownLatch(13);
         final String userSessionsStore = "UserSessionsStore";
         //noinspection deprecation
         builder.stream(userSessionsStream, Consumed.with(Serdes.String(), 
Serdes.String()))
-                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-                .windowedBy(SessionWindows.with(ofMillis(sessionGap)))
-                .reduce((value1, value2) -> value1 + ":" + value2, 
Materialized.as(userSessionsStore))
-                .toStream()
-            .transform(() -> new Transformer<Windowed<String>, String, 
KeyValue<Object, Object>>() {
-                private ProcessorContext context;
-
-                @Override
-                public void init(final ProcessorContext context) {
-                    this.context = context;
-                }
-
-                @Override
-                public KeyValue<Object, Object> transform(final 
Windowed<String> key, final String value) {
-                    results.put(key, KeyValue.pair(value, 
context.timestamp()));
-                    latch.countDown();
-                    return null;
-                }
-
-                @Override
-                public void close() {}
+            .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) 
.windowedBy(SessionWindows.with(ofMillis(sessionGap))) .reduce((value1, value2) 
-> value1 + ":" + value2, Materialized.as(userSessionsStore))
+            .toStream()
+            .process(() -> (Processor<Windowed<String>, String, Object, 
Object>) record -> {
+                results.put(record.key(), KeyValue.pair(record.value(), 
record.timestamp()));
+                latch.countDown();
             });
 
         startStreams();
@@ -943,10 +941,12 @@ public class KStreamAggregationIntegrationTest {
         final long incrementTime = Duration.ofDays(1).toMillis();
 
         final long t1 = mockTime.milliseconds() - 
TimeUnit.MILLISECONDS.convert(1, TimeUnit.HOURS);
-        final List<KeyValue<String, String>> t1Messages = Arrays.asList(new 
KeyValue<>("bob", "start"),
-                                                                        new 
KeyValue<>("penny", "start"),
-                                                                        new 
KeyValue<>("jo", "pause"),
-                                                                        new 
KeyValue<>("emily", "pause"));
+        final List<KeyValue<String, String>> t1Messages = Arrays.asList(
+            new KeyValue<>("bob", "start"),
+            new KeyValue<>("penny", "start"),
+            new KeyValue<>("jo", "pause"),
+            new KeyValue<>("emily", "pause")
+        );
 
         final Properties producerConfig = TestUtils.producerConfig(
             CLUSTER.bootstrapServers(),
@@ -959,7 +959,8 @@ public class KStreamAggregationIntegrationTest {
             userSessionsStream,
             t1Messages,
             producerConfig,
-            t1);
+            t1
+        );
 
         final long t2 = t1 + incrementTime;
         IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
@@ -968,7 +969,8 @@ public class KStreamAggregationIntegrationTest {
                 new KeyValue<>("emily", "resume")
             ),
             producerConfig,
-            t2);
+            t2
+        );
         final long t3 = t2 + incrementTime;
         IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
             userSessionsStream,
@@ -977,7 +979,8 @@ public class KStreamAggregationIntegrationTest {
                 new KeyValue<>("penny", "stop")
             ),
             producerConfig,
-            t3);
+            t3
+        );
 
         final long t4 = t3 + incrementTime;
         IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
@@ -987,34 +990,21 @@ public class KStreamAggregationIntegrationTest {
                 new KeyValue<>("jo", "resume")   // jo's starts new session
             ),
             producerConfig,
-            t4);
+            t4
+        );
 
         final Map<Windowed<String>, KeyValue<Long, Long>> results = new 
HashMap<>();
         final CountDownLatch latch = new CountDownLatch(5);
 
         builder.stream(userSessionsStream, Consumed.with(Serdes.String(), 
Serdes.String()))
-               .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-               
.windowedBy(UnlimitedWindows.of().startOn(ofEpochMilli(startTime)))
-               .count()
-               .toStream()
-               .transform(() -> new Transformer<Windowed<String>, Long, 
KeyValue<Object, Object>>() {
-                   private ProcessorContext context;
-
-                   @Override
-                   public void init(final ProcessorContext context) {
-                       this.context = context;
-                   }
-
-                   @Override
-                   public KeyValue<Object, Object> transform(final 
Windowed<String> key, final Long value) {
-                       results.put(key, KeyValue.pair(value, 
context.timestamp()));
-                       latch.countDown();
-                       return null;
-                   }
-
-                   @Override
-                   public void close() {}
-               });
+            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+            .windowedBy(UnlimitedWindows.of().startOn(ofEpochMilli(startTime)))
+            .count()
+            .toStream()
+            .process(() -> (Processor<Windowed<String>, Long, Object, Object>) 
record -> {
+                results.put(record.key(), KeyValue.pair(record.value(), 
record.timestamp()));
+                latch.countDown();
+            });
         startStreams();
         assertTrue(latch.await(30, TimeUnit.SECONDS));
 
@@ -1025,7 +1015,7 @@ public class KStreamAggregationIntegrationTest {
     }
 
 
-    private void produceMessages(final long timestamp) throws Exception {
+    private void produceMessages(final long timestamp) {
         IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
             streamOneInput,
             Arrays.asList(
@@ -1039,11 +1029,12 @@ public class KStreamAggregationIntegrationTest {
                 IntegerSerializer.class,
                 StringSerializer.class,
                 new Properties()),
-            timestamp);
+            timestamp
+        );
     }
 
 
-    private void createTopics(final String safeTestName) throws 
InterruptedException {
+    private void createTopics(final String safeTestName) throws Exception {
         streamOneInput = "stream-one-" + safeTestName;
         outputTopic = "output-" + safeTestName;
         userSessionsStream = "user-sessions-" + safeTestName;
@@ -1067,7 +1058,7 @@ public class KStreamAggregationIntegrationTest {
 
     private <K, V> List<KeyValueTimestamp<K, V>> receiveMessages(final 
Deserializer<K> keyDeserializer,
                                                                  final 
Deserializer<V> valueDeserializer,
-                                                                 final Class 
innerClass,
+                                                                 final 
Class<?> innerClass,
                                                                  final int 
numMessages,
                                                                  final 
TestInfo testInfo)
             throws Exception {
@@ -1093,7 +1084,7 @@ public class KStreamAggregationIntegrationTest {
 
     private <K, V> List<KeyValueTimestamp<K, V>> 
receiveMessagesWithTimestamp(final Deserializer<K> keyDeserializer,
                                                                               
final Deserializer<V> valueDeserializer,
-                                                                              
final Class innerClass,
+                                                                              
final Class<?> innerClass,
                                                                               
final int numMessages,
                                                                               
final TestInfo testInfo) throws Exception {
         final String safeTestName = safeUniqueTestName(testInfo);
@@ -1117,7 +1108,7 @@ public class KStreamAggregationIntegrationTest {
 
     private <K, V> String readWindowedKeyedMessagesViaConsoleConsumer(final 
Deserializer<K> keyDeserializer,
                                                                       final 
Deserializer<V> valueDeserializer,
-                                                                      final 
Class innerClass,
+                                                                      final 
Class<?> innerClass,
                                                                       final 
int numMessages,
                                                                       final 
boolean printTimestamp) throws Exception {
         final ByteArrayOutputStream newConsole = new ByteArrayOutputStream();

Reply via email to