This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 08b3d9f37b0 MINOR: update KStreamAggregationIntegrationTest (#16699)
08b3d9f37b0 is described below
commit 08b3d9f37b0682078ac1d273497fb88cebfc9528
Author: Matthias J. Sax <[email protected]>
AuthorDate: Wed Jul 31 11:27:09 2024 -0700
MINOR: update KStreamAggregationIntegrationTest (#16699)
Refactor test to move off deprecated `transform()` in favor of
`process()`.
Reviewers: Bill Bejeck <[email protected]>
---
.../KStreamAggregationIntegrationTest.java | 781 ++++++++++-----------
1 file changed, 386 insertions(+), 395 deletions(-)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index a9b63f66d29..d4a53f1b222 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -48,14 +48,13 @@ import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.SlidingWindows;
import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
import org.apache.kafka.streams.kstream.TimeWindows;
-import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.UnlimitedWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.WindowedSerdes;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.kstream.internals.UnlimitedWindow;
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlySessionStore;
@@ -74,7 +73,6 @@ import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import java.io.ByteArrayOutputStream;
-import java.io.IOException;
import java.io.PrintStream;
import java.time.Duration;
import java.util.Arrays;
@@ -99,7 +97,6 @@ import static org.hamcrest.core.Is.is;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
-@SuppressWarnings({"unchecked", "deprecation"})
@Timeout(600)
@Tag("integration")
public class KStreamAggregationIntegrationTest {
@@ -108,7 +105,7 @@ public class KStreamAggregationIntegrationTest {
public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(NUM_BROKERS);
@BeforeAll
- public static void startCluster() throws IOException {
+ public static void startCluster() throws Exception {
CLUSTER.start();
}
@@ -132,7 +129,7 @@ public class KStreamAggregationIntegrationTest {
private KStream<Integer, String> stream;
@BeforeEach
- public void before(final TestInfo testInfo) throws InterruptedException {
+ public void before(final TestInfo testInfo) throws Exception {
builder = new StreamsBuilder();
final String safeTestName = safeUniqueTestName(testInfo);
createTopics(safeTestName);
@@ -143,8 +140,8 @@ public class KStreamAggregationIntegrationTest {
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory().getPath());
streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
100L);
- streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
-
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.Integer().getClass());
+ streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class);
+
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.IntegerSerde.class);
final KeyValueMapper<Integer, String, String> mapper =
MockMapper.selectValueMapper();
stream = builder.stream(streamOneInput,
Consumed.with(Serdes.Integer(), Serdes.String()));
@@ -156,7 +153,7 @@ public class KStreamAggregationIntegrationTest {
}
@AfterEach
- public void whenShuttingDown() throws IOException {
+ public void whenShuttingDown() throws Exception {
if (kafkaStreams != null) {
kafkaStreams.close();
}
@@ -183,21 +180,25 @@ public class KStreamAggregationIntegrationTest {
results.sort(KStreamAggregationIntegrationTest::compare);
- assertThat(results, is(Arrays.asList(
- new KeyValueTimestamp("A", "A", mockTime.milliseconds()),
- new KeyValueTimestamp("A", "A:A", mockTime.milliseconds()),
- new KeyValueTimestamp("B", "B", mockTime.milliseconds()),
- new KeyValueTimestamp("B", "B:B", mockTime.milliseconds()),
- new KeyValueTimestamp("C", "C", mockTime.milliseconds()),
- new KeyValueTimestamp("C", "C:C", mockTime.milliseconds()),
- new KeyValueTimestamp("D", "D", mockTime.milliseconds()),
- new KeyValueTimestamp("D", "D:D", mockTime.milliseconds()),
- new KeyValueTimestamp("E", "E", mockTime.milliseconds()),
- new KeyValueTimestamp("E", "E:E", mockTime.milliseconds()))));
+ assertThat(
+ results,
+ is(Arrays.asList(
+ new KeyValueTimestamp<>("A", "A", mockTime.milliseconds()),
+ new KeyValueTimestamp<>("A", "A:A", mockTime.milliseconds()),
+ new KeyValueTimestamp<>("B", "B", mockTime.milliseconds()),
+ new KeyValueTimestamp<>("B", "B:B", mockTime.milliseconds()),
+ new KeyValueTimestamp<>("C", "C", mockTime.milliseconds()),
+ new KeyValueTimestamp<>("C", "C:C", mockTime.milliseconds()),
+ new KeyValueTimestamp<>("D", "D", mockTime.milliseconds()),
+ new KeyValueTimestamp<>("D", "D:D", mockTime.milliseconds()),
+ new KeyValueTimestamp<>("E", "E", mockTime.milliseconds()),
+ new KeyValueTimestamp<>("E", "E:E", mockTime.milliseconds())
+ ))
+ );
}
- private static <K extends Comparable, V extends Comparable> int
compare(final KeyValueTimestamp<K, V> o1,
-
final KeyValueTimestamp<K, V> o2) {
+ private static <K extends Comparable<K>, V extends Comparable<V>> int
compare(final KeyValueTimestamp<K, V> o1,
+
final KeyValueTimestamp<K, V> o2) {
final int keyComparison = o1.key().compareTo(o2.key());
if (keyComparison == 0) {
final int valueComparison = o1.value().compareTo(o2.value());
@@ -220,12 +221,11 @@ public class KStreamAggregationIntegrationTest {
produceMessages(secondBatchTimestamp);
final Serde<Windowed<String>> windowedSerde =
WindowedSerdes.timeWindowedSerdeFrom(String.class, 500L);
- //noinspection deprecation
groupedStream
- .windowedBy(TimeWindows.of(ofMillis(500L)))
- .reduce(reducer)
- .toStream()
- .to(outputTopic, Produced.with(windowedSerde,
Serdes.String()));
+ .windowedBy(TimeWindows.of(ofMillis(500L)))
+ .reduce(reducer)
+ .toStream()
+ .to(outputTopic, Produced.with(windowedSerde, Serdes.String()));
startStreams();
@@ -234,7 +234,8 @@ public class KStreamAggregationIntegrationTest {
new StringDeserializer(),
String.class,
15,
- testInfo);
+ testInfo
+ );
// read from ConsoleConsumer
final String resultFromConsoleConsumer =
readWindowedKeyedMessagesViaConsoleConsumer(
@@ -242,7 +243,8 @@ public class KStreamAggregationIntegrationTest {
new StringDeserializer(),
String.class,
15,
- true);
+ true
+ );
final Comparator<KeyValueTimestamp<Windowed<String>, String>>
comparator =
Comparator.comparing((KeyValueTimestamp<Windowed<String>, String>
o) -> o.key().key())
@@ -255,21 +257,21 @@ public class KStreamAggregationIntegrationTest {
final long secondBatchWindowEnd = secondBatchWindowStart + 500;
final List<KeyValueTimestamp<Windowed<String>, String>> expectResult =
Arrays.asList(
- new KeyValueTimestamp<>(new Windowed<>("A", new
TimeWindow(firstBatchWindowStart, firstBatchWindowEnd)), "A",
firstBatchTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("A", new
TimeWindow(secondBatchWindowStart, secondBatchWindowEnd)), "A",
secondBatchTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("A", new
TimeWindow(secondBatchWindowStart, secondBatchWindowEnd)), "A:A",
secondBatchTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("B", new
TimeWindow(firstBatchWindowStart, firstBatchWindowEnd)), "B",
firstBatchTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("B", new
TimeWindow(secondBatchWindowStart, secondBatchWindowEnd)), "B",
secondBatchTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("B", new
TimeWindow(secondBatchWindowStart, secondBatchWindowEnd)), "B:B",
secondBatchTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("C", new
TimeWindow(firstBatchWindowStart, firstBatchWindowEnd)), "C",
firstBatchTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("C", new
TimeWindow(secondBatchWindowStart, secondBatchWindowEnd)), "C",
secondBatchTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("C", new
TimeWindow(secondBatchWindowStart, secondBatchWindowEnd)), "C:C",
secondBatchTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("D", new
TimeWindow(firstBatchWindowStart, firstBatchWindowEnd)), "D",
firstBatchTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("D", new
TimeWindow(secondBatchWindowStart, secondBatchWindowEnd)), "D",
secondBatchTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("D", new
TimeWindow(secondBatchWindowStart, secondBatchWindowEnd)), "D:D",
secondBatchTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("E", new
TimeWindow(firstBatchWindowStart, firstBatchWindowEnd)), "E",
firstBatchTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("E", new
TimeWindow(secondBatchWindowStart, secondBatchWindowEnd)), "E",
secondBatchTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("E", new
TimeWindow(secondBatchWindowStart, secondBatchWindowEnd)), "E:E",
secondBatchTimestamp)
+ new KeyValueTimestamp<>(new Windowed<>("A", new
TimeWindow(firstBatchWindowStart, firstBatchWindowEnd)), "A",
firstBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("A", new
TimeWindow(secondBatchWindowStart, secondBatchWindowEnd)), "A",
secondBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("A", new
TimeWindow(secondBatchWindowStart, secondBatchWindowEnd)), "A:A",
secondBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("B", new
TimeWindow(firstBatchWindowStart, firstBatchWindowEnd)), "B",
firstBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("B", new
TimeWindow(secondBatchWindowStart, secondBatchWindowEnd)), "B",
secondBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("B", new
TimeWindow(secondBatchWindowStart, secondBatchWindowEnd)), "B:B",
secondBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("C", new
TimeWindow(firstBatchWindowStart, firstBatchWindowEnd)), "C",
firstBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("C", new
TimeWindow(secondBatchWindowStart, secondBatchWindowEnd)), "C",
secondBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("C", new
TimeWindow(secondBatchWindowStart, secondBatchWindowEnd)), "C:C",
secondBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("D", new
TimeWindow(firstBatchWindowStart, firstBatchWindowEnd)), "D",
firstBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("D", new
TimeWindow(secondBatchWindowStart, secondBatchWindowEnd)), "D",
secondBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("D", new
TimeWindow(secondBatchWindowStart, secondBatchWindowEnd)), "D:D",
secondBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("E", new
TimeWindow(firstBatchWindowStart, firstBatchWindowEnd)), "E",
firstBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("E", new
TimeWindow(secondBatchWindowStart, secondBatchWindowEnd)), "E",
secondBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("E", new
TimeWindow(secondBatchWindowStart, secondBatchWindowEnd)), "E:E",
secondBatchTimestamp)
);
assertThat(windowedOutput, is(expectResult));
@@ -304,22 +306,26 @@ public class KStreamAggregationIntegrationTest {
new StringDeserializer(),
new IntegerDeserializer(),
10,
- testInfo);
+ testInfo
+ );
results.sort(KStreamAggregationIntegrationTest::compare);
- assertThat(results, is(Arrays.asList(
- new KeyValueTimestamp("A", 1, mockTime.milliseconds()),
- new KeyValueTimestamp("A", 2, mockTime.milliseconds()),
- new KeyValueTimestamp("B", 1, mockTime.milliseconds()),
- new KeyValueTimestamp("B", 2, mockTime.milliseconds()),
- new KeyValueTimestamp("C", 1, mockTime.milliseconds()),
- new KeyValueTimestamp("C", 2, mockTime.milliseconds()),
- new KeyValueTimestamp("D", 1, mockTime.milliseconds()),
- new KeyValueTimestamp("D", 2, mockTime.milliseconds()),
- new KeyValueTimestamp("E", 1, mockTime.milliseconds()),
- new KeyValueTimestamp("E", 2, mockTime.milliseconds())
- )));
+ assertThat(
+ results,
+ is(Arrays.asList(
+ new KeyValueTimestamp<>("A", 1, mockTime.milliseconds()),
+ new KeyValueTimestamp<>("A", 2, mockTime.milliseconds()),
+ new KeyValueTimestamp<>("B", 1, mockTime.milliseconds()),
+ new KeyValueTimestamp<>("B", 2, mockTime.milliseconds()),
+ new KeyValueTimestamp<>("C", 1, mockTime.milliseconds()),
+ new KeyValueTimestamp<>("C", 2, mockTime.milliseconds()),
+ new KeyValueTimestamp<>("D", 1, mockTime.milliseconds()),
+ new KeyValueTimestamp<>("D", 2, mockTime.milliseconds()),
+ new KeyValueTimestamp<>("E", 1, mockTime.milliseconds()),
+ new KeyValueTimestamp<>("E", 2, mockTime.milliseconds())
+ ))
+ );
}
@SuppressWarnings("deprecation")
@@ -333,15 +339,14 @@ public class KStreamAggregationIntegrationTest {
produceMessages(secondTimestamp);
final Serde<Windowed<String>> windowedSerde =
WindowedSerdes.timeWindowedSerdeFrom(String.class, 500L);
- //noinspection deprecation
groupedStream.windowedBy(TimeWindows.of(ofMillis(500L)))
- .aggregate(
- initializer,
- aggregator,
- Materialized.with(null, Serdes.Integer())
- )
- .toStream()
- .to(outputTopic, Produced.with(windowedSerde,
Serdes.Integer()));
+ .aggregate(
+ initializer,
+ aggregator,
+ Materialized.with(null, Serdes.Integer())
+ )
+ .toStream()
+ .to(outputTopic, Produced.with(windowedSerde, Serdes.Integer()));
startStreams();
@@ -371,21 +376,22 @@ public class KStreamAggregationIntegrationTest {
final long secondWindowEnd = secondWindowStart + 500;
final List<KeyValueTimestamp<Windowed<String>, Integer>> expectResult
= Arrays.asList(
- new KeyValueTimestamp<>(new Windowed<>("A", new
TimeWindow(firstWindowStart, firstWindowEnd)), 1, firstTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("A", new
TimeWindow(secondWindowStart, secondWindowEnd)), 1, secondTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("A", new
TimeWindow(secondWindowStart, secondWindowEnd)), 2, secondTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("B", new
TimeWindow(firstWindowStart, firstWindowEnd)), 1, firstTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("B", new
TimeWindow(secondWindowStart, secondWindowEnd)), 1, secondTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("B", new
TimeWindow(secondWindowStart, secondWindowEnd)), 2, secondTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("C", new
TimeWindow(firstWindowStart, firstWindowEnd)), 1, firstTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("C", new
TimeWindow(secondWindowStart, secondWindowEnd)), 1, secondTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("C", new
TimeWindow(secondWindowStart, secondWindowEnd)), 2, secondTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("D", new
TimeWindow(firstWindowStart, firstWindowEnd)), 1, firstTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("D", new
TimeWindow(secondWindowStart, secondWindowEnd)), 1, secondTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("D", new
TimeWindow(secondWindowStart, secondWindowEnd)), 2, secondTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("E", new
TimeWindow(firstWindowStart, firstWindowEnd)), 1, firstTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("E", new
TimeWindow(secondWindowStart, secondWindowEnd)), 1, secondTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("E", new
TimeWindow(secondWindowStart, secondWindowEnd)), 2, secondTimestamp));
+ new KeyValueTimestamp<>(new Windowed<>("A", new
TimeWindow(firstWindowStart, firstWindowEnd)), 1, firstTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("A", new
TimeWindow(secondWindowStart, secondWindowEnd)), 1, secondTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("A", new
TimeWindow(secondWindowStart, secondWindowEnd)), 2, secondTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("B", new
TimeWindow(firstWindowStart, firstWindowEnd)), 1, firstTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("B", new
TimeWindow(secondWindowStart, secondWindowEnd)), 1, secondTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("B", new
TimeWindow(secondWindowStart, secondWindowEnd)), 2, secondTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("C", new
TimeWindow(firstWindowStart, firstWindowEnd)), 1, firstTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("C", new
TimeWindow(secondWindowStart, secondWindowEnd)), 1, secondTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("C", new
TimeWindow(secondWindowStart, secondWindowEnd)), 2, secondTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("D", new
TimeWindow(firstWindowStart, firstWindowEnd)), 1, firstTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("D", new
TimeWindow(secondWindowStart, secondWindowEnd)), 1, secondTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("D", new
TimeWindow(secondWindowStart, secondWindowEnd)), 2, secondTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("E", new
TimeWindow(firstWindowStart, firstWindowEnd)), 1, firstTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("E", new
TimeWindow(secondWindowStart, secondWindowEnd)), 1, secondTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("E", new
TimeWindow(secondWindowStart, secondWindowEnd)), 2, secondTimestamp)
+ );
assertThat(windowedMessages, is(expectResult));
@@ -399,7 +405,6 @@ public class KStreamAggregationIntegrationTest {
for (final String record: allRecords) {
assertTrue(expectResultString.contains(record));
}
-
}
private void shouldCountHelper(final TestInfo testInfo) throws Exception {
@@ -411,21 +416,25 @@ public class KStreamAggregationIntegrationTest {
new StringDeserializer(),
new LongDeserializer(),
10,
- testInfo);
+ testInfo
+ );
results.sort(KStreamAggregationIntegrationTest::compare);
- assertThat(results, is(Arrays.asList(
- new KeyValueTimestamp("A", 1L, mockTime.milliseconds()),
- new KeyValueTimestamp("A", 2L, mockTime.milliseconds()),
- new KeyValueTimestamp("B", 1L, mockTime.milliseconds()),
- new KeyValueTimestamp("B", 2L, mockTime.milliseconds()),
- new KeyValueTimestamp("C", 1L, mockTime.milliseconds()),
- new KeyValueTimestamp("C", 2L, mockTime.milliseconds()),
- new KeyValueTimestamp("D", 1L, mockTime.milliseconds()),
- new KeyValueTimestamp("D", 2L, mockTime.milliseconds()),
- new KeyValueTimestamp("E", 1L, mockTime.milliseconds()),
- new KeyValueTimestamp("E", 2L, mockTime.milliseconds())
- )));
+ assertThat(
+ results,
+ is(Arrays.asList(
+ new KeyValueTimestamp<>("A", 1L, mockTime.milliseconds()),
+ new KeyValueTimestamp<>("A", 2L, mockTime.milliseconds()),
+ new KeyValueTimestamp<>("B", 1L, mockTime.milliseconds()),
+ new KeyValueTimestamp<>("B", 2L, mockTime.milliseconds()),
+ new KeyValueTimestamp<>("C", 1L, mockTime.milliseconds()),
+ new KeyValueTimestamp<>("C", 2L, mockTime.milliseconds()),
+ new KeyValueTimestamp<>("D", 1L, mockTime.milliseconds()),
+ new KeyValueTimestamp<>("D", 2L, mockTime.milliseconds()),
+ new KeyValueTimestamp<>("E", 1L, mockTime.milliseconds()),
+ new KeyValueTimestamp<>("E", 2L, mockTime.milliseconds())
+ ))
+ );
}
@Test
@@ -433,8 +442,8 @@ public class KStreamAggregationIntegrationTest {
produceMessages(mockTime.milliseconds());
groupedStream.count(Materialized.as("count-by-key"))
- .toStream()
- .to(outputTopic, Produced.with(Serdes.String(),
Serdes.Long()));
+ .toStream()
+ .to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
shouldCountHelper(testInfo);
}
@@ -444,8 +453,8 @@ public class KStreamAggregationIntegrationTest {
produceMessages(mockTime.milliseconds());
groupedStream.count()
- .toStream()
- .to(outputTopic, Produced.with(Serdes.String(),
Serdes.Long()));
+ .toStream()
+ .to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
shouldCountHelper(testInfo);
}
@@ -457,11 +466,10 @@ public class KStreamAggregationIntegrationTest {
produceMessages(timestamp);
produceMessages(timestamp);
- //noinspection deprecation
stream.groupByKey(Grouped.with(Serdes.Integer(), Serdes.String()))
- .windowedBy(TimeWindows.of(ofMillis(500L)))
- .count()
- .toStream((windowedKey, value) -> windowedKey.key() + "@" +
windowedKey.window().start()).to(outputTopic, Produced.with(Serdes.String(),
Serdes.Long()));
+ .windowedBy(TimeWindows.of(ofMillis(500L)))
+ .count()
+ .toStream((windowedKey, value) -> windowedKey.key() + "@" +
windowedKey.window().start()).to(outputTopic, Produced.with(Serdes.String(),
Serdes.Long()));
startStreams();
@@ -469,22 +477,26 @@ public class KStreamAggregationIntegrationTest {
new StringDeserializer(),
new LongDeserializer(),
10,
- testInfo);
+ testInfo
+ );
results.sort(KStreamAggregationIntegrationTest::compare);
final long window = timestamp / 500 * 500;
- assertThat(results, is(Arrays.asList(
- new KeyValueTimestamp("1@" + window, 1L, timestamp),
- new KeyValueTimestamp("1@" + window, 2L, timestamp),
- new KeyValueTimestamp("2@" + window, 1L, timestamp),
- new KeyValueTimestamp("2@" + window, 2L, timestamp),
- new KeyValueTimestamp("3@" + window, 1L, timestamp),
- new KeyValueTimestamp("3@" + window, 2L, timestamp),
- new KeyValueTimestamp("4@" + window, 1L, timestamp),
- new KeyValueTimestamp("4@" + window, 2L, timestamp),
- new KeyValueTimestamp("5@" + window, 1L, timestamp),
- new KeyValueTimestamp("5@" + window, 2L, timestamp)
- )));
+ assertThat(
+ results,
+ is(Arrays.asList(
+ new KeyValueTimestamp<>("1@" + window, 1L, timestamp),
+ new KeyValueTimestamp<>("1@" + window, 2L, timestamp),
+ new KeyValueTimestamp<>("2@" + window, 1L, timestamp),
+ new KeyValueTimestamp<>("2@" + window, 2L, timestamp),
+ new KeyValueTimestamp<>("3@" + window, 1L, timestamp),
+ new KeyValueTimestamp<>("3@" + window, 2L, timestamp),
+ new KeyValueTimestamp<>("4@" + window, 1L, timestamp),
+ new KeyValueTimestamp<>("4@" + window, 2L, timestamp),
+ new KeyValueTimestamp<>("5@" + window, 1L, timestamp),
+ new KeyValueTimestamp<>("5@" + window, 2L, timestamp)
+ ))
+ );
}
@SuppressWarnings("deprecation")
@@ -499,32 +511,33 @@ public class KStreamAggregationIntegrationTest {
produceMessages(thirdBatchTimestamp);
final Serde<Windowed<String>> windowedSerde =
WindowedSerdes.timeWindowedSerdeFrom(String.class, timeDifference);
- //noinspection deprecation
groupedStream
-
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(timeDifference),
ofMillis(2000L)))
- .reduce(reducer)
- .toStream()
- .to(outputTopic, Produced.with(windowedSerde,
Serdes.String()));
+
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(timeDifference),
ofMillis(2000L)))
+ .reduce(reducer)
+ .toStream()
+ .to(outputTopic, Produced.with(windowedSerde, Serdes.String()));
startStreams();
final List<KeyValueTimestamp<Windowed<String>, String>> windowedOutput
= receiveMessages(
- new TimeWindowedDeserializer<>(new StringDeserializer(), 500L),
- new StringDeserializer(),
- String.class,
- 30,
- testInfo);
+ new TimeWindowedDeserializer<>(new StringDeserializer(), 500L),
+ new StringDeserializer(),
+ String.class,
+ 30,
+ testInfo
+ );
final String resultFromConsoleConsumer =
readWindowedKeyedMessagesViaConsoleConsumer(
- new TimeWindowedDeserializer<String>(),
- new StringDeserializer(),
- String.class,
- 30,
- true);
+ new TimeWindowedDeserializer<String>(),
+ new StringDeserializer(),
+ String.class,
+ 30,
+ true
+ );
final Comparator<KeyValueTimestamp<Windowed<String>, String>>
comparator =
- Comparator.comparing((KeyValueTimestamp<Windowed<String>,
String> o) -> o.key().key())
- .thenComparing(KeyValueTimestamp::value);
+ Comparator.comparing((KeyValueTimestamp<Windowed<String>, String>
o) -> o.key().key())
+ .thenComparing(KeyValueTimestamp::value);
windowedOutput.sort(comparator);
final long firstBatchLeftWindowStart = firstBatchTimestamp -
timeDifference;
@@ -541,43 +554,43 @@ public class KStreamAggregationIntegrationTest {
final long thirdBatchLeftWindowEnd = thirdBatchLeftWindowStart +
timeDifference;
final List<KeyValueTimestamp<Windowed<String>, String>> expectResult =
Arrays.asList(
- // A @ firstBatchTimestamp left window created when A @
firstBatchTimestamp processed
- new KeyValueTimestamp<>(new Windowed<>("A", new
TimeWindow(firstBatchLeftWindowStart, firstBatchLeftWindowEnd)), "A",
firstBatchTimestamp),
- // A @ firstBatchTimestamp right window created when A @
secondBatchTimestamp processed
- new KeyValueTimestamp<>(new Windowed<>("A", new
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), "A",
secondBatchTimestamp),
- // A @ secondBatchTimestamp right window created when A @
thirdBatchTimestamp processed
- new KeyValueTimestamp<>(new Windowed<>("A", new
TimeWindow(secondBatchRightWindowStart, secondBatchRightWindowEnd)), "A",
thirdBatchTimestamp),
- // A @ secondBatchTimestamp left window created when A @
secondBatchTimestamp processed
- new KeyValueTimestamp<>(new Windowed<>("A", new
TimeWindow(secondBatchLeftWindowStart, secondBatchLeftWindowEnd)), "A:A",
secondBatchTimestamp),
- // A @ firstBatchTimestamp right window updated when A @
thirdBatchTimestamp processed
- new KeyValueTimestamp<>(new Windowed<>("A", new
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), "A:A",
thirdBatchTimestamp),
- // A @ thirdBatchTimestamp left window created when A @
thirdBatchTimestamp processed
- new KeyValueTimestamp<>(new Windowed<>("A", new
TimeWindow(thirdBatchLeftWindowStart, thirdBatchLeftWindowEnd)), "A:A:A",
thirdBatchTimestamp),
-
- new KeyValueTimestamp<>(new Windowed<>("B", new
TimeWindow(firstBatchLeftWindowStart, firstBatchLeftWindowEnd)), "B",
firstBatchTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("B", new
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), "B",
secondBatchTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("B", new
TimeWindow(secondBatchRightWindowStart, secondBatchRightWindowEnd)), "B",
thirdBatchTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("B", new
TimeWindow(secondBatchLeftWindowStart, secondBatchLeftWindowEnd)), "B:B",
secondBatchTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("B", new
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), "B:B",
thirdBatchTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("B", new
TimeWindow(thirdBatchLeftWindowStart, thirdBatchLeftWindowEnd)), "B:B:B",
thirdBatchTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("C", new
TimeWindow(firstBatchLeftWindowStart, firstBatchLeftWindowEnd)), "C",
firstBatchTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("C", new
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), "C",
secondBatchTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("C", new
TimeWindow(secondBatchRightWindowStart, secondBatchRightWindowEnd)), "C",
thirdBatchTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("C", new
TimeWindow(secondBatchLeftWindowStart, secondBatchLeftWindowEnd)), "C:C",
secondBatchTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("C", new
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), "C:C",
thirdBatchTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("C", new
TimeWindow(thirdBatchLeftWindowStart, thirdBatchLeftWindowEnd)), "C:C:C",
thirdBatchTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("D", new
TimeWindow(firstBatchLeftWindowStart, firstBatchLeftWindowEnd)), "D",
firstBatchTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("D", new
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), "D",
secondBatchTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("D", new
TimeWindow(secondBatchRightWindowStart, secondBatchRightWindowEnd)), "D",
thirdBatchTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("D", new
TimeWindow(secondBatchLeftWindowStart, secondBatchLeftWindowEnd)), "D:D",
secondBatchTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("D", new
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), "D:D",
thirdBatchTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("D", new
TimeWindow(thirdBatchLeftWindowStart, thirdBatchLeftWindowEnd)), "D:D:D",
thirdBatchTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("E", new
TimeWindow(firstBatchLeftWindowStart, firstBatchLeftWindowEnd)), "E",
firstBatchTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("E", new
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), "E",
secondBatchTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("E", new
TimeWindow(secondBatchRightWindowStart, secondBatchRightWindowEnd)), "E",
thirdBatchTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("E", new
TimeWindow(secondBatchLeftWindowStart, secondBatchLeftWindowEnd)), "E:E",
secondBatchTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("E", new
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), "E:E",
thirdBatchTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("E", new
TimeWindow(thirdBatchLeftWindowStart, thirdBatchLeftWindowEnd)), "E:E:E",
thirdBatchTimestamp)
+ // A @ firstBatchTimestamp left window created when A @
firstBatchTimestamp processed
+ new KeyValueTimestamp<>(new Windowed<>("A", new
TimeWindow(firstBatchLeftWindowStart, firstBatchLeftWindowEnd)), "A",
firstBatchTimestamp),
+ // A @ firstBatchTimestamp right window created when A @
secondBatchTimestamp processed
+ new KeyValueTimestamp<>(new Windowed<>("A", new
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), "A",
secondBatchTimestamp),
+ // A @ secondBatchTimestamp right window created when A @
thirdBatchTimestamp processed
+ new KeyValueTimestamp<>(new Windowed<>("A", new
TimeWindow(secondBatchRightWindowStart, secondBatchRightWindowEnd)), "A",
thirdBatchTimestamp),
+ // A @ secondBatchTimestamp left window created when A @
secondBatchTimestamp processed
+ new KeyValueTimestamp<>(new Windowed<>("A", new
TimeWindow(secondBatchLeftWindowStart, secondBatchLeftWindowEnd)), "A:A",
secondBatchTimestamp),
+ // A @ firstBatchTimestamp right window updated when A @
thirdBatchTimestamp processed
+ new KeyValueTimestamp<>(new Windowed<>("A", new
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), "A:A",
thirdBatchTimestamp),
+ // A @ thirdBatchTimestamp left window created when A @
thirdBatchTimestamp processed
+ new KeyValueTimestamp<>(new Windowed<>("A", new
TimeWindow(thirdBatchLeftWindowStart, thirdBatchLeftWindowEnd)), "A:A:A",
thirdBatchTimestamp),
+
+ new KeyValueTimestamp<>(new Windowed<>("B", new
TimeWindow(firstBatchLeftWindowStart, firstBatchLeftWindowEnd)), "B",
firstBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("B", new
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), "B",
secondBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("B", new
TimeWindow(secondBatchRightWindowStart, secondBatchRightWindowEnd)), "B",
thirdBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("B", new
TimeWindow(secondBatchLeftWindowStart, secondBatchLeftWindowEnd)), "B:B",
secondBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("B", new
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), "B:B",
thirdBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("B", new
TimeWindow(thirdBatchLeftWindowStart, thirdBatchLeftWindowEnd)), "B:B:B",
thirdBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("C", new
TimeWindow(firstBatchLeftWindowStart, firstBatchLeftWindowEnd)), "C",
firstBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("C", new
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), "C",
secondBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("C", new
TimeWindow(secondBatchRightWindowStart, secondBatchRightWindowEnd)), "C",
thirdBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("C", new
TimeWindow(secondBatchLeftWindowStart, secondBatchLeftWindowEnd)), "C:C",
secondBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("C", new
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), "C:C",
thirdBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("C", new
TimeWindow(thirdBatchLeftWindowStart, thirdBatchLeftWindowEnd)), "C:C:C",
thirdBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("D", new
TimeWindow(firstBatchLeftWindowStart, firstBatchLeftWindowEnd)), "D",
firstBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("D", new
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), "D",
secondBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("D", new
TimeWindow(secondBatchRightWindowStart, secondBatchRightWindowEnd)), "D",
thirdBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("D", new
TimeWindow(secondBatchLeftWindowStart, secondBatchLeftWindowEnd)), "D:D",
secondBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("D", new
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), "D:D",
thirdBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("D", new
TimeWindow(thirdBatchLeftWindowStart, thirdBatchLeftWindowEnd)), "D:D:D",
thirdBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("E", new
TimeWindow(firstBatchLeftWindowStart, firstBatchLeftWindowEnd)), "E",
firstBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("E", new
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), "E",
secondBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("E", new
TimeWindow(secondBatchRightWindowStart, secondBatchRightWindowEnd)), "E",
thirdBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("E", new
TimeWindow(secondBatchLeftWindowStart, secondBatchLeftWindowEnd)), "E:E",
secondBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("E", new
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), "E:E",
thirdBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("E", new
TimeWindow(thirdBatchLeftWindowStart, thirdBatchLeftWindowEnd)), "E:E:E",
thirdBatchTimestamp)
);
assertThat(windowedOutput, is(expectResult));
@@ -608,34 +621,36 @@ public class KStreamAggregationIntegrationTest {
final Serde<Windowed<String>> windowedSerde =
WindowedSerdes.timeWindowedSerdeFrom(String.class, timeDifference);
//noinspection deprecation
groupedStream.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(500L),
ofMinutes(5)))
- .aggregate(
- initializer,
- aggregator,
- Materialized.with(null, Serdes.Integer())
- )
- .toStream()
- .to(outputTopic, Produced.with(windowedSerde,
Serdes.Integer()));
+ .aggregate(
+ initializer,
+ aggregator,
+ Materialized.with(null, Serdes.Integer())
+ )
+ .toStream()
+ .to(outputTopic, Produced.with(windowedSerde, Serdes.Integer()));
startStreams();
final List<KeyValueTimestamp<Windowed<String>, Integer>>
windowedMessages = receiveMessagesWithTimestamp(
- new TimeWindowedDeserializer<>(),
- new IntegerDeserializer(),
- String.class,
- 30,
- testInfo);
+ new TimeWindowedDeserializer<>(),
+ new IntegerDeserializer(),
+ String.class,
+ 30,
+ testInfo
+ );
// read from ConsoleConsumer
final String resultFromConsoleConsumer =
readWindowedKeyedMessagesViaConsoleConsumer(
- new TimeWindowedDeserializer<String>(),
- new IntegerDeserializer(),
- String.class,
- 30,
- true);
+ new TimeWindowedDeserializer<String>(),
+ new IntegerDeserializer(),
+ String.class,
+ 30,
+ true
+ );
final Comparator<KeyValueTimestamp<Windowed<String>, Integer>>
comparator =
- Comparator.comparing((KeyValueTimestamp<Windowed<String>,
Integer> o) -> o.key().key())
- .thenComparingInt(KeyValueTimestamp::value);
+ Comparator.comparing((KeyValueTimestamp<Windowed<String>, Integer>
o) -> o.key().key())
+ .thenComparingInt(KeyValueTimestamp::value);
windowedMessages.sort(comparator);
final long firstBatchLeftWindowStart = firstBatchTimestamp -
timeDifference;
@@ -652,43 +667,43 @@ public class KStreamAggregationIntegrationTest {
final long thirdBatchLeftWindowEnd = thirdBatchLeftWindowStart +
timeDifference;
final List<KeyValueTimestamp<Windowed<String>, Integer>> expectResult
= Arrays.asList(
- // A @ firstBatchTimestamp left window created when A @
firstBatchTimestamp processed
- new KeyValueTimestamp<>(new Windowed<>("A", new
TimeWindow(firstBatchLeftWindowStart, firstBatchLeftWindowEnd)), 1,
firstBatchTimestamp),
- // A @ firstBatchTimestamp right window created when A @
secondBatchTimestamp processed
- new KeyValueTimestamp<>(new Windowed<>("A", new
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), 1,
secondBatchTimestamp),
- // A @ secondBatchTimestamp right window created when A @
thirdBatchTimestamp processed
- new KeyValueTimestamp<>(new Windowed<>("A", new
TimeWindow(secondBatchRightWindowStart, secondBatchRightWindowEnd)), 1,
thirdBatchTimestamp),
- // A @ secondBatchTimestamp left window created when A @
secondBatchTimestamp processed
- new KeyValueTimestamp<>(new Windowed<>("A", new
TimeWindow(secondBatchLeftWindowStart, secondBatchLeftWindowEnd)), 2,
secondBatchTimestamp),
- // A @ firstBatchTimestamp right window updated when A @
thirdBatchTimestamp processed
- new KeyValueTimestamp<>(new Windowed<>("A", new
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), 2,
thirdBatchTimestamp),
- // A @ thirdBatchTimestamp left window created when A @
thirdBatchTimestamp processed
- new KeyValueTimestamp<>(new Windowed<>("A", new
TimeWindow(thirdBatchLeftWindowStart, thirdBatchLeftWindowEnd)), 3,
thirdBatchTimestamp),
-
- new KeyValueTimestamp<>(new Windowed<>("B", new
TimeWindow(firstBatchLeftWindowStart, firstBatchLeftWindowEnd)), 1,
firstBatchTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("B", new
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), 1,
secondBatchTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("B", new
TimeWindow(secondBatchRightWindowStart, secondBatchRightWindowEnd)), 1,
thirdBatchTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("B", new
TimeWindow(secondBatchLeftWindowStart, secondBatchLeftWindowEnd)), 2,
secondBatchTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("B", new
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), 2,
thirdBatchTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("B", new
TimeWindow(thirdBatchLeftWindowStart, thirdBatchLeftWindowEnd)), 3,
thirdBatchTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("C", new
TimeWindow(firstBatchLeftWindowStart, firstBatchLeftWindowEnd)), 1,
firstBatchTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("C", new
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), 1,
secondBatchTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("C", new
TimeWindow(secondBatchRightWindowStart, secondBatchRightWindowEnd)), 1,
thirdBatchTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("C", new
TimeWindow(secondBatchLeftWindowStart, secondBatchLeftWindowEnd)), 2,
secondBatchTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("C", new
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), 2,
thirdBatchTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("C", new
TimeWindow(thirdBatchLeftWindowStart, thirdBatchLeftWindowEnd)), 3,
thirdBatchTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("D", new
TimeWindow(firstBatchLeftWindowStart, firstBatchLeftWindowEnd)), 1,
firstBatchTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("D", new
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), 1,
secondBatchTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("D", new
TimeWindow(secondBatchRightWindowStart, secondBatchRightWindowEnd)), 1,
thirdBatchTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("D", new
TimeWindow(secondBatchLeftWindowStart, secondBatchLeftWindowEnd)), 2,
secondBatchTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("D", new
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), 2,
thirdBatchTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("D", new
TimeWindow(thirdBatchLeftWindowStart, thirdBatchLeftWindowEnd)), 3,
thirdBatchTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("E", new
TimeWindow(firstBatchLeftWindowStart, firstBatchLeftWindowEnd)), 1,
firstBatchTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("E", new
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), 1,
secondBatchTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("E", new
TimeWindow(secondBatchRightWindowStart, secondBatchRightWindowEnd)), 1,
thirdBatchTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("E", new
TimeWindow(secondBatchLeftWindowStart, secondBatchLeftWindowEnd)), 2,
secondBatchTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("E", new
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), 2,
thirdBatchTimestamp),
- new KeyValueTimestamp<>(new Windowed<>("E", new
TimeWindow(thirdBatchLeftWindowStart, thirdBatchLeftWindowEnd)), 3,
thirdBatchTimestamp)
+ // A @ firstBatchTimestamp left window created when A @
firstBatchTimestamp processed
+ new KeyValueTimestamp<>(new Windowed<>("A", new
TimeWindow(firstBatchLeftWindowStart, firstBatchLeftWindowEnd)), 1,
firstBatchTimestamp),
+ // A @ firstBatchTimestamp right window created when A @
secondBatchTimestamp processed
+ new KeyValueTimestamp<>(new Windowed<>("A", new
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), 1,
secondBatchTimestamp),
+ // A @ secondBatchTimestamp right window created when A @
thirdBatchTimestamp processed
+ new KeyValueTimestamp<>(new Windowed<>("A", new
TimeWindow(secondBatchRightWindowStart, secondBatchRightWindowEnd)), 1,
thirdBatchTimestamp),
+ // A @ secondBatchTimestamp left window created when A @
secondBatchTimestamp processed
+ new KeyValueTimestamp<>(new Windowed<>("A", new
TimeWindow(secondBatchLeftWindowStart, secondBatchLeftWindowEnd)), 2,
secondBatchTimestamp),
+ // A @ firstBatchTimestamp right window updated when A @
thirdBatchTimestamp processed
+ new KeyValueTimestamp<>(new Windowed<>("A", new
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), 2,
thirdBatchTimestamp),
+ // A @ thirdBatchTimestamp left window created when A @
thirdBatchTimestamp processed
+ new KeyValueTimestamp<>(new Windowed<>("A", new
TimeWindow(thirdBatchLeftWindowStart, thirdBatchLeftWindowEnd)), 3,
thirdBatchTimestamp),
+
+ new KeyValueTimestamp<>(new Windowed<>("B", new
TimeWindow(firstBatchLeftWindowStart, firstBatchLeftWindowEnd)), 1,
firstBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("B", new
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), 1,
secondBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("B", new
TimeWindow(secondBatchRightWindowStart, secondBatchRightWindowEnd)), 1,
thirdBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("B", new
TimeWindow(secondBatchLeftWindowStart, secondBatchLeftWindowEnd)), 2,
secondBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("B", new
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), 2,
thirdBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("B", new
TimeWindow(thirdBatchLeftWindowStart, thirdBatchLeftWindowEnd)), 3,
thirdBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("C", new
TimeWindow(firstBatchLeftWindowStart, firstBatchLeftWindowEnd)), 1,
firstBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("C", new
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), 1,
secondBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("C", new
TimeWindow(secondBatchRightWindowStart, secondBatchRightWindowEnd)), 1,
thirdBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("C", new
TimeWindow(secondBatchLeftWindowStart, secondBatchLeftWindowEnd)), 2,
secondBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("C", new
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), 2,
thirdBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("C", new
TimeWindow(thirdBatchLeftWindowStart, thirdBatchLeftWindowEnd)), 3,
thirdBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("D", new
TimeWindow(firstBatchLeftWindowStart, firstBatchLeftWindowEnd)), 1,
firstBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("D", new
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), 1,
secondBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("D", new
TimeWindow(secondBatchRightWindowStart, secondBatchRightWindowEnd)), 1,
thirdBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("D", new
TimeWindow(secondBatchLeftWindowStart, secondBatchLeftWindowEnd)), 2,
secondBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("D", new
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), 2,
thirdBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("D", new
TimeWindow(thirdBatchLeftWindowStart, thirdBatchLeftWindowEnd)), 3,
thirdBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("E", new
TimeWindow(firstBatchLeftWindowStart, firstBatchLeftWindowEnd)), 1,
firstBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("E", new
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), 1,
secondBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("E", new
TimeWindow(secondBatchRightWindowStart, secondBatchRightWindowEnd)), 1,
thirdBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("E", new
TimeWindow(secondBatchLeftWindowStart, secondBatchLeftWindowEnd)), 2,
secondBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("E", new
TimeWindow(firstBatchRightWindowStart, firstBatchRightWindowEnd)), 2,
thirdBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("E", new
TimeWindow(thirdBatchLeftWindowStart, thirdBatchLeftWindowEnd)), 3,
thirdBatchTimestamp)
);
assertThat(windowedMessages, is(expectResult));
@@ -703,66 +718,71 @@ public class KStreamAggregationIntegrationTest {
for (final String record: allRecords) {
assertTrue(expectResultString.contains(record));
}
-
}
@SuppressWarnings("deprecation")
@Test
public void shouldCountSessionWindows() throws Exception {
final long sessionGap = 5 * 60 * 1000L;
- final List<KeyValue<String, String>> t1Messages = Arrays.asList(new
KeyValue<>("bob", "start"),
- new
KeyValue<>("penny", "start"),
- new
KeyValue<>("jo", "pause"),
- new
KeyValue<>("emily", "pause"));
+ final List<KeyValue<String, String>> t1Messages = Arrays.asList(
+ new KeyValue<>("bob", "start"),
+ new KeyValue<>("penny", "start"),
+ new KeyValue<>("jo", "pause"),
+ new KeyValue<>("emily", "pause")
+ );
final long t1 = mockTime.milliseconds() -
TimeUnit.MILLISECONDS.convert(1, TimeUnit.HOURS);
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
- userSessionsStream,
- t1Messages,
- TestUtils.producerConfig(
- CLUSTER.bootstrapServers(),
- StringSerializer.class,
- StringSerializer.class,
- new Properties()),
- t1);
+ userSessionsStream,
+ t1Messages,
+ TestUtils.producerConfig(
+ CLUSTER.bootstrapServers(),
+ StringSerializer.class,
+ StringSerializer.class,
+ new Properties()),
+ t1
+ );
final long t2 = t1 + (sessionGap / 2);
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
- userSessionsStream,
- Collections.singletonList(
- new KeyValue<>("emily", "resume")
- ),
- TestUtils.producerConfig(
- CLUSTER.bootstrapServers(),
- StringSerializer.class,
- StringSerializer.class,
- new Properties()),
- t2);
+ userSessionsStream,
+ Collections.singletonList(
+ new KeyValue<>("emily", "resume")
+ ),
+ TestUtils.producerConfig(
+ CLUSTER.bootstrapServers(),
+ StringSerializer.class,
+ StringSerializer.class,
+ new Properties()),
+ t2
+ );
final long t3 = t1 + sessionGap + 1;
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
- userSessionsStream,
- Arrays.asList(
- new KeyValue<>("bob", "pause"),
- new KeyValue<>("penny", "stop")
- ),
- TestUtils.producerConfig(
- CLUSTER.bootstrapServers(),
- StringSerializer.class,
- StringSerializer.class,
- new Properties()),
- t3);
+ userSessionsStream,
+ Arrays.asList(
+ new KeyValue<>("bob", "pause"),
+ new KeyValue<>("penny", "stop")
+ ),
+ TestUtils.producerConfig(
+ CLUSTER.bootstrapServers(),
+ StringSerializer.class,
+ StringSerializer.class,
+ new Properties()),
+ t3
+ );
final long t4 = t3 + (sessionGap / 2);
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
- userSessionsStream,
- Arrays.asList(
- new KeyValue<>("bob", "resume"), // bobs session
continues
- new KeyValue<>("jo", "resume") // jo's starts new
session
- ),
- TestUtils.producerConfig(
- CLUSTER.bootstrapServers(),
- StringSerializer.class,
- StringSerializer.class,
- new Properties()),
- t4);
+ userSessionsStream,
+ Arrays.asList(
+ new KeyValue<>("bob", "resume"), // bobs session continues
+ new KeyValue<>("jo", "resume") // jo's starts new session
+ ),
+ TestUtils.producerConfig(
+ CLUSTER.bootstrapServers(),
+ StringSerializer.class,
+ StringSerializer.class,
+ new Properties()),
+ t4
+ );
final long t5 = t4 - 1;
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
userSessionsStream,
@@ -774,35 +794,22 @@ public class KStreamAggregationIntegrationTest {
StringSerializer.class,
StringSerializer.class,
new Properties()),
- t5);
+ t5
+ );
final Map<Windowed<String>, KeyValue<Long, Long>> results = new
HashMap<>();
final CountDownLatch latch = new CountDownLatch(13);
//noinspection deprecation
builder.stream(userSessionsStream, Consumed.with(Serdes.String(),
Serdes.String()))
- .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
- .windowedBy(SessionWindows.with(ofMillis(sessionGap)))
- .count()
- .toStream()
- .transform(() -> new Transformer<Windowed<String>, Long,
KeyValue<Object, Object>>() {
- private ProcessorContext context;
-
- @Override
- public void init(final ProcessorContext context) {
- this.context = context;
- }
-
- @Override
- public KeyValue<Object, Object> transform(final
Windowed<String> key, final Long value) {
- results.put(key, KeyValue.pair(value,
context.timestamp()));
- latch.countDown();
- return null;
- }
-
- @Override
- public void close() {}
- });
+ .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+ .windowedBy(SessionWindows.with(ofMillis(sessionGap)))
+ .count()
+ .toStream()
+ .process(() -> (Processor<Windowed<String>, Long, Object, Object>)
record -> {
+ results.put(record.key(), KeyValue.pair(record.value(),
record.timestamp()));
+ latch.countDown();
+ });
startStreams();
latch.await(30, TimeUnit.SECONDS);
@@ -820,59 +827,65 @@ public class KStreamAggregationIntegrationTest {
@Test
public void shouldReduceSessionWindows() throws Exception {
final long sessionGap = 1000L; // something to do with time
- final List<KeyValue<String, String>> t1Messages = Arrays.asList(new
KeyValue<>("bob", "start"),
- new
KeyValue<>("penny", "start"),
- new
KeyValue<>("jo", "pause"),
- new
KeyValue<>("emily", "pause"));
+ final List<KeyValue<String, String>> t1Messages = Arrays.asList(
+ new KeyValue<>("bob", "start"),
+ new KeyValue<>("penny", "start"),
+ new KeyValue<>("jo", "pause"),
+ new KeyValue<>("emily", "pause")
+ );
final long t1 = mockTime.milliseconds();
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
- userSessionsStream,
- t1Messages,
- TestUtils.producerConfig(
- CLUSTER.bootstrapServers(),
- StringSerializer.class,
- StringSerializer.class,
- new Properties()),
- t1);
+ userSessionsStream,
+ t1Messages,
+ TestUtils.producerConfig(
+ CLUSTER.bootstrapServers(),
+ StringSerializer.class,
+ StringSerializer.class,
+ new Properties()),
+ t1
+ );
final long t2 = t1 + (sessionGap / 2);
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
- userSessionsStream,
- Collections.singletonList(
- new KeyValue<>("emily", "resume")
- ),
- TestUtils.producerConfig(
- CLUSTER.bootstrapServers(),
- StringSerializer.class,
- StringSerializer.class,
- new Properties()),
- t2);
+ userSessionsStream,
+ Collections.singletonList(
+ new KeyValue<>("emily", "resume")
+ ),
+ TestUtils.producerConfig(
+ CLUSTER.bootstrapServers(),
+ StringSerializer.class,
+ StringSerializer.class,
+ new Properties()),
+ t2
+ );
final long t3 = t1 + sessionGap + 1;
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
- userSessionsStream,
- Arrays.asList(
- new KeyValue<>("bob", "pause"),
- new KeyValue<>("penny", "stop")
- ),
- TestUtils.producerConfig(
- CLUSTER.bootstrapServers(),
- StringSerializer.class,
- StringSerializer.class,
- new Properties()),
- t3);
+ userSessionsStream,
+ Arrays.asList(
+ new KeyValue<>("bob", "pause"),
+ new KeyValue<>("penny", "stop")
+ ),
+ TestUtils.producerConfig(
+ CLUSTER.bootstrapServers(),
+ StringSerializer.class,
+ StringSerializer.class,
+ new Properties()),
+ t3
+ );
final long t4 = t3 + (sessionGap / 2);
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
- userSessionsStream,
- Arrays.asList(
- new KeyValue<>("bob", "resume"), // bobs session
continues
- new KeyValue<>("jo", "resume") // jo's starts new
session
- ),
- TestUtils.producerConfig(
- CLUSTER.bootstrapServers(),
- StringSerializer.class,
- StringSerializer.class,
- new Properties()),
- t4);
+ userSessionsStream,
+ Arrays.asList(
+ new KeyValue<>("bob", "resume"), // bobs session continues
+ new KeyValue<>("jo", "resume") // jo's starts new session
+ ),
+ TestUtils.producerConfig(
+ CLUSTER.bootstrapServers(),
+ StringSerializer.class,
+ StringSerializer.class,
+ new Properties()),
+ t4
+ );
final long t5 = t4 - 1;
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
userSessionsStream,
@@ -884,34 +897,19 @@ public class KStreamAggregationIntegrationTest {
StringSerializer.class,
StringSerializer.class,
new Properties()),
- t5);
+ t5
+ );
final Map<Windowed<String>, KeyValue<String, Long>> results = new
HashMap<>();
final CountDownLatch latch = new CountDownLatch(13);
final String userSessionsStore = "UserSessionsStore";
//noinspection deprecation
builder.stream(userSessionsStream, Consumed.with(Serdes.String(),
Serdes.String()))
- .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
- .windowedBy(SessionWindows.with(ofMillis(sessionGap)))
- .reduce((value1, value2) -> value1 + ":" + value2,
Materialized.as(userSessionsStore))
- .toStream()
- .transform(() -> new Transformer<Windowed<String>, String,
KeyValue<Object, Object>>() {
- private ProcessorContext context;
-
- @Override
- public void init(final ProcessorContext context) {
- this.context = context;
- }
-
- @Override
- public KeyValue<Object, Object> transform(final
Windowed<String> key, final String value) {
- results.put(key, KeyValue.pair(value,
context.timestamp()));
- latch.countDown();
- return null;
- }
-
- @Override
- public void close() {}
+ .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(SessionWindows.with(ofMillis(sessionGap))) .reduce((value1, value2)
-> value1 + ":" + value2, Materialized.as(userSessionsStore))
+ .toStream()
+ .process(() -> (Processor<Windowed<String>, String, Object,
Object>) record -> {
+ results.put(record.key(), KeyValue.pair(record.value(),
record.timestamp()));
+ latch.countDown();
});
startStreams();
@@ -943,10 +941,12 @@ public class KStreamAggregationIntegrationTest {
final long incrementTime = Duration.ofDays(1).toMillis();
final long t1 = mockTime.milliseconds() -
TimeUnit.MILLISECONDS.convert(1, TimeUnit.HOURS);
- final List<KeyValue<String, String>> t1Messages = Arrays.asList(new
KeyValue<>("bob", "start"),
- new
KeyValue<>("penny", "start"),
- new
KeyValue<>("jo", "pause"),
- new
KeyValue<>("emily", "pause"));
+ final List<KeyValue<String, String>> t1Messages = Arrays.asList(
+ new KeyValue<>("bob", "start"),
+ new KeyValue<>("penny", "start"),
+ new KeyValue<>("jo", "pause"),
+ new KeyValue<>("emily", "pause")
+ );
final Properties producerConfig = TestUtils.producerConfig(
CLUSTER.bootstrapServers(),
@@ -959,7 +959,8 @@ public class KStreamAggregationIntegrationTest {
userSessionsStream,
t1Messages,
producerConfig,
- t1);
+ t1
+ );
final long t2 = t1 + incrementTime;
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
@@ -968,7 +969,8 @@ public class KStreamAggregationIntegrationTest {
new KeyValue<>("emily", "resume")
),
producerConfig,
- t2);
+ t2
+ );
final long t3 = t2 + incrementTime;
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
userSessionsStream,
@@ -977,7 +979,8 @@ public class KStreamAggregationIntegrationTest {
new KeyValue<>("penny", "stop")
),
producerConfig,
- t3);
+ t3
+ );
final long t4 = t3 + incrementTime;
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
@@ -987,34 +990,21 @@ public class KStreamAggregationIntegrationTest {
new KeyValue<>("jo", "resume") // jo's starts new session
),
producerConfig,
- t4);
+ t4
+ );
final Map<Windowed<String>, KeyValue<Long, Long>> results = new
HashMap<>();
final CountDownLatch latch = new CountDownLatch(5);
builder.stream(userSessionsStream, Consumed.with(Serdes.String(),
Serdes.String()))
- .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-
.windowedBy(UnlimitedWindows.of().startOn(ofEpochMilli(startTime)))
- .count()
- .toStream()
- .transform(() -> new Transformer<Windowed<String>, Long,
KeyValue<Object, Object>>() {
- private ProcessorContext context;
-
- @Override
- public void init(final ProcessorContext context) {
- this.context = context;
- }
-
- @Override
- public KeyValue<Object, Object> transform(final
Windowed<String> key, final Long value) {
- results.put(key, KeyValue.pair(value,
context.timestamp()));
- latch.countDown();
- return null;
- }
-
- @Override
- public void close() {}
- });
+ .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+ .windowedBy(UnlimitedWindows.of().startOn(ofEpochMilli(startTime)))
+ .count()
+ .toStream()
+ .process(() -> (Processor<Windowed<String>, Long, Object, Object>)
record -> {
+ results.put(record.key(), KeyValue.pair(record.value(),
record.timestamp()));
+ latch.countDown();
+ });
startStreams();
assertTrue(latch.await(30, TimeUnit.SECONDS));
@@ -1025,7 +1015,7 @@ public class KStreamAggregationIntegrationTest {
}
- private void produceMessages(final long timestamp) throws Exception {
+ private void produceMessages(final long timestamp) {
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
streamOneInput,
Arrays.asList(
@@ -1039,11 +1029,12 @@ public class KStreamAggregationIntegrationTest {
IntegerSerializer.class,
StringSerializer.class,
new Properties()),
- timestamp);
+ timestamp
+ );
}
- private void createTopics(final String safeTestName) throws
InterruptedException {
+ private void createTopics(final String safeTestName) throws Exception {
streamOneInput = "stream-one-" + safeTestName;
outputTopic = "output-" + safeTestName;
userSessionsStream = "user-sessions-" + safeTestName;
@@ -1067,7 +1058,7 @@ public class KStreamAggregationIntegrationTest {
private <K, V> List<KeyValueTimestamp<K, V>> receiveMessages(final
Deserializer<K> keyDeserializer,
final
Deserializer<V> valueDeserializer,
- final Class
innerClass,
+ final
Class<?> innerClass,
final int
numMessages,
final
TestInfo testInfo)
throws Exception {
@@ -1093,7 +1084,7 @@ public class KStreamAggregationIntegrationTest {
private <K, V> List<KeyValueTimestamp<K, V>>
receiveMessagesWithTimestamp(final Deserializer<K> keyDeserializer,
final Deserializer<V> valueDeserializer,
-
final Class innerClass,
+
final Class<?> innerClass,
final int numMessages,
final TestInfo testInfo) throws Exception {
final String safeTestName = safeUniqueTestName(testInfo);
@@ -1117,7 +1108,7 @@ public class KStreamAggregationIntegrationTest {
private <K, V> String readWindowedKeyedMessagesViaConsoleConsumer(final
Deserializer<K> keyDeserializer,
final
Deserializer<V> valueDeserializer,
- final
Class innerClass,
+ final
Class<?> innerClass,
final
int numMessages,
final
boolean printTimestamp) throws Exception {
final ByteArrayOutputStream newConsole = new ByteArrayOutputStream();